...
No Format |
---|
for(int i = 0; i < 5; i++){ LongWritable key = new LongWritable(); Text value = new Text(); while (peer.readNext(key, value)) { // read everything } // reopens the input peer.reopenInput() } |
You must not consume the whole input to reopen it.
Custom Inputformat
You can implement your own inputformat. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.
...
Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.
Setup and Cleanup
Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:
No Format |
---|
public class MyEstimator extends
BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
@Override
public void setup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException {
//Setup: Choose one as a master
this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
}
@Override
public void cleanup(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException {
// your cleanup here
}
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException {
// your computation here
}
}
|
Setup is called before bsp method, and cleanup is executed at the end after bsp. You can do everything in setup and cleanup: sync, send, increment counters, write output or even read from the input.
Combiners
Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.
...