Overview

This page describes how to migrate from the old LoadFunc and StoreFunc interface (Pig 0.1.0 through Pig 0.6.0) to the new interfaces proposed in http://wiki.apache.org/pig/LoadStoreRedesignProposal and planned to be released in Pig 0.7.0. Besides the example in this page, users can also look at LoadFunc and StoreFunc implementation in the piggybank codebase (contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage) for examples of migration. For example, MultiStorage implements a custom OutputFormat.

A general note applicable to both LoadFunc and StoreFunc implementations is that the implementation should use the new Hadoop 20 API based classes (InputFormat/OutputFormat and related classes) under the org.apache.hadoop.mapreduce package instead of the old org.apache.hadoop.mapred package.

The main motivation for these changes is to move closer to using Hadoop's InputFormat and OutputFormat classes. This way pig users/developers can create new LoadFunc and StoreFunc implementation based on existing Hadoop InputFormat and OutputFormat classes with minimal code. The complexity of reading the data and creating a record will now lie in the InputFormat and likewise on the writing end, the complexity of writing will lie in the OutputFormat. This enables !Pig to easily read/write data in new storage formats as and when an Hadoop InputFormat and OutputFormat is available for them.

LoadFunc Migration

The methods in the old LoadFunc have been split among a LoadFunc abstract class which has the main methods for loading data and 3 new interfaces

The main change is that the new LoadFunc API is based on a InputFormat to read the data. Implementations can choose to use existing InputFormats like TextInputFormat or implement a new one.

Table mapping old API calls to new API calls in rough order of call sequence

Old Method in LoadFunc

Equivalent New Method

New Class/Interface in which method is present

Explanation

No equivalent method

setUDFContextSignature()

LoadFunc

This method will be called by Pig both in the front end and back end to pass a unique signature to the Loader. The signature can be used to store into the !UDFContext any information which the Loader needs to store between various method invocations in the front end and back end. A use case is to store RequiredFieldList passed to it in LoadPushDown.pushProjection(RequiredFieldList) for use in the back end before returning tuples in getNext(). The default implementation in LoadFunc has an empty body. This method will be called before other methods.

No equivalent method

relativeToAbsolutePath()

LoadFunc

Pig runtime will call this method to allow the Loader to convert a relative load location to an absolute location. The default implementation provided in LoadFunc handles this for FileSystem locations. If the load source is something else, loader implementation may choose to override this.

determineSchema()

getSchema()

LoadMetadata

determineSchema() was used by old code to ask the loader to provide a schema for the data returned by it - the same semantics are now achieved through getSchema() of the LoadMetadata interface. LoadMetadata is an optional interface for loaders to implement - if a loader does not implement it, this will indicate to the pig runtime that the loader cannot return a schema for the data

fieldsToRead()

pushProjection()

LoadPushDown

fieldsToRead() was used by old code to convey to the loader the exact fields required by the pig script -the same semantics are now achieved through pushProject() of the LoadPushDown interface. LoadPushDown is an optional interface for loaders to implement - if a loader does not implement it, this will indicate to the pig runtime that the loader is not capable of returning just the required fields and will return all fields in the data. If a loader implementation is able to efficiently return only required fields, it should implement LoadPushDown to improve query performance

No equivalent method

getInputFormat()

LoadFunc

This method will be called by Pig to get the InputFormat used by the loader. The methods in the InputFormat (and underlying RecordReader) will be called by pig in the same manner (and in the same context) as by Hadoop in a map-reduce java program. If the InputFormat is a hadoop packaged one, the implementation should use the new API based one under org.apache.hadoop.mapreduce. If it is a custom InputFormat, it should be implemented using the new API in org.apache.hadoop.mapreduce. If a custom loader using a text-based InputFormat or a file based InputFormat would like to read files in all subdirectories under a given input directory recursively, then it should use the PigFileInputFormat and PigTextInputFormat classes provided in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer. This is to work around the current limitation in Hadoop's TextInputFormat and FileInputFormat which only read one level down from provided input directory. So for example if the input in the load statement is 'dir1' and there are subdirs 'dir2' and 'dir2/dir3' underneath dir1, using Hadoop's TextInputFormat or FileInputFormat only files under 'dir1' can be read. Using PigFileInputFormat or PigTextInputFormat (or by extending them), files in all the directories can be read.

