Turing Complete Pig

This document lays out the high level design for adding control flow and modularity constructs to Pig.

Introduction

Pig Latin is a data flow language. As such it does not offer users control flow and modularity features that are present in general purpose programming languages, including functions, modules, loops, and branches. Given that it is a data flow language adding these constructs is neither straightforward nor reasonable. However, users do want to be able to integrate standard programming techniques of separation and code sharing offered by functions and modules as well as integration of control flow offered by functions, loops, and branches. This document proposes a way to accomplish these goals while preserving Pig Latin's data flow orientation.

The Proposal

Approach

Since direct integration of data flow and control flow is neither reasonable nor desirable, a heuristic is needed to productively combine the two. One approach that is being used in other projects is to view the control flow as building pipelines of data flow. This is used in Cascading and FlumeJava.

Given this approach, it makes sense to embed Pig Latin in an existing scripting language. A scripting language has several advantages over a compiled language such as Java:

Compared to an approach that integrates control flow and data flow, such as PL/SQL, embedding in an existing scripting language is a much lower development and maintenance effort. It will also be much easier for users, who will be able to use existing development tools (IDEs, debuggers, etc.) to work with their scripts.

(Note however, that if a given user prefers Java, there is nothing to prevent them from using the proposed interface with Java.)

We propose to solve these issues with two significant changes to Pig.

Add Macro Expansion To Pig Latin

Part one of this proposal is to add macro expansion to Pig Latin. The existing DEFINE keyword will be expanded to allow definitions of Pig macros. A new INLINE keyword will be added that inlines a macro at a particular point. A new IMPORT keyword will also be added to allow macros from other files to be inlined as well. See below for full details.

define bot_cleanser (X, user) returns Y {
    Y = filter X by not is_a_bot($user);
}

A = load 'fact';
B = bot_cleanser (A, 'username');
C = group B by user;
...
store Z into 'processed';

Use Scripting Language For Control Flow

The second part of the proposal is to embed Pig Latin scripts in the host scripting language via a JDBC like compile, bind, run model. An example script would look like:

p = Pig.compile("""A = load '$in’;
                   C = group A by user;
                   D = foreach C generate group, myUDF(A);
                   store D into '$out’;
                   F = group D all;
                   G = foreach F generate MAX(D);
                   store G into 'tmp’;
                """)
error = 100.0
input = "original”
output = "output-0”
final = "final-output”

for i in range(1, 100):
    r = p.bind({'in':input, 'out':output}) # attaches $in, $out in Pig Latin to input, output Python variables
    results = r.run()
    
    if results.getStatus("D") `= "FAILED" or results.getStatus("G") =` "FAILED":
        raise "Pig job failed"
    iter = results.getResults("G")
    if iter.next() > 1:
        input = output
        output = "output-" + i.to_s
    else:
        H = Pig.fs("mv " + output + " " + final)
        break

Architecture

Functional Viewpoint

Part one of this design is to add macro expansion to Pig Latin. The second part of the proposal is to embed Pig Latin scripts in host scripting languages via a JDBC like compile, bind, run model. Pig will pass the control flow script to the host language for processing. This in turn will end up producing sets of data flows which will be passed to a new Pig object.

An example script would look like:

p = Pig.compile("""A = load '$in';
                   C = group A by user;
                   D = foreach C generate group, myUDF(A);
                   store D into '$out';
                   F = group D all;
                   G = foreach F generate MAX(D);
                   store G into 'tmp';
                """)
error = 100.0
input = "original"
output = "output-0"
final = "final-output"

for i in range(1, 100):
    # attaches $in, $out in Pig Latin to input, output Python variables
    # all variables passed in my be strings or convertible to strings
    r = p.bind({'in':input, 'out':output})
    results = r.run()

    if results.getStatus("D") == "FAILED" or results.getStatus("G") == "FAILED":
        raise "Pig job failed"
    iter = results.getResults("G")
    # keep going until our error value has converged to under 1
    if iter.next() > 1:
        input = output
        output = "output-" + i.to_s
    else:
        H = Pig.fs("mv " + output + " " + final)
        break

Scripting Language

The scripting language chosen must be able to execute in the JVM. This allows us to package a jar with Pig (or pull it via Ivy or Maven) and run it without the user needing to download and install any packages. The candidates are Python, Ruby, Groovy, Scala, and Clojure. (Scala and Clojure are not scripting languages, though they can act like one.)

