Nested Logical Plan Model

{!!!draft 0.2 !!!}

Preface

At least, I would expect us to better understand Pig language and logical model conceptually. Some concepts might be useful for implementations. Some concepts might be too far from practical applications.

Benefits

What is Logical Plan?

A logical plan is an isomorph of a Pig query. It captures all the information needed for logical execution (an abstract execution performed sequentially by logical reasoning). Any implementation of actual physical plan and physical execution engine always yields the same result as logical execution regardless of underlying backends or optimization techniques.

For example, one can think about it this way:-

3 * (4 + 5)

The logical execution of this plan should always yield 27 regardless of whether we use a calculator or a Macbook to process it, or whether we distribute 3 over addition or not.

Practically a logical plan can be associated with additional useful information/flags to dictate some of physical execution's behaviors (eg. parallelism degree or optimization hints). This has to be considered carefully to keep the language generic.

Also, from the definition given above, we will need a reference execution engine implementation when we support more backends.

What is Load/Store?

In current implementation

In inner plan

Abstraction

Load/Store bridges data from one space to another space (and from one type to another type in most cases).

By above abstraction, it is possible to define different Load/Store :-

From Type

To Type

Legacy Load

file

bag

Legacy Store

bag

file

LoadTuple

bag

tuple

StoreTuple

Tuple

Bag

LoadField

Tuple

Field

StoreField

Field

Tuple

  1. This makes it possible to create a consistent nested processing model.
  2. This gives more flexibility to the language beyond plans only consisting of bag-based operators.

What is Pig Query?

From the query:-

A = LOAD '/tmp/data1.txt' ;
B = COGROUP A BY $0*$1 ;
C = FILTER B BY $0 > 5 ;

Functionally:-

A : File -> Bag
B : Bag x (f: Tuple -> Tuple) -> Bag
C : Bag x (f: Tuple -> Boolean) -> Bag

By

  1. Neglecting "A" which we consider as the bridge between file system and Pig processing space
  2. Focusing on only main components of the data flow

We can write:-

(C o B) : Bag -> Bag

We can say "A Pig query is a function from Bag to Bag" :-

f: Bag -> Bag         This is our current logical plan.

By generalizing it, we should be able to say:-

f: Datum -> Datum     This is generalized logical plan

This abstraction indicates that Pig query (and logical plan as its isomorph) should be able to process from any data type to any data type.

What is nested plan?

From the query:-

A = LOAD '/tmp/data1.txt' ;
B = COGROUP A BY $0*$1 ;
C = FILTER B BY $0 > 5 ;

By considering COGROUP operator, these are operations that have to be taken sequentially:-

  1. Iterate through the bag from input port
  2. For each tuple in the bag, calculate $0 * $1, append it as an element in the tuple, and tag it as field being used for grouping.
  3. Once (2) is complete for the whole bag, start grouping by the tagged field.
  4. Output data to the output port.

This is generalized version:-

  1. Iterate through the bag from input port
  2. For each tuple in the bag, apply f: Tuple -> Tuple , append output tuple as an element in the tuple, and tag it as field being used for grouping.

  3. Once (2) is complete for the whole bag, start grouping by the tagged field.
  4. Output data to the output port.

From the generalized logical plan which is:-

f: Datum -> Datum

This indicates that "f: Tuple -> Tuple in COGROUP operator is also a logical plan". This maps to the actual implementation that we need an inner logical plan for COGROUP operator.

Applications

For example, if I have implemented a UDF called TupleMinMax of type Tuple -> Tuple that does find MIN/MAX across all the elements. One way to try this out in Grunt would be:-

A = <1,2,3,4,5,6,7,8,9,10> ;
B = TupleMinMax(A) ;
dump B ;

Executing the query..........
B = <1,10>

The generated plan may look like this:-

Constant(<1,2,3,4,5,6,7,8,9,10>)
              |
              |
       UDF(TupleMinMax())

Now I want to do real data processing using TupleMinMax. I may do like this:-

X = LOAD '/tmp/data1.txt' ;
Y = FILTER X BY ELEM($_) > 0 ;  
Z = FOREACH X  {
                S = TupleMinMax(X) ;
                GENERATE S.$0 * S.$1, S.$0 + 2 ;  
               } ;
STORE Z INTO '/tmp/output' ;