No equivalent method

setLocation()

LoadFunc

This method is called by Pig to communicate the load location to the loader. The loader should use this method to communicate the same information to the underlying InputFormat. This method is called multiple times by pig - implementations should bear in mind that this method is called multiple times and should ensure there are no inconsistent side effects due to the multiple calls.

bindTo()

prepareToRead()

LoadFunc

bindTo() was the old method which would provide an InputStream among other things to the LoadFunc. The LoadFunc implementation would then read from the InputStream in getNext(). In the new API, reading of the data is through the InputFormat provided by the LoadFunc. So the equivalent call is prepareToRead() wherein the RecordReader associated with the InputFormat provided by the LoadFunc is passed to the LoadFunc. The RecordReader can then be used by the implementation in getNext() to return a tuple representing a record of data back to pig.

getNext()

getNext()

LoadFunc

The meaning of getNext() has not changed and is called by Pig runtime to get the next tuple in the data - in the new API, this is the method wherein the implementation will use the the underlying RecordReader and construct a tuple

bytesToInteger(),...bytesToBag()

bytesToInteger(),...bytesToBag()

LoadCaster

The signature of bytesToTuple,.. bytesToBag methods has changed to take a field schema of the bag/tuple, and bytesToTuple/bytesToBag should construct the tuple/bag in conformance with the given field schema. The meaning of these methods has not changed and is called by Pig runtime to cast a DataByteArray fields to the right type when needed. In the new API, a LoadFunc implementation should give a LoadCaster object back to pig as the return value of getLoadCaster() method so that it can be used for casting. The default implementation in LoadFunc returns an instance of UTF8StorageConvertor which can handle casting from UTF-8 bytes to different types. If a null is returned then casting from DataByteArray to any other type (implicitly or explicitly) in the pig script will not be possible.

An example of how a simple LoadFunc implementation based on old interface can be converted to the new interfaces is shown in the Examples section below.

StoreFunc Migration

StoreFunc is now an abstract class providing default implementations for some of the methods. The main change is that the new StoreFunc API is based on a OutputFormat to read the data. Implementations can choose to use existing OutputFormat like TextOutputFormat or implement a new one.

Table mapping old API calls to new API calls in rough order of call sequence

Old Method in StoreFunc

Equivalent New Method

New Class/Interface in which method is present

Explanation

No equivalent method

setStoreFuncUDFContextSignature()

StoreFunc

This method will be called by Pig both in the front end and back end to pass a unique signature to the Storer. The signature can be used to store into the UDFContext any information which the Storer needs to store between various method invocations in the front end and back end. The default implementation in StoreFunc has an empty body. This method will be called before other methods.

No equivalent method

relToAbsPathForStoreLocation()

StoreFunc

Pig runtime will call this method to allow the Storer to convert a relative store location to an absolute location. An implementation is provided in StoreFunc which handles this for FileSystem based locations.

No equivalent method

checkSchema()

StoreFunc

A Store function should implement this function to check that a given schema describing the data to be written is acceptable to it. The default implementation in StoreFunc has an empty body. This method will be called before any calls to setStoreLocation().

No equivalent method

setStoreLocation()

StoreFunc

This method is called by Pig to communicate the store location to the storer. The storer should use this method to communicate the same information to the underlying OutputFormat. This method is called multiple times by pig - implementations should bear in mind that this method is called multiple times and should ensure there are no inconsistent side effects due to the multiple calls.

getStorePreparationClass()

getOutputFormat()

StoreFunc