The code will be written in such a way that users can, with a minimal amount of coding, adapt it to the language of their choice. However, the Pig team will pick one language to support as a default and reference implementation.

JRuby would perhaps be a good choice, except that it is under the GPL. If I understand the license correctly, packaging it as part of Pig would mean that Pig would need to be under the GPL. That obviously is a non-starter. (Note, others have challenged this assertion on my part, saying that Pig itself would only become GPL if it modified and packaged JRuby. If there is sufficient interest in Ruby we may need to pursue this further.)

Scala is cool and powerful, but relatively unknown at this point. Similarly Groovy seems to have a small user community and relatively few available libraries.

Clojure is a functional language, a paradigm which seems to engender one of love, terror, or confusion. As such, it probably is not a good choice for Pig Latin. Also, Cascalog already exists for those who like Clojure.

This leaves Python, which has the advantages of a Java implementation, well established community, and extensive libraries. My impression is that Python also has a wider adoption and user base than Ruby. One downside is that Pig Latin has more Java like syntax (use of braces to open and close nested sections, semicolons to terminate lines) rather than Python like syntax. This mixing of multiple syntaxes may confuse users.

What Can Be Embedded

It will be possible to embed all of Pig's relational operators as well as REGISTER, DEFINE, set, and fs. REGISTER, DEFINE, set, and fs will also be accessible through static functions.

Grunt will not be supported in this setup. So Grunt specific commands such as mkdir, cd etc. will not be supported. We are in the process of deprecating these in standard Pig Latin anyway. The new fs commands will be supported.

In addition to accepting strings in the compile call, it will also be possible to read Pig Latin code from a file. The contents of this file must be valid Pig Latin. It is expected that this file will be present on the local machine. In the case of Oozie this can be accomplished by specifying it as a file to be included in files sent to the Task Node.

Converge.pig

A = load '$in';
C = group A by user;
D = foreach C generate group, myUDF(A);
store D into '$out';
F = group D all;
G = foreach F generate MAX(D);
store G into 'tmp';
error = 100.0

Converge.py

p = Pig.compileFromFile("Converge.pig")
input = "original"
output = "output-0"
final = "final-output"

for i in range(1, 100):
    r = p.bind({'in':input, 'out':output}) # attaches $in, $out in Pig Latin to input, output Python variables
    results = r.run()

    if results.getStatus("D") != "FAILED" or results.getStatus("G") =! "FAILED":
        raise "Pig job failed"
    iter = results.getResults("G")
    if iter.next() > 1:
        input = output
        output = "output-" + i.to_s
    else:
        H = Pig.fs("mv " + output + " " + final)
        break

Use of Parameters to Be Bound Later

When defining a Pig object, it will be legal to leave certain values undefined, replacing them with a parameter name. For example a user may wish to define a pipeline without yet defining the location of the input to the pipeline. The parameter will be indicated by a dollar sign followed by a sequence of alpha-numeric or underscore characters. Values for these parameters must be provided later at the time bind() is called on the Pig object. To call bind without providing values for all parameters will be an error. Parameters can be used to replace any expression in the Pig Latin script. Examples of expressions include:

Note that this is more restrictive than the current parameter substitution in Pig, which allows general string replacement anywhere in the script. The reason for the additional restriction is to enable the parser to do upfront verification. If p = Pig.compile("A = $SCRIPT") is a valid call, the parser cannot do any checking.

Parallel Execution of Similar Pipelines

A common use case is for users to run an identical pipeline across multiple input sets. For example, a user might want to run the same pipeline over data from different countries. A for loop in the host language would support this, but would result in the pipelines running serially rather than in parallel. Many users wish to run them in parallel instead. To meet this need, Pig will support binding lists of variables to the same Pig object in one bind call. Calling run on that object will then result in the script being run for each set of bound variables in parallel.

For example, say that a pipeline exists that the user wishes to run for US, UK, and Brazil, all in parallel.

p = Pig.compile("""A = load '$in';
                   B = filter A by user is not null;
                   ...
                   store Z into '$out';
                """)

# Pass a list of hashes, instead of one hash
r = p.bind([{'in':'us_raw','out':'us_processed'},
            {'in':'uk_raw','out':'uk_processed'},
            {'in':'brazil_raw','out':'brazil_processed'}])
r.run()

Macro Expansion

