Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

No Format
  for(int i = 0; i < 5; i++){
    LongWritable key = new LongWritable();
    Text value = new Text();
    while (peer.readNext(key, value)) {
       // read everything
    }
    // reopens the input
    peer.reopenInput()
  }

You must not consume the whole input to reopen it.

Custom Inputformat

You can implement your own inputformat. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.

...

Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.

Setup and Cleanup

Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:

No Format

 public class MyEstimator extends
      BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {

    @Override
    public void setup(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException {
      //Setup: Choose one as a master
      this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
    }

    @Override
    public void cleanup(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException {
      // your cleanup here
    }

    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {
      // your computation here
    }
  } 

Setup is called before bsp method, and cleanup is executed at the end after bsp. You can do everything in setup and cleanup: sync, send, increment counters, write output or even read from the input.

Combiners

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.

...