Differences between revisions 23 and 24
Revision 23 as of 2009-05-26 23:50:40
Size: 34455
Editor: fa0-1-wlan-rtr
Revision 24 as of 2009-09-20 23:38:12
Size: 34479
Editor: localhost
Comment: converted to 1.6 markup
Deletions are marked like this. Additions are marked like this.
Line 1: Line 1:
[[Anchor(Multi-query_Performance)]] <<Anchor(Multi-query_Performance)>>
Line 9: Line 9:
[[Anchor(External)]] <<Anchor(External)>>
Line 12: Line 12:
[[Anchor(Use_cases:)]] <<Anchor(Use_cases:)>>
Line 15: Line 15:
[[Anchor(Explicit/implicit_split:)]] <<Anchor(Explicit/implicit_split:)>>
Line 52: Line 52:
[[Anchor(Storing_intermediate_results)]] <<Anchor(Storing_intermediate_results)>>
Line 67: Line 67:
[[Anchor(Why?)]] <<Anchor(Why?)>>
Line 78: Line 78:
[[Anchor(Changes)]] <<Anchor(Changes)>>
Line 81: Line 81:
[[Anchor(Execution_in_batch_mode)]] <<Anchor(Execution_in_batch_mode)>>
Line 105: Line 105:
[[Anchor(Run)]] <<Anchor(Run)>>
Line 120: Line 120:
[[Anchor(Exec)]] <<Anchor(Exec)>>
Line 133: Line 133:
[[Anchor(Explain)]] <<Anchor(Explain)>>
Line 167: Line 167:
[[Anchor(Turning_off_multi_query)]] <<Anchor(Turning_off_multi_query)>>
Line 183: Line 183:
[[Anchor(Error_Handling)]] <<Anchor(Error_Handling)>>
Line 214: Line 214:
[[Anchor(Incompatible_Changes)]] <<Anchor(Incompatible_Changes)>>
Line 219: Line 219:
[[Anchor(Path_Names_and_Schemes)]] <<Anchor(Path_Names_and_Schemes)>>
Line 254: Line 254:
[[Anchor(HBaseStorage)]] <<Anchor(HBaseStorage)>>
Line 276: Line 276:
[[Anchor(Implicit_Dependencies)]] <<Anchor(Implicit_Dependencies)>>
Line 308: Line 308:
[[Anchor(Execution_Plans)]] <<Anchor(Execution_Plans)>>
Line 315: Line 315:
[[Anchor(Implicit_vs_Explicit)]] <<Anchor(Implicit_vs_Explicit)>>
Line 355: Line 355:
[[Anchor(Map_only_splittee)]] <<Anchor(Map_only_splittee)>>
Line 376: Line 376:
attachment:map-only.png {{attachment:map-only.png}}
Line 393: Line 393:


Line 417: Line 417:
attachment:map-mapreduce.png {{attachment:map-mapreduce.png}}
Line 438: Line 438:
attachment:mapreduce-mapreduce.png {{attachment:mapreduce-mapreduce.png}}
Line 444: Line 444:
[[Anchor(Multi_input_splittee)]] <<Anchor(Multi_input_splittee)>>
Line 451: Line 451:
[[Anchor(Store_load_bridge)]] <<Anchor(Store_load_bridge)>>
Line 475: Line 475:
attachment:load-store-rev.png {{attachment:load-store-rev.png}}
Line 493: Line 493:


