Proposal for hash based aggregation in map (in-map combiner)


Pig does (sort based) partial aggregation in map side, through the use of combiner. MR serializes the output of map to a buffer, sorts it on the keys, deserializes and passes the values grouped on the keys to combiner phase. The same work of combiner can be done in the map phase itself by using a hash-map on the keys. This hash based (partial) aggregation can be done with or without a combiner phase.




Design Option 1

In this option, the work done by MR to group values by key before invoking the combiner plan is simulated within the map using a hash-map. When the hash-map is big enough, run the accumulated groups through the combiner plan. The results of the in-map combine plan execution get written as the map output. This option is easier to implement. But it is not going to be efficient in its memory footprint as input tuples will be kept around until the configured memory limit forces it to combine them. This will result in a smaller number of key-values that will be in memory at a time, and result in fewer values being aggregated, and a larger map output size.

query plan

MapReduceOper class that represents MR job in pig MR plan will now have a new member inMapCombinePlan, which is a PhysicalPlan. In the initial implementation, combiner physical plan (the member called combinePlan) can be cloned here.

But for supporting in-map combine for cases where combiner does not get used (eg. when there is a bag/non-algebraic udf), the MR plan optimizer rules need to change. In such cases, the output type of map and combine plan would be different, that could be a problem.

plan execution

A new class that extends PigMapBase will have a collect call that collects the key-values into a hash-map. The hash-map will spill into the combine plan, when its estimated size exceeds a configurable threshold . This would be similar to the InternalCacheBag implementation.

Design Option 2

In this option, there will be a new physical operator, POGroupHash, that will do the hash based aggregation. This will be the last node in the map plan of MR job corresponding to the group operation. When there are two, or a small number of values for a group key (set), it will compute the new partial aggregate value and store it in the hash-map. The memory management of the hash-map will be similar to that of InternalCacheBag, it will estimate its memory footprint. It will flush some % (5%?) of entries when it exceeds configurable memory limit. The least recently used keys can be chosen to be flushed. THese flushed entries will be written as map output.

It might be useful to have a new udf interface that accepts a tuple at a time to compute a partial aggregate, so that new bags don't have to be created for each new tuple that needs to be aggregated. But the bag creation overhead and overhead of calling the udf multiple times could be reduced by calling the udf only after few values have been accumulated in the hash-map.

This design option will have a smaller memory footprint because input tuples can be aggregated sooner. This will result also in smaller output size because more records can be held in the hash-map. But the work involved with this option is more because the MR plan generation will need to change to use this new relational operator when the query is a 'combinable' query. It can have impact on other visitors and optimizer rules as well.

PigHashBasedAggInMap (last edited 2011-06-13 22:52:44 by nat-dip6)