Pig Streaming Functional Specification

Streaming can have three separate meaning in the context of Pig project:

  1. A specific way of submitting jobs to Hadoop: Hadoop Streaming
  2. A form of processing in which the entire portion of the dataset that corresponds to a task in sent to the task and output streams out. There is no temporal or causal correspondence between an input record and specific output records.
  3. The use of non-Java functions with Pig.

The goal of Pig with respect to streaming is to support #2 for (a)Java UDFs, (b)non-Java UDFs and (c)user specified binaries/scripts. We will start with (c) since it would be most beneficial for the users. It is not our goal to be feature-by-feature compatible with Hadoop streaming as it is too open-ended and might force us to implement features that we don't necessarily want in Pig.

This document proposes the functional specification to fulfill 2.c

Feature Specification

1 Computation Specification

1.1 Ability to specify streaming via binary/script.

A = load 'data';
B = stream A through `stream.pl`;

1.2 Ability to specify parameters to streaming

A = load 'data';
B = stream A through `stream.pl -n 5`;

1.3 Separation of streaming specification from stream operator.

For simple specifications, it is convenient for the user to be able to specify the computation directly within streaming operator:

A = load 'data';
B = stream A through `stream.pl`;

However, for more complex specifications involving serializer/deserializer or input/output via file, the user will be required to use define command:

<define command> ::= define <alias> <computation spec>
<alias> ::= pig identifier
<computation spec> ::= <UDF spec> | <command spec>
<UDF spec> ::= pig standard function spec
<command spec> ::= `<command>` [<input spec>] [<output spec>] [<ship_spec>] [<cache_spec>]
<command> ::= standard Unix command including the arguments
<input spec>::= input (<input stream spec> [using <serializer>]{, <input stream spec> [using <serializer>]})
<output spec>::= output (<output stream spec> [using <deserializer>]{, <output stream spec> [using <deserializer>]})
<ship spec>::=ship(<file spec>{,<file spec>})
<cache spec>::=cache(<cache file spec>{,<cache file spec>})
<input stream spec> ::= stdin | <file spec>
<output stream spec> ::= stdout | <file spec>
<cache file spec>::='<dfs file spec>#<dfs file name>'
<file spec> ::= unix file path enclised in single quotes
<dfs file spec> ::= path specification of the distributed file system
<dfs file name> ::= file name on the distributed file system
<serializer> ::= <udf spec>
<deserializer> ::= <udf spec>

Note that we use backticks to enclose the streaming command. This is because we want to reserve quotes for use within the command without need to escape. Backticks should be reasonably intuitive for a user since in Unix they are associated with notion of "execute this".

Given the definition above, the stream command will look as follows:

<stream command>::= <alias>{,<alias>} = stream <alias>{,<alais>} through <streaming spec> [<schema spec>]
<streaming spec>::= <alias> | <UDF spec> | <command>
<schema spec>:: standard pig schema spec like as (x,yz)

Within stream operator, If streaming specification is enclosed in backticks, it is assumed to be a streaming command specified inline. Otherwise, the string is looked up in the alias hash and if found assumed to be an alias. Otherwise, an error is reported.

Note that left hand side of the streaming command allows multiple aliases. This is needed for the case of streaming creating multiple outputs and will not be supported in the initial implementation. Also note that both define and stream operators refer to UDF specs. This is to accommodate streaming UDFs and will not be supported in the initial release.

A = load 'data';
define cmd `stream_cmd -input file.dat`
S1 = stream A through cmd;

More examples in the later sections.

1.4 Data Guarantees

The guarantees made to the user will be determined based on the position of the streaming operator in the pig script. The following types will be supported:

In addition to position, the data grouping and ordering can be determine by the data itself. For now, users would need to know the property of the data to be able to take advantage of its structure; however, eventually, this should be part of metadata.

Example 1: The data given to streaming is unordered

A = load 'data';
B = stream A through `stream.pl`;

Example 2: The data is grouped

A = load 'data';
B = group A by $1;
C = foreach B flatten(A);
D = stream C through `stream.pl`

Example 3: data is grouped and ordered

