Differences between revisions 15 and 16
 ⇤ ← Revision 15 as of 2009-12-01 09:32:29 → Size: 20185 Editor: c-98-207-155-182 Comment: ← Revision 16 as of 2009-12-15 00:52:17 → ⇥ Size: 20980 Editor: socks1 Comment: Deletions are marked like this. Additions are marked like this. Line 230: Line 230: For example, if we assume * total number of samples = 200  * total number of samples with key k1 = 30  * size of input file = 1G. * totalMemory = 150M * avgMemUsage for tuples of k1 = 150 bytes * avgDiskUsage for tuples of k1 = 100 bytesthen, * estimated total number of k1 that can fit in memory = 150M/150 = 1M * estimated total number of tuples from input file = 1G/100 = 10M tuples  * estimated number of tuples for k1 from input file = (30/200) * 10M = 1.5M * estimated total number of reducers for k1 = Math.ceil (1.5M/1M) = 2This calculation is done on every key of samples. If a key requires more than 1 reducer, it is regarded as a skewed key, and pre-allocated with multiple reducers. The reducers are allocated to skewed keys in round robin fashion.

Skewed Join

Introduction

Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains (1). In order to counteract this problem, skewed join computes a histogram of the key space and uses this data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input keys. It accomplishes this by splitting one of the input on the join predicate and streaming the other input.

Use cases

Skewed join can be used when the underlying data is sufficiently skewed and the user needs a finer control over the allocation of reducers to counteract the skew. It should also be used when the data associated with a given key is too large to fit in memory.

```big = LOAD 'big_data' AS (b1,b2,b3);

massive = LOAD 'massive_data' AS (m1,m2,m3);

C = JOIN big BY b1, massive BY m1 USING "skewed";```

In order to use skewed join,

• Skewed join currently works with two-table inner join.
• Append 'using "skewed"' construct to the join to force pig to use skewed join
• pig.skewedjoin.reduce.memusage specifies the fraction of heap available for the reducer to perform the join. A low fraction forces pig to use more reducers but increases copying cost. For pigmix tests, we have seen good performance when we set this value in the range 0.1 - 0.4. However, note that this is hardly an accurate range. Its value depends on the amount of heap available for the operation, the number of columns in the input and the skew. It is best obtained by conducting experiments to achieve a good performance. The default value is =0.5=.

Requirements

• Support a 'skewed' condition for the join command - Modify Join operator to have a "skewed" option.
• Handle considerably large skew in the input data efficiently
• Join tables whose keys are too big to fit in memory

Implementation

Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the input records and computes a histogram of the underlying key space. The second map/reduce job partitions the input table and performs a join on the predicate. In order to join the two tables, one of the tables is partitioned and other is streamed to the reducer. The map task of the join job uses the pig.keydist file to determine the number of reducers per key. It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen in the reduce phase of the join job.

Sampler phase

If the underlying data is sufficiently skewed, load imbalances will result in a few reducers getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution and stores it in the pig.keydist file. In order to reduce spillage, the sampler conservatively estimates the number of rows that can be sent to a single reducer based on the memory available for the reducer. The memory available for the reducer is a product of the heap size and the memusage parameter specified by the user. Using this information it creates a map of the reducers and the skewed keys. For the table which is partitioned, the partitioner uses the key distribution to send the data to the reducer in a round robin fashion. For the table which is streamed, the mapper task uses the pig.keydist file to copy the data to each of the reduce partitions.

As a first stab at the implementation, we will be using the random sampler used by Order BY. The sampler currently does not output the key distribution nor the size of the sample record. It will be modified to support the same.

Join Phase

Skewed join happens in the reduce phase. As a convention, the first table in the join command is partitioned and sent to the various reducers. Partitioning allows us to support massive tables without having to worry about the memory limitations. The partitioner is overridden to send the data in a round robin fashion to each of the reducers associated with a key. The partitioner obtains the reducer information from the key distribution file. If the input is highly skewed and the number of reducers is very low, the task will bail out and report an error.

