Accumulator UDF

Introduction

For data processing with PIG, it is very common to call "group by" or "cogroup" to group input tuples by a key, then call one or more UDFs to process each group. For example:

A = load 'mydata';
B = group A by $0;
C = foreach B generate group, myUDF1(A), myUDF2(A, 'some_param'), myUDF3(A);
store C into 'myresult';

The current implementation is during grouping process, all tuples that belongs to the same key are materialized into a DataBag, and the DataBag(s) are passed to the UDFs. This causes performance and memory problem. For a large key, if its tuples can not fit into memory, performance has to sacrifice to spill extra data into disk.

Since many UDFs do not really need to see all the tuples that belongs to a key at the same time, it is possible to pass those tuples as batches. A good example would be like COUNT(), SUM(). Tuples can be passed to UDFs in accumulative manner. When all the tuples are passed, the final method is called to retrieve the value. This way, we can minimize the memory usage and improve performance by avoiding data spill.

UDF change

An Accumulator interface is defined. UDFs that are able to process tuples in accumulative manner should implement this interface. It is defined as following:

public interface Accumulator <T> {
    /**
     * Pass tuples to the UDF.  You can retrive DataBag by calling b.get(index).
     * Each DataBag may contain 0 to many tuples for current key
     */
    public void accumulate(Tuple b) throws IOException;

    /**
     * Called when all tuples from current key have been passed to accumulate.
     * @return the value for the UDF for this key.
     */
    public T getValue();

    /**
     * Called after getValue() to prepare processing for next key.
     */
    public void cleanup();

}

UDF should still extend EvalFunc as before. The PIG engine would detect based on context whether tuples can be processed accumulatively. If not, then regular EvalFunc would be called. Therefore, for a UDF, both interfaces should be implemented properly

Use Cases

PIG engine would process tuples accumulatively only when all of the UDFs implements Accumulator interface. If one of the UDF is not Accumulator, then all UDFs are called by their EvalFunc interface as regular UDFs. Following are examples accumulator interface of UDFs would be called:

When to Call Accumulator

Therefore, if under POForEach, there are multiple UDFs, some are Accumulators,while some are not, the Accumulator would be off.

Design

Once the optimizer detects that the reduce plan can run accumulatively, it set a flag to POPackage and POForEach to indicate the data is going to be processed in accumulative mode. POForEach in turn sets this flat to all the operations of its input plans.

During runtime, POPackages creates a tuple with AccumultiveBag as its fields. This bag wraps up an AccumulativeTupleFeeder, which has a handler to the reducer Iterator to pull next batch of tuples. It also has a buffer to hold tuples of current batch. All AccumulativeBag shares the same feeder. The tuple generated by POPackage is passed to POForeach, POForeach is able to get AccumulativeTupleFeeder from AccumulativeBag. It then calls feeder.nextBatch() to fill the AccumulativeBag with first batch of tuples, pass them to the POUserFunc. Because POUserFunc is marked as accumulative, it would call the accumulate() of the UDF. The POUserFunc returns with a code of STATUS_BATCH_OK. Then POForeach pulls next batch, and so on until the last batch of tuples are retrieved and processed. At the end, POForeach notifies POUserFunc that accumulation is done. It makes a final call to POUserFunc, which in turn calls getValue() to return the final result.

Following is the sequence diagram of the data flow:

/homes/yinghe/Desktop/SequenceDiagram.jpg

Internal Changes

Accumulator

PhysicalOperator

MapReduceLauncher

AccumulatorOptimizer

POStatus

POForEach

AccumulativeBag

AccumulativeTupleBuffer

POPackage

ExpressionOperator

POUserFunc

Other ExpressionOperators

Buildin UDFs

Performance Test

PigAccumulatorSpec (last edited 2009-11-16 19:15:10 by yinghe)