Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents

Combiners

Combiner

Wiki Markup
Combiner in Hama BSP model is used to combine messages during [barrier sync|SyncService] stage. This can reduce messages sent to other peers because sending messages somehow incurs overhead\[1\]. 

An example

Combiner's usage can be seen in source code - CombinerExample.java, in which an user defined combiner - SumCombiner - sum up messages to be sent and add the result to BSPMessageBundle class. This decreases the total messages to just one IntegerMessage with a summary of all valuesCombiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.

No Format
  public static publicclass void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle   SyncException, InterruptedExceptioncombine(Iterable<BSPMessage> messages) {

      for (int iBSPMessageBundle bundle = 0; i < 1000; i++) {
new BSPMessageBundle();
      	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
int sum = 0;

      }
Iterator<BSPMessage> it =    peer.syncmessages.iterator();

      ifwhile (peer.getPeerName().equals(masterTaskit.hasNext()) {
        IntegerMessage received;
        while ((received = sum += ((IntegerMessage) peerit.getCurrentMessagenext()) != null) {.getData();
      }

    sum += received.getData(bundle.addMessage(new IntegerMessage("Sum", sum));
      return  }bundle;
      }

    }

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.


Combiner actually get called in sync function,

No Format

  
No Format
  public void staticsync() classthrows SumCombinerIOException, extendsSyncException, CombinerInterruptedException {
    ...
    final BSPMessageBundle bundle = combineMessages(messages);
    @Override...
  }

and in combineMessages function the system checks if Combiner needs to apply.

No Format

  publicprivate BSPMessageBundle combinecombineMessages(Iterable<BSPMessage> messages) {
    if  BSPMessageBundle bundle = new BSPMessageBundle();(!conf.getClass("bsp.combiner.class", Combiner.class).equals(
        Combiner.class)) {
      intCombiner sumcombiner = 0;

 (Combiner) ReflectionUtils.newInstance(conf.getClass(
        Iterator<BSPMessage> it = messages.iterator(); "bsp.combiner.class", Combiner.class), conf);

      whilereturn (itcombiner.hasNextcombine(messages));
 {
   } else {
   sum += ((IntegerMessage) it..next()).getData();
    }
  }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Wiki Markup
\[1\]. MALEWICZ, G., AUSTERN, M. H., BIK, A. J., DEHNERT, J. C., HORN, I., LEISER, N., AND CZAJKOWSKI, G. Pregel: A system for large-scale graph processing. In Proceedings of SIGMOD (2010)