Line 516: Line 516:
[[Anchor(Internal)]] <<Anchor(Internal)>>
Line 519: Line 519:
[[Anchor(Grunt_parser)]] <<Anchor(Grunt_parser)>>
Line 556: Line 556:
[[Anchor(Explain)]] <<Anchor(Explain)>>
Line 575: Line 575:
[[Anchor(Map Reduce Engine)]] <<Anchor(Map Reduce Engine)>>
Line 583: Line 583:
[[Anchor(MRCompiler)]] <<Anchor(MRCompiler)>>
Line 597: Line 597:
[[Anchor(MultiQueryOptimizer)]] <<Anchor(MultiQueryOptimizer)>>
Line 615: Line 615:
[[Anchor(MapReduceLauncher)]] <<Anchor(MapReduceLauncher)>>
Line 628: Line 628:
[[Anchor(JobControlCompiler)]] <<Anchor(JobControlCompiler)>>
Line 644: Line 644:
[[Anchor(Store Operator)]] <<Anchor(Store Operator)>>
Line 657: Line 657:
[[Anchor(Split_operator)]] <<Anchor(Split_operator)>>
Line 668: Line 668:
[[Anchor(DemuxOperator)]] <<Anchor(DemuxOperator)>>
Line 682: Line 682:
[[Anchor(PartitionScheme)]] <<Anchor(PartitionScheme)>>
Line 699: Line 699:
[[Anchor(Local_Execution_engine)]] <<Anchor(Local_Execution_engine)>>
Line 712: Line 712:
[[Anchor(Performance Results)]] <<Anchor(Performance Results)>>

Multi-query Performance

Currently scripts with multiple store commands can result in a lot of duplicated work. The idea how to avoid the duplication is described here: https://issues.apache.org/jira/browse/PIG-627

Note: In this document <handle> means <alias> ::= pig identifier


Use cases:

Explicit/implicit split:

There might be cases in which you want different processing on separate parts of the same data stream. Like so:

A = load ...
split A' into B if ..., C if ...
store B' ...
store C' ...


A=load ...
B=filter A' ...
C=filter A' ...
store B' ...
store C' ...

In the current system the first example will dump A' to disk and then start jobs for B' and C'. In the second example Pig will execute all the dependencies of B' and store it. And then execute all the dependencies of C' and store it.

Both of the above are equivalent, but the performance will be different.

Here's what the multi-query feature does to increase the performance:

  • In the second case we add an implicit split to transform the query to case number one. That eliminates the processing of A' multiple times.
  • Make the split non-blocking and allow processing to continue. This helps to reduce the amount of data that has to be stored right at the split.
  • Allow multiple outputs from a job. This way we can store some results as a side-effect of the main job. This is also necessary to make the previous item work.
  • Allow multiple split branches to be carried on to the combiner/reducer. This reduces the amount of IO again in the case where multiple branches in the split can benefit from a combiner run.

Storing intermediate results

Sometimes people will store intermediate results.

A=load ...
store A'
store A''

If the script doesn't re-load A' for the processing of A the steps above A' will be duplicated. This is basically a special case of Number 2 above, so the same steps are recommended. With the proposed changes the script will basically process A and dump A' as a side-effect. Which is what the user probably wanted to begin with.


Pig's philosophy is: Optimize it yourself, why don't you.


  • Implicit splits: It's probably what you expect when you use the same handle in different stores.
  • Store/Load vs Split: When optimizing, it's a reasonable assumption that splits are faster than load/store combinations
  • Side-files: Side-files (multiple output from a single map-reduce job) is available in hadoop, but cannot be made use of in pig in the current system.


Execution in batch mode

Batch mode is entered when Pig is given a script to execute (e.g.: invoking with a script as parameter, or using the "-e" or "-f" Pig options) Interactive mode is on the grunt shell ("grunt:>"). There wasn't much difference between them. In order for us to optimize the multi-query case, we needed to distinguish the two some more.

Right now whenever the parser sees a store (or dump, illustrate) it will kick of the execution of that part of the script. Part of the changes was to change batch mode to parse the entire script first and see if we can combine things to reduce the overall amount of work that needs to be done. Only after that will the execution start.

The high level changes are:

  • Store does not trigger an immediate execution. The entire script is considered before the execution starts.
  • Explicit splits will be put in places where a handle has multiple children.
  • Multiple split branches/stores in the script will be combined into the same job, if possible.

Some problems with this:

  • Explain works on handles, which only gives you a slice of the entire script execution at a time. What's more, is that at the point they may occur in a script they might not give you an accurate picture about the situation, since the execution plans might change once the entire script is handled.
  • Debugging on the grunt shell is more complicated, since scripts run differently that what one might type on the shell.

Additional changes therefore are:

  • Add a run/exec commands to the shell to execute a script in interactive or batch mode for debugging.
  • Add scripts as a target (in additions to handles) to explain.
  • Add dot as an output type to explain (a graphical explanation of the graph will make multi-query explains more understandable.)


