...
- See the BSP programming model of HAMA if you didn't read yet.
No Format |
---|
public class SerializePrinting { private static String TMP_OUTPUT = "/tmp/test-example/"; public static class HelloBSP extends BSP { public static final Log LOG = LogFactory.getLog(HelloBSP.class); private Configuration conf; private final static int PRINT_INTERVAL = 5000; public void bsp(BSPPeerProtocol bspPeer) throws IOException, KeeperException, InterruptedException { int num = Integer.parseInt(conf.get("bsp.peers.num")); FileSystem fileSys = FileSystem.get(conf); int i = 0; for (String otherPeer : bspPeer.getAllPeerNames()) { if (bspPeer.getPeerName().equals(otherPeer)) { SequenceFile.Writer writerString peerName = SequenceFilebspPeer.createWriter(fileSys, conf, new Path(TMP_OUTPUT + i), LongWritable.class, Text.class, CompressionType.NONEgetPeerName(); if writer.append(new LongWritable(System.currentTimeMillis()), new Text((peerName.equals(otherPeer)) { "Hello BSP from " + (i + 1) + " of " + num + ": " + bspPeer.getPeerName())); writer.close(); writeLogToFile(peerName, i); } Thread.sleep(PRINT_INTERVAL); bspPeer.sync(); i++; } } public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); BSPJob bsp = new BSPJob(conf, SerializePrinting.class); // Set the job name bsp.setJobName("serialize printing"); bsp.setBspClass(HelloBSP.class); // Set the task size as a number of GroomServer BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(false); bsp.setNumBspTask(cluster.getGroomServers()); FileSystem fileSys = FileSystem.get(conf); if (fileSys.exists(new Path(TMP_OUTPUT))) { fileSys.delete(new Path(TMP_OUTPUT), true); } BSPJobClient.runJob(bsp); System.out.println("Each task printed the \"Hello World\" as below:"); for (int i = 0; i < cluster.getGroomServers(); i++) { SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path( TMP_OUTPUT + i), conf); LongWritable timestamp = new LongWritable(); Text message = new Text(); reader.next(timestamp, message); System.out.println(new Date(timestamp.get()) + ": " + message); reader.close(); } } } |