Fragment Replicate Join

Fragment Replicate Join(FRJ) is useful when we want a join between a huge table and a very small table (fitting in memory small) and the join doesn't expand the data by much. The idea is to distribute the processing of the huge files by fragmenting it and replicating the small file to all machines receiving a fragment of the huge file. Because of the availability of the entire small file, the join becomes a trivial task without needing any break in the pipeline.

NOTE: In the initial version of the implementation, the first input in the Join statement will be considered to be the "fragment" input and the other inputs are considered to be the "replicate" inputs. In tests, it was found that around 80MB might be a maximum size for the "replicate" input. Above that, there is a good probability of getting memory errors.


The following is a set of parameters that we can alter to compare the performance of the different types of join algorithms:

  1. Query being compared
    1. A = load 'frag';
      B = load 'repl';
      C = join A by $0, B by $0 using replicated;
    2. A = load 'frag';
      B = load 'repl';
      C = join A by $0, B by $0 using replicated;
      D = group C by $1;
      E = foreach D generate group, COUNT(C);
  2. Frag & Repl Files

    1. Repl File has atmost one tuple per key (shrinking or const size join)

      2.1.*.1 Key Set of Frag = Key Set of Repl
      2.1.*.2 (Key Set of Frag - Key Set of Repl) != {}
      2.1.*.3 (Key Set of Repl - Key Set of Frag) != {}
      2.1.#.1 Range of frag files: 20M to 20G with the increment being 10 times the prev one
      2.1.$.1 Range of repl files: starts at 2K and ends where the size of repl file is equal to that of frag file with increment being 10 times the prev one
    2. Repl File has a lot of tuples per key (expanding join)

      2.2.*.1 Key Set of Frag = Key Set of Repl
      2.2.*.2 (Key Set of Frag - Key Set of Repl) != {}
      2.2.*.3 (Key Set of Repl - Key Set of Frag) != {}
      2.2.#.1 Range of frag files: 20M to 20G with the increment being 10 times the prev one
      2.2.$.1 Range of repl files: 2K to 2M with the increment being 10 times the prev one
  3. Number of map & reduce slots available

          3.1 One preallocated cluster with equal number of map & reduce slots and the same being equal to the maximum number of tasks that will be created by any job that will run on this cluster
          3.2 One preallocated cluster with 0 reduce slots and the number of map slots being equal to twice the maximum number of map tasks that will be created by any map-only job that will run 
                on this cluster (Can run only map-only jobs)
  4. Number of map tasks created

          4.1 InputSplit logic unchanged
          4.2 InputSplit logic changed to produce more map tasks than would have been actually created
  5. Number of reduce tasks created

          5.1 number of reduce tasks = number of maps used
          5.2 number of reduce tasks is left unassigned which will get set to 0.9 times the max reduce slots configured in the cluster


We compare the times of FRJ implemented as a new operator with Symmetric Hash Join (the normal map reduce join) and a UDF implementation of FRJ. The changes to the logical side are as per JoinFramework. We differentiate the joins as those where the join result is larger than its input(Expanding Join) & those where its lesser(Reducing Join). Each case is associated with a set of numbers (1.2, 2.1.*.2,etc) which tell us the above mentioned parameters representing the case. The following graphs show the performance of the various algorithms:

Experiment 1: Reducing Join (1.2, 2.1.*.2, 3.1, 4.1, 5.2)

Experiment 2: Expanding Join (1.2, 2.2.*.1, 3.1, 4.1, 5.2)



Experiment 3: Utilization: (1.2, 2.1.*.2, 3.1, 4.1, 5.2)

Experiment 4: Sorted Bag (1.2, 2.1.*.2, 3.1, 4.1, 5.2)

We measure the utilization of the cluster by the various algorithms by running 10 homogenous jobs simultaneously and calculating the number of jobs same sized clusters can run in a minute for the different algorithms. The following graphs give the results of the experiments ran.

One serious limitation of FRJ is that it tries to read the replicated table into memory. If the file is even slightly bigger, it dies with out of memory exception. In order to work around this problem, we can read the replicated tables and also the fragment of the fragmented table into Sorted Bags, which are disk-backed structures, and perform a merge join. However, from the graphs below it doesn't seem like a viable alternative



UDF used

public static class FRJoin extends EvalFunc<DataBag>{
        String repl;
        int keyField;
        boolean isTblSetUp = false;
        Hashtable<DataByteArray, DataBag> replTbl = new Hashtable<DataByteArray, DataBag>();
        public FRJoin(){
        public FRJoin(String repl){
            this.repl = repl;

        public DataBag exec(Tuple input) throws IOException {
                isTblSetUp = true;
            try {
                DataByteArray key = (DataByteArray) input.get(keyField);
                if(!replTbl.containsKey(key)) return BagFactory.getInstance().newDefaultBag();
                return replTbl.get(key);
            } catch (ExecException e) {
                throw new IOException(e.getMessage());
        private void setUpHashTable() throws IOException {
            FileSpec replFile = new FileSpec(repl,new FuncSpec(PigStorage.class.getName()+"()"));
            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
            PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
            try {
                Tuple dummyTuple = null;
                for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)){
                    Tuple tup = (Tuple) res.result;
                    DataByteArray key = (DataByteArray)tup.get(keyField);
                    DataBag vals = null;
                        vals = replTbl.get(key);
                        vals = BagFactory.getInstance().newDefaultBag();
                        replTbl.put(key, vals);
            } catch (ExecException e) {
                throw new IOException(e.getMessage());


FRJ has been submitted as a patch. The jira following this issue is