Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

No Format
 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
   HamaConfiguration conf = new HamaConfiguration();
   BSPJob job = new BSPJob(conf);
   // set the BSP class which shall be executed
   job.setBspClass(RealTime.class);
   // help Hama to locale the jar to be distributed
   job.setJarByClass(RealTime.class);
   // give it a name
   job.setJobName("Producer Consumer Test");
   // use 4 tasks
   job.setNumBspTask(4);
   // output path must be defined
   job.setOutputPath(new Path("/tmp/realtime-test"));
   job.setOutputFormat(NullOutputFormat.class);
   // submit the job to the localrunner and wait for its completion, while outputting logs
   job.waitForCompletion(true);
 }

...

No Format
    @Override
    public void bsp(
	    BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, IntWritable> peer)
	    throws IOException, SyncException, InterruptedException {

	if (isMaster) {
	    Random random = new Random();
	    // produces new random integers every second
	    while (true) {
		int newInt = random.nextInt();
                // just send to other not master peers
		peer.send(peer.getAllPeerNames()[Math.abs(newInt % (peer.getNumPeers()-1))+1],
			new IntWritable(newInt));
		System.out.println("Sending " + newInt);
		peer.sync();
		Thread.sleep(1000);
	    }
	} else {
	    // if I'm not the master, then I am one of the slaves!
	    while (true) {
		peer.sync();
		IntWritable msg = null;
		while ((msg = peer.getCurrentMessage()) != null) {
		    System.out.println(msg.get() + " received!");
		}
	    }
	}
    }

...