Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add explanation of Serialize Printing Of Hello BSP

...

Each BSP task of the HAMA cluster, will print the LOG string "Hello BSP" in serial order. This example will help you to understand the concepts of the BSP computing model.

  • Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peershostname in HAMA cluster.
  • Each task prints print the LOG string "Hello BSP" only when its turn comes at intervals of 5 seconds.

BSP implementation of Serialize Printing of "Hello BSP"

No Format
public class publicSerializePrinting class{
 ClassSerializePrinting extends
  public static BSP<NullWritable,class NullWritable,HelloBSP IntWritable,extends Text>BSP {

    public static final intLog NUM_SUPERSTEPSLOG = 15 LogFactory.getLog(HelloBSP.class);
    private Configuration conf;

    @Override
    public void bsp(BSPPeer<NullWritable, NullWritableBSPPeer bspPeer) throws IOException, IntWritableKeeperException, Text> bspPeer)
      throws IOException, SyncException, InterruptedException {

     for (int inum = 0; i < NUM_SUPERSTEPS; i++) {Integer.parseInt(conf.get("bsp.peers.num"));

      int i = 0;
      for (String otherPeer(Map.Entry<String, String> e : bspPeer.getAllPeers().getAllPeerNamesentrySet()) {
        if(bspPeer.send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i.getHostName().equals(e.getValue())) {
          LOG.info("Hello BSP from " + i + " of " + num + ": "
              + bspPeer.getHostName());
        }
        
        Thread.sleep(200);
        bspPeer.sync();
        i++;
      }
    }

    @Override
    public Configuration getConf() {
    IntegerMessage msg =return nullconf;
    }

    @Override
   while ((msgpublic =void setConf(IntegerMessage) bspPeer.getCurrentMessage()) != null) {
  Configuration conf) {
      this.conf = conf;
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration  bspPeer.write(conf = new IntWritable(msg.getData()), new Text(msg.getTag()));
      }
    }HamaConfiguration();
    // Execute locally
    // conf.set("bsp.master.address", "local");

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    BSPJobClient.runJob(bsp);
  }
}