The existing DEFINE keyword will be expanded to allow definitions of Pig macros. A new INCLUDE keyword will be added to allow macros from other files to be inlined as well. See below for full details.

define bot_cleanser (X, user) returns Y {
    $Y = filter $X by not is_a_bot($user);
}

A = load 'fact';
B = bot_cleanser (A, 'username');
C = group B by user;
...
store Z into 'processed';

Data entry points into macros will be handled by specifying the list of in aliases in the parameter list. Data exit points from macros will be handled by specifying the list of out aliases in a returns clause. Calling a macro without having provided the proper number of parameters and output aliases will result in an error when the Pig script is compiled. Note that if a user does not wish to use some of the outputs from a macro he can simply not reference the assigned aliases again in his script. The Pig optimizer will handle removing those portions of the execution graph that do not result in a store or dump.

These macros will allow simple parameter substitution. This is not a generic substitution. These macro parameters can occur in the same places as the bind parameters defined above. In cases where these arguments represent field names (such as the example given above) there is no way to enforce the expected type of the column.

Thus a macro that does a join and then groups, counts, and stores on two separate keys would look like:

define join_group_split_count(A, B, a_join_key, b_join_key, first_group_key, second_group_key) returns X, Y {
    C = join on $A on $a_join_key, $B on $b_join_key;
    D = group C by $first_group_key;
    $X = foreach D generate group, COUNT(C);
    E = group C by $second_group_key;
    $Y = foreach E generate group, COUNT(E);
}

alpha = load 'foo' as (user, zip, url);
beta = load 'bar' as (name, age);
(gamma, delta) = join_group_split_count(alpha, beta, user, name, zip, age);
...

The new INCLUDE keyword can be used of files in Pig Latin:

bot_filter.pig

define bot_cleanser (X, user) returns Y {
    Y = filter X by not is_a_bot($user);
}

main.pig

include bot_filter.pig;

A = load 'fact';
B = bot_cleanser(A, 'username');
C = group B by user;
...
store Z into 'processed';

In the embedded case, the INCLUDE keyword can also refer to named Pig objects that are compiled elsewhere in the host language.

bot_cleanser = Pig.compile(bc, """define bot_cleanser(X, user) returns Y {
                                      Y = filter X by not is_a_bot($user);
                                  }
                               """)

main = Pig.compile("""include bc;
                      A = load 'fact';
                      B = bot_cleanser(A, username');
                      C = group B by user;
                      ...
                      store Z into 'processed';
                   """)

Named modules in the host language will take precedence over filenames when including. So if there is an include x statement in an embedded pipeline and a separate Pig object named x and a file in the current working directory named x, the Pig object will be used and not the file. If the file is desired, it can be referred to by the full path or by ./x.

Note that no namespace mechanism will be provided. All macro names are global. All code in an imported file is visible to (and potentially runnable by) the importing file.

Requirements This Drives on the Parser

The Pig class will need a way to annotate the Pig Latin with notes that the new parser can then keep and place in the AST. For example line numbers and file names, which will be completely different than the line numbers and file names the Pig Latin parser will see (since it will only see sections the constructed pipelines, not the script as a whole). It is convenient to do this in comment syntax so that we do not introduce more tokens or keywords into Pig Latin. A comment of the form

/*@@@ key=value \*/

will be taken to be an annotation. Key here will be one of a set of predefined keys that the parser will understand how to use to annotate a node in the parse tree.

Key

Meaning

linenumber

Original line number in the source file that this line occurred on.

filename

Original file this line occurred in.

origid

Original id assigned to in this line (used because ids are rewritten during macro expansion).

The parser will need to change to have a parse only mode, where it does only parsing and some very basic semantic checks to support up front error checking on the embedded Pig Latin. In this upfront mode the parser will also have to support certain uncertainties that would not be valid Pig Latin, such as the presence of unresolved parameters in the code.

Finally, the parser will need to change to support the new macro expansion feature. This will require some integration with the parameter substitution functionality, as well as keeping a symbol table of macros that have been seen.

Automatic Registration of Control Flow Functions

As of Pig 0.8 it is possible to write Pig UDFs in scripting languages that can be compiled to the JVM. Currently, this integration only exists for Jython. With relatively little work it could be added for other languages. For this to work currently the Jython functions must be located in separate .py files. As part of the embedding work Pig will automatically discover and register control flow functions located in the control flow script. For example:

def square(num):
  return ((num)*(num))

