Differences between revisions 8 and 9
Revision 8 as of 2011-02-08 13:11:59
Size: 3854
Editor: edwardyoon
Comment:
Revision 9 as of 2011-02-15 05:56:05
Size: 2182
Editor: edwardyoon
Comment:
Deletions are marked like this. Additions are marked like this.
Line 36: Line 36:
public class SerializePrinting {
  private static String TMP_OUTPUT = "/tmp/test-example/";

  public static class HelloBSP extends BSP {
    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
    private Configuration conf;
    private final static int PRINT_INTERVAL = 5000;
Line 46: Line 38:
      int num = Integer.parseInt(conf.get("bsp.peers.num"));
      FileSystem fileSys = FileSystem.get(conf);
      int in = 0, out = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        } else {
          out++;
        }
      }
Line 49: Line 48:
      int i = 0;
      for (String otherPeer : bspPeer.getAllPeerNames()) {
        if (bspPeer.getPeerName().equals(otherPeer)) {
          LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": "
              + bspPeer.getPeerName());
      byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      BSPMessage estimate = new BSPMessage(tagName, myData);
Line 55: Line 52:
          SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
              new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
              CompressionType.NONE);
          writer.append(new LongWritable(System.currentTimeMillis()), new Text(
              "Hello BSP from " + (i + 1) + " of " + num + ": "
                  + bspPeer.getPeerName()));
          writer.close();
      bspPeer.send(masterTask, estimate);
      bspPeer.sync();
Line 63: Line 55:
        }       double pi = 0.0;
      int numPeers = bspPeer.getNumCurrentMessages();
      BSPMessage received;
      while ((received = bspPeer.getCurrentMessage()) != null) {
        pi += Bytes.toDouble(received.getData());
      }
Line 65: Line 62:
        Thread.sleep(PRINT_INTERVAL);
        bspPeer.sync();
        i++;
      if (bspPeer.getPeerName().equals(masterTask)) {
        pi = pi / numPeers;
        writeResult(pi);
Line 70: Line 67:

    public Configuration getConf() {
      return conf;
    }

    public void setConf(Configuration conf) {
      this.conf = conf;
    }

  }

  public static void main(String[] args) throws InterruptedException,
      IOException {
    // BSP job configuration
    HamaConfiguration conf = new HamaConfiguration();

    BSPJob bsp = new BSPJob(conf, SerializePrinting.class);
    // Set the job name
    bsp.setJobName("serialize printing");
    bsp.setBspClass(HelloBSP.class);

    // Set the task size as a number of GroomServer
    BSPJobClient jobClient = new BSPJobClient(conf);
    ClusterStatus cluster = jobClient.getClusterStatus(false);
    bsp.setNumBspTask(cluster.getGroomServers());

    FileSystem fileSys = FileSystem.get(conf);
    if (fileSys.exists(new Path(TMP_OUTPUT))) {
      fileSys.delete(new Path(TMP_OUTPUT), true);
    }
    BSPJobClient.runJob(bsp);

    System.out.println("Each task printed the \"Hello World\" as below:");
    for (int i = 0; i < cluster.getGroomServers(); i++) {
      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
          TMP_OUTPUT + i), conf);
      LongWritable timestamp = new LongWritable();
      Text message = new Text();
      reader.next(timestamp, message);
      System.out.println(new Date(timestamp.get()) + ": " + message);
      reader.close();
    }
  }
}

Pi Estimator

The value of PI can be calculated in a number of ways. Consider the following method of estimating PI

  • Inscribe a circle in a square
  • Randomly generate points in the square
  • Determine the number of points in the square that are also in the circle
  • Let r be the number of points in the circle divided by the number of points in the square
  • PI ~ 4 r

Serial pseudo code for this procedure as below:

iterations = 10000
circle_count = 0

do j = 1,iterations
  generate 2 random numbers between 0 and 1
  xcoordinate = random1
  ycoordinate = random2
  if (xcoordinate, ycoordinate) inside circle
  then circle_count = circle_count + 1
end do

PI = 4.0*circle_count/iterations

The BSP implementation for Pi

A distributed strategy in HAMA with BSP programming model, is break the loop into portions that can be executed by the tasks.

  • Each task executes locally its portion of the loop a number of times.
  • One task acts as master and collects the results through the BSP communication interface.

    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
        KeeperException, InterruptedException {
      int in = 0, out = 0;
      for (int i = 0; i < iterations; i++) {
        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
        if ((Math.sqrt(x * x + y * y) < 1.0)) {
          in++;
        } else {
          out++;
        }
      }

      byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
      BSPMessage estimate = new BSPMessage(tagName, myData);

      bspPeer.send(masterTask, estimate);
      bspPeer.sync();

      double pi = 0.0;
      int numPeers = bspPeer.getNumCurrentMessages();
      BSPMessage received;
      while ((received = bspPeer.getCurrentMessage()) != null) {
        pi += Bytes.toDouble(received.getData());
      }

      if (bspPeer.getPeerName().equals(masterTask)) {
        pi = pi / numPeers;
        writeResult(pi);
      }
    }

PiEstimator (last edited 2013-04-17 10:50:29 by edwardyoon)