...
No Format |
---|
public class SerializePrinting {
public static class HelloBSP extends BSP {
public static final Log LOG = LogFactory.getLog(HelloBSP.class);
private Configuration conf;
@Override
public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
InterruptedException {
int num = Integer.parseInt(conf.get("bsp.peers.num"));
int i = 0;
for(Map.Entry<String, String> e : bspPeer.getAllPeers().entrySet()) {
if(bspPeer.getHostName().equals(e.getValue())) {
LOG.info("Hello BSP from " + i + " of " + num + ": "
+ bspPeer.getHostName());
}
Thread.sleep(200);
bspPeer.sync();
i++;
}
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
}
public static void main(String[] args) throws InterruptedException,
IOException {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
// conf.set("bsp.master.address", "local");
BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
// Set the job name
bsp.setJobName("serialize printing");
bsp.setBspClass(HelloBSP.class);
BSPJobClient.runJob(bsp);
}
}
|