Differences between revisions 12 and 13
Revision 12 as of 2009-09-20 23:38:30
Size: 9787
Editor: localhost
Comment: converted to 1.6 markup
Revision 13 as of 2009-11-19 17:55:02
Size: 9787
Editor: nat-dip6
Comment:
Deletions are marked like this. Additions are marked like this.
Line 21: Line 21:
   * Skewed join currently works with tow-table inner join.    * Skewed join currently works with two-table inner join.

Skewed Join

Introduction

Parallel joins are vulnerable to the presence of skew in the underlying data. If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains (1). In order to counteract this problem, skewed join computes a histogram of the key space and uses this data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input keys. It accomplishes this by splitting one of the input on the join predicate and streaming the other input.

Use cases

Skewed join can be used when the underlying data is sufficiently skewed and the user needs a finer control over the allocation of reducers to counteract the skew. It should also be used when the data associated with a given key is too large to fit in memory.

big = LOAD 'big_data' AS (b1,b2,b3);

massive = LOAD 'massive_data' AS (m1,m2,m3);

C = JOIN big BY b1, massive BY m1 USING "skewed";

In order to use skewed join,

  • Skewed join currently works with two-table inner join.
  • Append 'using "skewed"' construct to the join to force pig to use skewed join
  • pig.skewedjoin.reduce.memusage specifies the fraction of heap available for the reducer to perform the join. A low fraction forces pig to use more reducers but increases copying cost. For pigmix tests, we have seen good performance when we set this value in the range 0.1 - 0.4. However, note that this is hardly an accurate range. Its value depends on the amount of heap available for the operation, the number of columns in the input and the skew. It is best obtained by conducting experiments to achieve a good performance. The default value is =0.5=.

Requirements

  • Support a 'skewed' condition for the join command - Modify Join operator to have a "skewed" option.
  • Handle considerably large skew in the input data efficiently
  • Join tables whose keys are too big to fit in memory

Implementation

Skewed join translates into two map/reduce jobs - Sample and Join. The first job samples the input records and computes a histogram of the underlying key space. The second map/reduce job partitions the input table and performs a join on the predicate. In order to join the two tables, one of the tables is partitioned and other is streamed to the reducer. The map task of the join job uses the pig.keydist file to determine the number of reducers per key. It then sends the key to each of the reducers in a round robin fashion. Skewed joins happen in the reduce phase of the join job.

partition.jpg

Sampler phase

If the underlying data is sufficiently skewed, load imbalances will result in a few reducers getting a lot of keys. As a first task, the sampler creates a histogram of the key distribution and stores it in the pig.keydist file. In order to reduce spillage, the sampler conservatively estimates the number of rows that can be sent to a single reducer based on the memory available for the reducer. The memory available for the reducer is a product of the heap size and the memusage parameter specified by the user. Using this information it creates a map of the reducers and the skewed keys. For the table which is partitioned, the partitioner uses the key distribution to send the data to the reducer in a round robin fashion. For the table which is streamed, the mapper task uses the pig.keydist file to copy the data to each of the reduce partitions.

As a first stab at the implementation, we will be using the random sampler used by Order BY. The sampler currently does not output the key distribution nor the size of the sample record. It will be modified to support the same.

Join Phase

Skewed join happens in the reduce phase. As a convention, the first table in the join command is partitioned and sent to the various reducers. Partitioning allows us to support massive tables without having to worry about the memory limitations. The partitioner is overridden to send the data in a round robin fashion to each of the reducers associated with a key. The partitioner obtains the reducer information from the key distribution file. If the input is highly skewed and the number of reducers is very low, the task will bail out and report an error.

For the streaming table, since more than one reducer can be associated with a key, the streamed table records (that match the key) needs to be copied over to each of these reducers. The mapper function uses the key distribution in pig.keydist file to copy the records over to each of the partition. It accomplishes this be inserting a PRop to the logical plan. The PRop sets a partition index to each of the key/value pair which is then used by the partitioner to send the pair to the right reducer.

Partition Rearrange operator

The partition rearrange operator (PRop) is an overloaded version of the local rearrange operator. Similar to local rearrange, it takes an input tuple and outputs a key/value pair with the tuple being the value. PRop however outputs the reducer index along with the tuple. The reducer index is represented as a 1 byte field. This index is used by the partitioner to copy the streaming input record to the multiple reducers.