For the streaming table, since more than one reducer can be associated with a key, the streamed table records (that match the key) needs to be copied over to each of these reducers. The mapper function uses the key distribution in pig.keydist file to copy the records over to each of the partition. It accomplishes this be inserting a PRop to the logical plan. The PRop sets a partition index to each of the key/value pair which is then used by the partitioner to send the pair to the right reducer.

Partition Rearrange operator

The partition rearrange operator (PRop) is an overloaded version of the local rearrange operator. Similar to local rearrange, it takes an input tuple and outputs a key/value pair with the tuple being the value. PRop however outputs the reducer index along with the tuple. The reducer index is represented as a 1 byte field. This index is used by the partitioner to copy the streaming input record to the multiple reducers.

Determining the number of reducers per key

The number of reducers for a key is obtained from the key distribution file. Along with the distribution, the sampler estimates the number of reducers needed for a key by calculating the number of records that fit in a reducer. It computes this by estimating the size of the sample and the fraction of heap available to the jvm for the join operation. The fraction of heap is provided as a config parameter pig.mapred.skewedjoin.memusage by the user. Knowing the number of records per reducer helps minimize disk spillage.

Handling 3-way joins

Currently we do not support more than two tables for skewed join. Specifying 3+ way joins will fail validation. For such joins, we rely on the user to break them up into 2 way joins.

Implementation stages

The implementation of skewed join is split into two phases:

• In the first phase the skewed join uses the order by sampling to compute a histogram of the records. It then relies on user configs to pass the intermediate keys to the right reducers.
• In the second phase the current uniform random sampling used by order by will be replaced by a block level sampler which will avoid the problem of over-sampling the data for large inputs.

Skewed Join performance

We have run the PigMix suite L3 test on a Hadoop cluster to compare skewed join with the regular join. On an average of 3 runs, skewed join took around 24 hours 30 minutes to complete whereas the regular join had to be killed after running for 5 days.

We conducted various performance tests to come up with a "magic" value for the memusage parameter. We ran the pigmix suite L3 query to join an input with 9 columns with an input with 2 columns. Here are the results:

 Number of tuples Number of Reducers Total Time Memusage 262159 x 2607 2 8min 10sec 0.5 262159 x 2607 3 5min 8sec 0.3 262159 x 2607 5 3min 23sec 0.2 262159 x 2607 9 2min 6 sec 0.1 262159 x 2607 18 1min 15sec 0.05 262159 x 2607 36 1min 12sec 0.025 262159 x 2607 90 1min 13sec 0.01 262159 x 2607 112 1min 17sec 0.008 262159 x 26195 2 77min 10sec 0.5 262159 x 26195 3 47min 58sec 0.3 262159 x 26195 5 27min 47sec 0.2 262159 x 26195 9 16min 38sec 0.1 262159 x 26195 18 8min 31sec 0.05 262159 x 26195 36 4min 37sec 0.025 262159 x 26195 90 3min 56sec 0.01 262159 x 26195 112 4min 42sec 0.008

When both the inputs had 2 columns, the value of memusage had to be even lower:

 Number of tuples Number of Reducers Total Time Memusage 262159 x 2607 1 11min 57sec 0.5 262159 x 2607 1 11min 57sec 0.3 262159 x 2607 1 11min 57sec 0.2 262159 x 2607 1 11min 57sec 0.1 262159 x 2607 1 11min 57sec 0.05 262159 x 2607 2 6min 22sec 0.025 262159 x 2607 5 2min 40sec 0.01 262159 x 2607 6 2min 19sec 0.008 262159 x 2607 14 1min 16sec 0.003 262159 x 2607 42 1min 8sec 0.001 262159 x 2607 83 1min 7sec 0.0005 262159 x 26195 1 113min 48sec 0.5 262159 x 26195 1 113min 48sec 0.3 262159 x 26195 1 113min 48sec 0.2 262159 x 26195 1 113min 48sec 0.1 262159 x 26195 1 113min 48sec 0.05 262159 x 26195 2 60min 17sec 0.025 262159 x 26195 5 23min 35sec 0.01 262159 x 26195 6 20min 9sec 0.008 262159 x 26195 14 9min 20sec 0.003 262159 x 26195 42 5min 41sec 0.001 262159 x 26195 83 3min 42sec 0.0005