In the old API, getStorePreparationClass() was the means by which the implementation could communicate to Pig the OutputFormat to use for writing - this is now achieved through getOutputFormat(). getOutputFormat() is NOT an optional method and implementation SHOULD provide an OutputFormat to use. The methods in the OutputFormat (and underlying RecordWriter and OutputCommitter) will be called by pig in the same manner (and in the same context) as by Hadoop in a map-reduce java program. If the OutputFormat is a hadoop packaged one, the implementation should use the new API based one in org.apache.hadoop.mapreduce. If it is a custom OutputFormat, it should be implemented using the new API under org.apache.hadoop.mapreduce. The checkOutputSpecs() method of the OutputFormat will be called by pig to check the output location up-front. This method will also be called as part of the Hadoop call sequence when the job is launched. So implementations should ensure that this method can be called multiple times without inconsistent side effects.

bindTo()

prepareToWrite()

StoreFunc

bindTo() was the old method which would provide an OutputStream among other things to the StoreFunc. The StoreFunc implementation would then write to the OutputStream in putNext(). In the new API, writing of the data is through the OutputFormat provided by the StoreFunc. So the equivalent call is prepareToWrite() wherein the RecordWriter associated with the OutputFormat provided by the StoreFunc is passed to the StoreFunc. The RecordWriter can then be used by the implementation in putNext() to write a tuple representing a record of data in a manner expected by the RecordWriter.

putNext()

putNext()

StoreFunc

The meaning of putNext() has not changed and is called by Pig runtime to write the next tuple of data - in the new API, this is the method wherein the implementation will use the the underlying RecordWriter to write the Tuple out

finish()

no equivalent method in StoreFunc - implementations can use commitTask() in OutputCommitter

OutputCommitter

finish() has been removed from StoreFunc since the same semantics can be achieved by OutputCommitter.commitTask() - (OutputCommitter.needsTaskCommit() should return true to be able to use commitTask()).

An example of how a simple StoreFunc implementation based on old interface can be converted to the new interfaces is shown in the Examples section below.

Examples

Loader

The loader implementation in the example is a loader for text data with line delimiter as '\n' and '\t' as default field delimiter (which can be overridden by passing a different field delimiter in the constructor) - this is similar to current PigStorage loader in Pig. The new implementation uses an existing Hadoop supported !Inputformat - TextInputFormat as the underlying InputFormat.

Old Implementation

/**
 * A load function that parses a line of input into fields using a delimiter to set the fields.
 */
public class SimpleTextLoader extends Utf8StorageConverter {
    protected BufferedPositionedInputStream in = null;

    long                end            = Long.MAX_VALUE;
    private byte recordDel = '\n';
    private byte fieldDel = '\t';
    private ByteArrayOutputStream mBuf;
    private ArrayList<Object> mProtoTuple;
    private static final String UTF8 = "UTF-8";

    public SimpleTextLoader() {
    }

    /**
     * Constructs a Pig loader that uses specified character as a field delimiter.
     *
     * @param delimiter
     *            the single byte character that is used to separate fields.
     *            ("\t" is the default.)
     */
    public SimpleTextLoader(String delimiter) {
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;

            case 'x':
            case 'u':
                this.fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2));
                break;

            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimiter must be a single character");
        }
    }

    public Tuple getNext() throws IOException {
        if (in == null || in.getPosition() > end) {
            return null;
        }

        if (mBuf == null) mBuf = new ByteArrayOutputStream(4096);
        mBuf.reset();
        while (true) {
            // BufferedPositionedInputStream is buffered, so I don't need
            // to buffer.
            int b = in.read();
            prevByte = (byte)b;

            if (b == fieldDel) {
                readField();
            } else if (b == recordDel) {
                readField();
                Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
                mProtoTuple = null;
                return t;
            } else if (b == -1) {
                // hit end of file
                return null;
            } else {
                mBuf.write(b);
            }
        }
    }

    public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
        this.in = in;
        this.end = end;

        // Since we are not block aligned we throw away the first
        // record and count on a different instance to read it
        if (offset != 0) {
            getNext();
        }
    }

    private void readField() {
        if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>();
        if (mBuf.size() == 0) {
            // NULL value
            mProtoTuple.add(null);
        } else {

            byte[] array = mBuf.toByteArray();
            if (array.length==0)
                mProtoTuple.add(null);
            else
                mProtoTuple.add(new DataByteArray(array));
        }
        mBuf.reset();
    }

    public Schema determineSchema(String fileName, ExecType execType,
            DataStorage storage) throws IOException {
        return null;
    }

    public RequiredFieldResponse fieldsToRead(RequiredFieldList requiredFieldList)
    throws FrontendException {
        // indicate to pig that this loader will return all fields and not just
        // the required fields passed in the argument.
        return new RequiredFieldResponse(false);
    }
 }

