Differences between revisions 24 and 25
Revision 24 as of 2008-02-14 19:23:48
Size: 36989
Comment:
Revision 25 as of 2009-09-20 23:38:19
Size: 37015
Editor: localhost
Comment: converted to 1.6 markup
Deletions are marked like this. Additions are marked like this.
Line 59: Line 59:
attachment:simple.png {{attachment:simple.png}}
Line 121: Line 121:
 1. No API changes to UDFs in the short term. In the long term, we might introduce an aggregation function API that is push-based along the lines of the [http://www.postgresql.org/docs/8.2/static/sql-createaggregate.html Postgres user-defined aggregation function API].  1. No API changes to UDFs in the short term. In the long term, we might introduce an aggregation function API that is push-based along the lines of the [[http://www.postgresql.org/docs/8.2/static/sql-createaggregate.html|Postgres user-defined aggregation function API]].
Line 182: Line 182:
attachment:SimpleLP.jpg {{attachment:SimpleLP.jpg}}
Line 533: Line 533:
attachment:FILTERTranslation.png {{attachment:FILTERTranslation.png}}
Line 540: Line 540:
attachment:Group.png {{attachment:Group.png}}
Line 544: Line 544:
attachment:GroupPhy.png {{attachment:GroupPhy.png}}
Line 1041: Line 1041:
attachment:phynested.png {{attachment:phynested.png}}

Introduction

Task: Redesign Pig logical plan structure and execution engine

Goals:

  • Address current inefficiencies.
  • Open up new opportunities for optimization.
  • Support arbitrary operator DAGs (which may arise in a single pig program, or when jointly executing groups of interrelated programs).

Logical Plan Structure

Current Problems:

  • We use operators to manipulate the outermost dataset, but eval specs to manipulate the nested data, which reduces code reuse and increases complexity.
  • Eval specs are assumed to be a linear chain. Thus it makes doing splits and general DAGs difficult.

Proposal:

  1. Get rid of eval specs, make everything operators
  2. Since Pig deals with nested data models and allows manipulation of nested data, it is only natural for the logical plan structure to be fully nestable, e.g. the foreach operator can have a nested query plan that it uses to process each input tuple.

  3. Execute the outermost operators using their map-reduce implementations if any (see table below). Execute all nested query plans using local execution mode.
  4. Add a split operator that replicates input data along multiple outgoing branches. This will help us to support multiple outputs and DAGs.

Advantages of a nested query plan:

  • Same operator set used for processing both the outermost, as well as nested data; no code duplication, easier to understand.
  • Can reuse the local execution mode to process the nested query plans
  • Can allow for generalization of Pig Latin in the future where the language within FOREACH can be the same as that outside it.

Here is a list of proposed operators:

Operator

Attributes

Number of inputs in query plan

Semantics

Implementation (M-R vs local)

LOAD

file names, load function

0

Loads the contents of the files using the given load function into a bag

Same (with file system abstraction layer

STORE

file name, store function, in future a hierarchy of fields to partition by

1

Stores the input bag into the given file (in future, partitioned by the given field hierarchy)

Same (with file system abstraction layer)

FOREACH

nested query plan

1

Applies the nested query plan to each item of the input bag to produce an output bag

Same

GENERATE

None

>=1

Computes the output(s) of its child query plan(s), and concatenates them together into output tuple(s). This will typically be the topmost operator in the nested query plan within the FOREACH operator

Same

FILTER

nested query plan for condition

1

Applies the nested query plan to each item of the input bag. If the plan returns true, the item is put in the output bag, otherwise not.

Same

GROUP/COGROUP

nested query plans, one for the grouping criteria of each input

>=1

Applies the appropriate nested query plan to each item of each input to determine its group. Items within the same group are grouped together into bags

Different: M-R will use map-reduce, Local will use our new SortedBag to sort and collect data into bags

SORT

list of columns on which to sort, ASC, DESC flags for each column

1

Orders the input bag to produce an output bag

Different: M-R will use quantiles and map-reduce to do a parallel sort, local will use SortedBag.

DISTINCT (Blocking)

None

1

Eliminates duplicates in the input bag to produce an output bag

Different: M-R will rewrite into group/foreach, local will use DistinctBag.

PROJECT

list of column numbers or schema names

1

Selects the specified columns of the input tuple into the output tuple

Same

MAP_LOOKUP

a list of keys to lookup

1

Selects the contents of the specified keys into an output tuple

Same

BINCOND

3 nested query plans: 1 specifying condition, 1 specifying what to ouptut when condition is true, and 1 specifying what to output when condition is false

1

Same as conditional expression in C++/Java (a>b?c:d)

Same

COMPARISON (<, <=, >, >=, matches, ==, !=)

None

>=1

Computes the output(s) of its child query plan(s), and compares them according to the specified logical operator, outputs a boolean

Same

AND/OR/NOT

None

>=1

Computes the (boolean) output(s) of its child query plan(s), and combines them according to the specified logical operator, outputs a boolean

Same

CONSTANT

constant value

0

Outputs a constant value

Same

UDF_APPLY

UDF to apply

>=1

Computes the output(s) of its child query plans, assembles the results together into a tuple, and applies the UDF using that tuple as the argument. The result is passed on as output.

Same

STREAM

program to invoke

>=1 (may have multiple outputs as well)

streams the input to the external program without waiting for output. The output arrives at some later point in time

Same

SPLIT

None

1 (only operator to have multiple outputs apart from STREAM)

replicates input along both output branches

different (depends on our choice of push vs pull model). If pull, M-R buffers to DFS, local buffers in memory (spilling to disk if necessary).

UNION

none

>=1

union of child query plans

Different (Map-side union will be no-op, reduce side will cause break in pipeline) In local mode, straightforward.

Plan Execution

Each of the above logical operators will translate to a physical operator (in many cases, the physical operator will be shared between backends, as shown in the above table).

One physical operators have been linked together into a query plan, they must be executed. There is a choice of mainly 2 models for execution (assume that data flows downwards in an execution plan):

simple.png

  1. Push: Operator A pushes data to B that operates on it, and pushes the result to C.

  2. Pull: Operator C asks B for its next data item. If B has nothing pending to return, it asks A. When A returns a data item, B operates on it, and finally returns the result to C.

Pull API

public interface Pull<T> {
    /*
    * T is the return type. In the most general case
    * it can be Object, so we might omit T altogether.
    */

    public void open();

    public T getNext();

    public void close();
}

Push API:

public interface Push<T> {
    /*
    * T is the output type. In the most general case
    * it can be Object, so we might omit T altogether.
    */

    public void open();

    public putNext(T);

    public void close();
}

Each model has its own unique advantages and disadvantages. Pull is more natural when there are multiple inputs, push is natural when there are multiple outputs. In the context of our operators, there are several places where one of the models is a natural fit, and is hard to replace.

Pull - Natural use in:

  • UDFs already pull data by using an iterator.
  • FILTER, GROUP/COGROUP, BINCOND : Evaluate their nested query plans using pull (can be converted to push, though unnatural).
  • COMPARISON, AND/OR/NOT: Pull data because they have multiple inputs

Push - Natual use in:

  • EvalFunc<Bag>: Eval functions push data (through bag.add()). If we go with a pull model, have to hold in memory any bag output by a function (seems reasonable).

  • SPLIT: Multiple outputs are most naturally supported through the push model.

Disadvantages:

Pull:

  • Requires buffering whenever there are multiple outputs.
  • Requires multiple passes over the data if multiple aggregates are to be computed.

Push:

  • No convenient API to push data to UDFs that have multiple inputs.

Proposal

  1. Single-threaded execution.
  2. No API changes to UDFs in the short term. In the long term, we might introduce an aggregation function API that is push-based along the lines of the Postgres user-defined aggregation function API.

  3. Make the model entirely push-based. Reasons:

    1. Accomodates multiple outputs more naturally.
    2. Accomodates possible future change of having special aggregate function API to iterate over data only once.
    3. Functions can still pull data; we will push an entire iterator to them, instead of pushing tuple by tuple.
  4. Implement hadoop iterator cloning/rewind so that we don't have to do our own disk writes.
  5. Before hadoop iterator cloning becomes available, we could even materialize bags in memory, as today and this model works.
  6. When multiple iterators become available (either by reduce-side extension of hadoop), or when doing a map-side cogroup, it will fit nicely into this model.

Use Cases

To be written

Logical and Physical Plans

  1. A criterion we are adopting in the redesign of the logical and physical layer in Pig is to promote what used to be EvalSpec’s and Cond’s to operators.

  2. Such approach provides: 1.) a clearer definition of the language; 2.) better identification of possibility for optimizations of various form
  3. Logical operators are such that they provide the logical representation of a Pig computation and type checking.
  4. The aspects that pertain to carrying out the actual computations on a given dataset belong to physical operators. Exploitation of type information should enable efficient computations.
  5. The mapping from logical operators to physical operators aims at preserving isomorphism of computation trees. Hence, most logical operators have their counterpart in physical operators. For instance, a logical FILTER will be translated into a physical FILTER. See translation scheme below.
  6. These are the following exceptions:
    1. Logical Cogroup to be translated into a Physical LocalRearrange and GlobalRearrange.

    2. Chris mentioned that even Algebraic Functions are exceptions to this.

