Avoiding Serialization/De-serialization in pig

Serialization/De-serialization is expensive and avoiding it will improve performance. This wiki discusses ideas that can help with that.

Delaying/Avoiding deserialization at runtime

These approaches (except 5) does not involve major changes to core pig code. Load functions, or serialization between map and reduce can be separately changed to improve performance.

  1. LoadFunctions make use of public interface LoadPushDown.pushDownProjection. Don't deserialize columns not that are not in required . This should always improve performance. PigStorage indirectly works this way, if a column is not used, the optimizer removes the casting(ie deserialization) of the column from the type-casting foreach statement which comes after the load.

  2. LoadFunction returns a custom tuple, which deserializes fields only when tuple.get(i) is called. This can be useful if the first operator after load is a filter operator - the whole filter expression might not have to be evaluated and that deserialization of all columns might not have to be done. Assuming the first approach is already implemented, then this approach is likely to have some overhead with queries where all tuple.get(i) is called on all columns/rows.

  3. LoadFunction delays deserialization of map and bag types until a member function of java.util.Map or DataBag is called. The load function uses subclass of Map and DataBag which holds the serialized copy. This will help in delaying the deserialization further. This can't be done for scalar types because the classes pig uses for them are final; even if that were not the case we might not see much of performance gain because of the cost of creating an copy of the serialized data might be high compared to the cost of deserialization. This will only delay serialization up to the MR boundaries.

Example of query where this will help -
l = LOAD 'file1' AS (a : int, b : map [ ]);
f = FOREACH l GENERATE udf1(a), b;       -- Approach 2 will not help in delaying deserialization beyond this point.
fil = FILTER f BY $0 > 5;
dump fil; -- Serialization of column b can be delayed until here using this approach .
  1. Set the property "pig.data.tuple.factory.name" to use a tuple that understands serialization format used for bags and maps used in approach 3, so that serialized data can be passed from loader across MR boundaries in the serialization format of load function. The write() and readFields() functions of tuple returned by TupleFactory is used to serialize data between Map and Reduce. To use a new custom tuple, you need to use a custom TupleFactory that returns tuples of this type. But this approach will work only for a set of load functions in the query that share same serialization format for map and bags.

  2. Expose load function's sedes functionality in new interface and track lineage of columns This will the elegant and extensible way of doing what is proposed in approach 4. For each serialized column, if we know the deserialization function, we can delay deserialization across MR boundaries.