Proposed Redesign For Load, Store, and Slicer in Pig
Goals
The current design of LoadFunc, StoreFunc, and the Slicer interfaces in Pig are not adequate. This proposed redesign has the following goals:
The Slicer interface is redundant. Remove it and allow users to directly use Hadoop InputFormats in Pig.
It is not currently easy to use a separate OutputFormat for a StoreFunc. This should be made easy to allow users to store data into locations other than HDFS.
Currently users that wish to operate on Pig and Map-Reduce are required to write Hadoop InputFormat and OutputFormat as well as a Pig load and storage functions. While Pig load and store functions will always be necessary to take the most advantage of Pig, it would be good for users to be able to use Hadoop InputFormat and OutputFormat classes directly to minimize the data interchange cost.
New storage formats such as Zebra are being implemented for Hadoop that include metadata information such as schema, etc. The LoadFunc interface needs to allow Pig to obtain this metadata. There is a describeSchema call in the current interface. More functions may be necessary.
- These new storage formats also plan to support pushing of, at least, projection and selection into the storage layer. Pig needs to be able to query loaders to determine what if any pushdown capabilities they support and then make use of those capabilities.
- There already exists one metadata system in Hadoop (Hive's metastore) and there is a proposal to add another (Owl). Pig needs to be able to query these metadata systems for information about data to be read. It also needs to be able to record information to these metadata systems when writing data. The load and store functions are a reasonable place to do these operations since that is the point at which Pig is reading and writing data. This will also allow Pig to read and write data from and to multiple metadata stores in single Pig Latin scripts if that is desired.
A requirement for the implementation that does not fit into the goals above is that while the existing Pig implementation is tightly tied to Hadoop (and is becoming more tightly tied all the time), we do not want to tie Pig Latin tightly to Hadoop. Therefore while we plan to allow users to easily interact with Hadoop InputFormats and OutputFormats, these should not be exposed as such to Pig Latin. Pig Latin must still view these as load and store functions; it will only be the underlying implementation that will realize that they are Hadoop classes and handle them appropriately.
Interfaces
With these proposed changes, load and store functions in Pig are becoming very weighty objects. The current LoadFunc interface already provides mechanisms for reading the data, getting some schema information, casting data, and some place holders for pushing down projections into the loader. This proposal will add more file level metadata, selection push down, plus interaction with InputFormats. It will also add OutputFormats to store functions. If we create two monster interfaces that attempt to provide everything, the burden of creating a new load or store function in Pig will become overwhelming. Instead, this proposal envisions splitting the interface into a number of interfaces, each with a clear responsibility. Load and store functions will then only be required to implement the interfaces for functionality they offer.
For load functions:
LoadFunc will be pared down to just contain functions directly associated with reading data, such as getNext.
A new LoadCaster interface will be added. This interface will contain all of the bytesToX methods currently in LoadFunc. LoadFunc will add a getCaster routine, that will return an object that can provide casts. The existing UTF8!StorageConverter class will change to implement this interface. Load functions will then be free to use this class as their caster, or provide their own. For existing load functions that provide all of the bytesToX methods, they can implement the LoadCaster interface and return themselves from the getCaster routine. If a loader does not provide a LoadCaster, casts from byte array to other Pig types will not be supported for data loaded via that loader.
A new LoadMetadata interface will be added. Calls that find metadata about the data being loaded, such as determineSchema, will be placed in this interface. If a loader does not implement this interface, then Pig will assume that no metadata is obtainable for this data.
A new LoadPushDown interface will be added. Calls that determine what can be pushed down and pushing that functionality down into the loader will be placed in this interface. If a loader does not implement this interface, then Pig will assume that the loader is not capable of pushing down any functionality.
For store functions:
A new method getOutputFormat will be added to StoreFunc to allow a storage function to return its output format.
A new interface StoreMetadata will be added to provide a way for storage functions to record metadata. If a given storage function does not implement this interface Pig will assume that it is unable to record metadata.
Details
LoadFunc
/**
* This interface is used to implement functions to parse records
* from a dataset.
*/
public interface LoadFunc {
/**
* This method is called by the Pig runtime in the front end to convert the
* input location to an absolute path if the location is relative. The
* loadFunc implementation is free to choose how it converts a relative
* location to an absolute location since this may depend on what the location
* string represent (hdfs path or some other data source)
*
* @param location location as provided in the "load" statement of the script
* @param curDir the current working direction based on any "cd" statements
* in the script before the "load" statement. If there are no "cd" statements
* in the script, this would be the home directory -
* <pre>/user/<username> </pre>
* @return the absolute location based on the arguments passed
* @throws IOException if the conversion is not possible
*/
String relativeToAbsolutePath(String location, Path curDir) throws IOException;
/**
* Communicate to the loader the location of the object(s) being loaded.
* The location string passed to the LoadFunc here is the return value of
* {@link LoadFunc#relativeToAbsolutePath(String, String)}
*
* This method will be called in the backend multiple times. Implementations
* should bear in mind that this method is called multiple times and should
* ensure there are no inconsistent side effects due to the multiple calls.
*
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, String)}.
* @param job the {@link Job} object
* @throws IOException if the location is not valid.
*/
void setLocation(String location, Job job) throws IOException;
/**
* This will be called during planning on the front end. This is the
* instance of InputFormat (rather than the class name) because the
* load function may need to instantiate the InputFormat in order
* to control how it is constructed.
* @return the InputFormat associated with this loader.
* @throws IOException if there is an exception during InputFormat
* construction
*/
InputFormat getInputFormat() throws IOException;
/**
* This will be called on the front end during planning and not on the back
* end during execution.
* @return the {@link LoadCaster} associated with this loader. Returning null
* indicates that casts from byte array are not supported for this loader.
* construction
* @throws IOException if there is an exception during LoadCaster
*/
LoadCaster getLoadCaster() throws IOException;
/**
* Initializes LoadFunc for reading data. This will be called during execution
* before any calls to getNext. The RecordReader needs to be passed here because
* it has been instantiated for a particular InputSplit.
* @param reader {@link RecordReader} to be used by this instance of the LoadFunc
* @param split The input {@link PigSplit} to process
* @throws IOException if there is an exception during initialization
*/
void prepareToRead(RecordReader reader, PigSplit split) throws IOException;
/**
* Retrieves the next tuple to be processed.
* @return the next tuple to be processed or null if there are no more tuples
* to be processed.
* @throws IOException if there is an exception while retrieving the next
* tuple
*/
Tuple getNext() throws IOException;
}The LoadCaster interface will include bytesToInt, bytesToLong, etc. functions currently in LoadFunc. UTF8!StorageConverter will implement this interface.
Open Question: Should the methods to convert to a Bag, Tuple and Map take a Schema (ResourceSchema?) argument?
LoadMetadata
/**
* This interface defines how to retrieve metadata related to data to be loaded.
* If a given loader does not implement this interface, it will be assumed that it
* is unable to provide metadata about the associated data.
*/
public interface LoadMetadata {
/**
* Get a schema for the data to be loaded.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, String)}
* @param conf The {@link Configuration} object
* @return schema for the data to be loaded. This schema should represent
* all tuples of the returned data. If the schema is unknown or it is
* not possible to return a schema that represents all returned data,
* then null should be returned.
* @throws IOException if an exception occurs while determining the schema
*/
ResourceSchema getSchema(String location, Configuration conf) throws
IOException;
/**
* Get statistics about the data to be loaded. If no statistics are
* available, then null should be returned.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, String)}
* @param conf The {@link Configuration} object
* @return statistics about the data to be loaded. If no statistics are
* available, then null should be returned.
* @throws IOException if an exception occurs while retrieving statistics
*/
ResourceStatistics getStatistics(String location, Configuration conf)
throws IOException;
/**
* Find what columns are partition keys for this input.
* @param location Location as returned by
* {@link LoadFunc#relativeToAbsolutePath(String, String)}
* @param conf The {@link Configuration} object
* @return array of field names of the partition keys.
* @throws IOException if an exception occurs while retrieving partition keys
*/
String[] getPartitionKeys(String location, Configuration conf)
throws IOException;
/**
* Set the filter for partitioning. It is assumed that this filter
* will only contain references to fields given as partition keys in
* getPartitionKeys
* @param plan that describes filter for partitioning
* @throws IOException if the filter is not compatible with the storage
* mechanism or contains non-partition fields.
*/
void setParitionFilter(OperatorPlan plan) throws IOException;
}ResourceSchema will be a top level object (org.apache.pig.ResourceSchema) used to communicate information about data to be loaded or that is being stored. It is not the same as the existing org.apache.pig.impl.logicalLayer.schema.Schema.
public class ResourceSchema {
int version;
public class ResourceFieldSchema {
public String name;
public DataType type;
public String description;
public ResourceFieldSchema schema; // nested tuples and bags will have their own schema
}
public ResourceFieldSchema[] fields;
enum Order { ASCENDING, DESCENDING }
public int[] sortKeys; // each entry is an offset into the fields array.
public Order[] sortKeyOrders;
}Feedback from Pradeep: We must fix the two level access issues with schema of bags in current schema before we make these changes, otherwise that same contagion will afflict us here.
ResourceStatistics
public class ResourceStatistics {
public class ResourceFieldStatistics {
int version;
enum Distribution {UNIFORM, NORMAL, POWER};
public long numDistinctValues; // number of distinct values represented in this field
public Distribution distribution; // how values in this field are distributed
// We need some way to represent a histogram of values in the field,
// as those will be useful. However, we can't count on being
// able to hold such histograms in memory. Have to figure out
// how they can be kept on disk and represented here.
// Probably more in here
}
public long mBytes; // "disk" size in megabytes (file size or equivalent)
public long numRecords; // number of records
public ResourceFieldStatistics[] fields;
// Probably more in here
}At this point, ResourceStatistics is poorly understood. In initial versions we may choose not to implement it. In additions to questions on what should be in the statistics, there are questions on how statistics should be communicated in relation to partitions. For example, when loading from a table that is stored in owl or hive, one or more partitions may be being loaded. Assuming statistics are kept on the partition level, how are these statistics then communicated to Pig? Is it the loader's job to combine the statistics for all of the partitions being read? Or does it return an array of ResourceStatistics? But if so, what does Pig do with it since it does not know which tuples belong to which partitions (and doesn't want to know). Even worse on store, any statistics Pig has to report is across all data being stored. But the storage function underneath may choose to partition the data. How does it then separate those statistics for the different partitions? In these cases should store functions be in charge of calculating statistics? Perhaps some statistics that can be easily distributed across partitions (such as distribution types) should be calculated and sent down by Pig and some left to the store functions. Perhaps statistics on store should instead be driven by call backs that Pig could define and implement and store functions should call after data has been partitioned if they want to.
LoadPushdown
/**
* This interface defines how to communicate to Pig what functionality can
* be pushed into the loader. If a given loader does not implement this interface
* it will be assumed that it is unable to accept any functionality for push down.
*/
interface LoadPushDown {
/**
* Set of possible operations that Pig can push down to a loader.
*/
enum OperatorSet {PROJECTION, SELECTION};
/**
* Determine the operators that can be pushed to the loader.
* Note that by indicating a loader can accept a certain operator
* (such as selection) the loader is not promising that it can handle
* all selections. When it is passed the actual operators to
* push down it will still have a chance to reject them.
* @return list of all features that the loader can support
*/
List<OperatorSet> getFeatures();
/**
* Propose a set of operators to push to the loader.
* @param plan Plan containing proposed operators
* @return true if the loader accepts the plan, false if not.
* If false is returned Pig may choose to trim the plan and call
* this method again.
*/
boolean pushOperators(OperatorPlan plan);
}An open question for LoadPushdown is how do we communicate what needs to be pushed down? In the above, OperatorPlan is envisioned as a simple syntax tree that would be sufficient to communicate operators and their expressions to the storage functions. Initially we considered using the LogicalPlan as is. But our LogicalPlan is a mess right now. It's also tightly tied to our implementation, so exposing it as an interface is unwise. We could use a SQL like string, but then loaders have to implement a parser.
StoreFunc
/**
* This interface is used to implement functions to write records
* from a dataset.
*/
public interface StoreFunc {
/**
* This method is called by the Pig runtime in the front end to convert the
* output location to an absolute path if the location is relative. The
* StoreFunc implementation is free to choose how it converts a relative
* location to an absolute location since this may depend on what the location
* string represent (hdfs path or some other data source)
*
* @param location location as provided in the "store" statement of the script
* @param curDir the current working direction based on any "cd" statements
* in the script before the "store" statement. If there are no "cd" statements
* in the script, this would be the home directory -
* <pre>/user/<username> </pre>
* @return the absolute location based on the arguments passed
* @throws IOException if the conversion is not possible
*/
String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException;
/**
* Return the OutputFormat associated with StoreFunc. This will be called
* on the front end during planning and not on the backend during
* execution.
* @return the {@link OutputFormat} associated with StoreFunc
* @throws IOException if an exception occurs while constructing the
* OutputFormat
*
*/
OutputFormat getOutputFormat() throws IOException;
/**
* Communicate to the store function the location used in Pig Latin to refer
* to the object(s) being stored. That is, if the PL script is
* <b>store A into 'bla'</b>
* then 'bla' is the location. This location should be either a file name
* or a URI. If it does not have a URI scheme Pig will assume it is a
* filename. This will be
* called during planning on the front end, not during execution on
* the backend.
* @param location Location indicated in store statement.
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
*/
void setStoreLocation(String location, Job job) throws IOException;
/**
* Set the schema for data to be stored. This will be called on the
* front end during planning. A Store function should implement this function to
* check that a given schema is acceptable to it. For example, it
* can check that the correct partition keys are included;
* a storage function to be written directly to an OutputFormat can
* make sure the schema will translate in a well defined way.
* @param s to be checked
* @throws IOException if this schema is not acceptable. It should include
* a detailed error message indicating what is wrong with the schema.
*/
void checkSchema(ResourceSchema s) throws IOException;
/**
* Initialize StoreFunc to write data. This will be called during
* execution before the call to putNext.
* @param writer RecordWriter to use.
* @throws IOException if an exception occurs during initialization
*/
void prepareToWrite(RecordWriter writer) throws IOException;
/**
* Write a tuple the output stream to which this instance was
* previously bound.
*
* @param t the tuple to store.
* @throws IOException if an exception occurs during the write
*/
void putNext(Tuple t) throws IOException;
}StoreMetadata
/**
* This interface defines how to write metadata related to data to be loaded.
* If a given store function does not implement this interface, it will be assumed that it
* is unable to record metadata about the associated data.
*/
interface StoreMetadata {
/**
* Set statistics about the data being written.
* @throws IOException
*/
void setStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException;
/**
* Set schema of the data being written
* @throws IOException
*/
void setSchema(ResourceSchema schema, String location, Configuration conf) throws IOException;
}Given the uncertainly noted above under ResourceStatistics on how statistics should be stored, it is not clear that this interface makes sense.
LoadFunc and InputFormat Interaction
The Slicer and Slice interfaces duplicate much of the InputFormat and InputSplit interfaces provided by Hadoop. The current SliceWrapper helper function interacts with these slices to encode information about input sources into each InputSplit so that Pig can correctly manage which processing pipeline a tuple from a given input is assigned to. Currently load functions also duplicate much of the functionality of Hadoop's RecordReader interface. Currently load functions are provided a data stream and asked to parse both records and fields within records. RecordReader already provides the functionality to parse out records, though it always returns two values, key and value, not a tuple of any number of values. An integral part of this proposal is to change the LoadFunc to call RecordReader.getCurrentKey and getCurrentValue and then parse out fields in the tuple from that result.
Since Pig still needs to add information to InputSplits, user provided InputFormats and InputSplits cannot be used directly. Instead, the proposal is to change PigInputFormat to represent the job's InputFormat to !Hadoop and internally to handle the complexity of multiple inputs and hence multiple InputFormats. PigInputFormat will return PigSplits each of which contain an InputSplit. In addition, PigSplit will contain the necessary information to allow Pig to correctly address tuples to the correct data processing pipeline.
In order to support arbitrary Hadoop InputFormats, Pig can provide a load function, InputFormatLoader, that will take an InputFormat as a constructor argument. Only InputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied InputFormat using reflection. When asked by Pig which InputFormat to use, it will return the one indicated by the user. Its call to getNext will then take the key and value provided by the associated RecordReader and construct a two field tuple. These types will be converted to Pig types as follows:
Writable |
Pig type |
Comment |
Text |
chararray |
|
IntWritable |
integer |
|
integer |
|
|
LongWritable |
long |
|
long |
|
|
FloatWritable |
float |
|
DoubleWritable |
double |
|
BooleanWritable |
int |
In the future if Pig exposes boolean as a first class type, this would change to boolean |
ByteWritable |
int |
|
NullWritable |
null |
|
All others |
byte array |
How do we construct a byte array from arbitrary types? |
Since the format of any other types are unknown to Pig and cannot be generalized, it does not make sense to provide casts from byte array to pig types via a LoadCaster. If users wish to use an InputFormat that uses types beyond these and cast them to Pig types, they can extend the InputFormatLoader and return a custom written LoadCaster that will handle casting their data types.
Details
Currently, in addition to information regularly passed with an InputSplit, Pig also passes along additional information via the SliceWrapper (Pig's implementation of InputSplit). This includes an index (it's not clear what this is for, and there are no calls to SliceWrapper.getIndex() in the code; boy comments in code sure are nice), and exec type to indicate MR or local mode, and the list of target operators (the pipeline) for this input. These items can continue to be carried along as is in the proposed PigInputSplit.
Pig's Slice interface currently supports the following calls:
getLocations - will be replaced by InputSplit.getLocations
init - replaced by RecordReader.initialize
- getStart - see below
getLength - replaced by InputSplit.getLength
close - replaced by RecordReader.close
- getPos - see below
getProgress - replaced by RecordReader.getProgress
next - replaced by RecordReader.getCurrentKey and getCurrentValue
Positioning information in an InputSplit presents a problem. Hadoop 0.18 has a getPos call in the InputSplit, but it has been removed in 0.20. The reason is that input from files can generally be assigned a position, though it may not always be accurate, as in the bzip case. But for some input formats position may not have meaning. Even if Pig does not switch to using InputFormats it will have to deal with this issue, just as MR has.
These changes will affect the SamplableLoader interface. Currently it uses skip and getPos to move the underlying stream so that it can pick up a sample of tuples out of a block. Since it would sit atop InputFormat it would no longer have access to the underlying stream. It would be changed instead to skip a number of tuples.
However, in some places Pig needs this position information. In particular, when building an index for a merge join, Pig needs a way to mark a location in an input while building the index and then return to that position during the join. In this new proposal, the merge join index will contain filename and split index (index of the split in the List returned by InputFormat.getSplits()). The merge join code at run time will then seek to the right split in the file and process from that split on. For this to work the assumption is if we start from a split and read from there on to the last splitwe get sorted data - i.e. the splits in getSplits() is preserve ordering. Since this in general cannot be guaranteed, the proposal to handle this is to sample the first and last keys in each split and record both values in the index entry for that split. The index is then sorted based on both the first and last key. While seeking into the right file based on the join key during merge join processing, the implementation will then read the relevant splits in the right file as indicated in the index (reading the splits from the matching index entry to the last index entry).
These changes will also affect loaders that need to read a record in order to determine their schema, such as BinStorage or a JSON loader. These loaders will need to know how to read at least the first record on their own, without the benefit of the underlying InputFormat, since they will need to call this on the front end where an InputFormat has not yet been used.
In addition to opening files as part of Map-Reduce, Pig loaders also open files on the side in MR jobs. The new load interface needs to be able to open these side files as well. For this we would need to create a new instance of the appropriate InputFormat, calls getSplits, and then iterate over the split and for each split create a RecordReader and process the data returned by the RecordReader and then move to the next split.
Performance concerns.
With this proposal we are strongly encouraging users to double parse their data (since they will parse out the records and then parse those into fields). This could carry a performance penalty. Also, we are removing the ability to seek when sampling, which could also contain a performance penalty. A few thoughts on this:
We are already in the process of moving PigStorage to use LineRecordReader underneath, and we are getting a 30% speed up for doing it. This is only one case, but at least in this case double parsing is not hurting us.
If users that are writing both an InputFormat and a load function are concerned about double parsing, there is nothing stopping them from having their InputFormat track where field boundaries are and pass that information along to their loader so that they can parse in a single pass.
- For sampling, it is not clear how sever the penalty is for removing seek, since Hadoop is reading much of the information off disk into the buffers anyway, and sequential read on disk is not that much worse than seek.
All this said, before committing to this we should do some prototyping and performance testing to convince ourselves that these changes will be performance neutral.
StoreFunc and OutputFormat Interaction
In the same way that LoadFunc currently duplicates some functionality of InputFormat, StoreFunc duplicates some functionality of OutputFormat. StoreFunc will be changed to deal primarily with converting a tuple to a key value pair that can be stored by Hadoop.
To support arbitrary OutputFormats, a new storage function OutputFormatStorage could be written that will take an OutputFormat as a constructor argument. Only OutputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied OutputFormat using reflection. Tuples to be stored by this storage function must have either one or two fields. If they have two fields, the first of will be taken to be the key, and the second the value. If they have one, the key will be set to null and the value will be taken from the single field. Data type conversion on this data will be done in the same way as noted above for InputFormatLoader.
Open Questions:
Does all this force us to switch to Hadoop for local mode as well? We aren't opposed to using Hadoop for local mode it just needs to get reasonable fast. Can we use InputFormat et. al. on local files without using the whole HDFS structure? Answer According to Hadoop documentation TextInputFormat works on local files as well as hdfs files. We may need to catch that we are in local mode and change the filename to file:// OR change to using Hadoop's local mode
How will we worked with compressed files? FileInputFormat already works with bzip and gzip compressed files, producing reasonable splits. PigStorage will be reworked to depend on FileInputFormat (or a descendant thereof, see next item) and should therefore be able to use this functionality. Currently Pig supports gz/bzip for arbitrary loadfunc/storefunc combinations. With this proposal, gz/bzip format will only be supported for load/store using PigStorage.
Implementation details and status
Current status
https://issues.apache.org/jira/browse/PIG-966 is the main JIRA to track progress. A branch -'load-store-redesign' (http://svn.apache.org/repos/asf/hadoop/pig/branches/load-store-redesign) has been created to undertake work on this proposal.
Status on Nov 2. 2009: This branch has simple load-store working for PigStorage and BinStorage. Joins on multiple inputs and multi store queries with multi query optimization also work. Some of the recent changes in the proposal above (the changes noted under Nov 2. 2009 in the Changes below) have not been incorporated. A list (may not be comprehensive) of remaining tasks is listed in a subsection below.
Notes on implementation details
This section is to document changes made at a high level to give an overall connected picture which code comments may not provide.
Changes to work with Hadoop InputFormat model
Hadoop has the notion of a single InputFormat per job. This is restrictive since Pig processes multiple inputs in the same map reduce job (in the case of Join, Union or Cogroup). This is handled by PigInputFormat which is the InputFormat Pig communicates to Hadoop as the Job's InputFormat. In PigInputFormat.getSplits(), the implementation processes each input in the following manner:
Instantiate the LoadFunc associated with the input
Make a clone of the Configuration passed in the getSplits() call and then invoke LoadFunc.setlocation() using the clone. The reason a clone is necessary here is because generally in the setlocation() method, the loadfunc would communicate the location to its underlying InputFormat. Typically InputFormats store the location into the Configuration for use in the getSplits() call. For example, FileInputFormat does this through FileInputFormat.setInputPaths(Job job, String location). We don't want updates to the Configuration for different inputs to over-write each other - hence the clone.
Call getInputFormat() on the LoadFunc and then getSplits() on the InputFormat returned. Note that the above setLocation call needs to happen *before* the getSplits() call and the getSplits() call needs to be given a JobContext built out of the "updated (with location)" cloned Configuration.
Wrap each returned InputSplit in PigSplit to store information like the list of target operators (the pipeline) for this input, the index of the split in the List of Splits returned by getSplits (this is used during merge join index creation) etc (comments in PigSplit explain the members)
The list of target operators helps pig give the tuples from an input to the correct part of the pipeline in a multi input pipeline (like in join, cogroup, union).
The other method in InputFormat is createRecordReader which needs be given a TaskAttemptContext. The Configuration present in the TaskAttemptContext needs to have any information that might have been put into it as a result of the above LoadFunc.setLocation() call. However the PigInputFormat.getSplits() method is called in the front-end by Hadoop and PigInputFormat.createRecordReader() is called in the back-end. So we would need to somehow pass a Map between input and the input specific Configuration (updated with location and other information from the relevant LoadFunc.setLocation() call) from the front end to the back-end. One way to pass this map would be in the Configuration of the JobContext passed to PigInputFormat.getSplits(). However in Hadoop 0.20.1 this Configuration present in the JobContext passed to PigInputFormat.getSplits() is a copy of the Configuration which is serialized to the backend and used to create the TaskAttemptContext passed in PigInputFormat.createRecordReader(). Hence passing the map this way is not possible. Hence we re-create the side effects of the LoadFunc.setLocation() call in PigInputFormat.getSplits() in PigInputFormat.createRecordReader() by the following sequence:
Instantiate the LoadFunc associated with input represented by the PigSplit passed into PigInputFormat.createRecordReader()
invoke LoadFunc.setLocation()
Call getInputFormat() on the LoadFunc and then createRecordReader() on the InputFormat returned. Note that the above setLocation call needs to happen *before* the createRecordReader() call and the createRecordReader() call needs to be given a TaskAttemptContext built out of the "updated (with location)" Configuration.
Wrap the RecordReader returned above in PigRecordReader class which is returned to Hadoop as the RecordReader. PigRecordReader has Text as key type (which is always sent with a null value to Hadoop since in pig, we really do not extract a key from input records) and a Tuple as a the value type (which is a tuple constructed from the input record).
Open Question: - We are hoping that LoadFunc actually sets up the input location on the conf in the setLocation() call - and then using that conf in createRecordReader() call - what if it does this in getInputFormat()?
Changes to work with Hadoop OutputFormat model
Hadoop has the notion of a single OutputFormat per job. PigOutputFormat is the class indicated by Pig as the OutputFormat for map reduce jobs compiled from pig scripts.
In PigOutputFormat.checkOutputSpecs(), the implementation iterates over !POStore(s) in the map and reduce phases and for each such store does the following:
Instantiate the StoreFunc associated with the !POStore
Make a clone of the JobContext passed in PigOutputFormat.checkOutputSpecs() call and then invoke StoreFunc.setStoreLocation() using the clone. The reason a clone is necessary here is because generally in the setStorelocation() method, the StoreFunc would communicate the location to its underlying OutputFormat. Typically OutputFormats store the location into the Configuration for use in the checkOutputSpecs() call. For example, FileOutputFormat does this through FileOutputFormat.setOutputPath(Job job, Path location). We don't want updates to the Configuration for different outputs to over-write each other - hence the clone.
Call getOutputFormat() on the StoreFunc and then checkOutputSpecs() on the OutputFormat returned. Note that the above setStoreLocation call needs to happen *before* the checkOutputSpecs() call and the checkOutputSpecs() call needs to be given the "updated (with location)" cloned JobContext.
PigOutputFormat.getOutputCommitter() returns a PigOutputCommitter object. The PigOutputCommitter internally keeps a list of OutputCommitters corresponding to OutputFormat of StoreFunc(s) in the POStore(s) in the map and reduce phases. It delegates all calls in the OutputCommitter class invoked by Hadoop to calls on the appropriate underlying committers.
The other method in OutputFormat is the getRecordWriter() method. In the single store case PigOutputFormat.getRecordWriter() does the following:
Instantiate the StoreFunc associated with single !POStore.
invoke StoreFunc.setStoreLocation()
Call getOutputFormat() on the StoreFunc and then getRecordWriter() on the OutputFormat returned. Note that the above setStoreLocation call needs to happen *before* the getRecordWriter() call and the getRecordWriter() call needs to be given a TaskAttemptContext which has the "updated (with location)" Configuration.
Wrap the RecordWriter returned above in PigRecordWriter class which is returned to Hadoop as the RecordWriter. PigRecordReader has WritableComparable as key type (which is always sent with a null value when we write, since in pig, we really do not have a key to store in the output( and a Tuple as a the value type (which is the output tuple).
For the multi query optimized multi store case, there are multiple !POStores in the same map reduce job. In this case, the data is written out in the Pig map or reduce pipeline itself through the POStore operator. Details of this can be found in http://wiki.apache.org/pig/PigMultiQueryPerformanceSpecification - "Internal Changes" section - "Store Operator" subsection. So from the pig runtime code, we never call Context.write() (which would have internally called PigRecordWriter.write()). So the handling of multi stores has not changed for writing data out for this redesign.
Mechanism to read side files
Pig needs to read side files in many places like in Merge Join, Order by, Skew join, dump etc. To facilitate doing this in an easy manner, a utility LoadFunc called ReadToEndLoader has been introduced. Though this has been implemented as a LoadFunc, the only LoadFunc method which is truly implemented is getNext(). The usage pattern is to construct an instance using the constructor which would take a reference to the true LoadFunc (which can read the side file data) and then repeatedly call getNext() till null is encountered in the return value. The implementation of ReadToEndLoader hides the actions of getting InputSplits from the underlying InputFormat and then processing each split by getting the RecordReader and processing data in the split before moving to the next.
Changes to skew join sampling (PoissonSampleLoader)
See discussion in PIG-1062 .
Problem 1: Earlier version of PoissonSampleLoader stored the size on disk as an extra last column in the sampled tuples it returned in map phase of sampling MR job. This was used in PartitionSkwewedKeys udf in the reduce stage of sampling job to compute total number of tuples using input-file-size/avg-disk-sz-from-samples . Avg-disk-sz-from-samples is not available with new loader design, because getPosition() is not there.
Solution : PoissonSampleLoader returns a special tuple with number of rows in that Map, in addition to the sampled tuples. To create this special tuple, the max row length in input sampled tuples is tracked, and a new tuple with size of max_row_length + 2 is created. And spl_tuple[max_row_length ] = "marker_string"
- spl_tuple[max_row_length + 1] = num_rows
The size of max_row_length+2 is used because the join key can be an expression, which is evaluated on the columns in tuples returned by the sampler, and the expression might expect specific data types to be present in certain (<= max_row_length) locations of the tuple. If number of tuples in sample is 0, the special tuple is not sent.
In PartitionSkwewedKeys udf in the reduce stage,the udf iterates over the tuples to find these special tuples and calculate the total number of rows.
Problem 2: PoissonSampleLoader samples 17 tuples from every set of tuples that will fit into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples that fit into reducer memory - X. Ie we need to sample one tuple every X/17 tuples. Earlier, the number of tuples to be sampled was calculated before the tuples were read, in PoissonSampleLoader.computeSamples(..) . To get the number of samples to be sampled in a map, the formula used was = number-of-reducer-memories-needed * 17 / number-of-splits
Where - number-of-reducer-memories-needed = (total_file_size * disk_to_mem_factor)/available_reducer_heap_size
disk_to_mem_factor has default of 2.
Then PoissonSampleLoader would return sampled tuples by skipping split-size/num_samples bytes at a time.
With new loader we have to skip some number of tuples instead of bytes. But we don't have an estimate of total number of tuples in the input.
One way to work around this would be to use size of tuple in memory to estimate size of tuple in disk using above disk_to_mem_factor, then number of tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples
But the use of disk_to_mem_factor is very dubious, the real disk_to_mem_factor will vary based on compression-algorithm, data characteristics (sorting etc), and encoding.
Solution: The goal is to sample one tuple every X/17 tuples. (X = number of tuples that fit in available reducer memory).
To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size.
Number of tuples skipped for every sampled tuple = 1/17 * ( available_reducer_heap_size/average-tuple-mem-size)
The average-tuple-mem-size and number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new tuple is sampled.
Changes to order-by sampling (RandomSampler)
Problem: With new interface, we cannot use the old approach : In each map job of the sampling MR job, divide the split-size by number of samples required to determine the number of bytes to skip after each sample. <<br>> With new interface, we can skip only by number of tuples not by bytes on disk.
Proposal: The goal is to sample tuples with equal probability for any tuple getting sampled (assuming number of tuples to be sampled is much smaller than total number of tuples).
If N is the number of samples required. In getNext(),allocate a buffer for N elements, populate it with the first N tuples, and continue scanning the partition. For every ith next() call, generate a random number r s.t. 0<=r<i, and if r<N insert the new tuple into our buffer at position r. This gives a nicely random sample of the tuples in the partition.
Changes to Streaming
Currently Pig streaming uses store and load functions to handle the serialization and deserialization of data between Pig and the user's executable. Based on the changes proposed in this redesign that will no longer be feasible. Hadoop's InputFormat and OutputFormat are not appropriate abstractions for communicating with an external executable.
Pig's current support for variable data formats in streaming is more extensive than Hadoop's. Conceptually users could create executables that read data from BinStorage or other non-text formats. The Pig development team are not aware of any users doing this, nor is it clear why they would choose to. Removing this functionality will make Pig's streaming implementation simpler. It also avoids shoe-horning streaming into the new load store design. (For complete details of Hadoop's streaming see http://hadoop.apache.org/common/docs/r0.20.1/streaming.html .)
I propose that Pig move to a model of using Hadoop's default streaming format, which is to expect new-line separated records, with tab being used as a field separator. Hadoop allows users to redefine the field separator, and so should Pig. This will also match the current default of using PigStorage as the (de)serializer for streaming. As before Pig should support communicating with the executable via either stdin and stdout or files. This will force a syntax change in Pig Latin. Currently, if a user wants to stream data to an executable with comma separated fields instead of tab separated fields, the syntax is:
define CMD `perl PigStreaming.pl - foo nameMap` input(stdin using PigStorage(',') output(stdout using PigStorage(',');
A = load 'file';
B = stream B through CMD;The syntax should change to remove the reference to a store and load functions, as they are no longer meaningful. Thus the above would become:
define CMD `perl PigStreaming.pl - foo nameMap` input(stdin using ',') output(stdout using ','); A = load 'file'; B = stream B through CMD;
From an implementation viewpoint, the functionality required to write to and read from the streaming binary will be equivalent to the tuple parsing and serialization of PigStorage.getNext() and PigStorage.putNext(). While it will not be possible to use PigStorage directly every effort should be made to share this code (most likely by putting the actual code in static utility methods that can be called by each class) to avoid double code maintenance costs.
Remaining Tasks
BinStorage needs to implement LoadMetadata's getSchema() to replace current determineSchema()
- piggybank loaders/storers need to be ported
fix lineage code to use LoadCaster instead of LoadFunc
- local mode needs to be ported
PigDump needs to be ported
- POLoad needs to be ported
- Need to handle passing loadfunc specific info between different instances of loadfunc (Different instances in front end and between front end and back end - we need what is required in PIG-602) (setPartitionFilter() and pushOperators()for example needs this - these methods are called in the front end but the information passed is needed in the backend)
For ResourceSchema to be effectively used for communicating schema, we must fix the two level access issues with schema of bags in current schema before we make these changes, otherwise that same contagion will afflict us here.
- Input/Output handler code in streaming needs to be ported
- split by file will have to removed from language
- fix code with FIXME in comment relating to load-store redesign
Decide on what we should do with ReversibleLoadFunc and multiquery optimization
- Need to fix issues with bytesToBag, bytesToTuple and bytesToMap creating complex types with right schema
Address any Open Questions in this document
Changes
Sept 23 2009, Gates
Changed setURI to setLocation in LoadFunc and StoreFunc. Also changed it to throw IOException in the cases where the passed in location is not valid for this load or store mechanism.
Changed LoadSchema to ResourceSchema and LoadStatistics to ResourceStatistics
Added getPartitionKeys and setPartitionFilter to LoadMetadata
Sept 25 2009, Gates
Added allFinished call to StoreFunc
Sept 29 2009, Gates
- Added answer for open question 1. Added and answered open questions 2 and 3.
Nov 2 2009, Pradeep Kamath
In LoadFunc:
Added relativeToAbsolutePath() method in LoadFunc per http://issues.apache.org/jira/browse/PIG-879?focusedCommentId=12768818&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12768818
- Changed comments in setLocation regarding the location passed - the location will now be the return value of relativeToAbsolutePath()
setLocation() now also takes a Job argument since the main purpose of this call is to an opportunity to the LoadFunc implementation to communicate the input location to underlying InputFormat. InputFormat implementations inturn seem to be storing this information inthe Job. For example, FileInputFormat has the following static method to set the input location: setInputPaths(JobConf conf, String commaSeparatedPaths) ;
Removed doneReading() method since there is already a RecordReader.close() method which will be called by Hadoop wherein all the functionality that needs to be done on completion of reading can be done.
- All methods now can throw IOException - this keeps the interface more flexible for exception cases
In LoadMetadata:
- getSchema(), getStatistics() and getPartitionKeys() methods now take a location and Configuration argument so that the implementation can use that information in returning the information requested.
In StoreFunc:
Added relToAbsPathForStoreLocation() method per http://issues.apache.org/jira/browse/PIG-879?focusedCommentId=12768818&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12768818
- Methods which did not throw IOException now do so to enable exceptions in implementations
Removed doneWriting() - same functionality already present in RecordWriter.close() and OutputCommitter.commitTask()
Changed setSchema() to checkSchema since this method is called only to allow StoreFunc to check
Removed allFinished() - same functionality already present in OutputCommitter.cleanupJob()
Added a new section 'Implementation details and status'
Nov 11, Dmitriy Ryaboy
Minor clarification of meaning of mBytes in ResourceStatistics
Nov 12 2009, Thejas Nair Added sections -
Changes to order-by sampling (RandomSampler)
Changes to skew join sampling (PoissonSampleLoader)
Nov 23 2009, Gates
- Added section "Changes to Streaming"
Nov 23 2009, Dmitriy Ryaboy
updated StoreMetadata to match changes made to LoadMetadata