This effort is still a "work in progress". Please feel free to add comments.
Introduction
HAMA is a distributed computing framework on Hadoop for massive scientific computations, currently being incubated as one of the incubator project by the Apache Software Foundation.
Project Goal
The Hama project goal is to provide scientific computation environment with its own computing engine, called BSP, on the Hadoop. We are focusing on are as follows:
- Compatibility
- Scalability
- Flexibility
- Usability and Applicability
The overall architecture of HAMA
Hama is consist of BSP, a network-based distributed computing engine, and a set of applications.
BSP framework
Hama BSP is a network-based distributed computing engine. The idea of network-based distributed computing using commodity PCs, and The BSP (Bulk Synchronous Parallel) model is not new. See http://en.wikipedia.org/wiki/Bulk_synchronous_parallel more detailed information of BSP. We are inspired from Google Pregel, we believe that the BSP programming paradigm is ideally suited for complex problems requiring communication of data, such as matrix, graph.
One BSP cluster consists of one BSPMaster, multiple GroomServer and one or more zookeeper servers in a network environment.
BSPMaster
BSPMaster is responsible to do the following:
- Maintaining groom server status. 2. Controlling super steps in a cluster. 3. Maintaining job progress information. 4. Scheduling Jobs and Assigning tasks to groom servers 5. Disseminating execution class across groom servers. 6. Controlling fault. 7. Providing users with the cluster control interface.
A BSP Master and multiple grooms are started by the script. Then, the bsp master starts up with a RPC server for groom servers. Groom servers starts up with a BSPPeer instance - later, BSPPeer needs to be integrated with GroomServer - and a RPC proxy to contact the bsp master. After started, each groom periodically sends a heartbeat message that encloses its groom server status, including maximum task capacity, unused memory, and so on.
Each time the bsp master receives a heartbeat message, it brings up-to-date groom server status - the bsp master makes use of groom servers' status in order to effectively assign tasks to idle groom servers - and returns a heartbeat response that contains assigned tasks and others actions that a groom server has to do. For now, we have a FIFO job scheduler and very simple task assignment algorithms.
GroomServer
A Groom Server (shortly referred to as groom) is a process that performs bsp tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes assigned tasks and reports its status by means of periodical piggybacks with BSPMaster. Each groom is designed to run with HDFS or other distributed storages. Basically, a groom server and a data node should be run on one physical node.
Fault Tolerance Mechanisms
BSP Programming Interface
Hama BSP programming interface is designed to be similar to the MapReduce interface. This allows the framework to be less complex than the traditional BSP libraries capable of executing sequential code, we to provide user-friendly and intuitive programming interface to existing users of MapReduce framework. See the below example code:
public class BSPEaxmple { public static class MyBSP extends BSP { @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { // A communication and synchronization phase of a BSP superstep // Send data to neighbor node bspPeer.send(hostname, msg); // Superstep synchronization bspPeer.sync(); // Receive current messages bspPeer.getCurrentMessage(); } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } } // BSP job configuration public void main(String[] args) throws Exception { BSPJob bsp = new BSPJob(new HamaConfiguration(), BSPEaxmple.class); // Set the job name bsp.setJobName("My BSP Job"); bsp.setBspClass(MyBSP.class); // Submit job BSPJobClient.runJob(bsp); } }
- In/Output System is planned in next version.
Examples
Pi Estimation
The value of PI can be calculated in a number of ways. Consider the following method of estimating PI
- Inscribe a circle in a square
- Randomly generate points in the square
- Determine the number of points in the square that are also in the circle
- Let r be the number of points in the circle divided by the number of points in the square
- PI ~ 4 r
Serial pseudo code for this procedure as below:
iterations = 10000 circle_count = 0 do j = 1,iterations generate 2 random numbers between 0 and 1 xcoordinate = random1 ycoordinate = random2 if (xcoordinate, ycoordinate) inside circle then circle_count = circle_count + 1 end do PI = 4.0*circle_count/iterations
The BSP Algorithm for Pi
Parallel strategy is break the loop into portions that can be executed by the tasks.
- For the task of estimating PI:
- Each task executes its portion of the loop a number of times.
- Each task can do its work without requiring any information from the other tasks (there are no data dependencies).
- One task acts as master and collects the results.
public class PiEstimator { private static String MASTER_TASK = "master.task."; public static class MyEstimator extends BSP { public static final Log LOG = LogFactory.getLog(MyEstimator.class); private Configuration conf; private String masterTask; private static final int iterations = 10000; public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { int in = 0, out = 0; for (int i = 0; i < iterations; i++) { double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; if ((Math.sqrt(x * x + y * y) < 1.0)) { in++; } else { out++; } } byte[] tagName = Bytes.toBytes(getName().toString()); byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations); BSPMessage estimate = new BSPMessage(tagName, myData); bspPeer.send(bspPeer.getAddress(masterTask), estimate); bspPeer.sync(); double pi = 0.0; BSPMessage received; while ((received = bspPeer.getCurrentMessage()) != null) { pi = (pi + Bytes.toDouble(received.getData())) / 2; } if (pi != 0.0) System.out.println("Estimated value of PI is " + pi); } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; this.masterTask = conf.get(MASTER_TASK); } } public static void main(String[] args) throws InterruptedException, IOException { // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); // Execute locally // conf.set("bsp.master.address", "local"); BSPJob bsp = new BSPJob(conf, PiEstimator.class); // Set the job name bsp.setJobName("pi estimation example"); bsp.setBspClass(MyEstimator.class); BSPJobClient jobClient = new BSPJobClient(conf); ClusterStatus cluster = jobClient.getClusterStatus(true); // Choose one as a master for (String name : cluster.getActiveGroomNames()) { conf.set(MASTER_TASK, name); break; } BSPJobClient.runJob(bsp); } }