Multi-query Performance

Currently scripts with multiple store commands can result in a lot of duplicated work. The idea how to avoid the duplication is described here: https://issues.apache.org/jira/browse/PIG-627

Note: In this document <handle> means <alias> ::= pig identifier

External

Use cases:

Explicit/implicit split:

There might be cases in which you want different processing on separate parts of the same data stream. Like so:

A = load ...
...
split A' into B if ..., C if ...
...
store B' ...
store C' ...

or

A=load ...
...
B=filter A' ...
C=filter A' ...
...
store B' ...
store C' ...

In the current system the first example will dump A' to disk and then start jobs for B' and C'. In the second example Pig will execute all the dependencies of B' and store it. And then execute all the dependencies of C' and store it.

Both of the above are equivalent, but the performance will be different.

Here's what the multi-query feature does to increase the performance:

Storing intermediate results

Sometimes people will store intermediate results.

A=load ...
...
store A'
...
store A''

If the script doesn't re-load A' for the processing of A the steps above A' will be duplicated. This is basically a special case of Number 2 above, so the same steps are recommended. With the proposed changes the script will basically process A and dump A' as a side-effect. Which is what the user probably wanted to begin with.

Why?

Pig's philosophy is: Optimize it yourself, why don't you.

However:

Changes

Execution in batch mode

Batch mode is entered when Pig is given a script to execute (e.g.: invoking with a script as parameter, or using the "-e" or "-f" Pig options) Interactive mode is on the grunt shell ("grunt:>"). There wasn't much difference between them. In order for us to optimize the multi-query case, we needed to distinguish the two some more.

Right now whenever the parser sees a store (or dump, illustrate) it will kick of the execution of that part of the script. Part of the changes was to change batch mode to parse the entire script first and see if we can combine things to reduce the overall amount of work that needs to be done. Only after that will the execution start.

The high level changes are:

Some problems with this:

Additional changes therefore are:

Run

(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same as requested there)

The new command has the format:

run [-param <key>=<value>]* [-param_file <filename>] <script name>

Which runs the script in interactive mode, so every store triggers execution. The statements from the script are put into the command history and all the handles defined in the script can be referenced in subsequent statements after the run command has completed. Issuing a run command on the grunt command line has basically the same effect as typing the statements manually.

Exec

The new command has the format:

exec [-param <key>=<value>]* [-param_file <filename>]* [<script name>]

Which will run the script in batch mode. Store statements will not trigger execution; Rather the entire script will be parsed before execution starts. Unlike the "run" command, exec does not change the command history or remembers the handles used inside the script. Exec without any parameters can be used in scripts to force execution up to the point in the script where the exec occurs.

Explain

Changes to the command:

explain [-out <path>] [-brief] [-dot] [-param <key>=<value>]* [-param_file <filename>]* [-script <scriptname>] [<handle>]

Behavior:

Dot:

Out:

Brief:

Param/Param_file:

Turning off the multi-query optimization

By default the multi-query optimization is enabled and scripts execution will be handled accordingly. If it is desired to turn off the optimization and revert to "execute-on-store" behavior, the "-M" or "-no_multiquery" switches can be used.

In order to run script "foo.pig" without the optimization, execute pig as follows:

$ pig -M foo.pig
or:
$ pig -no_multiquery foo.pig

If pig is launched in interactive mode with this switch "exec" statements are also going to run in interactive mode.

Error Handling

With multiquery Pig processes an entire script or a batch of statements at once. By default Pig tries to run all the jobs that result from that - regardless of whether some jobs fail during execution. A user has the following options to check which jobs have succeeded or failed:

Pig logs all successful and failed store commands. Store commands are identified by output path. At the end of execution a summary line indicates success, partial failure or failure of all store commands.

Pig also returns different code upon completion for these scenarios:

Turning off best effort execution

In some cases it might be desirable to fail the entire script upon detecting the first failed job. This can be achieved with the "-F" or "-stop_on_failure" command line flag. If used, Pig will stop execution when the first failed job is detected and discontinue further processing. This also means that file commands that come after a failed store in the script will not be executed (This can be used to create "done" files).