(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same as requested there)

The new command has the format:

run [-param <key>=<value>]* [-param_file <filename>] <script name>

Which runs the script in interactive mode, so every store triggers execution. The statements from the script are put into the command history and all the handles defined in the script can be referenced in subsequent statements after the run command has completed. Issuing a run command on the grunt command line has basically the same effect as typing the statements manually.


The new command has the format:

exec [-param <key>=<value>]* [-param_file <filename>]* [<script name>]

Which will run the script in batch mode. Store statements will not trigger execution; Rather the entire script will be parsed before execution starts. Unlike the "run" command, exec does not change the command history or remembers the handles used inside the script. Exec without any parameters can be used in scripts to force execution up to the point in the script where the exec occurs.


Changes to the command:

explain [-out <path>] [-brief] [-dot] [-param <key>=<value>]* [-param_file <filename>]* [-script <scriptname>] [<handle>]


  • If explain is given a script without a handle, it will output the entire execution graph (logical, physical, MR)
  • If explain is given a script with a handle, it will output the plan for the handle given
  • If no script is given, explain works as before


  • Text will give what we have today, dot will output a format that can be passed to dot for graphical display.
  • In Text mode, multiple output (split) will be broken out in sections.
  • Default (-dot): Text


  • Will generate logical_plan.[txt||dot], physical_plan.[text||dot], exec_plan.[text||dot] in the specified directory.

  • Default (no path given): Stdout


  • Does not expand nested plans (presenting a smaller graph for overview)


  • Allows for param substitution in scripts.

Turning off the multi-query optimization

By default the multi-query optimization is enabled and scripts execution will be handled accordingly. If it is desired to turn off the optimization and revert to "execute-on-store" behavior, the "-M" or "-no_multiquery" switches can be used.

In order to run script "foo.pig" without the optimization, execute pig as follows:

$ pig -M foo.pig
$ pig -no_multiquery foo.pig

If pig is launched in interactive mode with this switch "exec" statements are also going to run in interactive mode.

Error Handling

With multiquery Pig processes an entire script or a batch of statements at once. By default Pig tries to run all the jobs that result from that - regardless of whether some jobs fail during execution. A user has the following options to check which jobs have succeeded or failed:

Pig logs all successful and failed store commands. Store commands are identified by output path. At the end of execution a summary line indicates success, partial failure or failure of all store commands.

Pig also returns different code upon completion for these scenarios:

  • Return code 0: All jobs succeeded
  • (Return code 1 is used for retriable errors)
  • Return code 2: All jobs have failed
  • Return code 3: Some jobs have failed

Turning off best effort execution

In some cases it might be desirable to fail the entire script upon detecting the first failed job. This can be achieved with the "-F" or "-stop_on_failure" command line flag. If used, Pig will stop execution when the first failed job is detected and discontinue further processing. This also means that file commands that come after a failed store in the script will not be executed (This can be used to create "done" files).

This is how the flag is used:

$ pig -F foo.pig
$ pig -stop_on_failure foo.pig

Incompatible Changes

Most existing scripts produce the same result with or without the multi-query optimization. There are cases though were this is not true.

Path Names and Schemes

Any script is parsed in it's entirety before it is sent to execution. Since the current directory can change throughout the script any path used in load or store is translated to a fully qualified and absolute path.

In map-reduce mode, the following script:

cd /;
A = load 'foo';
cd tmp;
store A into 'bar';

will load from "hdfs://<host>:<port>/foo" and store into "hdfs://<host>:<port>/tmp/bar".

These expanded paths are going to be passed to any LoadFunc or Slicer implementation. In some cases, especially when a LoadFunc/Slicer is not used to read from a dfs file or path (e.g.: loading from an SQL database), this can cause problems.

Solutions are to either:

  • Specify "-M" or "-no_multiquery" to revert to the old names
  • Specify a custom scheme for the LoadFunc/Slicer

Arguments used in a load statement that have a scheme other than "hdfs" or "file" will not be expanded and passed to the LoadFunc/Slicer unchanged.

In the SQL case:

A = load "sql://footable" using SQLLoader();

