Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

No Format
 @Override
  public final void bsp(
      BSPPeer<LongWritable, Text, KEYOUT, VALUEOUT>VALUEOUT, MESSAGE_TYPE> peer)
      throws IOException, InterruptedException, SyncException {
      
      // this method reads the next key value record from file
      KeyValuePair<LongWritable, Text> pair = peer.readNext();

      // the following lines do the same:
      LongWritable key = new LongWritable();
      Text value = new Text();
      peer.readNext(key, value);
  }

...

No Format
  for(int i = 0; i < 5; i++){
    LongWritable key = new LongWritable();
    Text value = new Text();
    while (peer.readNext(key, value)) {
       // read everything
    }
    // reopens the input
    peer.reopenInput()
  }

You must not consume the whole input to reopen it.

Custom Inputformat

You can implement your own inputformat. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.

...

You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats, so you can use existing literature to get into it.

Implementation notes

Internal implementation details

BSPJobClient

  1. Create the splits for the job 2. writeNewSplits() 3. job.set("bsp.job.split.file", submitSplitFile.toString()); 4. Sets the number of peers to split.lenth

JobInProgress

  1. Receives splitFile 2. Add split argument to TaskInProgress constructor

Task

  1. Gets his split from Groom 2. Initializes everything in BSPPeerImpl

Communication Model

Within the bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The following table describes all the functions you can use:

Communication Model

Within the bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible. The following table describes all the functions you can use:

Function

Description

send(String peerName, BSPMessage msg)

Send a message to another peer.

getCurrentMessage()

Get a received message from the queue.

getNumCurrentMessages()

Get the number of messages currently in the queue.

sync()

Starts the barrier synchronization.

getPeerName()

Get the peer name of this task.

getPeerName(int index)

Gets the n-th peer name.

getNumPeers()

Get the number of peers.

getAllPeerNames()

Get all peer names (

Function

Description

send(String peerName, BSPMessage msg)

Send a message to another peer.

getCurrentMessage()

Get a received message from the queue.

getNumCurrentMessages()

Get the number of messages currently in the queue.

sync()

Starts the barrier synchronization.

getPeerName()

Get the peer name of this task.

getPeerName(int index)

Gets the n-th peer name.

getNumPeers()

Get the number of peers.

getAllPeerNames()

Get all peer names (including "this" task). (Hint: These are always sorted in ascending order)

...

No Format
    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>LongMessage> peer)
        throws IOException, SyncException, InterruptedException {

      for (String peerName : peer.getAllPeerNames()) {
        peer.send(peerName, 
          new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
      }

      peer.sync();
    }

...

No Format
    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, Writable> peer)
        throws IOException, SyncException, InterruptedException {

      for (int i = 0; i < 100; i++) {
        // send some messages
        peer.sync();
      }

    }

...

No Format
    // enum definition
    enum LoopCounter{
      LOOPS
    }

    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {
      for (int i = 0; i < iterations; i++) {
        // details ommitted
        peer.getCounter(LoopCounter.LOOPS).increment(1L);
      }
      // rest ommitted
    }

Setup and Cleanup

Since Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.

Setup and Cleanup

Combiners

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.

you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:

No Format

 public class MyEstimator extends
      BSP<NullWritable
No Format

    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritableText, NullWritable>DoubleWritable, peer) throws IOException,DoubleWritable> {

    @Override
    SyncException,public InterruptedExceptionvoid {
setup(
      for (int iBSPPeer<NullWritable, = 0; i < 1000; i++) {
NullWritable, Text, DoubleWritable, DoubleWritable> peer)
       	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i)); throws IOException {
      }
      peer.sync();

//Setup: Choose one as a master
      this.masterTask if= (peer.getPeerName()peer.equalsgetNumPeers(masterTask) / 2) {;
    }

    @Override
    IntegerMessage received;
public void cleanup(
        BSPPeer<NullWritable, while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException {
   sum += received.getData();   // your cleanup here
    }

    }@Override
    public void }bsp(
    }

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.

No Format

  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages)     BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {
      BSPMessageBundle// bundleyour = new BSPMessageBundle();computation here
    }
  int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

} 

Setup is called before bsp method, and cleanup is executed at the end after bsp. You can do everything in setup and cleanup: sync, send, increment counters, write output or even read from the input.

Combiners

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.

No Format

    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, IntegerMessage> peer) throws IOException,
        SyncException, InterruptedException {

      for (int i = 0; i < 1000; i++) {
    	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
      }
      peer.sync();

      if (peer.getPeerName().equals(masterTask)) {
        IntegerMessage received;
        while ((received = peer.getCurrentMessage()) != null) {
          sum += received.getData();
        }
      }
    }

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.

No Format

  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
      BSPMessageBundle bundle = new BSPMessageBundle();
      int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Implementation notes

Internal implementation details

BSPJobClient

  1. Create the splits for the job 2. writeNewSplits() 3. job.set("bsp.job.split.file", submitSplitFile.toString()); 4. Sets the number of peers to split.lenth

JobInProgress

  1. Receives splitFile 2. Add split argument to TaskInProgress constructor

Task

  1. Gets his split from Groom 2. Initializes everything in BSPPeerImpl