You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 53 Next »

Serialize Printing of "Hello BSP"

Each BSP task of the HAMA cluster, will print the string "Hello BSP" in serial order. This example will help you to understand the concepts of the BSP computing model.

  • Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peers.
  • Each task prints the string "Hello BSP" only when its turn comes at intervals of 5 seconds.

BSP implementation of Serialize Printing of "Hello BSP"

public class SerializePrinting {
  private static String TMP_OUTPUT = "/tmp/test-example/";

  public static class HelloBSP extends BSP {
    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
    private Configuration conf;
    private final static int PRINT_INTERVAL = 5000;

    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
        KeeperException, InterruptedException {
      int num = Integer.parseInt(conf.get("bsp.peers.num"));
      FileSystem fileSys = FileSystem.get(conf);

      int i = 0;
      for (String otherPeer : bspPeer.getAllPeerNames()) {
        if (bspPeer.getPeerName().equals(otherPeer)) {

          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
              CompressionType.NONE);
          writer.append(new LongWritable(System.currentTimeMillis()), new Text(
              "Hello BSP from " + (i + 1) + " of " + num + ": "
                  + bspPeer.getPeerName()));
          writer.close();

        }

        Thread.sleep(PRINT_INTERVAL);
        bspPeer.sync();
        i++;
      }
    }

    public Configuration getConf() {
      return conf;
    }

    public void setConf(Configuration conf) {
      this.conf = conf;
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    // Set the task size as a number of GroomServer
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(false);
    bsp.setNumBspTask(cluster.getGroomServers());

    FileSystem fileSys = FileSystem.get(conf);
    if (fileSys.exists(new Path(TMP_OUTPUT))) {
      fileSys.delete(new Path(TMP_OUTPUT), true);
    }
    BSPJobClient.runJob(bsp);

    System.out.println("Each task printed the \"Hello World\" as below:");
    for (int i = 0; i < cluster.getGroomServers(); i++) {
      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
          TMP_OUTPUT + i), conf);
      LongWritable timestamp = new LongWritable();
      Text message = new Text();
      reader.next(timestamp, message);
      System.out.println(new Date(timestamp.get()) + ": " + message);
      reader.close();
    }
  }
}
  • No labels