Combiner
Wiki Markup |
---|
Combiner in Hama BSP model is used to combine messages during [barrier sync|SyncService] stage. This can reduce messages sent to other peers because sending messages somehow incurs overhead\[1\]. |
An example
Combiners
Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processorsCombiner's usage can be seen in source code - CombinerExample.java, in which an user defined combiner, SumCombiner, sums up messages to be sent and adds the result to BSPMessageBundle class. This decreases the total messages to just one IntegerMessage with a summary of all values.
No Format |
---|
public static class SumCombiner extends Combiner {
@Override
void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,
public BSPMessageBundle combine(Iterable<BSPMessage> messages)SyncException, InterruptedException {
for BSPMessageBundle(int bundlei = 0; i new< BSPMessageBundle()1000;
i++) {
int sum = 0;
peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
Iterator<BSPMessage>}
it = messagespeer.iteratorsync();
whileif (peer.getPeerName(it).hasNextequals(masterTask)) {
IntegerMessage sum +received;
while ((received = ((IntegerMessage) itpeer.nextgetCurrentMessage()).getData(); != null) {
}
sum += bundlereceived.addMessage(new IntegerMessage("Sum", sum)getData();
return bundle;}
}
}
|
If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.Combiner actually gets called in sync function,
No Format |
---|
public
static class publicSumCombiner voidextends sync() throws IOException, SyncException, InterruptedException {Combiner {
@Override
... public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
final BSPMessageBundle bundle = combineMessagesnew BSPMessageBundle(messages);
int ...
sum = }
|
and in combineMessages function the system checks if Combiner needs to apply.
No Format |
---|
0;
private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
Iterator<BSPMessage> it if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(= messages.iterator();
while Combiner.class(it.hasNext()) {
Combiner combinersum += (Combiner(IntegerMessage) ReflectionUtilsit.newInstancenext(conf)).getClassgetData();
}
bundle.addMessage(new "bsp.combiner.classIntegerMessage("Sum", Combiner.class), confsum));
return combiner.combine(messages)bundle;
} else {
...
}
}
|
...