Changes to Pig Being Done on the Types Branch

As of 1/12/09, types branch has been merged with trunk and all the code is now available on trunk. The old version of the trunk has been saved in the pretypes branch.

Objective

A number of pig developers having spent the past six months reworking the pig execution pipeline and adding types support. This work has been done on the types branch rather than in trunk. This work was done in a branch because the changes were very destabilizing. That work is now near completion. At some point in the near future the changes will be merged into the trunk.

These changes add a number of new features to pig. They are also not fully backward compatible. This document describes the new features, calls out areas of incompatibility, and provides users with information on what they will need to change in their Pig Latin scripts and UDFs in order to work with the new version of pig.

For the purposes of this document the work done on the types branch will be referred to as Pig 0.2.0. The intention is to release these changes at that revision number.

New Features and Extensions

This section covers new features or changes/enhancements to the existing ones and the work required from users to adopt to these changes.

Addition of Types

Pig 0.1.0 supported a single basic type - unicode String.

Pig 0.2.0 supports the following new types:

Types in the load statement

The syntax of the AS clause has been extended to include type information:

A = LOAD 'data' USING PigStorage() AS (name, age: long, gpa: float);

The original syntax can still be used for the entire AS statement or parts of it. If type specification is omitted for a particular field, it is assumed to be of type bytearray.

AS can also be used to define complex structures in your data such as tuples, bags, or maps.

A = LOAD 'data' USING MyStorage() AS (T: tuple(name, age: long, gpa: float), B: bag{tuple(number: int)}, M: map[]); 
...

Words tuple, bag, and map in the type declaration are optional. Also, in Pig 0.2.0, bags can only contain tuples. This might change in the future. Finally, note that the map does not take names or types. The key of the map must be a scalar object (no tuples, maps, or bags) and the value can be of any type.

Users are encouraged to declare the type of the data whenever possible. This will result in better parse time error checking as well as more efficient execution.

Constants

Pig 0.2.0 provides constant representation for scalar and complex types.

String (chararray) constants can be specified as ASCII characters (e.g. x MATCHES 'abc' ).

Any numeric constant consisting of just digits (e.g 123) is assigned the type of int. To specify a long constant, l or L must be appended to the number (e.g. 12345678L). If the l or L are not specified but the number is too large to fit into an int the problem will be detected at parse time and the processing will be terminated.

Any numeric constant with decimal point (e.g. 1.5) and/or exponent (e.g. 5e+1) is treated as double unless it ends with f or F in which case it is assigned type float (e.g. 1.5f).

The complex constants are represented as shown below:

