# Pig Merge Join

## Problem Statement

Often users' data is stored such that both inputs are already totally sorted on the join key. In this case, it is possible to join the data in the map phase of a map reduce job. This will provide a significant performance speed up compared to passing all of the data through unneeded sort and shuffle phases.

## Proposed Solution

Pig will implement a merge join algorithm (or sort-merge join, although in this case the sort is already assumed to have been done). As with other join algorithm choices Pig will not attempt to make a choice for the user at this point. The user will instruct Pig to use this algorithm with a using clause:

`    C = join A by \$0, B by \$0 using "merge";`

Pig will implement this algorithm by selecting the left input of the join to be the input file for the map phase, and the right input of the join to be the side file. It will then sample records from the right input to build an index that contains, for each sampled record, the key(s) the filename and the offset into the file the record begins at. This sampling will be done in an initial map only job. A second MR job will then be initiated, with the left input as its input. Each map will use the index to seek to the appropriate record in the right input and begin doing the join.

## Pre conditions for merge join

In the first release merge join will only work under following conditions:

• Both inputs are sorted in *ascending* order of join keys. If an input consists of many files, there should be a total ordering across the files in the *ascending order of file name*. So for example if one of the inputs to the join is a directory called input1 with files a and b under it, the data should be sorted in ascending order of join key when read starting at a and ending in b. Likewise if an input directory has part files part-00000, part-00001, part-00002 and part-00003, the data should be sorted if the files are read in the sequence part-00000, part-00001, part-00002 and part-00003.
• The merge join only has two inputs
• The loadfunc for the right input of the join should implement the SamplableLoader interface (PigStorage does implement the SamplableLoader interface).

• Only inner join will be supported
• Between the load of the sorted input and the merge join statement there can only be filter statements and foreach statement where the foreach statement should meet the following conditions:
• There should be no UDFs in the foreach statement
• The foreach statement should not change the position of the join keys
• There should not transformation on the join keys which will change the sort order

### Performance pre condition

• For optimal performance, each part file of the left (sorted) input of the join should have a size of at least 1 hdfs block size (for example if the hdfs block size is 128 MB, each part file should be > 128 MB). If the total input size (including all part files) is < a blocksize, then the part files should be uniform in size (without large skews in sizes). The main idea is to eliminate skew in the amount of input the final map job performing the merge-join will process.

In local mode, merge join will fall back to regular join

## Implementation Details

### Logical Plan

In the logical plan, use of this join will be recorded in !LOJoin (similar to the way fragment-replicate join and skew join are).

### Physical Plan

In the physical plan a !POMergeJoin operator will be created. It will contain the logic to implement the join. The logic will be:

```    open left input;
open index;
find last entry in index < first left key; // even if you find index entry == first left key, you must select the previous entry as you don't know where the key starts.
open right input;
seek to offset in right input indicated by selected index entry;
while (left keys) {
advance right input until right key >= left key;
if (right key == left key) {
read left records until key changes, storing records into list;
while(right key is the same) {
join right record with each left record in list;
read next right record;
} else {
}
}```

### Map Reduce Plan

The MR compiler will introduce a sampling MR job before the MR job that contains the !POMergeJoin. (The sampling algorithm is described below.) This sampling job can read as input the output of the previous map reduce job (or if there is no previous map reduce job the initial input file) even if there are physical operators before the !POMergeJoin in the current MR job. No MR boundary is created immediately before the sampling as there is with order by or skew join. For example:

```    A = load 'input1';
B = load 'input2';
C = filter A by \$1 is not null;
D = join B by \$0, C by \$0;```

can produce a map reduce plan:

```    Job 1:
Reduce: Sort the index.

Job 2:
Map: filter->join
Reduce:```

The reason for this difference is that the key location in the file is not affected by the filter, and thus the sample need not be taken after the filter whereas in the skew join and order by cases the skew of the key may be affected by the filter.

The sampling algorithm will need to record the key, filename and the offset into the input file that the record begins at. This is done by MergeJoinIndexer which extracts the keys from input tuple and appends filename and offset.

How many records per block to sample (thus how large to make the index) is not clear. Initially we should have it sample one record per block. We can then experiment to understand the space and performance trade offs of increasing the number of records sampled per block.

### Local Mode

In local mode !LOJoin should not be translated to !POMergeJoin, even when the user requests a sort merge join. We do not need to implement a version of this join that does not require the sampling.

## Outer Join