Determining the number of reducers per key

The number of reducers for a key is obtained from the key distribution file. Along with the distribution, the sampler estimates the number of reducers needed for a key by calculating the number of records that fit in a reducer. It computes this by estimating the size of the sample and the fraction of heap available to the jvm for the join operation. The fraction of heap is provided as a config parameter pig.mapred.skewedjoin.memusage by the user. Knowing the number of records per reducer helps minimize disk spillage.

Handling 3-way joins

Currently we do not support more than two tables for skewed join. Specifying 3+ way joins will fail validation. For such joins, we rely on the user to break them up into 2 way joins.

Implementation stages

The implementation of skewed join is split into two phases:

  • In the first phase the skewed join uses the order by sampling to compute a histogram of the records. It then relies on user configs to pass the intermediate keys to the right reducers.
  • In the second phase the current uniform random sampling used by order by will be replaced by a block level sampler which will avoid the problem of over-sampling the data for large inputs.

Skewed Join performance

We have run the PigMix suite L3 test on a Hadoop cluster to compare skewed join with the regular join. On an average of 3 runs, skewed join took around 24 hours 30 minutes to complete whereas the regular join had to be killed after running for 5 days.

We conducted various performance tests to come up with a "magic" value for the memusage parameter. We ran the pigmix suite L3 query to join an input with 9 columns with an input with 2 columns. Here are the results:

Number of tuples

Number of Reducers

Total Time

Memusage

262159 x 2607

2

8min 10sec

0.5

262159 x 2607

3

5min 8sec

0.3

262159 x 2607

5

3min 23sec

0.2

262159 x 2607

9

2min 6 sec

0.1

262159 x 2607

18

1min 15sec

0.05

262159 x 2607

36

1min 12sec

0.025

262159 x 2607

90

1min 13sec

0.01

262159 x 2607

112

1min 17sec

0.008

262159 x 26195

2

77min 10sec

0.5

262159 x 26195

3

47min 58sec

0.3

262159 x 26195

5

27min 47sec

0.2

262159 x 26195

9

16min 38sec

0.1

262159 x 26195

18

8min 31sec

0.05

262159 x 26195

36

4min 37sec

0.025

262159 x 26195

90

3min 56sec

0.01

262159 x 26195

112

4min 42sec

0.008

When both the inputs had 2 columns, the value of memusage had to be even lower:

Number of tuples

Number of Reducers

Total Time

Memusage

262159 x 2607

1

11min 57sec

0.5

262159 x 2607

1

11min 57sec

0.3

262159 x 2607

1

11min 57sec

0.2

262159 x 2607

1

11min 57sec

0.1

262159 x 2607

1

11min 57sec

0.05

262159 x 2607

2

6min 22sec

0.025

262159 x 2607

5

2min 40sec

0.01

262159 x 2607

6

2min 19sec

0.008

262159 x 2607

14

1min 16sec

0.003

262159 x 2607

42

1min 8sec

0.001

262159 x 2607

83

1min 7sec

0.0005

262159 x 26195

1

113min 48sec

0.5

262159 x 26195

1

113min 48sec

0.3

262159 x 26195

1

113min 48sec

0.2

262159 x 26195

1

113min 48sec

0.1

262159 x 26195

1

113min 48sec

0.05

262159 x 26195

2

60min 17sec

0.025

262159 x 26195

5

23min 35sec

0.01

262159 x 26195

6

20min 9sec

0.008

262159 x 26195

14

9min 20sec

0.003

262159 x 26195

42

5min 41sec

0.001

262159 x 26195

83

3min 42sec

0.0005

As evident from the results, the performance of skewed join varies significantly with the value of memusage. We will advise keeping a low value for memusage, thus using multiple reducers for the join. Note that setting an extremely low value increases the copying cost since the streaming table now needs to be copied to more reducers. We have seen good performance when this value was set in the range of 0.1 - 0.4 for the pigmix tests.

References

  • (1) "Practical Skew Handling in Parallel Joins" - David J. Dewitt, Jeffrey F. Naughton, Donovan A. Schneider, S. Seshadri

PigSkewedJoinSpec (last edited 2009-12-15 00:52:17 by socks1)