NOTE: $0, $1, $2, ... only refer to elements in tuple so I imagine I could use $_ for the whole tuple

The top level plan may look like this:-

                  Load('/tmp/data1.txt')
                          |
                          |
                        FILTER
                          |
                          |
                        FOREACH 
                          |
                          |
                   Store('/tmp/output')

The inner plan in FILTER may look like this :-

                       LoadTuple
                           |
                           |
                         ELEM (Count the number of elements in tuple)
                           |
                           |
                      GreaterThan 0
                           |
                           |
                       StoreField (Will be boolean)

NOTE: This inner plan applies to each tuple in the input bag. The FILTER operator does iterate through the bag, giving out each tuple to the inner plan, and takes output of the inner plan which is boolean atom in this example. The data atom then is used by FILTER to determine whether to forward this tuple to the output port.

The inner plan in FOREACH may look like this:-

                       LoadTuple
                           |
                           |
                     UDF(TupleMinMax())
                       /         \ 
                      /           \
             FieldProject(0)   FieldProject(1)
                  /    |          |
     Const(2)--PLUS     \        /
                 |       \      / 
                 |        MULTIPLY
                 |          |
                 |          |
            InsertField(1)  InsertField(0)
                 \          /
                  \        /
                  StoreTuple

NOTE: In the real implementation, separating inner plan for each output field might be simpler to do. For example "GENERATE $1+$2, ($1+$2)*5" can be a plan for $1+$2 and a plan for ($1+$2)*5 so that we don't have to care about merging them all. /!\ Open question /!\

[shrav] Pig already kind of does what you are saying here; just that it does it implicitly. The loadTuple is infact what happens when a nested plan is processed. I guess the way to extend the language would be to just allow all the operators that we allow outside a nested plan inside of it. In fact, the execution side, that is the Physical side, already supports it. Just that we need to make appropriate parser changes and the hard thing would be to do type checking and parsing itself.

[pi] I think in logical plan change it would be better to have something that indicates the link between outer/inner. I don't find any existing operator fit in this. I agree that it doesn't have to be explicit.

More examples

Given GENERATE: Tuple -> Tuple and we would like to generate { $1, ($2+5) * $1 } from { $1, $2, $3, $4 }

The plan may look like this:-

            LoadTuple
            /        \
FieldProject(1)       FieldProject(2)                   
   |\                      |                    
   | \                     |  --------Constant(5)
   |  \                    | /
   |   \                  ADD
   |    \                  |
   |     \                 |
   |      ----------------\|
   |                      MUL 
   |                       |
InsertField(1)           InsertField(2)
     \                    /
      \ _____        ____/
             \      /
            StoreTuple

Diagram B1 [shrav] Are you saying that pig does not support this now?

[Pi] No, this wiki page is solely from my imagination.

This looks similar to a common relational plan:-

 Load('file1')         Load('file2')
   |                       |
  Filter                 Foreach
   |                       |
   |                      /
   |   -------------------
   |  /                     
  Cogroup
   |
  Store('outputfile')

Diagram B2

This allows us to fully utilize the notion of inner plans as we now can define our processing model using recursive definition.

From diagram B2 and Table C, Foreach is an operator that accepts bag and outputs bag. The internal mechanism is to iterate though all the tuples in the input bag. For each tuple, it performs data processing as defined in its inner plan. A plan that does accept tuple and outputs tuple like B1 can be embedded in the FOREACH operator.

Dynamic Execution Engine Selection

This might not be relevant :)

Sample use case

We've got log files stored in different physical locations where the directory structure indicates time dimension. We want to be able to process them all at once.

Here is the directory structure:-

We may extend the LOAD statement to be able to do something like this:-

X = LOAD '/data/01_2008', '/data/02_2008', '/data/03_2008' 
    AS { <'01_2008', $0 >, <'02_2008', $1 >, <'03_2008', $2 > } ;    /* Alias matching here will be a bit tricky. */

As can be seen here, $0, $1, $2 are bags embedded in tuples in a bigger bag .

One may write :-

Y = ForEach X   
         {
            A = FILTER $1 BY $1.$0 > 100 ;
            B = DISTINCT $1 ; 
            GENERATE $0, FLATTEN B ;
         } ; 