Will invoke the SQLLoader function with "sql://footable".


Scripts using the HBaseStorage loader will trigger a warning with the multi-query optimization turned on. The reason is the same as described above. Table names (since they are given without a scheme) will be interpreted as relative hdfs paths and the HBaseStorage function will see an expanded path of the form "hdfs://<host>:<port>/.../<tablename>". The storage function will in this case take whatever is after the last "/" in the string and try to use it as the name of the requested table. The warning will notify the user of this situation.

Scripts like:

A = load 'table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('a','b');

Should be changed to:

A = load 'hbase://table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('a','b');

To avoid the warning. Using "-M" or "-no_multiquery" will also remove the warning.

Implicit Dependencies

If a script has dependencies on the execution order outside of what Pig knows about, execution might fail.

For instance:

store A into 'foo';
B = load 'bar';
C = foreach B generate MYUDF($0,'foo');
store C into 'baz';

MYUDF might try to read form the file foo, a file that handle A was just stored into. However, Pig does not know that MYUDF depends on the file foo and might submit the jobs producing the files baz and foo at the same time, so execution might fail.

In order to make this work, the script has to be changed to:

store A into 'foo';
B = load 'bar';
C = foreach B generate MYUDF($0,'foo');
store C into 'baz';

The exec statment will trigger the execution of the job resulting in the file foo. This way the right execution order is enforced.

Execution Plans

Here is a closer look at what statements get combined into a single map-reduce plan.

Any implicit or explicit split is first compiled into a map-reduce job (the splitter) that stores the input of the split and another map-reduce job (the splittee) for each branch of the split that loads the split input and processes the branch.

Implicit vs. Explicit Splits

An explicit split is a split that is specified by using the split statement.


A = load 'foo';
split A into B if $0 is not null, C if $0 is null;
store B into 'bar';
store C into 'baz';

An implicit split is a split that is produced by using the same handle in multiple statements as input handles.


A = load 'foo';
B = filter A by $0 is not null;
C = filter A by $0 is null;
store B into 'bar';
store C into 'baz';

The following will not produce a split, because different handles are used in the filter statements, even though the statements are logically the same as above:

A = load 'foo';
B = filter A by $0 is not null;
A = load 'foo'
C = filter A by $0 is null;
store B into 'bar';
store C into 'baz';

The multi-query optimization then tries to combine splitters and splittees in the same job.

Map-only Splittees

