...
No Format |
---|
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
BSPJob job = new BSPJob(conf);
// set the BSP class which shall be executed
job.setBspClass(RealTime.class);
// help Hama to locale the jar to be distributed
job.setJarByClass(RealTime.class);
// give it a name
job.setJobName("Producer Consumer Test");
// use 4 tasks
job.setNumBspTask(4);
// output path must be defined
job.setOutputPath(new Path("/tmp/realtime-test"));
job.setOutputFormat(NullOutputFormat.class);
// submit the job to the localrunner and wait for its completion, while outputting logs
job.waitForCompletion(true);
}
|
...
No Format |
---|
@Override public void bsp( BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable> peer) throws IOException, SyncException, InterruptedException { if (isMaster) { Random random = new Random(); // produces new random integers every second while (true) { int newInt = random.nextInt(); // just send to other not master peers peer.send(peer.getAllPeerNames()[Math.abs(newInt % (peer.getNumPeers()-1))+1], new IntWritable(newInt)); System.out.println("Sending " + newInt); peer.sync(); Thread.sleep(1000); } } else { // if I'm not the master, then I am one of the slaves! while (true) { peer.sync(); IntWritable msg = null; while ((msg = peer.getCurrentMessage()) != null) { System.out.println(msg.get() + " received!"); } } } } |
...