New Implementation

public class SimpleTextLoader extends LoadFunc {
    protected RecordReader in;
    private byte fieldDel = '\t';
    private ArrayList<Object> mProtoTuple;
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
    private static final int BUFFER_SIZE = 1024;

    public SimpleTextLoader() {
    }

    /**
     * Constructs a Pig loader that uses specified character as a field delimiter.
     *
     * @param delimiter
     *            the single byte character that is used to separate fields.
     *            ("\t" is the default.)
     */
    public SimpleTextLoader(String delimiter) {
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;

            case 'x':
               fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2), 16);
               break;

            case 'u':
                this.fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2));
                break;

            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimiter must be a single character");
        }
    }

    @Override
    public Tuple getNext() throws IOException {
        try {
            boolean notDone = in.nextKeyValue();
            if (!notDone) {
                return null;
            }
            Text value = (Text) in.getCurrentValue();
            byte[] buf = value.getBytes();
            int len = value.getLength();
            int start = 0;

            for (int i = 0; i < len; i++) {
                if (buf[i] == fieldDel) {
                    readField(buf, start, i);
                    start = i + 1;
                }
            }
            // pick up the last field
            readField(buf, start, len);

            Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
            mProtoTuple = null;
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }

    }

    private void readField(byte[] buf, int start, int end) {
        if (mProtoTuple == null) {
            mProtoTuple = new ArrayList<Object>();
        }

        if (start == end) {
            // NULL value
            mProtoTuple.add(null);
        } else {
            mProtoTuple.add(new DataByteArray(buf, start, end));
        }
    }

    @Override
    public InputFormat getInputFormat() {
        return new TextInputFormat();
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) {
        in = reader;
    }

    @Override
    public void setLocation(String location, Job job)
            throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

Storer

The storer implementation in the example is a storer for text data with line delimiter as '\n' and '\t' as default field delimiter (which can be overridden by passing a different field delimiter in the constructor) - this is similar to current PigStorage storer in Pig. The new implementation uses an existing Hadoop supported OutputFormat - TextOutputFormat as the underlying OutputFormat.

Old Implementation

public class SimpleTextStorer implements StoreFunc {

    protected byte recordDel = '\n';
    protected byte fieldDel = '\t';

    protected static final String UTF8 = "UTF-8";

    public SimpleTextStorer() {}

