Pig Map Side Cogroup Proposal

This document details the proposal to build a mapside cogroup operator in Pig.


The map side cogroup would initially work with the following restrictions:

  1. All loaded files must be of the same storage type (no cogrouping Zebra and PigStorage files).

  2. There must exist a loader for this storage type that implements Pig's IndexableLoadFunc interface (currently only Zebra's TableLoader meets this criteria.)

  3. The loader used for this must guarantee that it does not split a single value of a key across multiple splits.
  4. Inputs to cogroup must be aliases of load statements. No other operators are allowed.


The map side cogroup would work as follows:

  1. Add an interface KeyValuesNotSplitLoader. This interface will have only one method. When called this method will indicate to the loader that it needs to not split keys across splits for this particular load.

  2. Run an initial MR job to build an index on the left most (assumably the largest) input using Pig's MergeJoinIndexer

  3. Pig will need to construct the second MR job such that splits from the left most input are used as the splits for the job. Loads of the other inputs (assumably the smaller inputs) will be done via the IndexableLoadFunc.

  4. Currently PigSplit contains the split number. This information will need to be recorded in the UDFContext so that it can be retrieved by POMergeCogroup operator. This transfer should probably be done in PigInputFormat.createRecordReader.

  5. Inside a new POMergeCogroup operator:

    Determine which split we are in
    if (not last split) {
        Using the index generated by the MergeJoinIndexer,
              determine the first key of the next split
    if (in first split) {
        open each of the other inputs at the beginning
    } else {
        Using the index generated in 1, determine the first key of our split
        open each of the other inputs at this first key 

    construct a heap, using the cogrouping key as the key for the heap
    foreach (input) {
        pull first record
        annotate with input number it came from
        insert into heap
    while (there are records in heap) {
        pull top record from heap
        place in bag based on which input it came from
        if (key from large && EOF on split) continue
        pull record from input that last record from top of heap came from
        if (key pulled < first key in next split) insert into heap
    output final record


Assume a set of tables mytables with one large file and n smaller files. These files have schemas:

large: (k, x, y)
small1: (k, x, z)
smalln: (k, z, alpha)

Then the Pig Latin script would look like:

A = load 'large';
B1 = load 'small1';
Bn = load 'smalln';
C = cogroup A by k, B1 by k, ..., Bn by k using "merge";

And C would have a schema of:

k, bag: large{(k, x, y)}, bag: small1{(k, x, z)}, ..., bag: smalln{(k, z, alpha)}