As evident from the results, the performance of skewed join varies significantly with the value of memusage. We will advise keeping a low value for memusage, thus using multiple reducers for the join. Note that setting an extremely low value increases the copying cost since the streaming table now needs to be copied to more reducers. We have seen good performance when this value was set in the range of 0.1 - 0.4 for the pigmix tests.

Internal Changes

Skewed Partitioner

SkewedPartitioner overrides the default Hadoop partitioner. For non skewed keys, the partitioner uses the key distribution to send the data to the reducer in a round robin fashion. It maintains the reducer map as a static hashmap. It also maintains a current index which is used to obtain the next reducer index.

NullablePartitionWritable

This is an adapter class which provides a partition index to the NullableWritable class. The partition index is used by both the paritioning and the streaming table. For non skewed keys, this value is set to -1.

PigMapReduce

A new class called MapWithPartitionIndex is implemented. This class has the logic to set the appropriate key/value based on whether the input was paritioning / streaming table. The JobControlCompiler sets this to be the Map class.

POPartitionRearrange

POParitionRearrange is a physical operator which extends the POLocalRearange. It reads the reducer map from the sampler phase and sets the appropriate reducer indexes. It overrides the constructPROutput and getNext of it's parent class for this purpose. getNext loads the reducerMap if it is not loaded whereas constructPROutput sets the reducerIndex.

MapRedUtil

This class implements a function to load and parse the reducer map obtained after the sampler phase. It is invoked by both SkewedPartitioner and POPartitionRearrange. The former is used by the partitioning table to send tuples to the reducers in a round robin fashion. The latter uses the map to stream the second input rows to each of the reducers as specified by the reducer map.

QueryParser

The parser has changed to handle "skewed" keyword and do basic validations.

LOJoin

The logical operator will distinguish between the various types of join. The various types are represented as an enum JOIN_TYPE. The rest of the join operators will be modified to make use of this operator.

POSkewedJoin

A new physical operator used for skewed join. It contains the plans to retrieve join keys.

MRCompiler

A new method visitSkewedJoin(), is added to convert !POSkewedJoin? into two MR jobs: sampling job, and a skewed join job.

• Creation of Sampling Job

The creation of sampling job is done through getSkewedJoinSampleJob(). The sampling is only executed on the first table, so this method wraps up the operators that are processed into MRReduceOper? from first table, then use the output of this MR job as input, to starts a new job for sampling. This method gets operators for retrieving keys from first table, and add a new operator to call an UDF to get memory usage of a tuple. These operators will be used by getSamplingJob() as transformation plan to process input data.

The getSamplingJob() is refactored to be used by both skewed join and order by. The caller specifies POSort operator used to sort samples, the previous job before the sampling job, the transformation plans to operate on samples, the input file, the output file, the UDF to calculate output, the UDF arguments and the class name of sample loader. This method creates a new MapReduceOper? for the sampling job, then adds the transformation plans into its map plan. For reducer plan, this method creates operator to add a constant value "all" into the tuple, and use POPackage to group all tuples by the "all" column into one bag. It then creates POForEach? as its successor. Under POForEach?, it has two physical plans for inputs. The first one has an ConstantExpression? with the value of parallelism, the second one has the POSort to sort samples. The output from this POForEach? would contain (parallelism, bag of samples). After that, another POForEach? is created, which has a physical plan to call the UDF to generate output results.

For skewed join, the getSamplingJob() is called from getSkewedJoinSampleJob(). It passes PoissonSampleLoader? as sample loader and PartitionSkewedKeys? as UDF to generate output results.

Following is an example of what the MR plan looks like for sampling job:

```(Name: MapReduce(1) - 1-28:
|   Store(hdfs://localhost/tmp/temp591209209/tmp-1005985854:org.apache.pig.builtin.BinStorage) - 1-40
|   |
|   |---New For Each(false)[tuple] - 1-39
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] - 1-38
|       |   |
|       |   |---Project[tuple][*] - 1-37
|       |
|       |---New For Each(false,false)[tuple] - 1-36
|           |   |
|           |   Constant(1) - 1-35
|           |   |
|           |   POSort[bag]() - 1-20
|           |   |   |
|           |   |   Project[bytearray][0] - 1-34
|           |   |
|           |   |---Project[tuple][1] - 1-33
|           |
|           |---Package[tuple]{chararray} - 1-32
|   Local Rearrange[tuple]{chararray}(false) - 1-31
|   |   |
|   |   Constant(all) - 1-30
|   |
|   |---New For Each(true,true)[tuple] - 1-29
|       |   |
|       |   Project[bytearray][0] - 1-18
|       |   |
|       |   POUserFunc(org.apache.pig.impl.builtin.TupleSize)[tuple] - 1-26
|       |   |
|       |   |---Project[tuple][*] - 1-25
|       |
• Creation of Skewed Join Job

After creating sampling job, the visitSkewedJoin() creates another MR job for skewed join. It does this by creating POLocalRearrange? for first table, POPartitionRearrange? for second table, followed by POGlobalRearrange? and POPackage, then it run the MRCompiler against these operators to get MR plan. The MR plan for this job is similar to the MR plan for regular join, except that the POLocalRearrange? for second table is replaced with POPartitionRearrange?.

TupleSize

An UDF used by sampling job to get memory usage and disk usage of a tuple. It takes the sample tuple as input and outputs another tuple with two fields, the memory usage and the disk usage.

PartitionSkewedKeys

An UDF used by sampling job to calculate reducer indexes for skewed keys. This UDF gets maximum amount of memory and multiply it by a user specified percentage to get available memory to hold tuples. It then goes through the sample bag, which are sorted by the sample keys. It gets to number of samples for a given key. By using the average memory usage and disk usage for that key, it estimates how many tuples for this key exists in input file, and therefore, estimates how many reducers it would need.

```       Number of Tuples Fit in Memory (tupleMCount) = totalMemory / avgMemUsage
Number of Tuples from First Table (tupleCount) = (sampleCount / totalSampleCount) * (inputFileSize / avgDiskUsage)
Number of Reducers = (int) Math.round(Math.ceil((double) tupleCount / tupleMCount));```

For example, if we assume

• total number of samples = 200
• total number of samples with key k1 = 30
• size of input file = 1G.
• totalMemory = 150M
• avgMemUsage for tuples of k1 = 150 bytes
• avgDiskUsage for tuples of k1 = 100 bytes

then,

• estimated total number of k1 that can fit in memory = 150M/150 = 1M
• estimated total number of tuples from input file = 1G/100 = 10M tuples
• estimated number of tuples for k1 from input file = (30/200) * 10M = 1.5M
• estimated total number of reducers for k1 = Math.ceil (1.5M/1M) = 2

This calculation is done on every key of samples. If a key requires more than 1 reducer, it is regarded as a skewed key, and pre-allocated with multiple reducers. The reducers are allocated to skewed keys in round robin fashion.

This UDF generates an output which will be used by the following join job. The format of the output file is a map. It has two keys:

• totalreducers: the number of total reducers for second job
• partition.list: a list of tuples with format of (skewed key, beginning index, ending index). It means for a skewed key, its tuples should be split to use reducer <beginning index> to <ending index>. <ending index> could be less than <beginning index> for the case of wrapping.

For example, if the output file is following: {totalreducers=3, partition.list={k1,0,1}{k2,2,1}} It means two skewed keys are found, k1 and k2. k1 would need 2 reducers: 0 and 1. k2 would need 3 reducers: 2, 0, 1.

MapReducerOper

Add variable to set the name of the partition file from sampling job. This name of this file needs to be set into JobConf? of second skewed join job as reference data.

JobControlCompiler

During creation of JobConf?, it needs to check if the MapReducerOper? is for skewed join, if it is, it needs to set the partition file into job conf, and set proper partitioner, mapper class and output key class.

FileLocalizer

Add method to get file size, or the total size of files in input path.

References

• (1) "Practical Skew Handling in Parallel Joins" - David J. Dewitt, Jeffrey F. Naughton, Donovan A. Schneider, S. Seshadri

PigSkewedJoinSpec (last edited 2009-12-15 00:52:17 by socks1)