Logical Plan

The logical plan will consist of a directed acyclic graph (DAG) of with logical operators as the nodes, and data flow between the operators as the edges.

The focus of the logical operators is post parse stage checking (such as type checking), optimization, and translation to a physical plan. The logical operators will contain information that facilitates these objectives.

A list of the classes used to model the logical plan follows, with a description of the classes, and the interfaces defined for major classes, or in some cases one interface defined to give an example of a set of classes.

LogicalPlan

The class LogicalPlan will contain the collection of logical operators. It will not contain the edges between the operators. To see why, consider the following simple pig script:

a = load 'myfile';
b = filter a by $0 > 5;
store b into 'myfilteredfile';

This will generate a logical plan that looks something like this:

SimpleLP.jpg

Notice that the graph edges represent data flow between relational operators (LOAD, FILTER, STORE, PROJECT) and between expression operators (greater than, CONSTANT), and between the two types. This means that edges in the graph have meaning beyond simply input and output for a node. For example, the filter node has two inputs, the load node and its condition (in this case, the greater than node). These inputs however have different semantics, as tuples coming from the load input are evaluated based on the boolean result coming from the conditional input and then possibly passed on to the store node. Given the differing semantics of different inputs, it seems better to encode the edges of the graph in the logical operators themselves, rather than in a generic graph contain object.