This is how the flag is used:

$ pig -F foo.pig
or:
$ pig -stop_on_failure foo.pig

Incompatible Changes

Most existing scripts produce the same result with or without the multi-query optimization. There are cases though were this is not true.

Path Names and Schemes

Any script is parsed in it's entirety before it is sent to execution. Since the current directory can change throughout the script any path used in load or store is translated to a fully qualified and absolute path.

In map-reduce mode, the following script:

cd /;
A = load 'foo';
cd tmp;
store A into 'bar';

will load from "hdfs://<host>:<port>/foo" and store into "hdfs://<host>:<port>/tmp/bar".

These expanded paths are going to be passed to any LoadFunc or Slicer implementation. In some cases, especially when a LoadFunc/Slicer is not used to read from a dfs file or path (e.g.: loading from an SQL database), this can cause problems.

Solutions are to either:

Arguments used in a load statement that have a scheme other than "hdfs" or "file" will not be expanded and passed to the LoadFunc/Slicer unchanged.

In the SQL case:

A = load "sql://footable" using SQLLoader();

Will invoke the SQLLoader function with "sql://footable".

HBaseStorage

Scripts using the HBaseStorage loader will trigger a warning with the multi-query optimization turned on. The reason is the same as described above. Table names (since they are given without a scheme) will be interpreted as relative hdfs paths and the HBaseStorage function will see an expanded path of the form "hdfs://<host>:<port>/.../<tablename>". The storage function will in this case take whatever is after the last "/" in the string and try to use it as the name of the requested table. The warning will notify the user of this situation.

Scripts like:

A = load 'table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('a','b');

Should be changed to:

A = load 'hbase://table' using org.apache.pig.backend.hadoop.hbase.HBaseStorage('a','b');

To avoid the warning. Using "-M" or "-no_multiquery" will also remove the warning.

Implicit Dependencies

If a script has dependencies on the execution order outside of what Pig knows about, execution might fail.

For instance:

...
store A into 'foo';
B = load 'bar';
C = foreach B generate MYUDF($0,'foo');
store C into 'baz';

MYUDF might try to read form the file foo, a file that handle A was just stored into. However, Pig does not know that MYUDF depends on the file foo and might submit the jobs producing the files baz and foo at the same time, so execution might fail.

In order to make this work, the script has to be changed to:

...
store A into 'foo';
exec;
B = load 'bar';
C = foreach B generate MYUDF($0,'foo');
store C into 'baz';

The exec statment will trigger the execution of the job resulting in the file foo. This way the right execution order is enforced.

Execution Plans

Here is a closer look at what statements get combined into a single map-reduce plan.

Any implicit or explicit split is first compiled into a map-reduce job (the splitter) that stores the input of the split and another map-reduce job (the splittee) for each branch of the split that loads the split input and processes the branch.

Implicit vs. Explicit Splits

An explicit split is a split that is specified by using the split statement.

E.g.:

A = load 'foo';
split A into B if $0 is not null, C if $0 is null;
store B into 'bar';
store C into 'baz';

An implicit split is a split that is produced by using the same handle in multiple statements as input handles.

E.g.:

A = load 'foo';
B = filter A by $0 is not null;
C = filter A by $0 is null;
store B into 'bar';
store C into 'baz';

The following will not produce a split, because different handles are used in the filter statements, even though the statements are logically the same as above:

A = load 'foo';
B = filter A by $0 is not null;
A = load 'foo'
C = filter A by $0 is null;
store B into 'bar';
store C into 'baz';

The multi-query optimization then tries to combine splitters and splittees in the same job.

Map-only Splittees

