...
- Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peers.
- Each task prints the LOG string "Hello BSP" only when its turn comes
Sequence Diagram
...
- at intervals of 5 seconds.
BSP implementation of Serialize Printing of "Hello BSP"
- See the BSP programming model of HAMA if you didn't read yet.
No Format |
---|
public class SerializePrintingClassSerializePrinting {extends publicBSP<NullWritable, staticNullWritable, class HelloBSP extends BSPIntWritable, Text> { public static final Logint LOGNUM_SUPERSTEPS = LogFactory.getLog(HelloBSP.class); private Configuration conf15; @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer) throws IOException, SyncException, InterruptedException { for (int numi = Integer.parseInt(conf.get("bsp.peers.num")); int i = 0;0; i < NUM_SUPERSTEPS; i++) { for (String otherPeer : bspPeer.getAllPeerNames()) { if (bspPeer.getPeerName().equalssend(otherPeer)), { LOG.info("Hello BSP from " + i + " of " + num + ": " + new IntegerMessage(bspPeer.getPeerName(), i)); } Thread.sleep(200); bspPeer.sync(); IntegerMessage msg = i++null; } while } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); BSPJobClient.runJob(bsp); bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag())); } } } } |