Converting UDFs from 0.1.x Pig to 0.2.x Pig

EvalFunc Changes

exec Signature Changes

In the earlier versions of Pig, the exec function had the following signature:

public void exec(Tuple input, T output) throws IOException;

In 0.2, the signature looks as follows:

public T exec(Tuple input) throws IOException;

The change is to return the output value rather than passing the output in a parameter. This is done to allow a function to return a NULL value.

The contract with the UDF is that for fatal errors that should terminate the entire processing, the UDF should throw an exception. For non-fatal errors, it should return null that would be recorded as null value in the output.

Lets take a look at TOLOWERCASE function that takes a string and generates a lower case version of it.

In the Pig 0.1.x, it looks as follows:

public class TOLOWERCASE extends EvalFunc<DataAtom>
      public void exec(Tuple input, DataAtom output) throws IOException {
                String str = input.getAtomField(0).strval();

In Pig 0.2, the same function looks as follows:

1. public class TOLOWERCASE extends EvalFunc<String>
2. {
3.   public String exec(Tuple input) throws IOException {
4.        if (input == null || input.size() == 0)
5.            return null;
6.        String str;
7.        try {
8.            str = (String)input.get(0);
9.        } catch (Exception e){
10.          System.err.println("Can't convert field to a string; error = " + e.getMessage());
11.          return null;
12.      }
13.      return str.toLowerCase();
14.   }

There are a couple of things to note here:

  1. The initial check for null or empty Tuple (lines 4-5) is needed for all UDFs that don't provide type-specific implementations as discussed later.

  2. Catching data-related exceptions and converting them to warnings and null return (lines 9-12). Note that, for now, the warning should go to stderr. In the later versions, we will propagate a logger to the UDFs. As the case with #1, converting exceptions to nulls is not needed as discussed later.

Input Data Changes

In the earlier version of Pig, the data was always propagated to the UDFs in a form of strings. Now the default type of the data is bytearray. The main reason for this is that treating non-ascii data as string can corrupt the data. The other one is cost of converting data to a string.

As the result, if UDF expects a data of a particular type, the script needs to make sure to pass the data to it in that type. This can be done by either declaring the correct data type in the AS clause of the LOAD statement or by explicitly casting the data going to the UDF to the required type.


A = LOAD 'data';
B = foreach A generate TOLOWERCASE($1);


A = LOAD 'data' as (name: chararray);
B = foreach A generate TOLOWERCASE(name);

outputSchema changes

If a UDF does not overwrite the outputSchema function, the output schema is assumed to be a single value of chararray type. This is compatible with the earlier versions of Pig. *If a UDF returns a value other than string, it will work correctly without specifying the schema but will force extra data conversion and thus be inefficient.*

Here is an example of SIZE UDF:

public class SIZE extends EvalFunc<Long> {
    public Schema outputSchema(Schema input) {
        return new Schema(new Schema.FieldSchema(null, DataType.LONG));

The UDF declares that its output is of type long.

Type Specific Functions

In the earlier versions of Pig, since type information was not available, each UDF had to choose the type in which to perform computations. Most generic functions like Pig builtins choose to use Double for arithmetic computations. With Pig 0.2, type specific functions can be written to provide for more efficient computation. Below is the example of the transformation of SUM function:

Old code with some details omitted:

1.  public class SUM extends EvalFunc<DataAtom> implements Algebraic {
2.      public void exec(Tuple input, DataAtom output) throws IOException {
3.          output.setValue(sum(input));
4.      }
5.  static protected double sum(Tuple input) throws IOException {
6.  }    
7. }

New code with some details omitted:

1.  public class SUM extends EvalFunc<Double> implements Algebraic {
2.      public Double exec(Tuple input) throws IOException {
3.          try {
4.              return sum(input);
5.       } catch (ExecException ee) {
6.              IOException oughtToBeEE = new IOException();
7.              oughtToBeEE.initCause(ee);
8.              throw oughtToBeEE;
10.     }
11. }
12.  static protected Double sum(Tuple input) throws ExecException {
13.      DataBag values = (DataBag)input.get(0);

14.      // if we were handed an empty bag, return NULL (compatible with SQL)
15.      if(values.size() == 0) {
16.            return null;
17.      }
18.      double sum = 0;
19.      boolean sawNonNull = false;
20.      for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
21.          Tuple t =;
22.          try {
23.                Double d = DataType.toDouble(t.get(0));
24.                if (d == null) continue;
25.                sawNonNull = true;
26.                sum += d;
27.          }catch(NumberFormatException nfe){
28.               // do nothing - essentially treat this particular input as null
29.          }catch(RuntimeException exp) {
30.               ExecException newE =  new ExecException("Error processing: " +
31.               t.toString() + exp.getMessage(), exp);
32.               throw newE;
33.          }
34.      }
35.      if(sawNonNull) {
36.          return new Double(sum);
37.      } else {
38.          return null;
39       }
40.    }

41.   public Schema outputSchema(Schema input) {
42.        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
43.   }

44.   public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
45.        List<FuncSpec> funcList = new ArrayList<FuncSpec>();
46.        funcList.add(new FuncSpec(this.getClass().getName(), Schema.generateNestedSchema(DataType.BAG, DataType.BYTEARRAY)));
47.        funcList.add(new FuncSpec(DoubleSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.DOUBLE)));
48.        funcList.add(new FuncSpec(FloatSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.FLOAT)));
49.        funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
50.        funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
51.        return funcList;
52.    }

The main thing to notice here is getArgToFuncMapping method (line 44-52) that allows to specify different classes to handle different inputs. (The need for nested schema is due to the fact that the input in this case is a bag of tuples not just a tuple.) The main class handles the default case of bytearray by converting the data to double (line 23). This assures backward compatibility. Note that it treats invalid data as null. Also, it sets the output schema to Double (lines 41-43).

Below is the example of one of the type-specific classes that handles Integer arithmetic:

1. public class IntSum extends EvalFunc<Long> implements Algebraic {
2.     public Long exec(Tuple input) throws IOException {
3.       try {
4.          return sum(input);
5.       } catch (ExecException ee) {
6.          IOException oughtToBeEE = new IOException();
7.          oughtToBeEE.initCause(ee);
8.          throw oughtToBeEE;
9.       }
10.   }
11.   static protected  Long sum(Tuple input) throws ExecException {
12.        DataBag values = (DataBag)input.get(0);
13.        if(values.size() == 0) {
14.            return null;
15.        }
16.        long sum = 0;
17.        boolean sawNonNull = false;
18.        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
19.            Tuple t = (Tuple);
20.            try {
21.                Integer i = (Integer)(t.get(0));
22.                if (i == null) continue;
23.                sawNonNull = true;
24.                sum += i;
25.            }catch(RuntimeException exp) {
26.                ExecException newE =  new ExecException("Error processing: " +
27.                t.toString() + exp.getMessage(), exp);
28.                throw newE;
29.           }
30.        }
31.        if(sawNonNull) {
32.            return new Long(sum);
33.        } else {
34.            return null;
35.        }
36.    }
37.    public Schema outputSchema(Schema input) {
38.        return new Schema(new Schema.FieldSchema(null, DataType.LONG));
39.    }    
40. }

