You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 31 Next »

Combiner

Combiner in Hama BSP model is used to combine messages during barrier sync stage. This can reduce messages sent to other peers because sending messages somehow incurs overhead[1].

An example

Combiner's usage can be seen in source code - CombinerExample.java, in which an user defined combiner - SumCombiner - sum up messages to be sent and add the result to BSPMessageBundle class. This decreases the total messages to just one IntegerMessage with a summary of all values.

  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
      BSPMessageBundle bundle = new BSPMessageBundle();
      int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Combiner actually get called in sync function,

  
  public void sync() throws IOException, SyncException, InterruptedException {
    ...
    final BSPMessageBundle bundle = combineMessages(messages);
    ...
  }

and in combineMessages function the system checks if Combiner needs to apply.

  private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
    if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
        Combiner.class)) {
      Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass(
          "bsp.combiner.class", Combiner.class), conf);

      return combiner.combine(messages);
    } else {
      ...
    }
  }

[1]. MALEWICZ, G., AUSTERN, M. H., BIK, A. J., DEHNERT, J. C., HORN, I., LEISER, N., AND CZAJKOWSKI, G. Pregel: A system for large-scale graph processing. In Proceedings of SIGMOD (2010)

  • No labels