Skewed Join

Introduction

Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains (1). In order to counteract this problem, skewed join computes a histogram of the key space and uses this data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input keys. It accomplishes this by splitting one of the input on the join predicate and streaming the other input.

Use cases

Skewed join can be used when the underlying data is sufficiently skewed and the user needs a finer control over the allocation of reducers to counteract the skew. It should also be used when the data associated with a given key is too large to fit in memory.

big = LOAD 'big_data' AS (b1,b2,b3);

massive = LOAD 'massive_data' AS (m1,m2,m3);

C = JOIN big BY b1, massive BY m1 USING "skewed";

In order to use skewed join,

Requirements

Implementation

Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the input records and computes a histogram of the underlying key space. The second map/reduce job partitions the input table and performs a join on the predicate. In order to join the two tables, one of the tables is partitioned and other is streamed to the reducer. The map task of the join job uses the pig.keydist file to determine the number of reducers per key. It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen in the reduce phase of the join job.

partition.jpg

Sampler phase

If the underlying data is sufficiently skewed, load imbalances will result in a few reducers getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution and stores it in the pig.keydist file. In order to reduce spillage, the sampler conservatively estimates the number of rows that can be sent to a single reducer based on the memory available for the reducer. The memory available for the reducer is a product of the heap size and the memusage parameter specified by the user. Using this information it creates a map of the reducers and the skewed keys. For the table which is partitioned, the partitioner uses the key distribution to send the data to the reducer in a round robin fashion. For the table which is streamed, the mapper task uses the pig.keydist file to copy the data to each of the reduce partitions.

As a first stab at the implementation, we will be using the random sampler used by Order BY. The sampler currently does not output the key distribution nor the size of the sample record. It will be modified to support the same.

Join Phase

Skewed join happens in the reduce phase. As a convention, the first table in the join command is partitioned and sent to the various reducers. Partitioning allows us to support massive tables without having to worry about the memory limitations. The partitioner is overridden to send the data in a round robin fashion to each of the reducers associated with a key. The partitioner obtains the reducer information from the key distribution file. If the input is highly skewed and the number of reducers is very low, the task will bail out and report an error.

For the streaming table, since more than one reducer can be associated with a key, the streamed table records (that match the key) needs to be copied over to each of these reducers. The mapper function uses the key distribution in pig.keydist file to copy the records over to each of the partition. It accomplishes this be inserting a PRop to the logical plan. The PRop sets a partition index to each of the key/value pair which is then used by the partitioner to send the pair to the right reducer.

Partition Rearrange operator

The partition rearrange operator (PRop) is an overloaded version of the local rearrange operator. Similar to local rearrange, it takes an input tuple and outputs a key/value pair with the tuple being the value. PRop however outputs the reducer index along with the tuple. The reducer index is represented as a 1 byte field. This index is used by the partitioner to copy the streaming input record to the multiple reducers.

Determining the number of reducers per key

The number of reducers for a key is obtained from the key distribution file. Along with the distribution, the sampler estimates the number of reducers needed for a key by calculating the number of records that fit in a reducer. It computes this by estimating the size of the sample and the fraction of heap available to the jvm for the join operation. The fraction of heap is provided as a config parameter pig.mapred.skewedjoin.memusage by the user. Knowing the number of records per reducer helps minimize disk spillage.

Handling 3-way joins

Currently we do not support more than two tables for skewed join. Specifying 3+ way joins will fail validation. For such joins, we rely on the user to break them up into 2 way joins.

Implementation stages

The implementation of skewed join is split into two phases:

Skewed Join performance

We have run the PigMix suite L3 test on a Hadoop cluster to compare skewed join with the regular join. On an average of 3 runs, skewed join took around 24 hours 30 minutes to complete whereas the regular join had to be killed after running for 5 days.

We conducted various performance tests to come up with a "magic" value for the memusage parameter. We ran the pigmix suite L3 query to join an input with 9 columns with an input with 2 columns. Here are the results:

Number of tuples

Number of Reducers

Total Time

Memusage

262159 x 2607

2

8min 10sec

0.5

262159 x 2607

3

5min 8sec

0.3

262159 x 2607

5

3min 23sec

0.2

262159 x 2607

9

2min 6 sec

0.1