In terms of execution plan model, Pig currently supports nested query like above. But due to the fact that the execution engine selection is static (Output plan ==> MapReduce, Inner plan ==> Local Engine), the execution of the above query is unlikely to finish (Given that each input file is huge).

To make the processing engine more flexible, the decision whether to use distributed backend or Local engine to process a plan should be dynamic. An easy way to do this might be to define the processing size threshold:-

ProcessingSize > K        Use distributed engine
Otherwise                 Use local engine

One naive way to measure the ProcessingSize is to use the input data size.

Current Logical Operators as Higher-Order Functions

Table C

FILTER :    Bag x (f: Tuple -> Boolean ) -> Bag

COGroup :   [ Bag, f: Tuple -> Tuple ]^n -> Bag

FOREACH  :  Bag x (f: Tuple -> Tuple) -> Bag

GENERATE :  Tuple x (list of flatten flags) -> Tuple
Note: If we just forget the flatten flags of GENERATE, it can be reduced to "GENERATE: Tuple -> Tuple"

PROJECT :   Tuple x (list of indices) -> Tuple

CROSS :     Bag x Bag -> Bag

ORDER :     Bag x (f: Tuple x Tuple -> CompareResult) -> Bag

DISTINCT:   Bag x (f: Tuple x Tuple -> CompareResult) -> Bag

UNION:      Bag x Bag -> Bag

SPLIT:      Bag x [f: Tuple -> Boolean ]^n -> Bag^n

JOIN :      This can be constructed by COGroup

[shrav] GENERATE looks oversimplified to me. First the input need not just be a tuple, it can be a combination of tuple and bag and flatten in that case actually produces the cartesian product. ALso in FOREACH, the function inside can be a full plan. So it can process bags as well and not just tuples.

[pi] f: Tuple x Tuple is the full plan inside ForEach. GENERATE in here is still ambiguous (and might not be correct) until we clearly separate responsibilities with its counterpart.

Problems with current Operators (5-May-2008)

LOGenerate

- In top level query, LOGenerate is just a splitter in the syntax (And possibly makes it easy to read). The main operator that has functionaly is LOForEach.

X = FOREACH Y GENERATE $0, $1 ;

- In nested query, GENERATE is just a marker before the list of expected output. Again, all the processing will go to the LOForEach.

Y = ForEach X   
         {
            A = DISTINCT $1 ; 
            GENERATE $0, FLATTEN B ;
         } ;

is equivalent to

Y = ForEach X GENERATE $0, FLATTEN(DISTINCT($1)) ;

Seems like LOGenerate is not needed at all. GENERATE is more like just a part of FOREACH syntax (analogous to BY and FILTER)

[shrav] I don't agree with this. In fact it is the other way round. The Foreach is dummy while the generate does all the work. The foreach just takes each input and uses the generate specification to process the input tuple. The generate spec is the one that defines the transformation.

[pi] We can think about this in two ways: first, only one of them do all the work. Second, we split responsibilities. I'm confused with what it is. We should come up with clear cut of responsibilies. Though, if you say "foreach just takes each input and uses", then it is not a dummy.

[pi] This is one possible way to describe internal operations of FOREACH GENERATE:-

Operator FOREACH:

FOREACH: Bag x (f: Tuple -> Tuple) x  (list of flatten indexes) -> Bag
  1. Iterate through the bag from input port
  2. For each tuple in the bag, apply f: Tuple -> Tuple (Which is the inner plan)

  3. Flatten and put all the output tuples to the output bag. Repeat previous step again.
  4. Output bag to the output port.

This way we don't need GENERATE and only use a normal inner plan in FOREACH . The list of flatten flags is belong to FOREACH.

LOProject

This operator is only for mapping input tuple to output tuple (eg. {A,B,C,D,E} ==> {A,C,D} ). Given the fact that we allow users to have fields in COGROUP, FILTER, FOREACH as expressions, LOProject then becomes just a special case when users merely specify direct mapping. Since we have agreed upon the concept of inner plans, I think LOProject is not needed.

[shrav]Project is a consistent way implementing these fields that the user mentions without letting the user bother about all the conversions he might need to do if we just pass the raw tuple to him. Also you can only project out one field and not multiple fields.

[pi] What you mentioned here is different from the current implementation.

NestedLogicalPlan (last edited 2009-09-20 23:38:21 by localhost)