In addition to containing the collection of logical operators, LogicalPlan will provide methods for callers to insert logical operators into the graph, and connect the inputs and outputs of operators. These connections will only be for data flow inputs and outputs, not contextual inputs such as the condition on a filter node. This strains the model described above somewhat, but it allows for generic manipulation of inputs and outpus of the operators without every visitor to the tree needing to understand all the different operator types.

The interface for LogicalPlan is:

public class LogicalPlan {
        private static final long serialVersionUID = 2L;

        protected PigContext mContext = null;

    protected Map<LogicalOperator, OperatorKey> mOps;
    protected Map<OperatorKey, LogicalOperator> mKeys;

    private List<LogicalOperator> mRoots;

        
        public LogicalPlan(PigContext pigContext) {
                ...
        }

        public LogicalOperator getRoots() {
                ...
    }

        public PigContext getPigContext() {
                return mContext;
        }

        public byte getOutputType(){
                return root.getOutputType();
        }

    /**
     * Given an operator, find its OperatorKey.
     * @param op Logical operator.
     * @return associated OperatorKey
     */
    public OperatorKey getOperatorKey(LogicalOperator op) {
        return mOps.get(op);
    }

    /**
     * Given an OperatorKey, find the associated operator.
     * @param opKey OperatorKey
     * @return associated operator.
     */
    public LogicalOperator getOperator(OperatorKey opKey) {
        return mKeys.get(opKey);
    }

    /**
     * Insert an operator into the plan.  This only inserts it as a node in
     * the graph, it does not connect it to any other operators.  That should
     * be done as a separate step using makeSuccessor or addSuccessor.
     * @param op Logical Operator to add to the plan.
     */
    public void add(LogicalOperator op) {
                ...
    }