If a splittee is a map-only job (doesn't require join, cogroup, group, etc) the splittee is merged into the splitter - into either the map or reduce plan. The use-case of storing temporary results during execution falls into this category.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = filter A by user is not null;
store B into 'filtered_by_user';
C = filter B by query_term is null;
store C into 'filtered_by_query';

Will be executed as:

map-only.png

The same works in the reducer. The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = group A by user;
C = foreach B generate group, MIN(A.timespent);
D = foreach B generate group, MAX(A.timespent);
store C into 'min_timespent';
store D into 'max_timespent';

Will be executed as:

reduce-only.png

Map-reduce Splittees

If a split happens in the map plan and one of the splitees is a map-(combine)-reduce job, the map plan will be a combined plan of all the splittee and splitter map plans and the reduce job will the the one of the map-(combine)-reduce job.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = filter A by user is not null;
store B into 'filtered_user';
C = group B by action;
D = foreach C generate B.action, COUNT(B);
store D into 'count';

Will be executed as:

map-mapreduce.png

In a similar way, if multiple splittees are map-(combine)-reduce jobs the combine and reduce plans are also merged.

The script:

A = load '/user/pig/tests/data/pigmix/page_views'
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = group A by user;
C = foreach B generate group, MAX(A.estimated_revenue);
store C into 'highest_values';
D = group A by query_term;
E = foreach D generate group, SUM(A.timespent);
store E into 'total_time';

Will be executed as:

mapreduce-mapreduce.png

If a split happens in a reduce plan, splittees have to be map-only jobs to be merged into the splitter. If there are map-reduce splittees the reduce will result in a tmp store and the splittees are run in separate jobs.

Multi-input splittees

Splittees that have multiple inputs (such as produced by cogroup or join) are not merged into the splitter. In this case the splitter will write the split input to a temporary file and the splittee will be started in a different job.

Store-load sequences

If a script stores and loads from the same file in a script, some special processing takes place to ensure that the jobs are executed in the right sequence.

Reversible LoadStoreFunc

If the store and load are processed using the same function and the LoadStoreFunc is reversible, the store is processed, but the load is removed from the plan. Instead the parent of the store is used as input for the dependent processing nodes.

The script:

A = load 'page_views';
store A into 'tmp1' using PigStorage();
B = load 'tmp1' using PigStorage();
C = filter B by $0 is not null;
store C into 'tmp2';

Will result in the following logical plan:

load-store-rev.png

If on the other side different load and store functions are used or the function is not reversible, the store and load will connected in the logical plan and eventually will result in 2 jobs running in sequence.

The script:

A = load 'page_views';
store A into 'tmp1' using PigStorage();
B = load 'tmp1' using BinStorage();
C = filter B by $0 is not null;
store C into 'tmp2';

Will result in the following logical plan:

load-store-non.png

File commands

Commands like rm, rmf, mv, copyToLocal and copy will trigger execution of all the stores that were defined before the command. This is done so that we can make sure that the targets of these commands will be there.

For instance:

A = load 'foo';
store A into 'bar';
mv bar baz;
rm foo;
A = load 'baz';
store A into 'foo';

Will result in a job that produces bar, then the mv and rm are executed. Finally, another job is run that will generate foo.

Internal Changes

GruntParser/PigServer

The parser currently uses a bottom up approach. When it sees a store (dump, explain), it goes bottom up and generates the plan that needs to happen for this particular store. In order to optimize the multi-query example, we need, however, a peek on the entire graph for a script (interactive mode is handled differently).

The high-level changes to the parser/server are:

The new "run" command will simply feed all the lines of a script through the interactive mode.

The PigServer has a new interface:

Internally all different batches and the interactive mode are represented as Graph objects. A graph object maintains a cache of the registered queries, keeps track of the logical plan and the processed/unprocessed stores. Graphs themselves can be interactive or batch. Interactive graphs will execute on store, batch graphs won't.

The PigServer maintains a stack of these graph objects so that setBatchOn/discardBatch operations basically become push and pop operations.

If the multi-query optimization is turned off all graphs will be generated as interactive, which is how we revert the behavior.

The merging of the different logical plans is done in the OperatorPlan, where merge operations can now either check for disjoint graphs or merge them with overlaps. Merged plans are later passed on to the implicit split inserter at which point all the overlapping operators from the merge will result in an explicit split.

Finally, the store-load handling is done in the pig server. It will either transform the plan or add a store-load connection. Absolute filenames will be available, since the QueryParser now translates them when it sees them.

The grunt parser makes use of the new PigServer APIs. It will use the batch API to parse and execute scripts. Since scripts can be nested, the script execution and the stack of graphs in the PigServer are closely related.

Explain

As described above the changes are:

The explain command already supports writing to PrintStreams, so adding the capability of writing to files was fairly straight forward.

Parameters for the desired output format as well as the verbosity were added to the explain calls in the execution engine, logical and physical plan.

The dot output is realized by the DotPlanDumper. Since dot does not care about any specific order, we simply iterate over the nodes and edges and dump them. Nested plans are realized as subgraphs which will case a recursion in the DotPlanDumper. Printing a map reduce plan will first start with the MROperator plan and then recurse into the physical plans.

Invisible nodes and edges are use to force the right layout of subgraphs. There is one invisible input and one invisible output per subgraph that are connected (via invisible edges) to the roots and leaves of the nested plan respectively. This way top to bottom layout of the nested graph is ensured.

CoGroups are special in that their nested plans are connected to input operators. This is modeled as subgraphs as well and the subgraphs are connected to the respective input operator.

Map Reduce Engine

Let's first look at how a multi-query is processed in the hadoop case. We have already discussed how the logical plan is generated, driven by the PigServer and GruntParser. The next step is translation of the logical plan to the physical plan. This has not changed except that in the multi-query case it has to carry connections of store to load operator through to the physical plan.

MRCompiler

After that the MRCompiler takes the physical plan and breaks it into a plan of map-reduce operators. The handling of split operators has not changed. What happens is:

When a split is encountered, it is removed from the plan and replaced by a store to a temporary path with BinStorage. The split branches are placed in new map-reduce operators that load from the temporary path.

Some minor changes to the MRCompiler are:

MultiQueryOptimizer

After the compiler the optimizers are run. The last one in the chain is the MultiQueryOptimizer. This optimizer is looking at all splitters and splittees and will merge them recursively - producing what was described above in ExecutionPlans. Since this step is run after the combiner optimizer, it can also go ahead and merge combiners if necessary. If the optimizer cannot merge it will keep the existing store that the split produced in the MRCompiler.

The merging of splittees into a splitter consists of:

Note: As an end result this merging will result in Split or Demux operators with multiple stores tucked away in their nested plans.

There are two more minor optimizers:

MapReduceLauncher

The map reduce launcher receives the map-reduce operator plan from the compilation stage. It uses the JobControlCompiler to compile map-reduce operators into hadoop job control objects and submits them to hadoop for execution.

The biggest change stems from the fact that with the current hadoop system we need to store multiple output from a single job in a temp directory first and then have to move it to the real location. Map-reduce operators that are connected in the graph mean that the successor needs to read files that the parent produces. In order to do that we need to submit the graph in several steps and after each one we need to move the files to their final location. So the overall job submission works like this:

JobControlCompiler

The JobControlCompiler distinguishes between multi store and single store map-reduce operators. Single store operators are processed the same way it was done before: The store is removed and the job configuration is set up to put the records that come out of the pipeline in the right files.

Multi store plans are handled differently. The compiler sets the output path to a newly created temporary directory. It also leaves the store operators in the plan; These operators will at -execution time- create subdirectories of the temporary path and direct records to that directory.

Let's assume we have 2 stores in the plan to directories "foo" and "/out/bar". After running the job one gets the following temporary directory structure:

/tmp/temp-<number>/abs/out/bar/part-*
/tmp/temp-<number>/rel/foo/part-*

The JobControlCompiler will then as a final step move the result files to their final locations.

Store Operator

Finally, after the MapReduceLauncher has submitted the jobs execution starts on the backend.

The store operator changed to deal with multiple outputs from a single map-reduce job as well as enabling the local engine to handle multiple stores in the same plan.

The operator itself has a store implementation that is instantiated by the execution engine that handles the output. Store has a "getNext" function that stores the input record and returns a null record. That basically means that stores can be placed into an execution pipeline as an operator with a side-effect that will consume all input records.

The local engine's implementation of the store is straight forward. It creates the StoreFunc and binds the output stream to it.

The map reduce engine's implementation of the store needs to set up a few more things. It creates a copy of the job configuration and overwrites some key fields. It changes the output and temporary work output directory to point to a temporary path specific to the store being processed (see JobControlCompiler). It also sets PigOutputFormat as the output class as well as registering the actual store function that PigOutputFormat is going to use. Finally it creates a new record writer and output collector with these settings.

Split operator

The split operator can now be used at execution time too. It used to always be removed by the compiler (and replaced by store-load combinations). So, here's how the split operator works:

Demux Operator

The demux operator is used in combiners and reducers where the input is a mix of different split plans of the mapper. The outputs of split plans are indexed and based on the index, the demux operator will decide which of it's nested plans a record belongs to and then attach it to that particular plan.

More precisely, these are the steps to merge a map-reduce splittee into the splitter:

  1. Add the map plan of the splittee to the inner plan list of the split operator.
  2. Set the index on the leaf operator of the map plan based on the order this map plan on the inner plan list.
  3. Add the reduce plan of the splittee to the inner plan list of the demux operator in the same order as the corresponding map plan.
  4. The outputs of merged map plan of the splitter are indexed key/value pairs and are sent to the reduce tasks.
  5. The demux operator extracts the index from the key/values it receives and uses it to find the corresponding reduce plan in its inner plan list.
  6. The chosen reduce plan consumes the key/values data.

Partition Scheme

What is the parallelism (the number of reduce tasks requested) of the merged splitter job? How do we partition the keys of the merged inner plans?

After considering several partition schemes, we settled on this one:

This scheme has advantages:

To avoid the key collision of different inner plans with this scheme, the PigNullableWritable class is modified to take into account of the indexes when two keys are compared (hashed).

Local Execution Engine

The local engine has not changed as much as the map reduce engine. The local engine executes the physical plan directly. The main changes were:

The local engine will thus execute the physical plan as follows:

Performance Results

We have run the PigMix suite on a Hadoop cluster to compare this enhancement to the code in the trunk. Here are the results (average over 3 runs):

Test Case

Prior to Multiquery

MultiQuery

L1

4min07sec

4min10sec

L2

1min46sec

1min54sec

L3

5min12sec

4min40sec

L4

2min39sec

2min36sec

L5

2min15sec

2min14sec

L6

2min27sec

2min28sec

L7

2min14sec

2min06sec

L8

2min07sec

2min01sec

L9

8min58sec

8min42sec

L10

9min10sec

9min15sec

L11

3min23sec

3min28sec

L12

10min14sec

2min47sec

L12 is the only multi-query script in the suite and we see 70% performance improvement. For the non-multi-query scripts, the performance is unchanged.

We also ran several multi-query tests on a 40-node Hadoop cluster against a text file with 200 million rows (each row had 3 columns). All tests used 40 reducers. These tests compared the performance of the trunk (released version) with the multiquery branch (latest version). Again, there were significant performance gains in the case that either all splittees had combiners (Test Case 1&2) or no splittee had combiner (Test Case 3&4). There ware also significant performance gains where some splittees were map-only (Test Case 5). The rest results (average over 3 runs) are listed below.

Test Case

Prior to Multiquery

MultiQuery

Gain

Description

1

311sec

172sec

44%

Merge 3 McR splittees into splitter and all splittees have combiners

2

523sec

247sec

52%

Merge 5 McR splittees into splitter and all splittees have combiners

3

549sec

264sec

51%

Merge 3 MR splittees into splitter and no splittee has combiner

4

725sec

516sec

28%

Merge 5 MR splittees into splitter and no splittee has combiner

5

203sec

114sec

43%

Merge 3 MR splittees into splitter and 2 splittees are map-only

PigMultiQueryPerformanceSpecification (last edited 2009-09-20 23:38:12 by localhost)