p = Pig.compile("""A = load 'foo';
                   B = foreach A generate square($0);""")
...

Dependency Management

Since the control flow will be running on the same machine as the Pig script, it will be the responsibility of the user to ensure that any modules required by the control flow script are present on that machine. In the case of Oozie, where the control flow will run on a Task Node, users will need to include any modules in the list of files Oozie needs to transport as part of the task.

Changes to Main and PigRunner

Pig's main function will now need to check to see if the script it has been handed is data flow only (that is, a Pig Latin file) or control flow. Embedded Pig Latin will only be supported in file mode, not interactive mode, so in any case except being invoked with a script file, main will know to parse the file as simple Pig Latin, as it does today. A new embedded command line option will be added to Pig that will indicate that this is an embedded script. The user can give an argument to embedded to indicate the language of the control flow script. Initially the only valid options will be j|python. If embedded is used without an argument than Jython will be used. In cases where main is provided with a script and embedded is not included in the command line, it will look for a #! line at the beginning of the file. If the basename portion of the path matches python or jython it will use the jython implementation. If main determines that the script given to it is a control flow script, it will need to invoke the appropriate scripting language. If possible, we plan to use the javax.script interface for this. If this proves unfeasible, we will design a simple interface to invoke the underlying scripting engine. Using javax.script will have the significant benefit that developers will not need to write integration code for new control flow languages beyond that needed by the new Pig.bind() call (detailed below).

The PigRunner class will not change. Oozie will use this as before to invoke Pig. The PigStats class returned by PigRunner.run will change slightly. See the PigStat's interface below for details.

Use Cases

Convergence

There are a whole class of problems that involve iterating over a pipeline an indeterminate number of times. Examples include running until values or error converge, graph traversal algorithms, etc. while and for loops in the host language along with the ability to test the output of a pipeline and either run it again or move to the next step will enable users to satisfy this use case. For example:

p = Pig.compile("""A = load '$in';
                   C = group A by user;
                   D = foreach C generate group, myUDF(A);
                   store D into '$out';
                   F = group D all;
                   G = foreach F generate MAX(D);
                   store G into 'tmp';
                """)
error = 100.0
input = "original"
output = "output-0"
final = "final-output"

for i in range(1, 100):
    r = p.bind({'in':input, 'out':output}) # attaches $in, $out in Pig Latin to input, output Python variables
    results = r.run()

    if results.getStatus("D") != "FAILED" or results.getStatus("G") != "FAILED":
        raise "Pig job failed"
    iter = results.getResults("G")
    if iter.next() > 1:
        input = output
        output = "output-" + i.to_s
    else:
        H = Pig.fs("mv " + output + " " + final)
        break

Inlining Macros

Users often decompose their Pig Latin into modules and then wish to invoke scripts from these modules. Today this is being done via the exec command. The new inline functionality will make this easier and allow argument passing. It will also allow these modules to be called without the need to store and load data in between, as is currently required by exec. Macros will be allowed to inline other macros. However, forward declarations will not be supported. That is if macro a references macro b then b will have to be defined before a. The parser will keep track of the stack of macros as it inlines in order to avoid circular references.

Note that macro inlining is not a complete replacement for function calls, as it does not support recursive calls. However, we found no examples where recursive function calls were necessary. An example of this use case is:

foo.pig

define foo(input) returns output {
    ...
}

bar.pig

define bar(input) returns output {
    ...
}

main.pig

include foo.pig;
include bar.pig;

A = load 'raw';
B = foo(A);
C = bar(B):
C = ...

Automated Pig Latin Generation

A number of user frameworks have sprung up that do automated generation of Pig Latin. Usually this is used to do things such as set the proper date for the data to be loaded, etc. An example of this use case is:

today = sys.argv[1]
p = Pig.compile("""A = load 'fact' using HowlLoader();
                   B = filter A by datestamp = '$date';
                   ...
                   store Z into 'aggregated' using HowlStorage('datestamp = $date');
                """)
r = p.bind({'date':today}).run()

Conditional Compilation

A sub-use case of automated generation is conditional code generation. For example, different processing might be required based on whether today is a weekday or a weekend day:

str = "A = load 'input';"
if today.isWeekday():
    str = str + "B = filter A by weekday_filter(*);"
else:
    str = str + "B = filter A by weekend_filter(*);"
str = str + "C = group B by user;"

Parallel Execution

