...
Each BSP task of the HAMA cluster, will print the LOG string "Hello BSP" in serial order. This example will help you to understand the concepts of the BSP computing model.
- Each task gets its own hostname (hostname:port pair) and a sorted list containing the hostnames of all hostname in HAMA clusterthe other peers.
- Each task print prints the LOG string "Hello BSP" only when their its turn comes at intervals of 5 seconds.
See below diagram if you don't understand:
BSP implementation of Serialize Printing of "Hello BSP"
- See the BSP programming model of HAMA if you didn't read yet.
No Format |
---|
public class SerializePrintingClassSerializePrinting {extends publicBSP<NullWritable, staticNullWritable, classIntWritable, HelloBSPText> extends BSP { public static final Logint LOGNUM_SUPERSTEPS = LogFactory.getLog(HelloBSP.class); private Configuration conf15; @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,BSPPeer<NullWritable, NullWritable, IntWritable, Text> bspPeer) throws IOException, SyncException, InterruptedException { for (int numi = Integer.parseInt(conf.get("bsp.peers.num")); int i = 0;0; i < NUM_SUPERSTEPS; i++) { for (Map.Entry<String, String> eString otherPeer : bspPeer.getAllPeersgetAllPeerNames().entrySet()) { if(bspPeer.getHostName().equals(e.getValue())) { LOG.info("Hello BSP from " + i + " of " + num + ": " + bspPeer.getHostName(send(otherPeer, new IntegerMessage(bspPeer.getPeerName(), i)); } Thread.sleep(200); bspPeer.sync(); IntegerMessage i++msg = null; } while } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); BSPJobClient.runJob(bsp);((msg = (IntegerMessage) bspPeer.getCurrentMessage()) != null) { bspPeer.write(new IntWritable(msg.getData()), new Text(msg.getTag())); } } } } |