Combiner
Combiner in Hama BSP model is used to combine messages during barrier sync stage. This can reduce messages sent to other peers because sending messages somehow incurs overhead[1].
An example
Combiner's usage can be seen in source code - CombinerExample.java, in which an user defined combiner - SumCombiner - sum up messages to be sent and add the result to BSPMessageBundle class. This decreases the total messages to just one IntegerMessage with a summary of all values.
public static class SumCombiner extends Combiner { @Override public BSPMessageBundle combine(Iterable<BSPMessage> messages) { BSPMessageBundle bundle = new BSPMessageBundle(); int sum = 0; Iterator<BSPMessage> it = messages.iterator(); while (it.hasNext()) { sum += ((IntegerMessage) it.next()).getData(); } bundle.addMessage(new IntegerMessage("Sum", sum)); return bundle; } }
Combiner actually get called in sync function,
public void sync() throws IOException, SyncException, InterruptedException { ... final BSPMessageBundle bundle = combineMessages(messages); ... }
and in combineMessages function the system checks if Combiner needs to apply.
private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) { if (!conf.getClass("bsp.combiner.class", Combiner.class).equals( Combiner.class)) { Combiner combiner = (Combiner) ReflectionUtils.newInstance(conf.getClass( "bsp.combiner.class", Combiner.class), conf); return combiner.combine(messages); } else { ... } }
[1]. MALEWICZ, G., AUSTERN, M. H., BIK, A. J., DEHNERT, J. C., HORN, I., LEISER, N., AND CZAJKOWSKI, G. Pregel: A system for large-scale graph processing. In Proceedings of SIGMOD (2010)