Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

No Format
 @Override
  public final void bsp(
      BSPPeer<LongWritable, Text, KEYOUT, VALUEOUT>VALUEOUT, MESSAGE_TYPE> peer)
      throws IOException, InterruptedException, SyncException {
      
      // this method reads the next key value record from file
      KeyValuePair<LongWritable, Text> pair = peer.readNext();

      // the following lines do the same:
      LongWritable key = new LongWritable();
      Text value = new Text();
      peer.readNext(key, value);
  }

...

No Format
    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, LongMessage> peer)
        throws IOException, SyncException, InterruptedException {

      for (String peerName : peer.getAllPeerNames()) {
        peer.send(peerName, 
          new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis()));
      }

      peer.sync();
    }

...

No Format
    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable>DoubleWritable, Writable> peer)
        throws IOException, SyncException, InterruptedException {

      for (int i = 0; i < 100; i++) {
        // send some messages
        peer.sync();
      }

    }

...

No Format
    // enum definition
    enum LoopCounter{
      LOOPS
    }

    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {
      for (int i = 0; i < iterations; i++) {
        // details ommitted
        peer.getCounter(LoopCounter.LOOPS).increment(1L);
      }
      // rest ommitted
    }

Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.

Setup and Cleanup

Since 0.4.0 you can use Setup and Cleanup methods in your BSP code. They can be inherited from BSP class like this:

...

No Format
    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, NullWritable>IntegerMessage> peer) throws IOException,
        SyncException, InterruptedException {

      for (int i = 0; i < 1000; i++) {
    	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
      }
      peer.sync();

      if (peer.getPeerName().equals(masterTask)) {
        IntegerMessage received;
        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
          sum += received.getData();
        }
      }
    }

...