Another sub-use case of automated generation is parallel execution of identical pipelines. Often users have a single pipeline that they would like to run multiple data sets through in parallel. Using the bind option with a map of variables satisfies this use case. For example, the following will run the pipeline for the US, the UK, and Brazil:

p = Pig.compile("""A = load '$in';
                   B = filter A by user is not null;
                   ...
                   store Z into '$out';
                """)

r = p.bind([{'in':'us_raw','out':'us_processed'},
            {'in':'uk_raw','out':'uk_processed'},
            {'in':'brazil_raw','out':'brazil_processed'}])
r.run()

Key Interfaces

API for Pig Class

The public interface for this functionality will be a new Java Pig class. The class will be abstract since implementations of no argument bind() will need to differ depending on the target language. A default implementation that will work for any language that can make use of Java objects will be provided that will not implement the no argument instance of bind(). The Main class will contain a map of control flow languages to subclasses of Pig to use for that language. See above for a description of how Main will choose which control flow language is being used.

Pig.java

public abstract class Pig {

    /**
     * Run a filesystem command.  Any output from this command is written to
     * stdout or stderr as appropriate.
     * @param cmd Filesystem command to run along with its arguments as one
     * string.
     * @throws IOException
     */
    public static void fs(String cmd) throws IOException {...}

    /**
     * Register a jar for use in Pig.  Once this is done this jar will be
     * registered for <b>all subsequent</b> Pig pipelines in this script.  If you wish to
     * register it for only a single Pig pipeline, use register within that
     * definition.
     * @param jarfile Path of jar to include.
     * @throws IOException if the indicated jarfile cannot be found.
     */
    public static void register(String jarfile) throws IOException {...}

    /**
     * Define an alias for a UDF or a streaming command.  This definition
     * will then be present for <b>all subsequent</b> Pig pipelines defined in this
     * script.  If you wish to define it for only a single Pig pipeline, use
     * define within that definition.
     * @param alias name of the defined alias
     * @param definition string this alias is defined as
     */
    public static void define(String alias, String definition) {...}

    /**
     * Set a variable for use in Pig Latin.  This set
     * will then be present for <b>all subsequent</b> Pig pipelines defined in this
     * script.  If you wish to set it for only a single Pig pipeline, use
     * set within that definition.
     * @param var variable to set
     * @param value to set it to
     */
    public static void set(String var, String value) {...}

    /**
     * Define a Pig pipeline.
     * @param pl Pig Latin definition of the pipeline.
     * @return Pig object representing this pipeline.
     * @throws IOException if the Pig Latin does not compile.
     */
    public static Pig compile(String pl) throws IOException {...}

    /**
     * Define a named portion of a Pig pipeline.  This allows it
     * to be included into another pipeline.
     * @param name Name that will be used to define this pipeline.
     * The namespace is global.
     * @param pl Pig Latin definition of the pipeline.
     * @return Pig object representing this pipeline.
     * @throws IOException if the Pig Latin does not compile.
     */
    public static Pig compile(String name, String pl) throws IOException {...}

    /**
     * Define a Pig pipeline based on Pig Latin in a separate file.
     * @param filename File to read Pig Latin from.  This must be a
     * purely Pig Latin file.  It cannot contain host language constructs
     * in it.
     * @return Pig object representing this pipeline.
     * @throws IOException if the Pig Latin does not compile or the file
     * cannot be found.
     */
    public static Pig compileFromFile(String filename) throws IOException {...}

    /**
     * Define a named Pig pipeline based on Pig Latin in a separate file.
     * This allows it to be included into another pipeline.
     * @param name Name that will be used to define this pipeline.
     * The namespace is global.
     * @param filename File to read Pig Latin from.  This must be a
     * purely Pig Latin file.  It cannot contain host language constructs
     * in it.
     * @return Pig object representing this pipeline.
     * @throws IOException if the Pig Latin does not compile or the file
     * cannot be found.
     */
    public static Pig compileFromFile(String name,
                                      String filename) throws IOException {...}

    /**
     * Bind a Pig object to variables in the host language (optional
     * operation).  This does an implicit mapping of variables in the host
     * language to parameters in Pig Latin.  For example, if the user
     * provides a Pig Latin statement
     * <tt> p = Pig.compile("A = load '$input';");</tt>
     * and then calls this function it will look for a variable called
     * <tt>input</tt> in the host language.  Scoping rules of the host
     * language will be followed in selecting which variable to bind.  The
     * variable bound must contain a string value.  This method is optional
     * because not all host languages may support searching for in scope
     * variables.
     * @returns an object that can be run.
     * @throws IOException if host language variables are not found to resolve all
     * Pig Latin parameters or if they contain unsupported types.
     */
    public abstract BoundScript bind() throws IOException;

