...
- Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peers.
- Each task prints the LOG string "Hello BSP" only when its turn comes at intervals of 5 seconds.
...
No Format |
---|
public class SerializePrinting { private static String TMP_OUTPUT = "/tmp/test-example/"; public static class HelloBSP extends BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); private Configuration conf; private final static int PRINT_INTERVAL = 5000; @Override public void bsp(BSPPeerBSPPeerProtocol bspPeer) throws IOException, KeeperException, KeeperException, InterruptedException { int num = Integer.parseInt(conf.get("bsp.peers.num")); FileSystem fileSys = FileSystem.get(conf); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { if (bspPeer.getPeerName().equals(otherPeer)) { SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf, new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, LOG.info( CompressionType.NONE); writer.append(new LongWritable(System.currentTimeMillis()), new Text( "Hello BSP from " + (i + 1) + " of " + num + ": " + bspPeer.getPeerName())); }writer.close(); } Thread.sleep(PRINT_INTERVAL); bspPeer.sync(); i++; } } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Execute locally Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(false); bsp.setNumBspTask(cluster.getGroomServers()); FileSystem fileSys = FileSystem.get(conf.set("bsp.master.address", "local"); ); if (fileSys.exists(new Path(TMP_OUTPUT))) { fileSys.delete(new Path(TMP_OUTPUT), true); } BSPJobClient.runJob(bsp); System.out.println("Each task printed the \"Hello World\" as below:"); for (int i = 0; i < cluster.getGroomServers(); i++) { BSPJob bsp = SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new BSPJob(conf, SerializePrinting.class); Path( TMP_OUTPUT + i), conf); LongWritable timestamp = new LongWritable(); // Text Setmessage the= job namenew Text(); bsp reader.setJobNamenext("serialize printing"timestamp, message); bsp.setBspClass(HelloBSP.class System.out.println(new Date(timestamp.get()) + ": " + message); BSPJobClientreader.runJobclose(bsp); } } } |