    /**
     * Make one operator the <b>sole</b> input of another.  If that operator
     * already has an input, that operator will become the passed in
     * operator's input.  So, for example, if the plan current contains
     * three nodes:  a, b, c.  And a is currently c's input, and this
     * function is called makeInput(b, c), then a will become b's input
     * and b will become c's input.
     * @param op Operator to make input of another operator.
     * @param inputOf Operator to make op an input of.
     * @throws IOException if op or inputOf are not in the plan.
     */
    public void makeInput(LogicalOperator op,
                          LogicalOperator inputOf) throws IOException {
        ...
    }

    /**
     * Make one operator an <b>additional</b> input of another.  This can only
     * legally be called on operators that can have multiple inputs, such as
     * Cogroup, Generate, or BinaryExpression.
     * @param op Operator to make input of another operator.
     * @param inputOf Operator to make op an input of.
     * @throws IOException if op or inputOf are not in the plan.
     */
    public void addInput(LogicalOperator op,
                         LogicalOperator inputOf) throws IOException {
        ...
    }

    /**
     * Remove an operator from the plan.  Connections in the graph will be
     * reconnected after the operator is removed.  So if a is b's input and b
     * is c's input, and b is removed, then a will become c's input.
     * @param op Operator to revmove.
     * @throws IOException if op or inputOf are not in the plan.
     */
    public void remove(LogicalOperator op) throws IOException {
        ...
    }
}

LogicalOperator

All logical operators will be a subclass of LogicalOperator. LogicalOperator itself will contain lists of the inputs and outputs of the operator, the schema for the operator, and the data type of the operator.

abstract public class LogicalOperator {
    private static final long serialVersionUID = 2L;

    /**
         * Schema associated with this logical operator.
         */
    protected Schema mSchema;

    /**
     * OperatorKey associated with this operator.  This key is used to find the
     * operator in the LogicalPlan.
     */
    protected OperatorKey mKey;

    /**
     * Datatype of this output of this operator.  Operators start out with data type
     * set to UNKNOWN, and have it set for them by the type checker.
     */
    protected byte mType = DataType.UNKNOWN;

    /**
         * Requested level of parallelism for this operation.
         */
    protected int mRequestedParallelism;

    /**
         * References to an operators inputs
         */
    protected List<LogicalOperator> mInputs;

    /**
         * Back pointers so that the logical plan can be navigated in either direction.
         */
    protected List<LogicalOperator> mOutputs;

    /**
     * Equivalent to LogicalOperator(k, 0).
     * @param - k Operator key to assign to this node.
     */
    public LogicalOperator(OperatorKey k) {
        this(k, 0);
    }

    /**
     * @param - k Operator key to assign to this node.
     * @param = rp degree of requested parallelism with which to execute this node.
     */
    public LogicalOperator(OperatorKey k, int rp) {
                ...
    }
    
    /**
     * Get the operator key for this operator.
     */
    public OperatorKey getOperatorKey() {
        return mKey;
    }

    /**
     * Set the schema for this oeprator.
     * @param schema Schema to set.
     */
    public void setSchema(Schema schema) {
        mSchema = schema;
    }

    /**
     * Get a copy of the schema for the output of this operator.
     */
    public Schema getSchema() {
        return mSchema;
    }

    /**
     * Set the type of this operator.  This should only be called by the type
     * checking routines.
     * @param type - Type to set this operator to.
     */
    final public void setType(byte t) {
        mType = t;
    }

    /**
     * Get the type of this operator.
     */
    public byte getType() {
        return mType;
    }
    
    /**
     * Get a list of all inputs to the operator.
     */
    public List<LogicalOperator> getInputs() {
        return mInputs;
    }

    /**
     * Get a list of all outputs to the operator.
     */
    public List<LogicalOperator> getOutputs() {
        return mOutputs;
    }

    public abstract void visit(LOVisitor v) throws ParseException;
    
    public abstract String name();
    
    @Override
    public String toString() {
                ...
    }
}

Each of the relational operators will be modeled as a logical operator. There will be a class ExpressionOperator that extends LogicalOperator and represents all types of expression operators. The class hierarchy will look like:

