Proposed Redesign For Load, Store, and Slicer in Pig

Goals

The current design of LoadFunc, StoreFunc, and the Slicer interfaces in Pig are not adequate. This proposed redesign has the following goals:

  1. The Slicer interface is redundant. Remove it and allow users to directly use Hadoop InputFormats in Pig.

  2. It is not currently easy to use a separate OutputFormat for a StoreFunc. This should be made easy to allow users to store data into locations other than HDFS.

  3. Currently users that wish to operate on Pig and Map-Reduce are required to write Hadoop InputFormat and OutputFormat as well as a Pig load and storage functions. While Pig load and store functions will always be necessary to take the most advantage of Hadoop, it would be good for users to be able to use Hadoop InputFormat and OutputFormat classes directly to minimize the data interchange cost.

  4. New storage formats such as Zebra are being implemented for Hadoop that include metadata information such as schema, etc. The LoadFunc interface needs to allow Pig to obtain this metadata. There is a describeSchema call in the current interface. More functions may be necessary.

  5. These new storage formats also plan to support pushing of, at least, projection and selection into the storage layer. Pig needs to be able to query loaders to determine what if any pushdown capabilities they support and then make use of those capabilities.
  6. There already exists one metadata system in Hadoop (Hive's metastore) and there is a proposal to add another (Owl). Pig needs to be able to query these metadata systems for information about data to be read. It also needs to be able to record information to these metadata systems when writing data. The load and store functions are a reasonable place to do these operations since that is the point at which Pig is reading and writing data. This will also allow Pig to read and write data from and to multiple metadata stores in single Pig Latin scripts if that is desired.

A requirement for the implementation that does not fit into the goals above is that while the existing Pig implementation is tightly tied to Hadoop (and is becoming more tightly tied all the time), we do not want to tie Pig Latin tightly to Hadoop. Therefore while we plan to allow users to easily interact with Hadoop InputFormats and OutputFormats, these should not be exposed as such to Pig Latin. Pig Latin must still view these as load and store functions; it will only be the underlying implementation that will realize that they are Hadoop classes and handle them appropriately.

Interfaces

With these proposed changes, load and store functions in Pig are becoming very weighty objects. The current LoadFunc interface already provides mechanisms for reading the data, getting some schema information, casting data, and some place holders for pushing down projections into the loader. This proposal will add more file level metadata, selection push down, plus interaction with InputFormats. It will also add OutputFormats to store functions. If we create two monster interfaces that attempt to provide everything, the burden of creating a new load or store function in Pig will become overwhelming. Instead, this proposal envisions splitting the interface into a number of interfaces, each with a clear responsibility. Load and store functions will then only be required to implement the interfaces for functionality they offer.

For load functions:

For store functions:

Details

LoadFunc (For brevity, getAbsolutePath() method's implementation is not shown below)

/**
 * <code>LoadFunc</code> provides functions directly associated with reading 
 * records from data set.
 */
public abstract class LoadFunc {
    
    /**
     * This method is called by the Pig runtime in the front end to convert the
     * input location to an absolute path if the location is relative. The
     * loadFunc implementation is free to choose how it converts a relative 
     * location to an absolute location since this may depend on what the location
     * string represent (hdfs path or some other data source)
     * 
     * @param location location as provided in the "load" statement of the script
     * @param curDir the current working direction based on any "cd" statements
     * in the script before the "load" statement. If there are no "cd" statements
     * in the script, this would be the home directory - 
     * <pre>/user/<username> </pre>
     * @return the absolute location based on the arguments passed
     * @throws IOException if the conversion is not possible
     */
    public String relativeToAbsolutePath(String location, Path curDir) 
            throws IOException {      
        return getAbsolutePath(location, curDir);
    }    

    /**
     * Communicate to the loader the location of the object(s) being loaded.  
     * The location string passed to the LoadFunc here is the return value of 
     * {@link LoadFunc#relativeToAbsolutePath(String, Path)}. Implementations
     * should use this method to communicate the location (and any other information)
     * to its underlying InputFormat through the Job object.
     * 
     * This method will be called in the backend multiple times. Implementations
     * should bear in mind that this method is called multiple times and should
     * ensure there are no inconsistent side effects due to the multiple calls.
     * 
     * @param location Location as returned by 
     * {@link LoadFunc#relativeToAbsolutePath(String, Path)}
     * @param job the {@link Job} object
     * store or retrieve earlier stored information from the {@link UDFContext}
     * @throws IOException if the location is not valid.
     */
    public abstract void setLocation(String location, Job job) throws IOException;
    
    /**
     * This will be called during planning on the front end. This is the
     * instance of InputFormat (rather than the class name) because the 
     * load function may need to instantiate the InputFormat in order 
     * to control how it is constructed.
     * @return the InputFormat associated with this loader.
     * @throws IOException if there is an exception during InputFormat 
     * construction
     */
    @SuppressWarnings("unchecked")
    public abstract InputFormat getInputFormat() throws IOException;

    /**
     * This will be called on the front end during planning and not on the back 
     * end during execution.
     * @return the {@link LoadCaster} associated with this loader. Returning null 
     * indicates that casts from byte array are not supported for this loader. 
     * construction
     * @throws IOException if there is an exception during LoadCaster 
     */
    public LoadCaster getLoadCaster() throws IOException {
        return new Utf8StorageConverter();
    }

    /**
     * Initializes LoadFunc for reading data.  This will be called during execution
     * before any calls to getNext.  The RecordReader needs to be passed here because
     * it has been instantiated for a particular InputSplit.
     * @param reader {@link RecordReader} to be used by this instance of the LoadFunc
     * @param split The input {@link PigSplit} to process
     * @throws IOException if there is an exception during initialization
     */
    @SuppressWarnings("unchecked")
    public abstract void prepareToRead(RecordReader reader, PigSplit split) throws IOException;

    /**
     * Retrieves the next tuple to be processed. Implementations should NOT reuse
     * tuple objects (or inner member objects) they return across calls and 
     * should return a different tuple object in each call.
     * @return the next tuple to be processed or null if there are no more tuples
     * to be processed.
     * @throws IOException if there is an exception while retrieving the next
     * tuple
     */
    public abstract Tuple getNext() throws IOException;

    /**
     * This method will be called by Pig both in the front end and back end to
     * pass a unique signature to the {@link LoadFunc}. The signature can be used
     * to store into the {@link UDFContext} any information which the 
     * {@link LoadFunc} needs to store between various method invocations in the
     * front end and back end. A use case is to store {@link RequiredFieldList} 
     * passed to it in {@link LoadPushDown#pushProjection(RequiredFieldList)} for
     * use in the back end before returning tuples in {@link LoadFunc#getNext()}.
     * This method will be call before other methods in {@link LoadFunc}
     * @param signature a unique signature to identify this LoadFunc
     */
    public void setUDFContextSignature(String signature) {
        // default implementation is a no-op
    }
       
}

The LoadCaster interface will include bytesToInt, bytesToLong, etc. functions currently in LoadFunc. UTF8!StorageConverter will implement this interface. The only change will be in bytesToTuple() and bytesToBag() - these methods will take an additional ResourceFieldSchema argument describing the schema of the tuple and bag respectively that the bytes need to be cast to.

LoadMetadata

/**
 * This interface defines how to retrieve metadata related to data to be loaded.
 * If a given loader does not implement this interface, it will be assumed that it
 * is unable to provide metadata about the associated data.
 */
public interface LoadMetadata {

    /**
     * Get a schema for the data to be loaded.  
     * @param location Location as returned by 
     * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information.  
     * @return schema for the data to be loaded. This schema should represent
     * all tuples of the returned data.  If the schema is unknown or it is
     * not possible to return a schema that represents all returned data,
     * then null should be returned.
     * @throws IOException if an exception occurs while determining the schema
     */
    ResourceSchema getSchema(String location, Job job) throws 
    IOException;

    /**
     * Get statistics about the data to be loaded.  If no statistics are
     * available, then null should be returned.
     * @param location Location as returned by 
     * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information.  
     * @return statistics about the data to be loaded.  If no statistics are
     * available, then null should be returned.
     * @throws IOException if an exception occurs while retrieving statistics
     */
    ResourceStatistics getStatistics(String location, Job job) 
    throws IOException;

    /**
     * Find what columns are partition keys for this input.
     * @param location Location as returned by 
     * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information.  
     * @return array of field names of the partition keys. Implementations 
     * should return null to indicate that there are no partition keys
     * @throws IOException if an exception occurs while retrieving partition keys
     */
    String[] getPartitionKeys(String location, Job job) 
    throws IOException;

    /**
     * Set the filter for partitioning.  It is assumed that this filter
     * will only contain references to fields given as partition keys in
     * getPartitionKeys. So if the implementation returns null in 
     * {@link #getPartitionKeys(String, Job)}, then this method is not
     * called by pig runtime. This method is also not called by the pig runtime
     * if there are no partition filter conditions. 
     * @param partitionFilter that describes filter for partitioning
     * @throws IOException if the filter is not compatible with the storage
     * mechanism or contains non-partition fields.
     */
    void setPartitionFilter(Expression partitionFilter) throws IOException;

}

ResourceSchema will be a top level object (org.apache.pig.ResourceSchema) used to communicate information about data to be loaded or that is being stored. It is not the same as the existing org.apache.pig.impl.logicalLayer.schema.Schema.

    public class ResourceSchema {

        int version;

        public class ResourceFieldSchema {
            public String name;
            public DataType type;
            public String description;
            public ResourceFieldSchema schema; // nested tuples and bags will have their own schema
        }

        public ResourceFieldSchema[] fields;

        enum Order { ASCENDING, DESCENDING }
        public int[] sortKeys; // each entry is an offset into the fields array.
        public Order[] sortKeyOrders;
    }

IMPORTANT NOTE:: In the ResourceFieldSchema for a bag field, the only field allowed in the subschema is a tuple field. The tuple itself can have a schema with more than 1 fields.

ResourceStatistics

    public class ResourceStatistics {

        public class ResourceFieldStatistics {

            int version;

            enum Distribution {UNIFORM, NORMAL, POWER};

            public long numDistinctValues;  // number of distinct values represented in this field
            public Distribution distribution; // how values in this field are distributed

            // We need some way to represent a histogram of values in the field,
            // as those will be useful.  However, we can't count on being
            // able to hold such histograms in memory.  Have to figure out
            // how they can be kept on disk and represented here.

            // Probably more in here
        }

        public long mBytes; // "disk" size in megabytes (file size or equivalent)
        public long numRecords;  // number of records
        public ResourceFieldStatistics[] fields;

        // Probably more in here
    }

At this point, ResourceStatistics is poorly understood. In initial versions we may choose not to implement it. In additions to questions on what should be in the statistics, there are questions on how statistics should be communicated in relation to partitions. For example, when loading from a table that is stored in owl or hive, one or more partitions may be being loaded. Assuming statistics are kept on the partition level, how are these statistics then communicated to Pig? Is it the loader's job to combine the statistics for all of the partitions being read? Or does it return an array of ResourceStatistics? But if so, what does Pig do with it since it does not know which tuples belong to which partitions (and doesn't want to know). Even worse on store, any statistics Pig has to report is across all data being stored. But the storage function underneath may choose to partition the data. How does it then separate those statistics for the different partitions? In these cases should store functions be in charge of calculating statistics? Perhaps some statistics that can be easily distributed across partitions (such as distribution types) should be calculated and sent down by Pig and some left to the store functions. Perhaps statistics on store should instead be driven by call backs that Pig could define and implement and store functions should call after data has been partitioned if they want to.

LoadPushdown

/**
 * This interface defines how to communicate to Pig what functionality can
 * be pushed into the loader.  If a given loader does not implement this interface
 * it will be assumed that it is unable to accept any functionality for push down.
 */
public interface LoadPushDown {

    /**
     * Set of possible operations that Pig can push down to a loader. 
     */
    enum OperatorSet {PROJECTION};

    /**
     * Determine the operators that can be pushed to the loader.  
     * Note that by indicating a loader can accept a certain operator
     * (such as selection) the loader is not promising that it can handle
     * all selections.  When it is passed the actual operators to 
     * push down it will still have a chance to reject them.
     * @return list of all features that the loader can support
     */
    List<OperatorSet> getFeatures();

    /**
     * Indicate to the loader fields that will be needed.  This can be useful for
     * loaders that access data that is stored in a columnar format where indicating
     * columns to be accessed a head of time will save scans.  This method will
     * not be invoked by the Pig runtime if all fields are required. So implementations
     * should assume that if this method is not invoked, then all fields from 
     * the input are required. If the loader function cannot make use of this 
     * information, it is free to ignore it by returning an appropriate Response
     * @param requiredFieldList RequiredFieldList indicating which columns will be needed.
     */
    public RequiredFieldResponse pushProjection(RequiredFieldList 
            requiredFieldList) throws FrontendException;
    
    public static class RequiredField implements Serializable {
        
        private static final long serialVersionUID = 1L;
        
        // will hold name of the field (would be null if not supplied)
        private String alias; 

        // will hold the index (position) of the required field (would be -1 if not supplied), index is 0 based
        private int index; 

        // A list of sub fields in this field (this could be a list of hash keys for example). 
        // This would be null if the entire field is required and no specific sub fields are required. 
        // In the initial implementation only one level of subfields will be populated.
        private List<RequiredField> subFields;
        
        // Type of this field - the value could be any current PIG DataType (as specified by the constants in DataType class).
        private byte type;

        public RequiredField() {
            // to allow piece-meal construction
        }
        
        /**
         * @param alias
         * @param index
         * @param subFields
         * @param type
         */
        public RequiredField(String alias, int index,
                List<RequiredField> subFields, byte type) {
            this.alias = alias;
            this.index = index;
            this.subFields = subFields;
            this.type = type;
        }

        /**
         * @return the alias
         */
        public String getAlias() {
            return alias;
        }

        /**
         * @return the index
         */
        public int getIndex() {
            return index;
        }

        
        /**
         * @return the required sub fields. The return value is null if all
         *         subfields are required
         */
        public List<RequiredField> getSubFields() {
            return subFields;
        }
        
        public void setSubFields(List<RequiredField> subFields)
        {
            this.subFields = subFields;
        }

        /**
         * @return the type
         */
        public byte getType() {
            return type;
        }

        public void setType(byte t) {
            type = t;
        }
        
        public void setIndex(int i) {
            index = i;
        }
        
        public void setAlias(String alias)
        {
            this.alias = alias;
        }

    }

    public static class RequiredFieldList implements Serializable {
        
        private static final long serialVersionUID = 1L;
        
        // list of Required fields, this will be null if all fields are required
        private List<RequiredField> fields = new ArrayList<RequiredField>(); 
        
        /**
         * @param fields
         */
        public RequiredFieldList(List<RequiredField> fields) {
            this.fields = fields;
        }

        /**
         * @return the required fields - this will be null if all fields are
         *         required
         */
        public List<RequiredField> getFields() {
            return fields;
        }

        public RequiredFieldList() {
        }
        
        public void add(RequiredField rf)
        {
            fields.add(rf);
        }
    }

    public static class RequiredFieldResponse {
        // the loader should pass true if it will return data containing
        // only the List of RequiredFields in that order. false if it
        // will return all fields in the data
        private boolean requiredFieldRequestHonored;

        public RequiredFieldResponse(boolean requiredFieldRequestHonored) {
            this.requiredFieldRequestHonored = requiredFieldRequestHonored;
        }

        // true if the loader will return data containing only the List of
        // RequiredFields in that order. false if the loader will return all
        // fields in the data
        public boolean getRequiredFieldResponse() {
            return requiredFieldRequestHonored;
        }

        // the loader should pass true if the it will return data containing
        // only the List of RequiredFields in that order. false if the it
        // will return all fields in the data
        public void setRequiredFieldResponse(boolean honored) {
            requiredFieldRequestHonored = honored;
        }
    }

    
}

StoreFunc

/**
* This abstract class is used to implement functions to write records
* from a dataset.
* 
*
*/

public abstract class StoreFunc implements StoreFuncInterface {

    /**
     * This method is called by the Pig runtime in the front end to convert the
     * output location to an absolute path if the location is relative. The
     * StoreFunc implementation is free to choose how it converts a relative 
     * location to an absolute location since this may depend on what the location
     * string represent (hdfs path or some other data source). 
     *  
     * 
     * @param location location as provided in the "store" statement of the script
     * @param curDir the current working direction based on any "cd" statements
     * in the script before the "store" statement. If there are no "cd" statements
     * in the script, this would be the home directory - 
     * <pre>/user/<username> </pre>
     * @return the absolute location based on the arguments passed
     * @throws IOException 
     * @throws IOException if the conversion is not possible
     */
    @Override
    public String relToAbsPathForStoreLocation(String location, Path curDir) 
    throws IOException {
        return LoadFunc.getAbsolutePath(location, curDir);
    }

    /**
     * Return the OutputFormat associated with StoreFunc.  This will be called
     * on the front end during planning and not on the backend during
     * execution. 
     * @return the {@link OutputFormat} associated with StoreFunc
     * @throws IOException if an exception occurs while constructing the 
     * OutputFormat
     *
     */
    public abstract OutputFormat getOutputFormat() throws IOException;

    /**
     * Communicate to the storer the location where the data needs to be stored.  
     * The location string passed to the {@link StoreFunc} here is the 
     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
     * This method will be called in the frontend and backend multiple times. Implementations
     * should bear in mind that this method is called multiple times and should
     * ensure there are no inconsistent side effects due to the multiple calls.
     * {@link #checkSchema(ResourceSchema)} will be called before any call to
     * {@link #setStoreLocation(String, Job)}.
     * 

     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object
     * @throws IOException if the location is not valid.
     */
    public abstract void setStoreLocation(String location, Job job) throws IOException;
 
    /**
     * Set the schema for data to be stored.  This will be called on the
     * front end during planning if the store is associated with a schema.
     * A Store function should implement this function to
     * check that a given schema is acceptable to it.  For example, it
     * can check that the correct partition keys are included;
     * a storage function to be written directly to an OutputFormat can
     * make sure the schema will translate in a well defined way.  
     * @param s to be checked
     * @throws IOException if this schema is not acceptable.  It should include
     * a detailed error message indicating what is wrong with the schema.
     */
    @Override
    public void checkSchema(ResourceSchema s) throws IOException {
        // default implementation is a no-op
    }

    /**
     * Initialize StoreFunc to write data.  This will be called during
     * execution before the call to putNext.
     * @param writer RecordWriter to use.
     * @throws IOException if an exception occurs during initialization
     */
    public abstract void prepareToWrite(RecordWriter writer) throws IOException;

    /**
     * Write a tuple the output stream to which this instance was
     * previously bound.
     * 
     * @param t the tuple to store.
     * @throws IOException if an exception occurs during the write
     */
    public abstract void putNext(Tuple t) throws IOException;
    
    /**
     * This method will be called by Pig both in the front end and back end to
     * pass a unique signature to the {@link StoreFunc} which it can use to store
     * information in the {@link UDFContext} which it needs to store between
     * various method invocations in the front end and back end. This method 
     * will be called before other methods in {@link StoreFunc}.
     * @param signature a unique signature to identify this StoreFunc
     */
    @Override
    public void setStoreFuncUDFContextSignature(String signature) {
        // default implementation is a no-op
    }
    
    /**
     * This method will be called by Pig if the job which contains this store
     * fails. Implementations can clean up output locations in this method to
     * ensure that no incorrect/incomplete results are left in the output location.
     * The implementation in {@link StoreFunc} deletes the output location if it
     * is a {@link FileSystem} location.
     * @param location Location returned by 
     * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information. 
     */
    @Override
    public void cleanupOnFailure(String location, Job job) 
    throws IOException {
        cleanupOnFailureImpl(location, job);
    }
    
    /**
     * Implementation for {@link #cleanupOnFailure(String, Job)}
     * @param location
     * @param job
     * @throws IOException
     */
    public static void cleanupOnFailureImpl(String location, Job job) 
    throws IOException {
        FileSystem fs = FileSystem.get(job.getConfiguration());
        Path path = new Path(location);
        if(fs.exists(path)){
            fs.delete(path, true);
        }    
    }
}

StoreMetadata

/**
 * This interface defines how to write metadata related to data to be loaded.
 * If a given store function does not implement this interface, it will be assumed that it
 * is unable to record metadata about the associated data.
 */

public interface StoreMetadata {

    /**
     * Store statistics about the data being written.
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information.  
     * @throws IOException 
     */
    void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException;

    /**
     * Store schema of the data being written
     * @param job The {@link Job} object - this should be used only to obtain 
     * cluster properties through {@link Job#getConfiguration()} and not to set/query
     * any runtime job information.  
     * @throws IOException 
     */
    void storeSchema(ResourceSchema schema, String location, Job job) throws IOException;
}

Given the uncertainly noted above under ResourceStatistics on how statistics should be stored, it is not clear that this interface makes sense.

LoadFunc and InputFormat Interaction

The Slicer and Slice interfaces duplicate much of the InputFormat and InputSplit interfaces provided by Hadoop. The current SliceWrapper helper function interacts with these slices to encode information about input sources into each InputSplit so that Pig can correctly manage which processing pipeline a tuple from a given input is assigned to. Currently load functions also duplicate much of the functionality of Hadoop's RecordReader interface. Currently load functions are provided a data stream and asked to parse both records and fields within records. RecordReader already provides the functionality to parse out records, though it always returns two values, key and value, not a tuple of any number of values. An integral part of this proposal is to change the LoadFunc to call RecordReader.getCurrentKey and getCurrentValue and then parse out fields in the tuple from that result.

Since Pig still needs to add information to InputSplits, user provided InputFormats and InputSplits cannot be used directly. Instead, the proposal is to change PigInputFormat to represent the job's InputFormat to !Hadoop and internally to handle the complexity of multiple inputs and hence multiple InputFormats. PigInputFormat will return PigSplits each of which contain an InputSplit. In addition, PigSplit will contain the necessary information to allow Pig to correctly address tuples to the correct data processing pipeline.

In order to support arbitrary Hadoop InputFormats, Pig can provide a load function, InputFormatLoader, that will take an InputFormat as a constructor argument. Only InputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied InputFormat using reflection. When asked by Pig which InputFormat to use, it will return the one indicated by the user. Its call to getNext will then take the key and value provided by the associated RecordReader and construct a two field tuple. These types will be converted to Pig types as follows:

Writable

Pig type

Comment

Text

chararray

IntWritable

integer

V!IntWritable

integer

LongWritable

long

V!LongWritable

long

FloatWritable

float

DoubleWritable

double

BooleanWritable

int

In the future if Pig exposes boolean as a first class type, this would change to boolean

ByteWritable

int

NullWritable

null

All others

byte array

How do we construct a byte array from arbitrary types?

Since the format of any other types are unknown to Pig and cannot be generalized, it does not make sense to provide casts from byte array to pig types via a LoadCaster. If users wish to use an InputFormat that uses types beyond these and cast them to Pig types, they can extend the InputFormatLoader and return a custom written LoadCaster that will handle casting their data types.

Details

Currently, in addition to information regularly passed with an InputSplit, Pig also passes along additional information via the SliceWrapper (Pig's implementation of InputSplit). This includes an index (it's not clear what this is for, and there are no calls to SliceWrapper.getIndex() in the code; boy comments in code sure are nice), and exec type to indicate MR or local mode, and the list of target operators (the pipeline) for this input. These items can continue to be carried along as is in the proposed PigInputSplit.

Pig's Slice interface currently supports the following calls:

Positioning information in an InputSplit presents a problem. Hadoop 0.18 has a getPos call in the InputSplit, but it has been removed in 0.20. The reason is that input from files can generally be assigned a position, though it may not always be accurate, as in the bzip case. But for some input formats position may not have meaning. Even if Pig does not switch to using InputFormats it will have to deal with this issue, just as MR has.

These changes will affect the SamplableLoader interface. Currently it uses skip and getPos to move the underlying stream so that it can pick up a sample of tuples out of a block. Since it would sit atop InputFormat it would no longer have access to the underlying stream. It would be changed instead to skip a number of tuples.

However, in some places Pig needs this position information. In particular, when building an index for a merge join, Pig needs a way to mark a location in an input while building the index and then return to that position during the join. In this new proposal, the merge join index will contain filename and split index (index of the split in the List returned by InputFormat.getSplits()). The merge join code at run time will then seek to the right split in the file and process from that split on. For this to work the assumption is if we start from a split and read from there on to the last splitwe get sorted data - i.e. the splits in getSplits() is preserve ordering. Since this in general cannot be guaranteed, the proposal to handle this is to sample the first and last keys in each split and record both values in the index entry for that split. The index is then sorted based on both the first and last key. While seeking into the right file based on the join key during merge join processing, the implementation will then read the relevant splits in the right file as indicated in the index (reading the splits from the matching index entry to the last index entry).

These changes will also affect loaders that need to read a record in order to determine their schema, such as BinStorage or a JSON loader. These loaders will need to know how to read at least the first record on their own, without the benefit of the underlying InputFormat, since they will need to call this on the front end where an InputFormat has not yet been used.

In addition to opening files as part of Map-Reduce, Pig loaders also open files on the side in MR jobs. The new load interface needs to be able to open these side files as well. For this we would need to create a new instance of the appropriate InputFormat, calls getSplits, and then iterate over the split and for each split create a RecordReader and process the data returned by the RecordReader and then move to the next split.

Performance concerns.

With this proposal we are strongly encouraging users to double parse their data (since they will parse out the records and then parse those into fields). This could carry a performance penalty. Also, we are removing the ability to seek when sampling, which could also contain a performance penalty. A few thoughts on this:

  1. We are already in the process of moving PigStorage to use LineRecordReader underneath, and we are getting a 30% speed up for doing it. This is only one case, but at least in this case double parsing is not hurting us.

  2. If users that are writing both an InputFormat and a load function are concerned about double parsing, there is nothing stopping them from having their InputFormat track where field boundaries are and pass that information along to their loader so that they can parse in a single pass.

  3. For sampling, it is not clear how sever the penalty is for removing seek, since Hadoop is reading much of the information off disk into the buffers anyway, and sequential read on disk is not that much worse than seek.

All this said, before committing to this we should do some prototyping and performance testing to convince ourselves that these changes will be performance neutral.

StoreFunc and OutputFormat Interaction

In the same way that LoadFunc currently duplicates some functionality of InputFormat, StoreFunc duplicates some functionality of OutputFormat. StoreFunc will be changed to deal primarily with converting a tuple to a key value pair that can be stored by Hadoop.

To support arbitrary OutputFormats, a new storage function OutputFormatStorage could be written that will take an OutputFormat as a constructor argument. Only OutputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied OutputFormat using reflection. Tuples to be stored by this storage function must have either one or two fields. If they have two fields, the first of will be taken to be the key, and the second the value. If they have one, the key will be set to null and the value will be taken from the single field. Data type conversion on this data will be done in the same way as noted above for InputFormatLoader.

Open Questions:

  1. Does all this force us to switch to Hadoop for local mode as well? We aren't opposed to using Hadoop for local mode it just needs to get reasonable fast. Can we use InputFormat et. al. on local files without using the whole HDFS structure? Answer According to Hadoop documentation TextInputFormat works on local files as well as hdfs files. We may need to catch that we are in local mode and change the filename to file:// OR change to using Hadoop's local mode

  2. How will we worked with compressed files? FileInputFormat already works with bzip and gzip compressed files, producing reasonable splits. PigStorage will be reworked to depend on FileInputFormat (or a descendant thereof, see next item) and should therefore be able to use this functionality. Currently Pig supports gz/bzip for arbitrary loadfunc/storefunc combinations. With this proposal, gz/bzip format will only be supported for load/store using PigStorage.

Implementation details and status

Current status

https://issues.apache.org/jira/browse/PIG-966 is the main JIRA to track progress. A branch -'load-store-redesign' (http://svn.apache.org/repos/asf/hadoop/pig/branches/load-store-redesign) has been created to undertake work on this proposal.

Status on Nov 2. 2009: This branch has simple load-store working for PigStorage and BinStorage. Joins on multiple inputs and multi store queries with multi query optimization also work. Some of the recent changes in the proposal above (the changes noted under Nov 2. 2009 in the Changes below) have not been incorporated. A list (may not be comprehensive) of remaining tasks is listed in a subsection below.

Notes on implementation details

This section is to document changes made at a high level to give an overall connected picture which code comments may not provide.

Changes to work with Hadoop InputFormat model

Hadoop has the notion of a single InputFormat per job. This is restrictive since Pig processes multiple inputs in the same map reduce job (in the case of Join, Union or Cogroup). This is handled by PigInputFormat which is the InputFormat Pig communicates to Hadoop as the Job's InputFormat. In PigInputFormat.getSplits(), the implementation processes each input in the following manner:

The list of target operators helps pig give the tuples from an input to the correct part of the pipeline in a multi input pipeline (like in join, cogroup, union).

The other method in InputFormat is createRecordReader which needs be given a TaskAttemptContext. The Configuration present in the TaskAttemptContext needs to have any information that might have been put into it as a result of the above LoadFunc.setLocation() call. However the PigInputFormat.getSplits() method is called in the front-end by Hadoop and PigInputFormat.createRecordReader() is called in the back-end. So we would need to somehow pass a Map between input and the input specific Configuration (updated with location and other information from the relevant LoadFunc.setLocation() call) from the front end to the back-end. One way to pass this map would be in the Configuration of the JobContext passed to PigInputFormat.getSplits(). However in Hadoop 0.20.1 this Configuration present in the JobContext passed to PigInputFormat.getSplits() is a copy of the Configuration which is serialized to the backend and used to create the TaskAttemptContext passed in PigInputFormat.createRecordReader(). Hence passing the map this way is not possible. Hence we re-create the side effects of the LoadFunc.setLocation() call in PigInputFormat.getSplits() in PigInputFormat.createRecordReader() by the following sequence:

Open Question: - We are hoping that LoadFunc actually sets up the input location on the conf in the setLocation() call - and then using that conf in createRecordReader() call - what if it does this in getInputFormat()?

Changes to work with Hadoop OutputFormat model

Hadoop has the notion of a single OutputFormat per job. PigOutputFormat is the class indicated by Pig as the OutputFormat for map reduce jobs compiled from pig scripts.

In PigOutputFormat.checkOutputSpecs(), the implementation iterates over !POStore(s) in the map and reduce phases and for each such store does the following:

PigOutputFormat.getOutputCommitter() returns a PigOutputCommitter object. The PigOutputCommitter internally keeps a list of OutputCommitters corresponding to OutputFormat of StoreFunc(s) in the POStore(s) in the map and reduce phases. It delegates all calls in the OutputCommitter class invoked by Hadoop to calls on the appropriate underlying committers.

The other method in OutputFormat is the getRecordWriter() method. In the single store case PigOutputFormat.getRecordWriter() does the following:

For the multi query optimized multi store case, there are multiple !POStores in the same map reduce job. In this case, the data is written out in the Pig map or reduce pipeline itself through the POStore operator. Details of this can be found in http://wiki.apache.org/pig/PigMultiQueryPerformanceSpecification - "Internal Changes" section - "Store Operator" subsection. So from the pig runtime code, we never call Context.write() (which would have internally called PigRecordWriter.write()). So the handling of multi stores has not changed for writing data out for this redesign.

Mechanism to read side files

Pig needs to read side files in many places like in Merge Join, Order by, Skew join, dump etc. To facilitate doing this in an easy manner, a utility LoadFunc called ReadToEndLoader has been introduced. Though this has been implemented as a LoadFunc, the only LoadFunc method which is truly implemented is getNext(). The usage pattern is to construct an instance using the constructor which would take a reference to the true LoadFunc (which can read the side file data) and then repeatedly call getNext() till null is encountered in the return value. The implementation of ReadToEndLoader hides the actions of getting InputSplits from the underlying InputFormat and then processing each split by getting the RecordReader and processing data in the split before moving to the next.

Changes to skew join sampling (PoissonSampleLoader)

See discussion in PIG-1062 .

Problem 1: Earlier version of PoissonSampleLoader stored the size on disk as an extra last column in the sampled tuples it returned in map phase of sampling MR job. This was used in PartitionSkwewedKeys udf in the reduce stage of sampling job to compute total number of tuples using input-file-size/avg-disk-sz-from-samples . Avg-disk-sz-from-samples is not available with new loader design, because getPosition() is not there.

Solution : PoissonSampleLoader returns a special tuple with number of rows in that Map, in addition to the sampled tuples. To create this special tuple, the max row length in input sampled tuples is tracked, and a new tuple with size of max_row_length + 2 is created. And spl_tuple[max_row_length ] = "marker_string"

The size of max_row_length+2 is used because the join key can be an expression, which is evaluated on the columns in tuples returned by the sampler, and the expression might expect specific data types to be present in certain (<= max_row_length) locations of the tuple. If number of tuples in sample is 0, the special tuple is not sent.

In PartitionSkwewedKeys udf in the reduce stage,the udf iterates over the tuples to find these special tuples and calculate the total number of rows.

Problem 2: PoissonSampleLoader samples 17 tuples from every set of tuples that will fit into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples that fit into reducer memory - X. Ie we need to sample one tuple every X/17 tuples. Earlier, the number of tuples to be sampled was calculated before the tuples were read, in PoissonSampleLoader.computeSamples(..) . To get the number of samples to be sampled in a map, the formula used was = number-of-reducer-memories-needed * 17 / number-of-splits
Where - number-of-reducer-memories-needed = (total_file_size * disk_to_mem_factor)/available_reducer_heap_size
disk_to_mem_factor has default of 2.

Then PoissonSampleLoader would return sampled tuples by skipping split-size/num_samples bytes at a time.

With new loader we have to skip some number of tuples instead of bytes. But we don't have an estimate of total number of tuples in the input.
One way to work around this would be to use size of tuple in memory to estimate size of tuple in disk using above disk_to_mem_factor, then number of tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples

But the use of disk_to_mem_factor is very dubious, the real disk_to_mem_factor will vary based on compression-algorithm, data characteristics (sorting etc), and encoding.

Solution: The goal is to sample one tuple every X/17 tuples. (X = number of tuples that fit in available reducer memory).
To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size.
Number of tuples skipped for every sampled tuple = 1/17 * ( available_reducer_heap_size/average-tuple-mem-size)

The average-tuple-mem-size and number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new tuple is sampled.

Changes to order-by sampling (RandomSampler)

Problem: With new interface, we cannot use the old approach : In each map job of the sampling MR job, divide the split-size by number of samples required to determine the number of bytes to skip after each sample. <<br>> With new interface, we can skip only by number of tuples not by bytes on disk.

Proposal: The goal is to sample tuples with equal probability for any tuple getting sampled (assuming number of tuples to be sampled is much smaller than total number of tuples).

Changes to Streaming

Currently Pig streaming uses store and load functions to handle the serialization and deserialization of data between Pig and the user's executable. Based on the changes proposed in this redesign that will no longer be feasible. Hadoop's InputFormat and OutputFormat are not appropriate abstractions for communicating with an external executable.

Pig's current support for variable data formats in streaming is more extensive than Hadoop's. Conceptually users could create executables that read data from BinStorage or other non-text formats. The Pig development team are not aware of any users doing this, nor is it clear why they would choose to. Removing this functionality will make Pig's streaming implementation simpler. It also avoids shoe-horning streaming into the new load store design. (For complete details of Hadoop's streaming see http://hadoop.apache.org/common/docs/r0.20.1/streaming.html .)

I propose that Pig move to a model of using Hadoop's default streaming format, which is to expect new-line separated records, with tab being used as a field separator. Hadoop allows users to redefine the field separator, and so should Pig. This will also match the current default of using PigStorage as the (de)serializer for streaming. As before Pig should support communicating with the executable via either stdin and stdout or files. This will force a syntax change in Pig Latin. Currently, if a user wants to stream data to an executable with comma separated fields instead of tab separated fields, the syntax is:

define CMD `perl PigStreaming.pl - foo nameMap` input(stdin using PigStorage(',')) output(stdout using PigStorage(','));
A = load 'file';
B = stream B through CMD;

The syntax should change the reference to a store and load functions to PigToStream and StreamToPig functions as defined below. Thus the above would become:

define CMD `perl PigStreaming.pl - foo nameMap` input(stdin using PigStreaming(',')) output(stdout using PigStreaming(','));
A = load 'file';
B = stream B through CMD;

From an implementation viewpoint, the functionality required to write to and read from the streaming binary will be equivalent to the tuple parsing and serialization of PigStorage.getNext() and PigStorage.putNext(). While it will not be possible to use PigStorage directly every effort should be made to share this code (most likely by putting the actual code in static utility methods that can be called by each class) to avoid double code maintenance costs.

It has been suggested that we should switch to the typed bytes protocol that is available in Hadoop and Hive (see

connection an interface so that users can easily extend it in the future. The interface should be quite simple:

    interface PigToStream {

        /**
         * Given a tuple, produce an array of bytes to be passed to the streaming
         * executable.
         */
        public byte[] serialize(Tuple t) throws IOException;
    }

    interface StreamToPig {

        /**
         *  Given a byte array from a streaming executable, produce a tuple.
         */
        public Tuple deserialize(byte[]) throws IOException;

        /**
         * This will be called on the front end during planning and not on the back
         * end during execution.
         *
         * @return the {@link LoadCaster} associated with this object.
         * @throws IOException if there is an exception during LoadCaster
         */
        public LoadCaster getLoadCaster() throws IOException;
    }

The default implementation of this would be as suggested above. The syntax for describing how data is (de)serialized would then stay as it currently is, except instead of giving a StoreFunc the user would specify a PigToStream, and instead of specifying a LoadFunc a StreamToPig.

Additionally, it has been noted that this change takes away the current optimization of Pig Latin scripts such as the following:

A = load 'myfile' split by 'file';
B = stream A through 'mycmd';
store B into 'outfile';

In this case Pig will optimize the query by removing the load function and replacing it with BinaryStorage, a function which simply passes the data as is to the streaming executable. It does not record or field parsing. Similarly, the store in the above script would be replaced with BinaryStorage.

We have two options to replace this. First, we could say that if a class implementing PigToStream also implements InputFormat, then Pig will drop the Load statement and use that InputFormat directly to load data and then pass the results to the stream. The same would be done with StreamToPig, OutputFormat and store. Second, we could create IdentityLoader and IdentityStreamToPig functions. IdentityLoader.getNext would return a tuple that just had one bytearray, which would be the entire record. This would then be a trivial serialization via the default PigToStream. Similarly IdentityStreamToPig would take the bytes returned by the stream and put them in a tuple of a single bytearray. The store function would then naturally translate this tuple into the underlying bytes. Functionally these are basically equivalent, since Pig would need to write code similar to the IdentityLoader etc. for the second case. So I believe the primary difference is in how it is presented to the user not the functionality or code written underneath.

Both of these approaches suffer from the problem that they assume TextInputFormat and TextOutputFormat. For any other IF/OF it will not be clear how to parse key, value pairs out of the stream data.

This optimization represents a fair amount of work. As the current optimization is not documented, it is not clear how many users are using it. Based on that I vote that we do not implement this optimization until such time as we see a need for it.

Remaining Tasks

Changes

Sept 23 2009, Gates

Sept 25 2009, Gates

Sept 29 2009, Gates

Nov 2 2009, Pradeep Kamath

In LoadFunc:

In LoadMetadata:

In StoreFunc:

Added a new section 'Implementation details and status'

Nov 11, Dmitriy Ryaboy

Nov 12 2009, Thejas Nair Added sections -

Nov 23 2009, Gates

Nov 23 2009, Dmitriy Ryaboy

Nov 25 2009, Gates

Dec 16 2009, Richard Ding

Mar 1 2010, Pradeep Kamath

LoadStoreRedesignProposal (last edited 2010-03-02 01:14:27 by PradeepKamath)