Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peers.
  • Each task prints the LOG string "Hello BSP" only when its turn comes at intervals of 5 seconds.

...

No Format
public class SerializePrinting {
  private static String TMP_OUTPUT = "/tmp/test-example/";

  public static class HelloBSP extends BSP {
    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
    private Configuration conf;
    private final static int PRINT_INTERVAL = 5000;

    @Override
    public void bsp(BSPPeerBSPPeerProtocol bspPeer) throws IOException,
 KeeperException,
       KeeperException, InterruptedException {
      int num = Integer.parseInt(conf.get("bsp.peers.num"));
      FileSystem fileSys = FileSystem.get(conf);

      int i = 0;
      for (String otherPeer : bspPeer.getAllPeerNames()) {
        if (bspPeer.getPeerName().equals(otherPeer)) {

          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
    LOG.info(          CompressionType.NONE);
          writer.append(new LongWritable(System.currentTimeMillis()), new Text(
              "Hello BSP from " + (i + 1) + " of " + num + ": "
                  + bspPeer.getPeerName()));
          }writer.close();

        }

        Thread.sleep(PRINT_INTERVAL);
        bspPeer.sync();
        i++;
      }
    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Execute locally Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    // Set the task size as a number of GroomServer
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(false);
    bsp.setNumBspTask(cluster.getGroomServers());

    FileSystem fileSys = FileSystem.get(conf.set("bsp.master.address", "local");
);
    if (fileSys.exists(new Path(TMP_OUTPUT))) {
      fileSys.delete(new Path(TMP_OUTPUT), true);
    }
    BSPJobClient.runJob(bsp);

    System.out.println("Each task printed the \"Hello World\" as below:");
    for (int i = 0; i < cluster.getGroomServers(); i++) {
    BSPJob bsp =  SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new BSPJob(conf, SerializePrinting.class);
Path(
          TMP_OUTPUT + i), conf);
      LongWritable timestamp = new LongWritable();
     // Text Setmessage the= job namenew Text();
    bsp  reader.setJobNamenext("serialize printing"timestamp, message);
    bsp.setBspClass(HelloBSP.class  System.out.println(new Date(timestamp.get()) + ": " + message);

      BSPJobClientreader.runJobclose(bsp);
    }
  }
}