    public SimpleTextStorer(String delimiter) {
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;

            case 'x':
            case 'u':
                this.fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2));
                break;

            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimiter must be a single character");
        }
    }

    OutputStream mOut;
    public void bindTo(OutputStream os) throws IOException {
        mOut = os;
    }

    @SuppressWarnings("unchecked")
    private void putField(Object field) throws IOException {
        //string constants for each delimiter
        String tupleBeginDelim = "(";
        String tupleEndDelim = ")";
        String bagBeginDelim = "{";
        String bagEndDelim = "}";
        String mapBeginDelim = "[";
        String mapEndDelim = "]";
        String fieldDelim = ",";
        String mapKeyValueDelim = "#";

        switch (DataType.findType(field)) {
        case DataType.NULL:
            break; // just leave it empty

        case DataType.BOOLEAN:
            mOut.write(((Boolean)field).toString().getBytes());
            break;

        case DataType.INTEGER:
            mOut.write(((Integer)field).toString().getBytes());
            break;

        case DataType.LONG:
            mOut.write(((Long)field).toString().getBytes());
            break;

        case DataType.FLOAT:
            mOut.write(((Float)field).toString().getBytes());
            break;

        case DataType.DOUBLE:
            mOut.write(((Double)field).toString().getBytes());
            break;

        case DataType.BYTEARRAY: {
            byte[] b = ((DataByteArray)field).get();
            mOut.write(b, 0, b.length);
            break;
                                 }

        case DataType.CHARARRAY:
            // oddly enough, writeBytes writes a string
            mOut.write(((String)field).getBytes(UTF8));
            break;

        case DataType.MAP:
            boolean mapHasNext = false;
            Map<String, Object> m = (Map<String, Object>)field;
            mOut.write(mapBeginDelim.getBytes(UTF8));
            for(Map.Entry<String, Object> e: m.entrySet()) {
                if(mapHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    mapHasNext = true;
                }
                putField(e.getKey());
                mOut.write(mapKeyValueDelim.getBytes(UTF8));
                putField(e.getValue());
            }
            mOut.write(mapEndDelim.getBytes(UTF8));
            break;

        case DataType.TUPLE:
            boolean tupleHasNext = false;
            Tuple t = (Tuple)field;
            mOut.write(tupleBeginDelim.getBytes(UTF8));
            for(int i = 0; i < t.size(); ++i) {
                if(tupleHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    tupleHasNext = true;
                }
                try {
                    putField(t.get(i));
                } catch (ExecException ee) {
                    throw ee;
                }
            }
            mOut.write(tupleEndDelim.getBytes(UTF8));
            break;

        case DataType.BAG:
            boolean bagHasNext = false;
            mOut.write(bagBeginDelim.getBytes(UTF8));
            Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
            while(tupleIter.hasNext()) {
                if(bagHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    bagHasNext = true;
                }
                putField((Object)tupleIter.next());
            }
            mOut.write(bagEndDelim.getBytes(UTF8));
            break;

        default: {
            int errCode = 2108;
            String msg = "Could not determine data type of field: " + field;
            throw new ExecException(msg, errCode, PigException.BUG);
        }

        }
    }

    public void putNext(Tuple f) throws IOException {
        // I have to convert integer fields to string, and then to bytes.
        // If I use a DataOutputStream to convert directly from integer to
        // bytes, I don't get a string representation.
        int sz = f.size();
        for (int i = 0; i < sz; i++) {
            Object field;
            try {
                field = f.get(i);
            } catch (ExecException ee) {
                throw ee;
            }

            putField(field);

            if (i == sz - 1) {
                // last field in tuple.
                mOut.write(recordDel);
            } else {
                mOut.write(fieldDel);
            }
        }
    }

    public void finish() throws IOException {
    }

    /* (non-Javadoc)
     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
     */

    public Class getStorePreparationClass() throws IOException {
        return null;
    }

}

New Implementation

public class SimpleTextStorer extends StoreFunc {
    protected RecordWriter writer = null;

    private byte fieldDel = '\t';
    private static final int BUFFER_SIZE = 1024;
    private static final String UTF8 = "UTF-8";
    public PigStorage() {
    }

    public PigStorage(String delimiter) {
        this();
        if (delimiter.length() == 1) {
            this.fieldDel = (byte)delimiter.charAt(0);
        } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
            switch (delimiter.charAt(1)) {
            case 't':
                this.fieldDel = (byte)'\t';
                break;

            case 'x':
               fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2), 16);
               break;
            case 'u':
                this.fieldDel =
                    (byte)Integer.parseInt(delimiter.substring(2));
                break;