262159 x 2607

18

1min 15sec

0.05

262159 x 2607

36

1min 12sec

0.025

262159 x 2607

90

1min 13sec

0.01

262159 x 2607

112

1min 17sec

0.008

262159 x 26195

2

77min 10sec

0.5

262159 x 26195

3

47min 58sec

0.3

262159 x 26195

5

27min 47sec

0.2

262159 x 26195

9

16min 38sec

0.1

262159 x 26195

18

8min 31sec

0.05

262159 x 26195

36

4min 37sec

0.025

262159 x 26195

90

3min 56sec

0.01

262159 x 26195

112

4min 42sec

0.008

When both the inputs had 2 columns, the value of memusage had to be even lower:

Number of tuples

Number of Reducers

Total Time

Memusage

262159 x 2607

1

11min 57sec

0.5

262159 x 2607

1

11min 57sec

0.3

262159 x 2607

1

11min 57sec

0.2

262159 x 2607

1

11min 57sec

0.1

262159 x 2607

1

11min 57sec

0.05

262159 x 2607

2

6min 22sec

0.025

262159 x 2607

5

2min 40sec

0.01

262159 x 2607

6

2min 19sec

0.008

262159 x 2607

14

1min 16sec

0.003

262159 x 2607

42

1min 8sec

0.001

262159 x 2607

83

1min 7sec

0.0005

262159 x 26195

1

113min 48sec

0.5

262159 x 26195

1

113min 48sec

0.3

262159 x 26195

1

113min 48sec

0.2

262159 x 26195

1

113min 48sec

0.1

262159 x 26195

1

113min 48sec

0.05

262159 x 26195

2

60min 17sec

0.025

262159 x 26195

5

23min 35sec

0.01

262159 x 26195

6

20min 9sec

0.008

262159 x 26195

14

9min 20sec

0.003

262159 x 26195

42

5min 41sec

0.001

262159 x 26195

83

3min 42sec

0.0005

As evident from the results, the performance of skewed join varies significantly with the value of memusage. We will advise keeping a low value for memusage, thus using multiple reducers for the join. Note that setting an extremely low value increases the copying cost since the streaming table now needs to be copied to more reducers. We have seen good performance when this value was set in the range of 0.1 - 0.4 for the pigmix tests.

Internal Changes

Skewed Partitioner

SkewedPartitioner overrides the default Hadoop partitioner. For non skewed keys, the partitioner uses the key distribution to send the data to the reducer in a round robin fashion. It maintains the reducer map as a static hashmap. It also maintains a current index which is used to obtain the next reducer index.

NullablePartitionWritable

This is an adapter class which provides a partition index to the NullableWritable class. The partition index is used by both the paritioning and the streaming table. For non skewed keys, this value is set to -1.

PigMapReduce

A new class called MapWithPartitionIndex is implemented. This class has the logic to set the appropriate key/value based on whether the input was paritioning / streaming table. The JobControlCompiler sets this to be the Map class.

POPartitionRearrange

POParitionRearrange is a physical operator which extends the POLocalRearange. It reads the reducer map from the sampler phase and sets the appropriate reducer indexes. It overrides the constructPROutput and getNext of it's parent class for this purpose. getNext loads the reducerMap if it is not loaded whereas constructPROutput sets the reducerIndex.

MapRedUtil

This class implements a function to load and parse the reducer map obtained after the sampler phase. It is invoked by both SkewedPartitioner and POPartitionRearrange. The former is used by the partitioning table to send tuples to the reducers in a round robin fashion. The latter uses the map to stream the second input rows to each of the reducers as specified by the reducer map.

QueryParser

The parser has changed to handle "skewed" keyword and do basic validations.

LOJoin

The logical operator will distinguish between the various types of join. The various types are represented as an enum JOIN_TYPE. The rest of the join operators will be modified to make use of this operator.

POSkewedJoin

A new physical operator used for skewed join. It contains the plans to retrieve join keys.

MRCompiler

A new method visitSkewedJoin(), is added to convert !POSkewedJoin? into two MR jobs: sampling job, and a skewed join job.