Extenders of LogicalOperator:

  • LOLoad
  • LOStore
  • LO!ForEach

  • LOGenerate
  • LOFilter
  • LO!CoGroup

  • LOSort
  • LODistinct
  • LOProject
  • LO!MapLookup

  • LOStream
  • LOSplit
  • LOUnion
  • ExpressionOperator (abstract, represents all expression types)

Extenders of ExpressionOperator

  • BinaryExpressionOperator (abstract, represents all binary expressions)

  • UnaryExpressionOperator (abstract, represents all unary expressions)

  • LO!BinCond

  • LOConst (constant values)
  • LO!UserFunc (invocation of user defined function)

  • LOParend
  • LOCast

Extenders of BinaryExpressionOperator

Extenders of UnaryExpressionOperator

  • LONot
  • LONegative

Logical Plan Visitors

The method for accessing the logical plan will be a visitor class, !LOVisitor. This class will contain the logic for traversing logical plans. Any class that needs to operate on the plan should extend this class. The extending class need not provide logic to navigate the logical plan (unless it needs to navigate it in some non-standard way). It just needs to provide logic for the specific operations it wants to do on the tree.

LOVisitor:

abstract public class LOVisitor {

    /**
     * Extenders of this class should implement this to either call 
     * depthFirst() dependencyOrder().  If order is not important, 
     * depthFirst() should be called, as it's faster.  If it is important
     * that your nodes only be visited after all the nodes they depend on
     * have been visited than you should call dependencyOrder() instead.
     */
    public abstract void visit();

     /**
     * Only LOFilter.visit() and subclass implementations of this function
     * should ever call this method.
     */
    void visitFilter(LOFilter f) throws ParseException {
        f.getCondition().visit(this);
        f.getInput().visit(this);
    }

        // And so on with other LO operators

    /**
     * Visit the graph in a depth first traversal.
     */
    private void depthFirst() {
        // TODO
    }

    /**
     * Visit the graph in a way that guarantees that no node is visited before
     * all the nodes it depends on (that is, all those giving it input) have
     * already been visited.
     */
    private void dependencyOrder() {
        // TODO
    }
}

Physical Plan Structure

TBD

Logical to Physical Translation Scheme

FILTER

FILTERTranslation.png

GROUP

The logical operator co-group would be converted to 3 physical operators the Local Rearrange, Global Rearrange and Package as shown below:

Group.png

There will be a Local Rearrange operator for each input which will aggregate to a Global Rearrange followed by a Package as shown below:

GroupPhy.png

The Local Rearrange takes the input tuple and outputs a key, value pair with the group field as the key and the tuple as the value. For eg., (1,R) will be converted to {1,(1,R)}. Also the tuple is tagged with the input index it originated from. In our case, if (1,R) came from A it would be tagged 1 and if it was from B it would be tagged 2.

The Global Rearrange converts the kev-value pairs of keys belonging to a partition into a set of (key, list of values). The partition is decided by which reducer the Global Rearrange is catering to. This need not be implemented by us as this is the intermediate step that happens between mapper and reducer.

The Package just takes each key, list of values and puts it in appropriate format as required by the co-group. So lets say we have (1,R),(2,G) in A and (1,B), (2,Y) in B. If there are two reducers, Global Rearrange catering to reducer 1 will have {1,{(1,R),(1,B)}} as the key, list of values which should be converted into an output tuple for co-group based on the tagged index of the tuples in the list. So this would be converted to {1,{(1,R)},{(1,B)}}. Similarly, {2,{(2,G),(2,Y)}} will be converted to {2,{(2,G)},{(2,Y)}} by reducer 2.

Logical to Physical Stubs and API's

Below are code samples that define possible APIs for the physical pipeline implementation. The fragments aim at validating how pull model and splitting can be used in the generation of a physical plan from a logical plan.

The sample code does not take advantage of type information at this point. Actual implementation would use type information as made available from the logical plan.

The example focuses on the case of the FILTER operator, pull model (via iterators), and splitting.

package org.apache.pig.optimization;

import java.util.Iterator;
import java.util.Vector;

public abstract class PhysicalOperator {

        public Vector<PhysicalOperator> inputs;
        