            default:
                throw new RuntimeException("Unknown delimiter " + delimiter);
            }
        } else {
            throw new RuntimeException("PigStorage delimiter must be a single character");
        }
    }

    ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);

    @Override
    public void putNext(Tuple f) throws IOException {
        int sz = f.size();
        for (int i = 0; i < sz; i++) {
            Object field;
            try {
                field = f.get(i);
            } catch (ExecException ee) {
                throw ee;
            }

            putField(field);

            if (i != sz - 1) {
                mOut.write(fieldDel);
            }
        }
        Text text = new Text(mOut.toByteArray());
        try {
            writer.write(null, text);
            mOut.reset();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @SuppressWarnings("unchecked")
    private void putField(Object field) throws IOException {
        //string constants for each delimiter
        String tupleBeginDelim = "(";
        String tupleEndDelim = ")";
        String bagBeginDelim = "{";
        String bagEndDelim = "}";
        String mapBeginDelim = "[";
        String mapEndDelim = "]";
        String fieldDelim = ",";
        String mapKeyValueDelim = "#";

        switch (DataType.findType(field)) {
        case DataType.NULL:
            break; // just leave it empty

        case DataType.BOOLEAN:
            mOut.write(((Boolean)field).toString().getBytes());
            break;

        case DataType.INTEGER:
            mOut.write(((Integer)field).toString().getBytes());
            break;

        case DataType.LONG:
            mOut.write(((Long)field).toString().getBytes());
            break;

        case DataType.FLOAT:
            mOut.write(((Float)field).toString().getBytes());
            break;

        case DataType.DOUBLE:
            mOut.write(((Double)field).toString().getBytes());
            break;

        case DataType.BYTEARRAY: {
            byte[] b = ((DataByteArray)field).get();
            mOut.write(b, 0, b.length);
            break;
                                 }

        case DataType.CHARARRAY:
            // oddly enough, writeBytes writes a string
            mOut.write(((String)field).getBytes(UTF8));
            break;

        case DataType.MAP:
            boolean mapHasNext = false;
            Map<String, Object> m = (Map<String, Object>)field;
            mOut.write(mapBeginDelim.getBytes(UTF8));
            for(Map.Entry<String, Object> e: m.entrySet()) {
                if(mapHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    mapHasNext = true;
                }
                putField(e.getKey());
                mOut.write(mapKeyValueDelim.getBytes(UTF8));
                putField(e.getValue());
            }
            mOut.write(mapEndDelim.getBytes(UTF8));
            break;

        case DataType.TUPLE:
            boolean tupleHasNext = false;
            Tuple t = (Tuple)field;
            mOut.write(tupleBeginDelim.getBytes(UTF8));
            for(int i = 0; i < t.size(); ++i) {
                if(tupleHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    tupleHasNext = true;
                }
                try {
                    putField(t.get(i));
                } catch (ExecException ee) {
                    throw ee;
                }
            }
            mOut.write(tupleEndDelim.getBytes(UTF8));
            break;

        case DataType.BAG:
            boolean bagHasNext = false;
            mOut.write(bagBeginDelim.getBytes(UTF8));
            Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
            while(tupleIter.hasNext()) {
                if(bagHasNext) {
                    mOut.write(fieldDelim.getBytes(UTF8));
                } else {
                    bagHasNext = true;
                }
                putField((Object)tupleIter.next());
            }
            mOut.write(bagEndDelim.getBytes(UTF8));
            break;

        default: {
            int errCode = 2108;
            String msg = "Could not determine data type of field: " + field;
            throw new ExecException(msg, errCode, PigException.BUG);
        }

        }
    }

    @Override
    public OutputFormat getOutputFormat() {
        return new TextOutputFormat<WritableComparable, Text>();
    }

    @Override
    public void prepareToWrite(RecordWriter writer) {
        this.writer = writer;
    }

    @Override
    public void setStoreLocation(String location, Job job) throws IOException {
        job.getConfiguration().set("mapred.textoutputformat.separator", "");
        FileOutputFormat.setOutputPath(job, new Path(location));
        if (location.endsWith(".bz2")) {
            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
        }  else if (location.endsWith(".gz")) {
            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        }
    }

}

Notes:

1) checkSpecs() method of user provided OutputFormat should be side-effect free, that is it should have consistent behavior if called multiple times. This is so because Pig will call this method multiple times and assumes it is safe to do so.

LoadStoreMigrationGuide (last edited 2010-05-15 06:56:51 by newacct)