Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: updated to new api

...

Each BSP task of the HAMA cluster, will print the LOG string "Hello BSP" in serial order. This example will help you to understand the concepts of the BSP computing model.

  • Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all hostname in HAMA clusterthe other peers.
  • Each task print prints the LOG string "Hello BSP" only when their its turn comes at intervals of 5 seconds.

See below diagram if you don't understand:

Image Removed

BSP implementation of Serialize Printing of "Hello BSP"

No Format
  public class SerializePrintingClassSerializePrinting {extends
  
  publicBSP<NullWritable, staticNullWritable, classIntWritable, HelloBSPText> extends BSP {

    public static final Logint LOGNUM_SUPERSTEPS = LogFactory.getLog(HelloBSP.class);
    private Configuration conf15;

    @Override
    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer)
      throws IOException, SyncException, InterruptedException {

    for  (int numi = Integer.parseInt(conf.get("bsp.peers.num"));

      int i = 0;0; i < NUM_SUPERSTEPS; i++) {
      for (Map.Entry<String, String> eString otherPeer : bspPeer.getAllPeersgetAllPeerNames().entrySet()) {
        if(bspPeer.getHostName().equals(e.getValue())) {
          LOG.info("Hello BSP from " + i + " of " + num + ": "
              + bspPeer.getHostName(send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i));
        }
        
        Thread.sleep(200);
        bspPeer.sync();
      IntegerMessage  i++msg = null;
      }
while    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();
    // Execute locally
    // conf.set("bsp.master.address", "local");

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    BSPJobClient.runJob(bsp);((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) {
        bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag()));
      }
    }
  }
}