        public PhysicalOperator() {
                inputs = new Vector<PhysicalOperator>();
        }
        
        public abstract Iterator<Object> iterator();
}

package org.apache.pig.optimization;

import java.util.Iterator;

public class POFilter extends PhysicalOperator {

        public PhysicalOperator input;
        public PhysicalOperator condition;
        
        public POFilter(PhysicalOperator input,
                       PhysicalOperator condition) {
                this.input = input;
                this.condition = condition;
        }
        
        private class POFilterIterator implements Iterator<Object> {
                private Iterator<Object> inputIter;
                private Iterator<Object> condIter;
                
                public POFilterIterator(Iterator<Object> inputIter,
                                             Iterator<Object> condIter) {
                        this.inputIter = inputIter;
                        this.condIter = condIter;
                }
                
                public Object next() {
                        Object nextVal = null;
                        
                        while(inputIter.hasNext()) {
                                if (((Boolean)condIter.next()).booleanValue() == true) {
                                        nextVal = inputIter.next();
                                        break;
                                }
                                else {
                                        // skip val
                                        inputIter.next();
                                }
                        }
                        
                        return nextVal;
                }
                
                public boolean hasNext() {                      
                        assert(inputIter.hasNext() ==
                                   condIter.hasNext());
                        
                        return inputIter.hasNext();
                }
                
                public void remove() {
                        throw new RuntimeException("Not supported");
                }
        }
        
        public Iterator<Object> iterator() {
                return new POFilterIterator(input.iterator(),
                                                  condition.iterator());
        }
}

package org.apache.pig.optimization;

import java.util.Iterator;
import java.util.Vector;
import java.util.Map;
import java.util.HashMap;

public class POSplit extends PhysicalOperator {

        // most naive implementation:
        // all is cached, saving comes from sharing the same
        // values without recomputing them from the split input
        //
        Vector<Object> buffer;
        
        // key = split reader
        // val = position in buffer that reader key is at
        //
        Map<POSplitReader, Integer> readerMap;
        
        Iterator<Object> iter;
        
        public POSplit(PhysicalOperator input) {
                super();
                inputs.add(input);
                buffer = new Vector<Object>();
                readerMap = new HashMap<POSplitReader, Integer>();
                iter = null;
        }
        
        public PhysicalOperator[] addReaders(int num) {
                PhysicalOperator[] result = new PhysicalOperator[num];
                for (int i = 0; i < num; ++i) {
                        result[i] = new POSplitReader(this);
                        
                        readerMap.put((POSplitReader)result[i],
                                                  new Integer(-1));
                }
                return result;
        }
        
        private class POSplitIterator implements Iterator<Object> {
        
                private POSplitReader reader;
                private POSplit source;
                
                public POSplitIterator(POSplitReader reader,
                                                       POSplit source) {
                        this.reader = reader;
                        this.source = source;
                }
                
                public boolean hasNext() {
                        int readerPos = source.readerMap.get(reader).intValue();
                        int bufferSize = source.buffer.size();
                        
                        if (readerPos + 1 == bufferSize) {
                                // need to bring one value in if possible
                                //
                                if (source.iter.hasNext()) {
                                        return true; //there is more stuff, not cached yet
                                }
                                else {
                                        return false; // reached the end
                                }
                        }
                        else {
                                return true; // next value is cached already
                        }
                }
                
                public Object next() {
                        int readerPos = source.readerMap.get(reader).intValue();
                        int bufferSize = source.buffer.size();
                        
                        if (readerPos + 1 == bufferSize) {
                                Object nextVal = source.iter.next();
                                
                                if (nextVal == null) {
                                        return null;
                                }
                                else {
                                        source.buffer.add(nextVal);
                                }
                        }

                        ++readerPos;
                        source.readerMap.put(reader, new Integer(readerPos));
                                
                        return source.buffer.elementAt(readerPos);
                }
                
                public void remove() {
                        throw new RuntimeException("Unsupported");
                }
        }
        
        public Iterator<Object> iterator(POSplitReader reader) {
                iter = inputs.get(0).iterator();
                return new POSplitIterator(reader, this);
        }
        