There are a couple of things to notice here:

  1. Unlike the default class, this class does not convert the data to the specified type; instead, it already expects it to be in that format (line 21). It also does not check for null/empty input. This is because Pig takes care of this by casting the inputs to the appropriate type and handing null values if appropriate.
  2. The output schema is set to indicate that the output type is long (lines 37-39).

Report Progress

Part of Hadoop infrastructer is to determine if a job is hanging and to kill it. To avoid being killed, the program needs to periodically report its progress. Pig's infrastructure takes care of it and most UDFs do not need to report progress. However, for functions that take a very long time (more than 5 minutes) to process a single Tuple, the function needs to report periodic progress.

It can do so, by periodically calling reporter.progress() during its processing.

public class MyUDF extends EvalFunc<String>
     public String exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        // do some processing
        // some more processing

Load/Store function changes

There are a couple of changes in Pig 0.2 effect load/store functions.

First, since Tuple is now an interface, the loader can't rely on read/write functions of the Tuple class to load/store the data as it would be implementation specific. Instead, each load/store function needs to provide its own implementation.

Second, load and store functions need to be type-aware. The load function is expected to do two things: (1) produce a tuple with fields of bytearray type and (2) provide conversion routines from bytearray to all other types. There are a couple of reasons to separate the two. First, since the user is not required to provide schema, Pig will be treating the data as bytearray in the absense of one. Second, a lazy conversion, is likely to be more efficient since me might not need to convert all data, for instance, if some of it is thrown away by the filter or projected out by generate. For store it means to perform conversions from the real type of the data to the format in which it should be stored.

