...
No Format |
---|
@Override public final void bsp( BSPPeer<LongWritable, Text, KEYOUT, VALUEOUT>VALUEOUT, MESSAGE_TYPE> peer) throws IOException, InterruptedException, SyncException { // this method reads the next key value record from file KeyValuePair<LongWritable, Text> pair = peer.readNext(); // the following lines do the same: LongWritable key = new LongWritable(); Text value = new Text(); peer.readNext(key, value); } |
...
No Format |
---|
for(int i = 0; i < 5; i++){ LongWritable key = new LongWritable(); Text value = new Text(); while (peer.readNext(key, value)) { // read everything } // reopens the input peer.reopenInput() } |
You must not consume the whole input to reopen it.
Custom Inputformat
You can implement your own inputformat. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.
...
You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats, so you can use existing literature to get into it.
Implementation notes
Internal implementation details
BSPJobClient
- Create the splits for the job 2. writeNewSplits() 3. job.set("bsp.job.split.file", submitSplitFile.toString()); 4. Sets the number of peers to split.lenth
- Receives splitFile 2. Add split argument to TaskInProgress constructor
Task
- Gets his split from Groom 2. Initializes everything in BSPPeerImpl
Communication Model
Within the bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The following table describes all the functions you can use:
Communication Model
Within the bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The following table describes all the functions you can use:
Function | Description |
send(String peerName, BSPMessage msg) | Send a message to another peer. |
getCurrentMessage() | Get a received message from the queue. |
getNumCurrentMessages() | Get the number of messages currently in the queue. |
sync() | Starts the barrier synchronization. |
getPeerName() | Get the peer name of this task. |
getPeerName(int index) | Gets the n-th peer name. |
getNumPeers() | Get the number of peers. |
getAllPeerNames() | Get all peer names ( |
Function | Description |
send(String peerName, BSPMessage msg) | Send a message to another peer. |
getCurrentMessage() | Get a received message from the queue. |
getNumCurrentMessages() | Get the number of messages currently in the queue. |
sync() | Starts the barrier synchronization. |
getPeerName() | Get the peer name of this task. |
getPeerName(int index) | Gets the n-th peer name. |
getNumPeers() | Get the number of peers. |
getAllPeerNames() | Get all peer names (including "this" task). (Hint: These are always sorted in ascending order) |
...
No Format |
---|
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>LongMessage> peer) throws IOException, SyncException, InterruptedException { for (String peerName : peer.getAllPeerNames()) { peer.send(peerName, new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis())); } peer.sync(); } |
...
No Format |
---|
@Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, Writable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < 100; i++) { // send some messages peer.sync(); } } |
...
No Format |
---|
// enum definition enum LoopCounter{ LOOPS } @Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < iterations; i++) { // details ommitted peer.getCounter(LoopCounter.LOOPS).increment(1L); } // rest ommitted } |
Setup and Cleanup
Since Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.
Setup and Cleanup
Combiners
Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.
you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:
No Format |
---|
public class MyEstimator extends
BSP<NullWritable |
No Format |
public void bsp(BSPPeer<NullWritable, NullWritable, NullWritableText, NullWritable>DoubleWritable, peer) throws IOException,DoubleWritable> { @Override SyncException,public InterruptedExceptionvoid { setup( for (int iBSPPeer<NullWritable, = 0; i < 1000; i++) { NullWritable, Text, DoubleWritable, DoubleWritable> peer) peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i)); throws IOException { } peer.sync(); //Setup: Choose one as a master this.masterTask if= (peer.getPeerName()peer.equalsgetNumPeers(masterTask) / 2) {; } @Override IntegerMessage received; public void cleanup( BSPPeer<NullWritable, while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) { NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException { sum += received.getData(); // your cleanup here } }@Override public void }bsp( } |
If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.
No Format |
---|
public static class SumCombiner extends Combiner { @Override public BSPMessageBundle combine(Iterable<BSPMessage> messages) BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { BSPMessageBundle// bundleyour = new BSPMessageBundle();computation here } int sum = 0; Iterator<BSPMessage> it = messages.iterator(); while (it.hasNext()) { sum += ((IntegerMessage) it.next()).getData(); } bundle.addMessage(new IntegerMessage("Sum", sum)); return bundle; } } } |
Setup is called before bsp method, and cleanup is executed at the end after bsp. You can do everything in setup and cleanup: sync, send, increment counters, write output or even read from the input.
Combiners
Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.
No Format |
---|
public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, IntegerMessage> peer) throws IOException,
SyncException, InterruptedException {
for (int i = 0; i < 1000; i++) {
peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
}
peer.sync();
if (peer.getPeerName().equals(masterTask)) {
IntegerMessage received;
while ((received = peer.getCurrentMessage()) != null) {
sum += received.getData();
}
}
}
|
If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.
No Format |
---|
public static class SumCombiner extends Combiner {
@Override
public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
BSPMessageBundle bundle = new BSPMessageBundle();
int sum = 0;
Iterator<BSPMessage> it = messages.iterator();
while (it.hasNext()) {
sum += ((IntegerMessage) it.next()).getData();
}
bundle.addMessage(new IntegerMessage("Sum", sum));
return bundle;
}
}
|
Implementation notes
Internal implementation details
BSPJobClient
- Create the splits for the job 2. writeNewSplits() 3. job.set("bsp.job.split.file", submitSplitFile.toString()); 4. Sets the number of peers to split.lenth
- Receives splitFile 2. Add split argument to TaskInProgress constructor
Task
- Gets his split from Groom 2. Initializes everything in BSPPeerImpl