    /**
     * Bind a Pig object to a set of variables.  This must be called before the
     * run if there are any variables in the Pig object.  Values must be provided
     * for all Pig Latin parameters.
     * @param vars map of variables to bind.  Keys should be parameters defined
     * in the Pig Latin.  Values should be strings that provide values for those
     * parameters.  They can be either constants or variables from the host
     * language.  Host language variables must contain strings.
     * @returns an object that can be run.
     * @throws IOException if there is not a key for each
     * Pig Latin parameter or if they contain unsupported types.
     */
    public BoundScript bind(Map<String, String> vars) throws IOException  {...}

    /**
     * Bind a Pig object to multiple sets of variables.  This will
     * cause the Pig Latin script to be executed in parallel over these sets of
     * variables.
     * @param vars list of maps of variables to bind.  Keys should be parameters defined
     * in the Pig Latin.  Values should be strings that provide values for those
     * variables.  They can be either constants or variables from the host
     * language.  Host language variables must be strings.
     * @returns an object that can be run.
     * @throws IOException  if there is not a key for each
     * Pig Latin parameter or if they contain unsupported types.
     */
    public BoundScript bind(List<Map<String, String>> vars) throws IOException  {...}

}

public class BoundScript {

    /**
     * Run a pipeline on Hadoop.  If there are no stores in this pipeline then nothing
     * will be run.  Bind must be called before this method.
     * @return a list of PigResult, one for each map of variables passed
     * to bind.
     */
    public List<PigResult> run() {
        run((Properties)null);
    }

    /**
     * Run a pipeline on Hadoop.  If there are no stores in this pipeline then nothing
     * will be run.  Bind must be called before this method.
     * @param prop Properties that Pig should set when running the script.
     * @return a list of PigResults, one for each map of variables passed
     * to bind.
     */
    public List<PigResult> run(Properties prop) {...}

    /**
     * Run a pipeline on Hadoop.  If there are no stores in this pipeline then nothing
     * will be run.  Bind must be called before this method.
     * @param prop Map of properties that Pig should set when running the script.
     * This is intended for use with scripting languages that do not support
     * the Properties object.
     * @return a list of PigResults, one for each map of variables passed
     * to bind.
     */
    public List<PigResult> run(Map<String, String> prop) {...}

    /**
     * Run a pipeline on Hadoop.  If there are no stores in this pipeline then nothing
     * will be run.  Bind must be called before this method.
     * @param propfile File with properties that Pig should set when running the script.
     * @return a list of PigResults, one for each map of variables passed
     * to bind.
     */
    public List<PigResult> run(String propfile) {...}

    /**
     * Run illustrate for a Pig pipeline.  Results will be printed
     * to stdout.  If a list of variables have been bound to this Pig object,
     * only one will be used for this illustrate.  Bind must be called before
     * this method.
     * @throws IOException if illustrate fails.
     */
    public void illustrate() throws IOException {...}

    /**
     * Explain a Pig pipeline.  Results will be printed to stdout.
     * If a list of variables have been bound to this Pig object,
     * only one will be used for this explain.  Bind must be called before this
     * method.
     * @throws IOException if explain fails.
     */
    public void explain() throws IOException {...}

    /**
     * Describe the schema of an alias in a Pig pipeline.
     * Results will be printed to stdout.
     * If a list of variables have been bound to this Pig object,
     * only one will be used for this describe.  Bind must be called before
     * this method.
     * @param alias to be described
     * @throws IOException if describe fails.
     */
    public void describe(String alias) throws IOException {...}
}

public class PigResult {

    /**
     * Get a summary status of all stores in the Pig pipeline.
     * @return ExecJob.JOB_STATUS enum.  COMPLETED indicates that all
     * stores in the script finished successfully, FAILED indicates that
     * at least one (not necessarily all) failed.
     */
    public ExecJob.JOB_STATUS getSummaryStatus() {...}