Below, is the example of how PigStorage looks now (with some details omitted):

1. public class PigStorage extends Utf8StorageConverter implements ReversibleLoadStoreFunc {
2. public Tuple getNext() throws IOException {
3.     if (in == null || in.getPosition() > end) {
4.          return null;
5.     }
6.     if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
7.     mBuf.reset();
8.     while (true) {
9.        // Hadoop's FSDataInputStream (which my input stream is based
10.      // on at some point) is buffered, so I don't need to buffer.
11.      int b =;
12.      if (b == fieldDel) {
13.          readField();
14.      } else if (b == recordDel) {
15.          readField();
16.          Tuple t =  mTupleFactory.newTuple(mProtoTuple);
17.          mProtoTuple.clear();
18.          return t;
19.      } else if (b == -1) {
20.           // hit end of file
21.           return null;
22.      } else {
23.            mBuf.write(b);
25.      }
26.   }
27. }
28. public void putNext(Tuple f) throws IOException {
29.     // I have to convert integer fields to string, and then to bytes.
30.     // If I use a DataOutputStream to convert directly from integer to
31.     // bytes, I don't get a string representation.
32.     int sz = f.size();
33.     for (int i = 0; i < sz; i++) {
34.         Object field;
35.         try {
36.             field = f.get(i);
37.         } catch (ExecException ee) {
38.             throw new IOException(ee);
39.         }
40.         switch (DataType.findType(field)) {
41.             case DataType.NULL:
42.                 break; // just leave it empty
43.             case DataType.BOOLEAN:
44.                 mOut.write(((Boolean)field).toString().getBytes());
45.                 break;
46.  }
47.  public Schema determineSchema(URL fileName) throws IOException {
48.     return null;
49.  }
50.  public void fieldsToRead(Schema schema) {
        // do nothing
51.  }
52. }

There are several things to note here:

  1. Use of TupleFactory (line 16) discussed earlier

  2. Type-specific handling of writing tuples (lines 40-45)
  3. Presense of determineSchema function. This is for future use and should be set to return null in Pig 0.2.

  4. Presense of fieldsToRead function. This is again for future use and should be left empty in Pig 0.2.

  5. Extension of Utf8StorageConverter class (line 1). This is the class that performs conversion of UTF8 strings into Pig types and can be used by other loaders that work with the data stored in the same format. In the future we will be also providing similar class for data stored in a binary format.

abstract public class Utf8StorageConverter {
    public DataBag bytesToBag(byte[] b) throws IOException {
        Object o;
        try {
            o = parseFromBytes(b);
        } catch (ParseException pe) {
            throw new IOException(pe.getMessage());
        return (DataBag)o;

    public String bytesToCharArray(byte[] b) throws IOException {
        return new String(b);


ConvertingUDFs (last edited 2009-09-20 23:38:23 by localhost)