The getSamplingJob() is refactored to be used by both skewed join and order by. The caller specifies POSort operator used to sort samples, the previous job before the sampling job, the transformation plans to operate on samples, the input file, the output file, the UDF to calculate output, the UDF arguments and the class name of sample loader. This method creates a new MapReduceOper? for the sampling job, then adds the transformation plans into its map plan. For reducer plan, this method creates operator to add a constant value "all" into the tuple, and use POPackage to group all tuples by the "all" column into one bag. It then creates POForEach? as its successor. Under POForEach?, it has two physical plans for inputs. The first one has an ConstantExpression? with the value of parallelism, the second one has the POSort to sort samples. The output from this POForEach? would contain (parallelism, bag of samples). After that, another POForEach? is created, which has a physical plan to call the UDF to generate output results.

For skewed join, the getSamplingJob() is called from getSkewedJoinSampleJob(). It passes PoissonSampleLoader? as sample loader and PartitionSkewedKeys? as UDF to generate output results.

Following is an example of what the MR plan looks like for sampling job:

(Name: MapReduce(1) - 1-28:
|   Store(hdfs://localhost/tmp/temp591209209/tmp-1005985854:org.apache.pig.builtin.BinStorage) - 1-40
|   |
|   |---New For Each(false)[tuple] - 1-39
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - 1-38
|       |   |
|       |   |---Project[tuple][*] - 1-37
|       |
|       |---New For Each(false,false)[tuple] - 1-36
|           |   |
|           |   Constant(1) - 1-35
|           |   |
|           |   POSort[bag]() - 1-20
|           |   |   |
|           |   |   Project[bytearray][0] - 1-34
|           |   |
|           |   |---Project[tuple][1] - 1-33
|           |
|           |---Package[tuple]{chararray} - 1-32
|   Local Rearrange[tuple]{chararray}(false) - 1-31
|   |   |
|   |   Constant(all) - 1-30
|   |
|   |---New For Each(true,true)[tuple] - 1-29
|       |   |
|       |   Project[bytearray][0] - 1-18
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.TupleSize)[tuple] - 1-26
|       |   |
|       |   |---Project[tuple][*] - 1-25
|       |
|       |---Load(hdfs://localhost/tmp/temp591209209/tmp-1276626748:org.apache.pig.impl.builtin.PoissonSampleLoader('org.apache.pig.builtin.BinStorage','100')) - 1-27 Operator Key: 1-28)

TupleSize

An UDF used by sampling job to get memory usage and disk usage of a tuple. It takes the sample tuple as input and outputs another tuple with two fields, the memory usage and the disk usage.

PartitionSkewedKeys

An UDF used by sampling job to calculate reducer indexes for skewed keys. This UDF gets maximum amount of memory and multiply it by a user specified percentage to get available memory to hold tuples. It then goes through the sample bag, which are sorted by the sample keys. It gets to number of samples for a given key. By using the average memory usage and disk usage for that key, it estimates how many tuples for this key exists in input file, and therefore, estimates how many reducers it would need.

       Number of Tuples Fit in Memory (tupleMCount) = totalMemory / avgMemUsage
       Number of Tuples from First Table (tupleCount) = (sampleCount / totalSampleCount) * (inputFileSize / avgDiskUsage)
       Number of Reducers = (int) Math.round(Math.ceil((double) tupleCount / tupleMCount));

For example, if we assume

then,

This calculation is done on every key of samples. If a key requires more than 1 reducer, it is regarded as a skewed key, and pre-allocated with multiple reducers. The reducers are allocated to skewed keys in round robin fashion.

This UDF generates an output which will be used by the following join job. The format of the output file is a map. It has two keys:

For example, if the output file is following: {totalreducers=3, partition.list={k1,0,1}{k2,2,1}} It means two skewed keys are found, k1 and k2. k1 would need 2 reducers: 0 and 1. k2 would need 3 reducers: 2, 0, 1.

MapReducerOper

Add variable to set the name of the partition file from sampling job. This name of this file needs to be set into JobConf? of second skewed join job as reference data.

JobControlCompiler

During creation of JobConf?, it needs to check if the MapReducerOper? is for skewed join, if it is, it needs to set the partition file into job conf, and set proper partitioner, mapper class and output key class.

FileLocalizer

Add method to get file size, or the total size of files in input path.

References

PigSkewedJoinSpec (last edited 2009-12-15 00:52:17 by yinghe)