    /**
     * Get the status of a particular store in a Pig pipeline.
     * @param alias Name of alias that was stored
     * @return ExecJob.JOB_STATUS enum indicating status of the store or
     * null if there is no such alias.
     */
    public ExecJob.JOB_STATUS getStatus(String alias) {...}

    /**
     * Get status of the first store in a Pig pipeline.  This method should
     * only be used in cases where there is a single store in the pipeline.
     * @return ExecJob.JOB_STATUS enum indicating status of the store
     */
    public ExecJob.JOB_STATUS getStatus() {...}

    /**
     * Get statistics from a particular store in a Pig pipeline.
     * @param alias Name of the alias that was stored.
     * @return PigStats object or null if there is no such alias.
     */
    public PigStats getStatistics(String alias) {...}

    /**
     * Get statistics from the first store in a Pig pipeline.  This method should
     * only be used in cases where there is a single store in the pipeline.
     * @return PigStats object
     */
    public PigStats getStatistics() {...}

    /**
     * Open an iterator to read results from a store.
     * @param alias Name of the alias that was stored.
     * @return iterator of tuples from this store or
     * null if there is no such alias.
     */
    public Iterator<Tuple> getResults(String alias) {...}

    /**
     * Open an iterator to read results from the first store.  This method should
     * only be used in cases where there is a single store in the pipeline.
     * @return iterator of tuples from the first store.
     */
    public Iterator<Tuple> getResults() {...}

}

Changes to PigStats

Since the call PigRunner.run() will now be returning results for both embedded and non-embedded executions of Pig, the PigStats class will need to change to support both cases. PigStats will become an abstract class with the following new methods:

    /**
     * Determine if this run of Pig was embedded in a control flow script.
     */
    public abstract boolean isEmbedded();

   /**
    * Get detailed information about all of the jobs run in this control flow.
    * @returns If isEmbedded() == false this will return null.  Otherwise it will
    * return a structure with exhaustive detail of all data flows executed by the
    * control flow.  It is a map of the name of the Pig Latin script to a list of
    * results.  The name is the name (optionally) provided during the compile step
    * (see @link Pig#compile(String, String)).  Pig Latin scripts compiled without
    * a name will be stored under a unique ID created by Pig at runtime.
    * Each name has a list because it is possible to run the same script multiple
    * times (either as the result of a loop in the control flow or because of
    * the parallel bind option).  Each entry in the list is a PigStats object,
    * with all of the details for that job run.
    */
   public Map<String, List<PigStats>> getAllStats();

   /**
    * Will return a list of error messages from all Pig Latin
    * scripts run in this control flow.
    */
   public List<String> getAllErrorMessages() {...}

Two new classes will be added that subclass PigStats: SimplePigStats and EmbeddedPigStats. PigStats as it is today will become SimplePigStats. SimplePigStats.getAllStats() will return null.

EmbeddedPigStats will return the following for PigStats methods:

Method

Return

isSuccessful

boolean AND of isSuccessful on each PigStats contained in getAllStats()

getReturnCode

-1 if any return codes of all the PigStats contained in getAllStats() is negative, 1 if any is positive, else 0

getErrorMessage

first error message from getAllErrorMessages()

getErrorCode

null

getPigProperties

null

getJobGraph

null

getOutputLocations

null

getOutputNames

null

getNumberBytes

sum of calling getNumberBytes() on every PigStats contained in getAllStats()

getNumberRecords

sum of calling getNumberRecords() on every PigStats contained in getAllStats()

getOutputAlias

null

getSMMSpillCount

sum of calling getSMMSpillCount() on every PigStats contained in getAllStats()

getProactiveSpillCountObjects

sum of calling getProactiveSpillCountObjects() on every PigStats contained in getAllStats()

getProactiveSpillCountRecords

sum of calling getProactiveSpillCountRecords() on every PigStats contained in getAllStats()

getBytesWritten

sum of calling getBytesWritten() on every PigStats contained in getAllStats()

getRecordsWritten

sum of calling getRecordsWritten() on every PigStats contained in getAllStats()

getHadoopVersion

same as now

getPigVersion

same as now

getScriptId

null

getFeatures

null

getDuration

MAX(endTime of all PigStats contained in getAllStats()) - MIN(startTime of all PigStats contained in getAllStats())

getNumberJobs

sum of calling getNumberJobs() on every PigStats contained in getAllStats()

getOutputStats

null

getInputStats

null

TuringCompletePig (last edited 2010-12-15 19:25:51 by AlanGates)