A = load 'data';
B = group A by $1;
C = foreach B {
    D = order A BY ($3, $4);
    generate D;
stream C through `stream.pl`

1.5 Chaining Streaming Operations

Multiple streaming operators can appear in the same script. They can be adjacent to each other or have other operations in between.

2 Job Management

2.1 Ability to ship computation and support data

We need to be able to ship streaming binary and supporting files, if any, from the client box to the computation nodes.

A user can specify the files to ship via ship clause in the define. For instance,

define X `stream.pl foo.txt` ship('stream.pl', 'foo.txt')

Note that Pig would not automatically ship any dependecies. It is the responsibility of the user to specify all the dependencies explicitely and also make sure that the software that the processing relies on such as, for instance, perl or python is installed on the cluster. The files will be shipped to the task's current working directory and only relative paths to them should be specified. Any pre-installed binaries are expected to be specified in the PATH.

Also, please, note that only files, not directories can be specified in ship statement. One way to work around this limitation is to tar all the dependencies into a tar file that accurately reflects structure needed on the compute nodes, then have a wrapper for your script that untars the dependencies prior to execution. In the future versions, we should support this functionality behind the scene.

If ship and cache options are not specified, pig will attempt to ship the binary in the following way:

Note that we need to make sure that executables retain their permissions and can be executed on the compute nodes.

If the user does supply a DEFINE for a given streaming command, then the above 'auto-shipping' is turned off.

Note that the ship spec has 2 facets: the source (provided in the ship() clause) is the user's view of his machine, while what is specified as the command is the view on the actual cluster. The user has to be aware of these two separate views. The _only_ guarantee offered to the user is that the shipped files are available is the current-working-directory of the launched job and that his cwd is also on the PATH environment variable.

Thus important points to keep in mind:

1. If Pig determines that it needs to auto-ship an absolute path e.g.

OP = stream IP through `/a/b/c/script`;


OP = stream IP through `perl /a/b/c/script.pl`;

it will not ship it at all since there is no way to ship files to the necessary location (lack of permissions etc.).

A few other points to keep in mind with regards to auto-ship:

Pig will also not auto-ship files in the following system directories (this is determined by executing 'which <file>' command)

Also to autoship, the file in question should be present in the PATH. So if the file is in the current working directory then the current working directory should be in the PATH.

2. It is safe only to ship files to be executed from the cwd on the task on the cluster:

OP = stream IP through `script`;


DEFINE CMD `script` ship('/a/b/script');
OP = stream IP through CMD`;

Shipping files to relative paths or absolute paths is undefined and mostly will fail since users might not have permissions to read/write/execute from arbitraty paths on the actual clusters.

2.2 Ability to cache data

The approach described above works fine for binaries/jars and small data sets. For larger datasets, loading them at run time for every execution can have serious performance consequences.

Similarly to 2.1, a user will be able to specify cached files via cache clause in the define statement. For instance,

define X `stream.pl foo.gz` ship('stream.pl') cache('/input/foo.gz#foo.gz')

Pig will not ship the files specified in the cache spec but would expect the files to be available on the compute nodes in the specified location. The name that follows '#', indicates how the user should refer to the cached file in the script.

Similarly to ship, cache only allows to specify individual files and not directories.

2.3 Execution guarantee

The streaming binary/script will be invoked *ONLY* if there is any input to send to it. If there is no input it will not be invoked.

3 Input/Output Handling

3.1 Data serialization/deserialization

Serialization/deserialization is needed respectively to convert data from tuples to a stream to pass to the streaming application and converts streaming application output back to tuples.

By default, the data going into the streaming command and the one coming out is assumed to be tab delimited.

S = stream A through `stream.pl`;

In the example above, default serialize (PigStorage) is used that takes tuples out of A and converts them into tab delimitted lines that are passed to stream.pl. The output of streaming is processed by default deserializer (PigStorage) one line at a time and split on tabs.

The user would be able to provide an alternative delimiter to default (de)serializer via define command:

define X `stream.pl` input(stdin using PigStorage('^A')) output (stdout using PigStorage('^A'));
S = stream A through X;

User will be able to provide custom serializer and deserializer. This would look something like example below and is similar to load/store commands. In fact, load function is identical to the deserializer and store to serializer.

define X `stream.pl` input(stdin using MySerializer) output (stdout using MyDeserializer);
S = stream A through X;

In addition to PigStorage the following serializers/deserializer will be part of pig distribution:

  1. BinarySerializer, BinaryDeserializer - treats the entire file as byte stream - no formating or interpretation.

Each deserializer will be implementing LoadFunc interface. Each serializer will be implementing StoreFunc interface.

3.2 Ability to declare schema for streaming output

Just like the case with load, rather than relying on the position, the user can give names to the output columns:

S = stream A through `stream.pl` as (a, b ,c);

3.3 Ability to let streaming read from file on disk

This is useful in two cases:

  1. To use existing binaries that expect input in a file rather than stdin.
  2. To provide multiple inputs.

In the initial release, we would only address the first case. Support for multiple inputs would be delayed till the later versions.

In case of a single input coming from the file the command would look as follows

define Y `stream.pl foo` input('foo' using MySerializer) output (stdout using MyDeserializer);
Z = stream X through Y;

This statement will cause the content of X to be written to file foo in the task's working directory using MySerializer.

3.4 Ability to write output(s) to the disk

The motivation here is similar to the ability to read data from disk described in 3.3:

  1. To accommodate existing applications that write output to the disk rather than stdout.
  2. To provide ability to create multiple outputs.

In the initial release, only limited support for multiple outputs would be provided as described below.

To accommodate the first case, the following command can be used:

define Z `stream.pl outputfile` output('outputfile' using MyDeserializer);
Y = stream X through Z;

This tells pig that streaming application stored its complete output into file called outputfile in the tasks's working directory and that the content of that file should be deserialized into Y using MyDeserializer.

A user can specify multiple outputs but only the first one will be automatically loaded; the rest would be stored in dfs using the file name specified in the output spec:

define A `stream.pl` output('output1', 'output2' using MyDeserializer);
Y = stream X through A;

In this example, output1 will be loaded into Y while the second output will be stored as a subdirectory with name output2 in the output directory specified for pig script.

4 Non-streaming Features

This section describes a set of features that are not directly related to streaming but would be very useful in the context of streaming.

4.1 Logging/Debugging/Error handling

4.1.1 Logging

Users will have control over handling of stderr of their streaming application by requesting the stderr is stored in DFS both for successful and failed jobs. This is done by adding stderr spec to the streaming command declaration:

define CMD `stream.pl` stderr('<dir>' limit 100)  

In this case, the streaming stderr will be stored in _logs/<dir> directory in the job's output directory. Note that the same Pig job can have multiple streaming applications associated with it. It would be up to the user to make sure that different names are used for this to avoid conflicts by passing the right directory to the stderr spec.

Pig would store logs of upto 100 tasks per streaming job in this location (so it's 100*4 = 400 logs assuming 4 retries per task). The limit is imposed to make sure that we don't create a large number of small files in HDFS and waste space and name node resources. The user can specify a smaller number via limit keyword in the stderr spec.

define CMD `stream.pl` stderr('CMD_logs' limit 100)  

The logs would only contain stderr information from the streaming application. The content will include a header and a footer. The header will include task name, start time, input size, input file and input range if available. The footer will contain result code, end time, and outputs' sizes.

4.1.2 Error Handling

If streaming application fails, pig will report the error including return code and any stderr reported by the streaming application.

If available, it would also report on the amount of data consumed so far.

4.2 Ability to templatize pig scripts

See https://issues.apache.org/jira/browse/PIG-58 for details

4.3 Ability to processing binary data

Sometimes, applications need to consume the entire data file without any parsing. In those cases applications can specify the split by 'file' option to the LoadFunc being used, further they can use BinaryStorage to specify that they do not want Pig to parse data at all and hence directly get the raw data.

A = load 'data' using BinaryStorage() split by 'file';
B = stream A by `stream.pl`

Note: BinaryStorage *needs* the split by 'file' option to be specified, else it isn't guaranteed to work.

4.4 Working with scalars

Several users need to have a way to compute a scalar using pig and use it in later computations.

They want to be able to do something like this:

A = load 'data1';
B = group A all;
D = load 'data2';
E = foreach D generate $1/C;

This does not work in pig because C is a relation not a scalar. Proposed short term solution is to do the following:

A = load 'data1';
B = group A all;
C = foreach B generate COUNT(B);
store C into 'count';
D = load 'data2';
E = foreach D generate $1/GetScalar(C);

GetScalar is a udf that reads the file on the first invocation and produces its content in the form of a value. This UDF will be provided as part of pig distribution till better solution is available.

5 Performance

We should have a performance target in mind as compared to Hadoop streaming. I think for the initial release it would make sense to aim for 30% overhead for streaming in Pig.

5.1 Load/Stream and Stream/Store optimizations

In cases where the STREAM operator immediately follows the LOAD or where it directly precedes the STORE operator, and given that they have the same LoadFunc/StoreFunc specifications Pig will try and optimize away the interpretation of data in the LoadFunc/StoreFunc (i.e. need to breakup raw input into Tuples) by substituting the equivalent {Load|Store}Funcs for BinaryStorage. For the LOAD/STREAM case the caveat is that this is feasible only when individual tasks are processing all of the data in the given input file (i.e. the split by 'file' option is specified to the LOAD operator). *As the result, for optimization to take place split by 'file' must be specified on the load statement.*

E.g. Pig will optimize:

IP = load 'data' split by 'file';
OP = stream IP through `myscript`;
store OP into 'output';


define CMD `myscript` input(stdin using BinaryStorage()) output(stdout using BinaryStorage());
IP = load 'data' using BinaryStorage() split by 'file';
OP = stream IP through CMD;
store OP into 'output' using BinaryStorage();


IP = load 'data' using PigStorage(',') split by 'file';
OP = stream IP through `myscript`;
store OP into 'output';

cannot optimize the LOAD/STREAM pair since they have different LoadFuncs (load has PigStorage(',') and stream has PigStorage()). The STREAM/STORE pair will be optimized to use BinaryStorage.

See performance numbers for the initial release at http://wiki.apache.org/pig/PigPerformance.


Streaming is fully supported in the local mode. There is one thing to keep in mind:ship and cache options are not used; Instead, the binaries and the supporting data must be accessible from the directory in which streaming command is executed.

PigStreamingFunctionalSpec (last edited 2009-09-20 23:38:09 by localhost)