{(1, 5, 18)} - creates a bag
('john', 25, 5.6f) - creates a tuple
['john'#25, 'dan'#18, 'alice'#20] - creates a map

Note that bags must contain tuples; tuples can contain any data; keys of the map must be of scalar type and values can be of any type.

Complex constants can be used in the same places scalar constants can be used: in filters and generate commands:

A = LOAD 'data' USING MyStorage() AS (T: tuple(name:chararray, age: int));
B = FILTER A BY T == ('john', 25);
D = FOREACH B GENERATE T.name, [25#5.6], {(1, 5, 18)};
...

Cast

In Pig 0.1.0 all scalar data fields were represented as chararray and were cast to double for any arithmetic computations.

With addition of types into 0.2.0, Pig now has a much richer set of implicit casts. It also introduced explicit casts.

The table below outlines allowed casts. The casts that are not supported will result in parse time error.

from / to

bag

tuple

map

int

long

float

double

chararray

bytearray

bag

error

error

error

error

error

error

error

error

tuple

error

error

error

error

error

error

error

error

map

error

error

error

error

error

error

error

error

int

error

error

error

yes

yes

yes

yes

error

long

error

error

error

yes

yes

yes

yes

error

float

error

error

error

yes

yes

yes

yes

error

double

error

error

error

yes

yes

yes

yes

error

chararray

error

error

error

yes

yes

yes

yes

error

bytearray

yes

yes

yes

yes

yes

yes

yes

yes

Implicit casts are used when the declared/default type of the data does not match the operation in which it is used. For instance,

A = LOAD 'data' AS (a: int, b: float);
B = foreach A generate a + b;

In this case, type promotion is used and a is implicitly cast to float.

A = LOAD 'data' AS (a, b);
B = foreach A generate a + b;

In this case, since type of either operator is not declared the default of bytearray is used. In this case both operands are implicitly cast to double to preserve compatibility with previous versions.

The following case is a little more tricky:

A = LOAD 'data' AS (a, b);
B = foreach A generate a + 1, b + 1;

Because the type of b is not specified, the choice of the operation will be based on the constant and will be selected as integer. If field b contains, for instance, float values, they will be truncated. This is the change from 0.1.0 version of Pig where data was always promoted to double for arithmetic operations.

Input data:
1, 1.5
2, 2.5

Pig 0.1.0:
2.0, 2.5
3.0, 3.5

Pig 0.2.0:
2, 2
3, 3

There are a couple ways to address this issue. First and the easiest is to declare the correct schema of the input. Second is to use constants of the appropriate type:

A = LOAD 'data' AS (a, b);
B = foreach A generate a + 1, b + 1.0;

Third is to use explicit cast:

A = LOAD 'data' AS (a, b);
B = FOREACH A GENERATE a + 1, (float)b + 1;

0.1.0 Operations

Addition of types required complete rewrite of Pig operations. The detailed discussion is available at http://wiki.apache.org/pig/PigTypesFunctionalSpec#Expression_Operators. This section describes the changes with the highest impact for the users.

A general rule is followed that a result of any arithmetic computation is of the same type as the operands. For instance, a division or multiplication of 2 integers result in an integer value.

Since in the 0.1.0 version of Pig, there were no numeric constants, string constants that contained numbers could be used as numbers, for example: = filter a by $1 > '5'=. Now '5' will be interpreted as a chararray with the character 5, not as the number 5. In this particular case, lexigraphical comparison rather then numeric one will be applied which might not be what you want.

In Pig 0.1.0 and earlier, the following comparison operators <, >, <=, >=, !=, = used to force an operation to numeric, while lt, gt, lte, gte, neq, eq used to force an operation to string. Now lt, et. al. are deprecated but allowed and they no longer force the operation to a string comparison. <, et. al. are applied to the data types as they are, without forcing it to numeric. So $1 > '5' now compares $1 to '5' as a chararray, not as an integer. To compare $1 to the number 5, the syntax now will be: $1 > 5

Having different types in the equality comparison in a FILTER can cause unexpected results. Consider the following code:

A = LOAD 'data' AS (a: float, b: int);
B = FILTER A by a == 1.42;

In this case, a float column is compared to a double constant which causes the float to be promoted to a double. Unfortunately, Java promotion does not preserve the exact value and as the result the comparison will fail even when a contains 1.42. The solution is to avoid promotion by marking the constant as also float:

A = LOAD 'data' AS (a: float, b: int);
B = FILTER A by a == 1.42f;

Alternatively, you could declare the column to be double but that would be less efficient.

Similarly, in the 0.1.0 version of Pig, the following expression was valid:

A = LOAD 'data' AS (a, b);
B = FORAECH A generate a + '1.42';

This would cause both a and 14.2 to be converted to double prior to the addition. In Pig 0.2.0 this would produce an error since addition is not defined for string and '1.42' is now interpreted as a sting constant. The code above should look as follows in Pig 0.2.0:

A = LOAD 'data' AS (a, b);
B = FORAECH A generate a + 1.42;

A % operator has been added that computes modulo

A = LOAD 'data' AS (a: int, b: int);
B = FORAECH A generate a%b;

NULL Support

Pig 0.2.0 introduces the notion of NULL values. NULLs have SQL notion of missing data. NULLs can naturally occur in the data or can be result of an operation. If the NULLs are part of the data, it will be responsibility of a Load function to correctly handle that. See discussion later.

The following operations can results in NULL values:

A = LOAD 'data' AS (a, b);
B = foreach A generate b;

In this case, if some rows of the data contain only a single column, a NULL will be injected into B for this rows.

Similarly, for loads without declared schema, NULLs will be injected if the columns are missing.

A = LOAD 'data';
B = foreach A generate $2;

However, in the following case, an error will be generated:

A = LOAD 'data' AS (a, b);
B = foreach A generate $3;

This is because the requested field is outside of the declared schema. This is a change compared to earlier Pig versions.

The table below describes how different operators interact with NULLs:

Operator

Interaction

Comparison operators

If either sub-expression is null, the result of the equality comparison will be null

Matches

If either the string being matched against or the string defining the match is null, the result will be null

Is null

Returns true if the tested value is null

Arithmetic operators, concat

If either sub-expression is null, the resulting expression is null

Size

If the tested object is null, size will return null

Dereference of a map or tuple

If the dereferenced map or tuple is null, the result will be null.

Cast

Casting a null from one type to another will result in a null

Aggregates

Built-in Aggregate functions will ignore nulls, just as they do in SQL. However, user defined aggregates are free to handle nulls the way the see fit

Boolean expressions that result from comparison or matches operators can only appear in a FILTER operator and if they result in a null value, the filter does not pass them through. (Note that if X is NULL both X and <nop>!X are null and would both be rejected by a filter.)

A = LOAD 'data' AS (name, age: int, gpa: float);
B = FILTER A by age > 30;
dump B;

In this case, records with NULL value of age will not be dumped.

To test for null values, is {not} null construct can be used.

A = LOAD 'data' AS (name, age: int, gpa: float);
B = foreach A generate (gpa is null? 0 :gpa) ;

In this case, B will contain value of 0 if value of gpa was missing from the data.

You can also use is {not} null in a filter:

A = LOAD 'data' AS (name, age: int, gpa: float);
B = filter A by age is not null and gpa is null;

B' will retain all the entries where 'age is specified while the gpa is missing.

Loaders maintained by Pig such as PigStorage have been updated to produce NULL wherever data is missing. This means that the code below that worked with 0.1.0

A = LOAD 'data' AS (name, age, gpa);
B = FILTER A by name neq '';
.....

will no longer produce correct results since empty string have been interpreted as NULL values. The following code should be used instead:

A = LOAD 'data' AS (name, age, gpa);
B = FILTER A by name is not null;
.....

Relational Operators

ORDER BY Extensions

In Pig 0.1.0 only ascending order was supported. With 0.2.0 you can specify desired order on per column basis.

A = LOAD 'data' AS (name, age: int, gpa: float);
B = ORDER A BY age, gpa desc;
...

The default order is ascending. In the example above the data would be sorted in ascending order of age and in descending order of the gpa.

The other change for ORDER BY is that the declared type of the data is used. In the example above, numeric ordering will be used for both columns. If the type of the columns is not specified, the lexigraphical order is used the same way it was used in 0.1.0.

Limit

A new LIMIT operator is introduced that limits the number of output tuples produced.

A = LOAD 'data' AS (name, age: int, gpa: float);
B = limit A 100;
...

In the example above, the output will consist of 100 of rows of A if A is large enough or the number of rows in A if A is smaller than 100 rows. Note that there are no guarantees on which rows are returned and the rows can change from one run to the next. Also, this is not a well distributed random sample.

A particular set of tuples can be requested by specifying ORDER BY operator before LIMIT;

A = LOAD 'data' AS (name, age: int, gpa: float);
B = ORDER A BY age, gpa desc;
C = limit B 100;
...

In the example above first 100 rows of B would be returned in the order of B.

Note that LIMIT operator is optimized and in most cases will run more efficiently than the identical query without the limit. It is always a good idea to use limit if you can.

Grunt Improvements

Command History and Completion

Using up and down arrows now allows the users to navigate their command history similarly to most Unix shells.

Also, TAB key can be used to auto complete Pig's reserved words as well as builtin functions. The user can also provide their own autocompletion list to, for instance, add his/her UDFs to the tab completion list. To do so, the user can create a file called autocomplete in the directory where (s)he run pig command. The file should contain all the names of the UDFs that the user uses. Note that the name are case sensitive:

UPPER
LOWER
....

Currently, autocompletion is not available for file system navigation.

Returning correct line number for syntax errors

In the 0.1.0 version of Pig, the error message resulting from a syntax error was misleading always saying that the error is on the line #1. This has been addressed in Pig 0.2.0 where the correct line number in used in the message.

Performance improvements

Several performance improvements have been made in 0.2.0.

Most of this improvements are transparent to users; however, some requires user intervention/understanding.

We have also made more aggressive use of the combiner. In Pig 0.2.0, the combiner is invoked in the following cases:

...
Y = group X ...;
Z = foreach Y generate simple_project || algebraic ...
...

In the example above, simple_project is just a projection of one of the columns of Y (group or X in this case), and algebraic is an algebraic UDF or a set of UDFs. group key can be omitted from the projection

The following queries will all use combiner:

....
Y = group X by $0;
Z = foreach Y generate group, COUNT(X), SUM(X);
....
Y = group X by $0, $1;
Z = foreach Y  generate flatten(group), COUNT(X);
....
Y = group X all;
Z = foreach Y generate COUNT(X);
....
Y = group X by $0;
Z = foreach Y generate COUNT(X), group;
....

Cogroups and queries with non-algebraic UDFs do not yet use the combiner.

Builtins Update

Arithmetic UDFs

In the 0.1.0 version of Pig, the numeric values passed in and out of UDFs were double. With pig 0.2.0, the real type of the input data can be used to make operations more efficient. To take advantage of this, the data types must be declared in the AS clause of the load or the inputs into the UDF must be cast to the appropriate type.

PigStorage Extension

In Pig 0.2.0, PigStorage UDF has been extended to handle complex data types. On the load side, fields containing tuples, bags, and maps will be recognized and processed accordingly. Similarly, on the store side, complex fields will be serialized using the format below. The same format is expected on the load side as well:

Bag: {(tuple1),(tuple2),...,(tupleN)}
Tuple: (type1, type2,...,typeN)
Map: [key1#value1,key2#value2,...,keyN#valueN]

Changes to PigDump

There are a couple of formatting changes to PigDump. First, the output will no longer generate spaces between the fields as it is hard to tell them apart from now supported NULL values when the data is read back in.

In the 0.1.0 version of Pig, the data would be produced in the following format:

john smith, 39, 3.81
bob jones, 22, 2.01

In 0.2.0, the same data will look like:

john smith,39,3.81
bob jones,22,2.01

Also, if a result of operation is a long or a float than the value will be appended with L or f respectively.

A = LOAD 'data';
B = group A all;
C = foreach B generate COUNT(A);
store C into 'output' using PigDump();

The result will be 100L.

Since PigDump is used by dump command, the changes above affect the dump command.

New UDFs

SIZE function was added that can be used to get the size of any data type. It replaces ARITY which is still valid but deprecated. The following table describes what SIZE returns for each type:

Type

Returns

bag

Number of tuples in the bag

tuple

Number of fields in the tuple

map

Number of keys in the map

chararray

Number of characters in the chararray

bytearray

Number of bytes in the bytearray

int

1

long

1

float

1

double

1

CONCAT function was added. It concatenates two chararray or bytearray values.

COUNT and NULLs

In Pig 0.2.0, COUNT actually counts NULL values. This is compatible with 0.1.0 version that did not recognize NULLs; however, this is not consistent with SQL treatment of NULLs. We might choose to fix that in the following versions of the software especially once we start supporting SQL. This also effects AVG as it is computed as 'SUM(A)/COUNT(A)';

Incompatible Changes

This section describes the changes that require user attention in order to make their code and UDFs work with Pig 0.2.0.

Data Type Changes

DataAtom Class is Gone

This class is no longer available in Pig 0.2.0. Pig allows fields to use basic java types such as String, Integer, etc. directly.

Old UDF code:

public class TOLOWERCASE extends EvalFunc<DataAtom>

New UDF code:

public class TOLOWERCASE extends EvalFunc<String>

The following table describes which java type are used to represent pig types:

Pig Type

Java Type

int

java.lang.Integer

long

java.lang.Long

float

java.lang.Float

double

java.lang.Double

chararray

java.lang.String

bytearray

org.apache.pig.data.DataByteArray

map

java.util.Map<java.lang.Object, java.lang.Object>

For tuples and bag see below.

DataMap Class is Gone

DataMap has been removed and Map<Object, Object> is used instead.

Old UDF code:

public class URLPARSE extends EvalFunc<DataMap>

New UDF code:

public class URLPARSE extends EvalFunc<Map<Object, Object> >

Tuple Changes

In Pig 0.2.0, the Tuple class is no longer a concrete class but an interface. This means that tuples can't be directly instantiated but instead a factory class should be used to create one:

import org.apache.pig.data.TupleFactory;
....
Tuple t =  TupleFactory.getInstance().newTuple();

If you know the number of fields in the tuple it is beneficial to provide that in the constructor.

Here is the complete TupleFactory interface:

public interface TupleFactory {
    public static TupleFactory getInstance() {}

     /**
     * Create an empty tuple.  This should be used as infrequently as
     * possible, use newTuple(int) instead.
     */
    public Tuple newTuple() {}

   /**
     * Create a tuple with size fields.  Whenever possible this is prefered
     * over the nullary constructor, as the constructor can preallocate the
     * size of the container holding the fields.  Once this is called, it
     * is legal to call Tuple.set(x, object), where x &lt; size.
     * @param size Number of fields in the tuple.
     */
    public Tuple newTuple(int size) {}

    /**
     * Create a tuple from the provided list of objects.
     * @param c List of objects to use as the fields of the tuple.
     */
    public Tuple newTuple(List c) {}

    /**
     * Create a tuple with a single element.  This is useful because of
     * the fact that bags (currently) only take tuples, we often end up
     * sticking a single element in a tuple in order to put it in a bag.
     * @param datum Datum to put in the tuple.
     */
    public Tuple newTuple(Object datum) {}

   /**
     * Return the actual class representing a tuple that the implementing
     * factory will be returning.  This is needed because hadoop (and
     * possibly other systems) we use need to know the exact class we will
     * be using for input and output.
     * @return Class that implements tuple.
     */

    public Class tupleClass() {}
}

The Tuple interface itself has changed significantly. Here is the new version:

public interface Tuple extends WritableComparable, Serializable {

    /**
     * Marker for indicating whether the value this object holds
     * is a null
     */
    public static byte NULL = 0x00;

    /**
     * Marker for indicating whether the value this object holds
     * is not a null
     */
    public static byte NOTNULL = 0x01;

    /**
     * Make this tuple reference the contents of another.  This method does not copy
     * the underlying data.   It maintains references to the data from the original
     * tuple (and possibly even to the data structure holding the data).
     * @param t Tuple to reference.
     */
    void reference(Tuple t);

    /**
     * Find the size of the tuple.  Used to be called arity().
     * @return number of fields in the tuple.
     */
    int size();

    /**
     * Find out if a given field is null.
     * @param fieldNum Number of field to check for null.
     * @return true if the field is null, false otherwise.
     * @throws ExecException if the field number given is greater
     * than or equal to the number of fields in the tuple.
     */
    boolean isNull(int fieldNum) throws ExecException;

    /**
     * Find the type of a given field.
     * @param fieldNum Number of field to get the type for.
     * @return type, encoded as a byte value.  The values are taken from
     * the class DataType.  If the field is null, then DataType.UNKNOWN
     * will be returned.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
     */
    byte getType(int fieldNum) throws ExecException;

    /**
     * Get the value in a given field.
     * @param fieldNum Number of the field to get the value for.
     * @return value, as an Object.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
     */
    Object get(int fieldNum) throws ExecException;

    /**
     * Get all of the fields in the tuple as a list.
     * @return List&lt;Object&gt; containing the fields of the tuple
     * in order.
     */
    List<Object> getAll();

    /**
     * Set the value in a given field.
     * @param fieldNum Number of the field to set the value for.
     * @param val Object to put in the indicated field.
     * @throws ExecException if the field number is greater than or equal to
     * the number of fields in the tuple.
     */
    void set(int fieldNum, Object val) throws ExecException;

    /**
     * Append a field to a tuple.  This method is not efficient as it may
     * force copying of existing data in order to grow the data structure.
     * Whenever possible you should construct your Tuple with the
     * newTuple(int) method and then fill in the values with set(), rather
     * than construct it with newTuple() and append values.
     * @param val Object to append to the tuple.
     */
    void append(Object val);

    /**
     * Determine the size of tuple in memory.  This is used by data bags
     * to determine their memory size.  This need not be exact, but it
     * should be a decent estimation.
     * @return estimated memory size.
     */
    long getMemorySize();

    /**
     * Write a tuple of atomic values into a string.  All values in the
     * tuple must be atomic (no bags, tuples, or maps).
     * @param delim Delimiter to use in the string.
     * @return A string containing the tuple.
     * @throws ExecException if a non-atomic value is found.
     */
    String toDelimitedString(String delim) throws ExecException;

    /**
     * @return true if this Tuple is null
     */
    public boolean isNull();

    /**
     * @param isNull boolean indicating whether this tuple is null
     */
    public void setNull(boolean isNull);

DataBag Changes

Similarly to Tuple, DataBag is now also an interface and new bags are created via calls to BagFactory

import org.apache.pig.data.BagFactory;
....
Bag bag =  BagFactory.getInstance().newDefaultBag();

Here is BagFactory interface:

public abstract class BagFactory {
    public static BagFactory getInstance() {
        ...
    }

    /**
     * Get a default (unordered, not distinct) data bag.
     */
    public abstract DataBag newDefaultBag();

    /**
     * Get a sorted data bag.
     * @param comp Comparator that controls how the data is sorted.
     * If null, default comparator will be used.
     */
    public abstract DataBag newSortedBag(Comparator<Tuple> comp);

    /**
     * Get a distinct data bag.
     */
    public abstract DataBag newDistinctBag();

    protected BagFactory() {
        gMemMgr = new SpillableMemoryManager();
    }

    protected void registerBag(DataBag b) {
        gMemMgr.registerSpillable(b);
    }

DataBag classes themself are mainly unchanged. The full interface can be seen at http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?view=markup

EvalFunc Changes

exec Signature Changes

In Pig 0.1.0, the exec function had the following signature:

public void exec(Tuple input, T output) throws IOException;

In 0.2.0, the signature looks as follows:

public T exec(Tuple input) throws IOException;

The change is to return the output value rather than passing the output in a parameter. This is done to allow a function to return a NULL value.

The contract with the UDF is that for fatal errors that should terminate the entire processing, the UDF should throw an exception. For non-fatal errors, it should return null that would be recorded as null value in the output.

Lets take a look at TOLOWERCASE function that takes a string and generates a lower case version of it.

In the 0.1.0 version of Pig, it looks as follows:

public class TOLOWERCASE extends EvalFunc<DataAtom>
{
      public void exec(Tuple input, DataAtom output) throws IOException {
                String str = input.getAtomField(0).strval();
                output.setValue(str.toLowerCase());
      }
}

In Pig 0.2.0, the same function looks as follows:

1. public class TOLOWERCASE extends EvalFunc<String>
2. {
3.   public String exec(Tuple input) throws IOException {
4.        if (input `= null || input.size() =` 0)
5.            return null;
6.        String str;
7.        try {
8.            str = (String)input.get(0);
9.        } catch (Exception e){
10.          System.err.println("Can't convert field to a string; error = " + e.getMessage());
11.          return null;
12.      }
13.      return str.toLowerCase();
14.   }
15.}

There are a couple of things to note here:

  1. The initial check for null or empty Tuple (lines 4-5) is needed for all UDFs that don't provide type-specific implementations as discussed later.

  2. Catching data-related exceptions and converting them to warnings and null return (lines 9-12). Note that, for now, the warning should go to stderr. In the later versions, we will propagate a logger to the UDFs. As the case with #1, converting exceptions to nulls is not needed as discussed later.

Input Data Changes

In the 0.1.0 version of Pig, the data was always propagated to the UDFs in a form of strings. Now the default type of the data is bytearray. The main reason for this is that treating non-ascii data as string can corrupt the data. The other one is cost of converting data to a string.

As the result, if UDF expects a data of a particular type, the script needs to make sure to pass the data to it in that type. This can be done by either declaring the correct data type in the AS clause of the LOAD statement or by explicitly casting the data going to the UDF to the required type.

Bad:

A = LOAD 'data';
B = foreach A generate TOLOWERCASE($1);

Good:

A = LOAD 'data' as (name: chararray);
B = foreach A generate TOLOWERCASE(name);

outputSchema changes

If a UDF does not overwrite the outputSchema function, the output schema is assumed to be a single value of chararray type. This is compatible with the 0.1.0 version of Pig. If a UDF returns a value other than string, it will work correctly without specifying the schema but will force extra data conversion and thus be inefficient.

Here is an example of SIZE UDF:

public class SIZE extends EvalFunc<Long> {
    .....
    @Override
    public Schema outputSchema(Schema input) {
        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
    }
    ....
}

The UDF declares that its output is of type long.

Type Specific Functions

In the 0.1.0 version of Pig, since type information was not available, each UDF had to choose the type in which to perform computations. Most generic functions like Pig builtins choose to use Double for arithmetic computations. With Pig 0.2.0, type specific functions can be written to provide for more efficient computation. Below is the example of the transformation of SUM function:

Old code with some details omitted:

1.  public class SUM extends EvalFunc<DataAtom> implements Algebraic {
2.      public void exec(Tuple input, DataAtom output) throws IOException {
3.          output.setValue(sum(input));
4.      }
    ....
5.  static protected double sum(Tuple input) throws IOException {
        ....
6.  }    
7. }

New code with some details omitted:

1.  public class SUM extends EvalFunc<Double> implements Algebraic {
2.      public Double exec(Tuple input) throws IOException {
3.          try {
4.              return sum(input);
5.       } catch (ExecException ee) {
6.              IOException oughtToBeEE = new IOException();
7.              oughtToBeEE.initCause(ee);
8.              throw oughtToBeEE;
10.     }
11. }
    ....
12.  static protected Double sum(Tuple input) throws ExecException {
13.      DataBag values = (DataBag)input.get(0);

14.      // if we were handed an empty bag, return NULL (compatible with SQL)
15.      if(values.size() == 0) {
16.            return null;
17.      }
18.      double sum = 0;
19.      boolean sawNonNull = false;
20.      for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
21.          Tuple t = it.next();
22.          try {
23.                Double d = DataType.toDouble(t.get(0));
24.                if (d == null) continue;
25.                sawNonNull = true;
26.                sum += d;
27.          }catch(NumberFormatException nfe){
28.               // do nothing - essentially treat this particular input as null
29.          }catch(RuntimeException exp) {
30.               ExecException newE =  new ExecException("Error processing: " +
31.               t.toString() + exp.getMessage(), exp);
32.               throw newE;
33.          }
34.      }
35.      if(sawNonNull) {
36.          return new Double(sum);
37.      } else {
38.          return null;
39       }
40.    }

41.   public Schema outputSchema(Schema input) {
42.        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
43.   }

44.   public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
45.        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
46.        funcList.add(new FuncSpec(this.getClass().getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
47.        funcList.add(new FuncSpec(DoubleSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
48.        funcList.add(new FuncSpec(FloatSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
49.        funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
50.        funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
51.        return funcList;
52.    }
53.}

The main thing to notice here is getArgToFuncMapping method (line 44-52) that allows specification of different classes to handle different inputs. (The need for nested schema is due to the fact that the input in this case is a bag of tuples not just a tuple.) The main class handles the default case of bytearray by converting the data to double (line 23). This assures backward compatibility. Note that it treats invalid data as null. Also, it sets the output schema to Double (lines 41-43).

Below is the example of one of the type-specific classes that handles Integer arithmetic:

1. public class IntSum extends EvalFunc<Long> implements Algebraic {
2.     public Long exec(Tuple input) throws IOException {
3.       try {
4.          return sum(input);
5.       } catch (ExecException ee) {
6.          IOException oughtToBeEE = new IOException();
7.          oughtToBeEE.initCause(ee);
8.          throw oughtToBeEE;
9.       }
10.   }
    ....
11.   static protected  Long sum(Tuple input) throws ExecException {
12.        DataBag values = (DataBag)input.get(0);
13.        if(values.size() == 0) {
14.            return null;
15.        }
16.        long sum = 0;
17.        boolean sawNonNull = false;
18.        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
19.            Tuple t = (Tuple) it.next();
20.            try {
21.                Integer i = (Integer)(t.get(0));
22.                if (i == null) continue;
23.                sawNonNull = true;
24.                sum += i;
25.            }catch(RuntimeException exp) {
26.                ExecException newE =  new ExecException("Error processing: " +
27.                t.toString() + exp.getMessage(), exp);
28.                throw newE;
29.           }
30.        }
31.        if(sawNonNull) {
32.            return new Long(sum);
33.        } else {
34.            return null;
35.        }
36.    }
37.    public Schema outputSchema(Schema input) {
38.        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
39.    }    
40. }

There are a couple of things to notice here:

  1. Unlike the default class, this class does not convert the data to the specified type; instead, it already expects it to be in that format (line 21). It also does not check for null/empty input. This is because Pig takes care of this by casting the inputs to the appropriate type and handing null values if appropriate.
  2. The output schema is set to indicate that the output type is long (lines 37-39).

Report Progress

Part of Hadoop infrastructure is to determine if a job is hanging and to kill it. To avoid being killed, the program needs to periodically report its progress. Pig's infrastructure takes care of it and most UDFs do not need to report progress. However, for functions that take a very long time (more than 5 minutes) to process a single Tuple, the function needs to report periodic progress.

It can do so, by periodically calling reporter.progress() during its processing.

public class MyUDF extends EvalFunc<String>
{
     public String exec(Tuple input) throws IOException {
        if (input `= null || input.size() =` 0)
            return null;
        // do some processing
        reporter.progress();
        // some more processing
   }
}

Load/Store function changes

There are a couple of changes in Pig 0.2.0 effect load/store functions.

First, since Tuple is now an interface, the loader can't rely on read/write functions of the Tuple class to load/store the data as it would be implementation specific. Instead, each load/store function needs to provide its own implementation.

Second, load and store functions need to be type-aware. The load function is expected to do two things: (1) produce a tuple with fields of bytearray type and (2) provide conversion routines from bytearray to all other types (unless it is overriding determineSchema, for more on this see below). There are a couple of reasons to separate the two. First, since the user is not required to provide schema, Pig will be treating the data as bytearray in the absense of one. Second, a lazy conversion, is likely to be more efficient since pig might not need to convert all data, for instance, if some of it is thrown away by the filter or projected out by generate. For store it means to perform conversions from the real type of the data to the format in which it should be stored.

Below, is the example of how PigStorage looks now (with some details omitted):

1. public class PigStorage extends Utf8StorageConverter implements ReversibleLoadStoreFunc {
        ... 
2. public Tuple getNext() throws IOException {
3.     if (in == null || in.getPosition() > end) {
4.          return null;
5.     }
6.     if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
7.     mBuf.reset();
8.     while (true) {
9.        // Hadoop's FSDataInputStream (which my input stream is based
10.      // on at some point) is buffered, so I don't need to buffer.
11.      int b = in.read();
12.      if (b == fieldDel) {
13.          readField();
14.      } else if (b == recordDel) {
15.          readField();
16.          Tuple t =  mTupleFactory.newTuple(mProtoTuple);
17.          mProtoTuple.clear();
18.          return t;
19.      } else if (b == -1) {
20.           // hit end of file
21.           return null;
22.      } else {
23.            mBuf.write(b);
25.      }
26.   }
27. }
28. public void putNext(Tuple f) throws IOException {
29.     // I have to convert integer fields to string, and then to bytes.
30.     // If I use a DataOutputStream to convert directly from integer to
31.     // bytes, I don't get a string representation.
32.     int sz = f.size();
33.     for (int i = 0; i < sz; i++) {
34.         Object field;
35.         try {
36.             field = f.get(i);
37.         } catch (ExecException ee) {
38.             throw new IOException(ee);
39.         }
40.         switch (DataType.findType(field)) {
41.             case DataType.NULL:
42.                 break; // just leave it empty
43.             case DataType.BOOLEAN:
44.                 mOut.write(((Boolean)field).toString().getBytes());
45.                 break;
             ....
46.  }
47.  public Schema determineSchema(URL fileName) throws IOException {
48.     return null;
49.  }
50.  public void fieldsToRead(Schema schema) {
        // do nothing
51.  }
52. }

There are several things to note here:

  1. Use of TupleFactory (line 16) discussed earlier

  2. Type-specific handling of writing tuples (lines 40-45)
  3. Presence of determineSchema function.

  4. Presence of fieldsToRead function. This is again for future use and should be left empty in Pig 0.2.0.

  5. Extension of Utf8StorageConverter class (line 1). This is the class that performs conversion of UTF8 strings into Pig types and can be used by other loaders that work with the data stored in the same format. In the future we will be also providing similar class for data stored in a binary format.

abstract public class Utf8StorageConverter {
....
    public DataBag bytesToBag(byte[] b) throws IOException {
        Object o;
        try {
            o = parseFromBytes(b);
        } catch (ParseException pe) {
            throw new IOException(pe.getMessage());
        }
        return (DataBag)o;
    }

    public String bytesToCharArray(byte[] b) throws IOException {
        return new String(b);
    }

    ....
}

Use of describeSchema

Some load functions may know the type of data they need to create. For example, a loader loading JSON data would know the types of each datum as it creates it. If this is the case, the user need not declare the types of the data to use types. Instead, the load function can implement the describeSchema method. The parser calls this method when parsing the Pig Latin script, and hence can get the schema of the data from the load function.

One important difference for load functions implementing describeSchema is that pig does not do lazy type conversion in this situation. Instead, it expects the load function to do the type conversion at load time.

Operational Changes

In the 0.1.0 version of Pig, jobs that did not require reduce produced the output in files named map-XXXXX. With 0.2.0, the data will be stored in the files named part-XXXX. The map-only queries are the ones that don't contain GROUP/COGROUP/JOIN as well as DISTINCT and ORDER BY.