Differences between revisions 16 and 17
Revision 16 as of 2009-01-15 01:57:02
Size: 8179
Editor: OlgaN
Revision 17 as of 2009-09-20 23:38:22
Size: 8179
Editor: localhost
Comment: converted to 1.6 markup
No differences found!

Join Framework


x This document provides a comprehensive view of performing joins in Pig. By JOIN here we mean traditional inner/outer SQL joins which in Pig are realized via COGROUP followed by flatten of the relations.

Some of the approaches described in this document can also be applied to CROSS and GROUP as well.


Currently, Pig running on top of Hadoop executes all joins in the same way. During the map stage, the data from each relation is annotated with the index of that relation. Then, the data is sorted and partitioned by the join key and provided to the reducer. This is similar to SQL's hash join. The data from the same relation is guaranteed to be continuous for the same key. This is to allow optimization that only keep N-1 relations in memory.

In some situations, more efficient join implementations can be constructed if more is known about the data of the relations. They are described in the section.

Pre-partitioned Join (PPJ)

This join type takes advantage of the fact that the data of all relations is already partition by the join key or its prefix which means that the join can be done completely independently on separate nodes. It further helps if the data is sorted on the key; otherwise it might have to get sorted before the join.

Also if some but not other tables are partitioned on the join key, the unpartitioned tables can be shuffled before the join.

In the case of Hadoop, this type of join means that the join can be done in a Map avoiding SORT/SHUFFLE/REDUCE stages. The performance would be even better if the partitions for the same key ranges were co-located on the same nodes and if the computation was scheduled to run on this nodes. However, for now this is outside of Pig's control.

Note that GROUP can take advantage of this knowledge as well.

To support this type of joing the data can be layed out in 2 ways. First, the data is globally sorted on the join key and range index is available. Second, the data is staticly partitioned in a fixed number of buckets using the same partitioning function. The first approach is more flexible since it allows arbitrary level of parallelism for processing the data but it is more complex and expensive to generate the data. Also, an open question for the second approach is how to identify matching partitions; using file name seems like a pretty fragile approach.

Fragment Replicate Join (FRJ)

This join type takes advantage of the fact that N-1 relations in the join are small. In this case, the small tables can be copied onto all the nodes and be joined with the data from the larger table. This saves the cost of sorting and partitioning the large table. The performance benefits can be even greater if small tables fit into main memory; otherwise, both the small tables and the partition of the large need to be sorted which is still better than having to shuffle the large table.

There are a couple of open questions here:

  1. How do we know that the data is small enough to use this type of join. If the join comes right after the load, we know the input data sizes. However, if data undergone many transformations in between we would not be able to tell. Seems like at least initially, we should limit this to joins coming right after load or to FRJ explicitly requested by the user.
  2. How do we know if the data will fit into memory. One way to do it is to try to load it and if it fails to fall back to the other type. We need to investigate how feasible this approach is.

If you have several larger tables in the join, it might be beneficial to split the join to fit FRJ pattern since it would significantly reduce the size of the data going into the next join and might even allow to use FRJ again.

For Hadoop this type of join means that the join can happen on the map side.

The data coming out of the join is not guaranteed to be sorted on the join key which could cause problems for queries that follow join by GROUP or ORDER BY on the prefix of the join key. This should be taken into account when choosing join type.

Note that CROSS can take advantage of this approach as well.

Details of the current implementation and performance measurements can be found in http://wiki.apache.org/pig/PigFRJoin.

Indexed Join (IJ)

This join type takes advantage of the fact that one or more tables participating in the join have index on the join key or its prefix. This is similar in structure to FRJ join but could be even more efficient since processing time can be proportional to the size of the non-indexed and hopefully smaller table.

In Hadoop, this will also result in a map side join.

Currently neither Pig nor Hadoop have indexing structure. So getting to this point might take some time and needs some compelling use cases to make the investment.


To choose best join algorithm, additional information about the data is required. This data can be stored with the data or in a separate repository in which case Pig can consume this data and make choices on user's behalf. However, part of Pig philosophy is to eat anything which means in this case to operate correctly and as efficiently as possible in the absence of the metadata. Also, even if metadata is available user should be able to disable its use.

Metadata Available

If metadata is available pig will pull this metadata and use it as part of optimization. The details of how this would be done is beyond the scope of this document. The required data would need to be communicated as part of Pig requirements of the metadata repository whenever one is available.

A user can disable optimizations by using set command:

set optimizations.all 'off'

We might choose later to allow only particular types of optimizations to be disabled like:

set optimizations.join 'off'
set optimizations.reorder 'off'

Also, a user should be able to specify a particular type of join to perform even if it contradicts the choices made by the optimizer. To support this we would need to extend the JOIN keyword to support outer joins and also to support Join type.

C = JOIN A by name, B by name USING <JOIN TYPE>;

The JOIN TYPE is a string that represents a type of a join like partitioned, ordered partitioned indexed, replicated, etc.

For FRJ join type, the order of the tables might have to imply which table is to be replicated. For IJ, the index metadata will need to be supplied.

No Metadata Available

If no external meta data is available, the user would need to provide additional information to help the optimizer to make good choice. The user could also explicitly specify the join TYPE to use as shown above.

For PPJ, the needed meta data is partition key and sort key. This information can be provided by extending LOAD statement

A = LOAD 'data' using PigStorage() as (x, y, z) PARTITIONED BY (x, y) SORTED BY (x,y);

Also, the user might choose not to give the information and just force the join type by doing

C = JOIN A by name, B by name USING 'partitioned';
C = JOIN A by name, B by name USING 'ordered partitioned';

It is reasonable to allow the user to force the join type for simple cases when all tables fit the same profile like all partitioned and ordered. But if different tables have different profiles like one is both partitioned and ordered, another only partitioned and yet another neither, we would force users to provide per table metadata for pig to make right choices.

For FRJ, we don't really need additional metadata for now. We can just work of input data sizes. Eventually, it would be nice to estimate the actual amount of data going into join as oppose to just input sizes but for that we would need data statistics. For this particular type, explicit user specification via join type might be useful.

For indexed join, user needs to provide index information including index key and how to find the index. Not going to elaborate here since we are ways of from supporting this.

JoinFramework (last edited 2009-09-20 23:38:22 by localhost)