This design will work for inner joins, and with slight modifications for left outer joins. It will not work for right outer or full outer joins. If we wish to extend it to work for those cases at some point in the future, it will have to be modified to also sample the left input. The reason for this is that in the current implementation !POMergeJoin does not know how far past the end of its input to keep accepting non-matching keys on the right side. It will need to know what key the next block of the left input starts on in order to determine when it should stop reading keys from the right input. A sampling pass on the left input that reads the first key of each block could provide this information. (Is the intent that each map task will at the end of its input continue reading keys from the right side till the first key in the next block and perform the outer join - for the outer join for the first key in the next block onwards the map task corresponding to that block will handle the processing. The extra corner case is the for the first key on the left input the outer join for the all the right keys less than that key will need to be done by the map task processing the first key (the first key would be the first entry in the index for the left side) Perhaps a figure might help illustrate:

Left input block 1: ==================

 25 .. 35

Left input block 2: ===================

 45 .. 65

Right input: ===========

 10 .. 24

|| 25 ||

 ..

|| 35 ||

 .. 44 45 ..

The first map would need to know it is the first map and hence handle the outer join for all the values on the right side < the first key (10 upto 24). It would then handle the join of all values present in the first block of the left input (25 to 35). It would also need to continue to read on the right side upto the first value in the next block of the left input (i.e. upto 44 inclusive). The next map (map on 2nd block) would handle the join of all values from 45 to EOF on right side.

In current implementation (r806281) only inner joins are supported.

## Multiway Join

This algorithm could theoretically be extended to support joins of three or more inputs. For now it will not be. Pig will give an error if users give more than two inputs to a merge join. If users wish to do three plus way joins with this algorithm they can decompose their joins into a series of two ways joins.

We benchmarked the performance of merge-join. Numbers are in table below. Since joins produce large number of output rows(n-squared in worst case) they are filtered out and not written to disk, so runtimes consist of mostly of CPU time. Data size is approximately 1.5 GB for 1M rows.

## Performance Benchmark

 # of rows # of rows Sym-Hash Join Merge-Join Remarks 100K 100K 1m30.588s 0m41.764s 5M 5M 1m5.579s 0m51.535s 15M 15M 2m55.812s 2m16.906s 15M 15M 5m21.030s 4m12.125s 20M 20M 10m30.819s 5m42.302s 20M 20M 12m25.878s 5m42.302s 20M 20M 15m56.533s 7m57.646s 20M 20M 112m20.478s 17m16.542s Zipf Key Distribution 100M 20M 30mins 15mins 50M 100M 42mins 28mins 50M 100M 42mins 28mins 50M 100M 42mins 28mins 100M 100M 74mins 44mins

In our tests, we found merge-join is faster then symmetric hash join anywhere from 30% to 100% depending on size of data, skew in keys, number of map waves run etc. One particular thing which affects merge join is how is input data distributed among files. We recommend to have all input files to be approximately equal in size or atleast as large as block size.

## Phase 2

Phase 1 which got committed in r804310 has few limitations. Those limitations are enumerated below with possible solutions:

Predecessors : Only filter and foreach are currently allowed as predecessor of Merge Join.

MRCompiler maintains state while compiling physical operators. One of them is list of MR jobs which are already created. These MR jobs contain pipeline of physical operator which have already gotten compiled. In case of MergeJoin there are atleast two MR jobs which would have gotten created by the time POMergeJoin is visited. Now POMergeJoin needs to identify which of these MR job corresponds to left input and which one corresponds to right. It does so by matching its predecessor physical operators in the physical plan with the physical operators which are there in compiled MR jobs. But this is not a reliable way. Confusion arises specially when preceding physical operator generated more then one MR job (e.g. in case of order-by). To make Merge Join work in these scenario we need a reliable way of knowing which physical operator belongs to which MR job. A proposal to fix this is to introduce PhyOpToMROp map in spirit of LogToPhyMap. More details at: https://issues.apache.org/jira/browse/PIG-858

Sort order : Data must be sorted in ascending order.

In POMergeJoin comparison of keys should be done by comparator which can be set based on user input.

End-of-All-Input : POMergeJoin needs to know when it is called last time. It does so by checking end of all input flag. Problem is it assumes that when this flag is true that pipeline is running without any input and with status EOP. This holds in all cases except for the case when one of the predecessor of merge join is streaming. Streaming also makes use of end-of-all-input flag and can potentially generate one or more tuples when this flag is set.

getNext() in POMergeJoin should be updated so that it doesn't make this assumption.

Performance : Currently POMergeJoin buffers all the tuples belonging to same key on left side before looking at right key. If keys don't match it throws away that buffer and moves on. Better way is to first look at both keys and then determine if there is any need of doing buffering. This will not save on CPU time because amount of work is nearly equal in either case, though it may affect memory footprint because in case of skewed key which don't match we will unnecessarily be consuming memory.

PigMergeJoin (last edited 2009-09-20 23:38:38 by localhost)