If a splittee is a map-only job (doesn't require join, cogroup, group, etc) the splittee is merged into the splitter - into either the map or reduce plan. The use-case of storing temporary results during execution falls into this category.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = filter A by user is not null;
store B into 'filtered_by_user';
C = filter B by query_term is null;
store C into 'filtered_by_query';

Will be executed as:


The same works in the reducer. The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = group A by user;
C = foreach B generate group, MIN(A.timespent);
D = foreach B generate group, MAX(A.timespent);
store C into 'min_timespent';
store D into 'max_timespent';

Will be executed as:


Map-reduce Splittees

If a split happens in the map plan and one of the splitees is a map-(combine)-reduce job, the map plan will be a combined plan of all the splittee and splitter map plans and the reduce job will the the one of the map-(combine)-reduce job.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = filter A by user is not null;
store B into 'filtered_user';
C = group B by action;
D = foreach C generate B.action, COUNT(B);
store D into 'count';

Will be executed as:


In a similar way, if multiple splittees are map-(combine)-reduce jobs the combine and reduce plans are also merged.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = group A by user;
C = foreach B generate group, MAX(A.estimated_revenue);
store C into 'highest_values';
D = group A by query_term;
E = foreach D generate group, SUM(A.timespent);
store E into 'total_time';

Will be executed as:


If a split happens in a reduce plan, splittees have to be map-only jobs to be merged into the splitter. If there are map-reduce splittees the reduce will result in a tmp store and the splittees are run in separate jobs.

Multi-input splittees

Splittees that have multiple inputs (such as produced by cogroup or join) are not merged into the splitter. In this case the splitter will write the split input to a temporary file and the splittee will be started in a different job.

Store-load sequences

If a script stores and loads from the same file in a script, some special processing takes place to ensure that the jobs are executed in the right sequence.

Reversible LoadStoreFunc

If the store and load are processed using the same function and the LoadStoreFunc is reversible, the store is processed, but the load is removed from the plan. Instead the parent of the store is used as input for the dependent processing nodes.

The script:

A = load 'page_views';
store A into 'tmp1' using PigStorage();
B = load 'tmp1' using PigStorage();
C = filter B by $0 is not null;
store C into 'tmp2';

Will result in the following logical plan:


If on the other side different load and store functions are used or the function is not reversible, the store and load will connected in the logical plan and eventually will result in 2 jobs running in sequence.

The script:

A = load 'page_views';
store A into 'tmp1' using PigStorage();
B = load 'tmp1' using BinStorage();
C = filter B by $0 is not null;
store C into 'tmp2';

Will result in the following logical plan:


File commands

Commands like rm, rmf, mv, copyToLocal and copy will trigger execution of all the stores that were defined before the command. This is done so that we can make sure that the targets of these commands will be there.

For instance:

A = load 'foo';
store A into 'bar';
mv bar baz;
rm foo;
A = load 'baz';
store A into 'foo';

Will result in a job that produces bar, then the mv and rm are executed. Finally, another job is run that will generate foo.

Internal Changes


The parser currently uses a bottom up approach. When it sees a store (dump, explain), it goes bottom up and generates the plan that needs to happen for this particular store. In order to optimize the multi-query example, we need, however, a peek on the entire graph for a script (interactive mode is handled differently).

The high-level changes to the parser/server are:

  • Not execute the plan when we see a store or dump
  • Alter the already existing merge functionality to allow intersecting graphs to be joined into a single logical plan.
  • Wait until the entire script is parsed and merged before sending the plan on to do validation, optimization, etc.

The new "run" command will simply feed all the lines of a script through the interactive mode.

The PigServer has a new interface:

  • setBatchOn()
    • By default batch mode is off and we are in interactive mode. setBatchOn starts a new batch for execution that will not execute on store.
    • setBatchOn can be called multiple times and will produce a nested set of batches
  • executeBatch()
    • Whenever batch mode is on, execute will process all the stores that are currently in the batch and have not been processed before
  • discardBatch()
    • Removes the current batch and goes back to the previous batch (or interactive mode, if there are no more)
  • isBatchOn()
    • Tells weather batch mode is on
  • isBatchEmpty()
    • Helper function that tells whether there are any unprocessed stores in the current batch

Internally all different batches and the interactive mode are represented as Graph objects. A graph object maintains a cache of the registered queries, keeps track of the logical plan and the processed/unprocessed stores. Graphs themselves can be interactive or batch. Interactive graphs will execute on store, batch graphs won't.

The PigServer maintains a stack of these graph objects so that setBatchOn/discardBatch operations basically become push and pop operations.

If the multi-query optimization is turned off all graphs will be generated as interactive, which is how we revert the behavior.

The merging of the different logical plans is done in the OperatorPlan, where merge operations can now either check for disjoint graphs or merge them with overlaps. Merged plans are later passed on to the implicit split inserter at which point all the overlapping operators from the merge will result in an explicit split.

Finally, the store-load handling is done in the pig server. It will either transform the plan or add a store-load connection. Absolute filenames will be available, since the QueryParser now translates them when it sees them.

The grunt parser makes use of the new PigServer APIs. It will use the batch API to parse and execute scripts. Since scripts can be nested, the script execution and the stack of graphs in the PigServer are closely related.


As described above the changes are:

  • Add options to explain to work on a script file as well as a handle.
  • Add the ability to print plans as dot files and to write output to files.
  • Allow for a "brief" options to control verbosity.

The explain command already supports writing to PrintStreams, so adding the capability of writing to files was fairly straight forward.

Parameters for the desired output format as well as the verbosity were added to the explain calls in the execution engine, logical and physical plan.

The dot output is realized by the DotPlanDumper. Since dot does not care about any specific order, we simply iterate over the nodes and edges and dump them. Nested plans are realized as subgraphs which will case a recursion in the DotPlanDumper. Printing a map reduce plan will first start with the MROperator plan and then recurse into the physical plans.

Invisible nodes and edges are use to force the right layout of subgraphs. There is one invisible input and one invisible output per subgraph that are connected (via invisible edges) to the roots and leaves of the nested plan respectively. This way top to bottom layout of the nested graph is ensured.

CoGroups are special in that their nested plans are connected to input operators. This is modeled as subgraphs as well and the subgraphs are connected to the respective input operator.

Map Reduce Engine

Let's first look at how a multi-query is processed in the hadoop case. We have already discussed how the logical plan is generated, driven by the PigServer and GruntParser. The next step is translation of the logical plan to the physical plan. This has not changed except that in the multi-query case it has to carry connections of store to load operator through to the physical plan.


After that the MRCompiler takes the physical plan and breaks it into a plan of map-reduce operators. The handling of split operators has not changed. What happens is:

When a split is encountered, it is removed from the plan and replaced by a store to a temporary path with BinStorage. The split branches are placed in new map-reduce operators that load from the temporary path.

Some minor changes to the MRCompiler are:

  • Edges from store to load operator are removed and their respective map-reduce operators are connected. That will enforce that the store job runs before the load job.
  • Jobs that result from a split are marked as splitters and splittees, so that in the optimization we can look at those.


After the compiler the optimizers are run. The last one in the chain is the MultiQueryOptimizer. This optimizer is looking at all splitters and splittees and will merge them recursively - producing what was described above in ExecutionPlans. Since this step is run after the combiner optimizer, it can also go ahead and merge combiners if necessary. If the optimizer cannot merge it will keep the existing store that the split produced in the MRCompiler.

The merging of splittees into a splitter consists of:

  • Creating a split operator in the map or reduce and setting the splittee plans as nested plans of the split
  • If it needs to merge combiners it will introduce a Demux operator to route the input from mixed split branches in the mapper to the right combine plan. The separate combiner plans are the nested plans of the Demux operator
  • If it needs to merge reduce plans, it will do so using the Demux operator the same way the combiner is merged.
  • In the cases where some splittees have combiners and some do not have combiners, the optimizer chooses either the subset of splittees with combiners or the subset of splittees without combiners--depending on which subset is larger--and merges the splittees in the chosen subset into the splitter. The other subset--if not empty--will not be merged.

Note: As an end result this merging will result in Split or Demux operators with multiple stores tucked away in their nested plans.

There are two more minor optimizers:

  • NoopFilterRemover will remove constant-true filters that splits produce

  • NoopStoreRemover will remove implicit stores to temporary files, if there is a user initiated store that contains the same information


The map reduce launcher receives the map-reduce operator plan from the compilation stage. It uses the JobControlCompiler to compile map-reduce operators into hadoop job control objects and submits them to hadoop for execution.

The biggest change stems from the fact that with the current hadoop system we need to store multiple output from a single job in a temp directory first and then have to move it to the real location. Map-reduce operators that are connected in the graph mean that the successor needs to read files that the parent produces. In order to do that we need to submit the graph in several steps and after each one we need to move the files to their final location. So the overall job submission works like this:

  • Remove all root operators
  • Compile these operators into jobs
  • Submit jobs to hadoop
  • Move result files or collect failure
  • Repeat unless map-reduce plan is empty


The JobControlCompiler distinguishes between multi store and single store map-reduce operators. Single store operators are processed the same way it was done before: The store is removed and the job configuration is set up to put the records that come out of the pipeline in the right files.

Multi store plans are handled differently. The compiler sets the output path to a newly created temporary directory. It also leaves the store operators in the plan; These operators will at -execution time- create subdirectories of the temporary path and direct records to that directory.

Let's assume we have 2 stores in the plan to directories "foo" and "/out/bar". After running the job one gets the following temporary directory structure:


The JobControlCompiler will then as a final step move the result files to their final locations.

Store Operator

Finally, after the MapReduceLauncher has submitted the jobs execution starts on the backend.

The store operator changed to deal with multiple outputs from a single map-reduce job as well as enabling the local engine to handle multiple stores in the same plan.

The operator itself has a store implementation that is instantiated by the execution engine that handles the output. Store has a "getNext" function that stores the input record and returns a null record. That basically means that stores can be placed into an execution pipeline as an operator with a side-effect that will consume all input records.

The local engine's implementation of the store is straight forward. It creates the StoreFunc and binds the output stream to it.

The map reduce engine's implementation of the store needs to set up a few more things. It creates a copy of the job configuration and overwrites some key fields. It changes the output and temporary work output directory to point to a temporary path specific to the store being processed (see JobControlCompiler). It also sets PigOutputFormat as the output class as well as registering the actual store function that PigOutputFormat is going to use. Finally it creates a new record writer and output collector with these settings.

Split operator

The split operator can now be used at execution time too. It used to always be removed by the compiler (and replaced by store-load combinations). So, here's how the split operator works:

  • Every input record will be attached to each nested plan in sequence
  • Until a plan is exhausted it will keep returning records from that plan
  • Once EOP is returned the next plan will be pulled from.
  • If all plans have been dealt with already the next input record is fetched
  • If there are no more input records the EOP is passed on to the requester

Demux Operator

The demux operator is used in combiners and reducers where the input is a mix of different split plans of the mapper. The outputs of split plans are indexed and based on the index, the demux operator will decide which of it's nested plans a record belongs to and then attach it to that particular plan.

More precisely, these are the steps to merge a map-reduce splittee into the splitter:

  1. Add the map plan of the splittee to the inner plan list of the split operator.
  2. Set the index on the leaf operator of the map plan based on the order this map plan on the inner plan list.
  3. Add the reduce plan of the splittee to the inner plan list of the demux operator in the same order as the corresponding map plan.
  4. The outputs of merged map plan of the splitter are indexed key/value pairs and are sent to the reduce tasks.
  5. The demux operator extracts the index from the key/values it receives and uses it to find the corresponding reduce plan in its inner plan list.
  6. The chosen reduce plan consumes the key/values data.

Partition Scheme

What is the parallelism (the number of reduce tasks requested) of the merged splitter job? How do we partition the keys of the merged inner plans?

After considering several partition schemes, we settled on this one:

  • The parallelism of the merged splitter job is the maximum of the parallelisms of all splittee jobs.
  • The keys from inner plans are partitioned into all the buckets via the default hash partitioner.

This scheme has advantages:

  • Simplicity. No new partition class needed.
  • Performance. The parallelism of a job specified by users most likely is determined by the number of available reducers (machines), so the merged parallelism confirms to the user expectation.

To avoid the key collision of different inner plans with this scheme, the PigNullableWritable class is modified to take into account of the indexes when two keys are compared (hashed).

Local Execution Engine

The local engine has not changed as much as the map reduce engine. The local engine executes the physical plan directly. The main changes were:

  • Allow for multiple stores in the physical plan
  • Respect order of execution when there are edges between store and load operators

The local engine will thus execute the physical plan as follows:

  • Get all the store operators with dependent load operators in a dependency order
  • Execute these stores one by one
  • Get all remaining stores and execute them in at once

Performance Results

We have run the PigMix suite on a Hadoop cluster to compare this enhancement to the code in the trunk. Here are the results (average over 3 runs):

Test Case

Prior to Multiquery






































L12 is the only multi-query script in the suite and we see 70% performance improvement. For the non-multi-query scripts, the performance is unchanged.

We also ran several multi-query tests on a 40-node Hadoop cluster against a text file with 200 million rows (each row had 3 columns). All tests used 40 reducers. These tests compared the performance of the trunk (released version) with the multiquery branch (latest version). Again, there were significant performance gains in the case that either all splittees had combiners (Test Case 1&2) or no splittee had combiner (Test Case 3&4). There ware also significant performance gains where some splittees were map-only (Test Case 5). The rest results (average over 3 runs) are listed below.

Test Case

Prior to Multiquery








Merge 3 McR splittees into splitter and all splittees have combiners





Merge 5 McR splittees into splitter and all splittees have combiners





Merge 3 MR splittees into splitter and no splittee has combiner





Merge 5 MR splittees into splitter and no splittee has combiner





Merge 3 MR splittees into splitter and 2 splittees are map-only

PigMultiQueryPerformanceSpecification (last edited 2009-09-20 23:38:12 by localhost)