        public Iterator<Object> iterator() {
                throw new RuntimeException("Do not Iterate directly but Use a Split Reader");
        }
}

package org.apache.pig.optimization;

import java.util.Iterator;

public class POSplitReader extends PhysicalOperator {

        public POSplit source;
        
        public POSplitReader(POSplit source) {
                super();
                this.source = source;
        }
        
        public Iterator<Object> iterator() {
                return source.iterator(this);
        }
}

package org.apache.pig.optimization;

public class PhysicalCompiler {

        public static PhysicalOperator compile(LogicalOperator lo) {
                IR ir = new IR(lo);
                
                return ir.compile();
        }
}

---

package org.apache.pig.optimization;

import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

/**
 * In some cases we may need some pre-processing...
 *
 */
class IR {
        LogicalOperator root;
        
        // key = logical node 
        // val = vector of nodes under root that match key
        //
        Map<LogicalOperator, Vector<LogicalOperator>> nodeMap;
        
        // key = logical node
        // val = vector of physical operators to use in the translation
        //       process
        //
        Map<LogicalOperator, Vector<PhysicalOperator>> translationMap;
        
        IR(LogicalOperator root) {
                this.root = root;
                this.nodeMap = new HashMap<LogicalOperator,
                                                 Vector<LogicalOperator>>();
                this.translationMap = new HashMap<LogicalOperator,
                                                          Vector<PhysicalOperator>>();
        }       
        
        private void doMapNode(LogicalOperator root,
                                                   LogicalOperator nodeToMap) {
                if (root.equals(nodeToMap)) {
                        Vector<LogicalOperator> map = nodeMap.get(nodeToMap);
                        
                        map.add(root);
                }
                else {
                        for (LogicalOperator newRoot : root.inputs) {
                                doMapNode(newRoot, nodeToMap);
                        }
                }
        }
        
        public void mapNode(LogicalOperator nodeToMap) {
                if (nodeMap.get(nodeToMap) == null) {
                        nodeMap.put(nodeToMap, new Vector<LogicalOperator>());
                        
                        doMapNode(root, nodeToMap);
                }
        }
        
        public int getNodeMapOccurence(LogicalOperator mapNode) {
                mapNode(mapNode);
                
                Vector<LogicalOperator> map = nodeMap.get(mapNode);
                
                if (map != null) {
                        return map.size();
                }
                else {
                        return 0;
                }
        }
        
        public void translateNodeTo(LogicalOperator nodeToTranslate,
                    PhysicalOperator[] translation) {
                Vector<PhysicalOperator> vec = new Vector<PhysicalOperator>();
                
                for (PhysicalOperator po : translation) {
                        vec.add(po);
                }
                
                translateNodeTo(nodeToTranslate,
                                   vec);
        }

        
        public void translateNodeTo(LogicalOperator nodeToTranslate,
                                         Vector<PhysicalOperator> translation) {
                mapNode(nodeToTranslate);
                
                Vector<LogicalOperator> map = nodeMap.get(nodeToTranslate);
                
                if (map.size() != translation.size()) {
                        throw new RuntimeException("Mismatch!");
                }
                
                translationMap.put(nodeToTranslate, translation);
        }
        
        private POFilter compileLOFilter(LOFilter filter) {
                assert(filter.condition.type == LogicalOperator.PIG_TYPE.BOOLEAN);
                
                PhysicalOperator physicalInput = (new IR(filter.input)).compile();
                
                IR booleanCondIR = new IR(filter.condition);
                
                POSplit split = new POSplit(physicalInput);
                
                PhysicalOperator splitReads[] = 
                                split.addReaders(1 +
                                                    booleanCondIR.getNodeMapOccurence(filter.input));
                
                booleanCondIR.translateNodeTo(filter.input, splitReads);
                
                PhysicalOperator booleanCond = booleanCondIR.compile();
                
                POFilter result = new POFilter(physicalInput,
                                                     booleanCond);
                
                return result;
        }

        public PhysicalOperator compile() {
                if (root instanceof LOFilter) {
                        return compileLOFilter((LOFilter) root);
                }
                else {
                        throw new RuntimeException("Unsupported Logical Operator");
                }
        }
}

