...
- Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all the other peers.
- Each task prints the LOG string "Hello BSP" only when its turn comes
Sequence Diagram
- at intervals of 5 seconds.
BSP implementation of Serialize Printing of "Hello BSP"
- See the BSP programming model of HAMA if you didn't read yet.
No Format |
---|
public class SerializePrintingClassSerializePrinting { extends public static classBSP<NullWritable, HelloBSPNullWritable, extendsIntWritable, BSPText> { public static final Logint LOGNUM_SUPERSTEPS = LogFactory.getLog(HelloBSP.class); private Configuration conf15; @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer) throws IOException, SyncException, InterruptedException { for (int numi = Integer.parseInt(conf.get("bsp.peers.num")); int i = 0;0; i < NUM_SUPERSTEPS; i++) { for (Map.Entry<String, String> eString otherPeer : bspPeer.getAllPeersgetAllPeerNames().entrySet()) { if(bspPeer.getHostName().equals(e.getValue())) { LOG.info("Hello BSP from " + i + " of " + num + ": " + bspPeer.getHostName()); send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i)); } Thread.sleep(200); bspPeer.sync(); IntegerMessage msg = i++null; } while } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); BSPJob bsp = new BSPJob(conf, SerializePrinting.class bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag())); // Set the job name} bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); BSPJobClient.runJob(bsp);} } } |