Stubs for LocalRearrange and Package No need to provide an implementation for GlobalRearragnge as its taken care of by the shuffle phas. However, for performance improvements, we can add an identity combiner so that the local rearrange outputs a <groupKey,list of tuples>. In this case, we would have to slightly change the part where input is provided to POPackage as we need to merge the list of lists into a flattened Bag or Bag of Bags. If we have a bag of bags, then there will be a slight improvement in performance as all Tuples inside the inner bag will belong to the same input and we just need to put this bag into its appropriate place.

package org.apache.pig.optimization;

import java.io.IOException;
import java.util.Iterator;

import org.apache.pig.data.Tuple;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;

public class POLocalRearrange extends PhysicalOperator {

        public PhysicalOperator input;
        public PhysicalOperator groupCondition;
        //The input index
        public int index;
        
        public POLocalRearrange(PhysicalOperator groupCondition, int index) {
                this.input = super.inputs[0];
                this.groupCondition = groupCondition;
                this.index = index;
        }
        
        private class POLRIterator implements Iterator<Object> {
                private Iterator<Object> inputIter;
                private Iterator<Object> gcIter;
                
                public POLRIterator(Iterator<Object> inputIter,
                                             Iterator<Object> gcIter) {
                        this.inputIter = inputIter;
                        this.gcIter = gcIter;
                }
                
                public Tuple next() {
                        if(!inputIter.hasNext()) return null;
                        //create a tuple(groupKey,Tuple) tagged with input index
                        Tuple res = new IndexedTuple(index,gcIter.next(),inputIter.next());
                        return res;
                }
                
                public boolean hasNext() {                      
                        assert(inputIter.hasNext() ==
                                   gcIter.hasNext());
                        
                        return inputIter.hasNext();
                }
                
                public void remove() {
                        throw new RuntimeException("Not supported");
                }
        }
        
        public Iterator<Object> iterator() {
                return new POLRIterator(input.iterator(),
                                                  groupCondition.iterator());
        }
}

package org.apache.pig.optimization;

import java.util.Iterator;

import org.apache.pig.data.Tuple;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;

public class POPackage extends PhysicalOperator{
        //Assuming the input is appropriately set to load from 
        //<key,list of vals> passed to the reducer by Hadoop
        PhysicalOperator input;
        
        public POPackage(PhysicalOperator input){
                this.input = input;
        }
        
        private class POPkgIterator implements Iterator<Object>{
                Iterator<Object> inputIter;
                
                public POPkgIterator(Iterator<Object> inputIter){
                        this.inputIter = inputIter;
                }
                
                public Tuple next(){
                        if(!inputIter.hasNext()) return null;
                        //Assuming the input Tuples will be of the form <groupKey, Bag of Tuples>
                        Tuple t = inputIter.next();
                        Tuple res = new Tuple();
                        res.addField(t.getField(0));
                        for(IndexedTuple bt : t.getField(1)){
                                //Add the tuple value at the appropriate index
                                res.addField(bt.getIndex(),bt.getField(0));
                        }
                        return res;
                }
                
                public boolean hasNext(){
                        return inputIter.hasNext();
                }
        }
        
        public Iterator<Object> iterator(){
                return new POPkgIterator(input.iterator());
        }
}

There is a problem with the above model when it is working on a nested bag. For example consider the following script:

A = load 'a';
B = group A by $1;
C = foreach B {
            D = filter A by $0<=2;
            generate D;
    }

A:
(1,R)
(2,R)
(3,B)
B:
(R,{(1,R),(2,R)})
(B,{(3,B)})

For each tuple in B the filter in the above script works on the bag nested inside the tuple. Since this is an explicit bag, and in the above model, a project operator would not be able to handle an explicit bag. It would just pass the bag instead of streaming its contents. Hence the project inside the condition of the filter will fail. As a solution, we want to propose overloading the Project operator to check if its input is a bag of tuples and if so stream its contents instead of passing the entire bag.

phynested.png

As can be seen from the above figure, the overloaded Project operator indicated in the figure works even with the nested bag.

[PipelineImpl Pipeline Implementation]: Plan and Issues

PigExecutionModel (last edited 2009-09-20 23:38:19 by localhost)