Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents

Overview

BSP Function

Communication Model

Synchronization

Inputs and Outputs

Setup and Cleanup

Combiners

Counters

— General Information —

In Apache Hama, you can implement your own BSP method by extending from org.apache.hama.bsp.BSP class. Apache Hama provides in this class a user-defined function bsp() that can be used to write your own BSP program.

The bsp() function handles whole parallel part of the program. (So it just gets called once, not all over again)

There are also setup() and cleanup() which will be called at the beginning of your computation, respectively at the end of the computation.

cleanup() is guranteed to run after the computation or in case of failure.

You can simply override the functions you need from BSP class.

The Hama is based on the Bulk Synchronous Parallel Model (BSP), in which a computation involves a number of supersteps, each having many distributed computational peers that synchronize at the end of the superstep. Basically, a BSP program consists of a sequence of supersteps. Each superstep consists of consistsof the following three phases:

  • Local computation
  • Process communication
  • Barrier synchronization

NOTE that these phases should be always sequential order.

In Apache Hama, the communication between tasks (or peers) is done within the barrier synchronization.

— Communication —


BSP Function

Hama provides a user-defined function “bsp()” that can be used to write your own BSP program. The bsp() function handles the whole parallel part of the program. In the 0.2 version, it only takes one argument, which is a communication protocol interface. Later, more arguments such as input or reporter could also be included.

Communication Model

Within the Within bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible.

Incoming messages are stored in a queue, thus the messages are not ordered.

The following table describes all the functions you can use:

Function

Description

send(String peerName, BSPMessage msg)

Send a message to another peer.

getCurrentMessage()

Get a received message from the queue.

getNumCurrentMessages()

Get the number of messages currently in the queue.

sync()

Starts the barrier synchronization.

getPeerName()

Get the peer name of this task.

getPeerName(int index)

Gets the n-th peer name.

getNumPeers()

Get the number of peers.

getAllPeerNames()

Get all peer names (including "this" task). (Hint: These are always sorted in ascending order)

The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:

No Format
    @Override
    public final void bsp(
      BSPPeer<KEYIN  BSPPeer<NullWritable, VALUEINNullWritable, KEYOUTText, VALUEOUT>DoubleWritable> peer)
        throws IOException, InterruptedExceptionSyncException, SyncExceptionInterruptedException {

      for (String otherPeerpeerName : peer.getAllPeerNames()) {
      String peerName = peer.getPeerName();
      LongMessage msg = 
send(peerName, 
          new LongMessage("Hello from " + peer.getPeerName());
      bspPeer.send(peerName, mgs, System.currentTimeMillis()));
      }

      bspPeerpeer.sync();
  
  }

...

...

Synchronization

...

When all the processes have entered the barrier by via the sync() function, the Hama proceeds to the next superstep. In the previous example case, the BSP job will be finished by one synchronization after sending a message “Hello "Hello from ..." to all peers. The

But, keep in mind that the sync() function is not the end of the BSP job. As was previously mentioned, all the communication functions are very flexible. For example, the sync() function also can be also located called in a for loop so that you can use to program the iterative methods sequentially:

No Format

    @Override
    public final void bsp(
        BSPPeer<KEYINBSPPeer<NullWritable, VALUEINNullWritable, KEYOUTText, VALUEOUT>DoubleWritable> peer)
        throws IOException, InterruptedExceptionSyncException, SyncExceptionInterruptedException {

      for (int i = 0; i < 100; i++) {
        // send some messages
      bspPeer  peer.sync();
      }

    }

The BSP job will be finished only when all processes have no more local and outgoing queues entries and all processes done or is killed by the user.

Inputs and Outputs

Counters

Combiners are used for performing message aggregation to reduce communication overhead in cases when messages can be summarized arithmetically e.g., min, max, sum, and average at the sender side. Suppose that you want to send the integer messages to a specific processor from 0 to 1000 and sum all received the integer messages from all processors.

No Format

    public void bsp(BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer) throws IOException,
        SyncException, InterruptedException {

      for (int i = 0; i < 1000; i++) {
    	peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
      }
      peer.sync();

      if (peer.getPeerName().equals(masterTask)) {
        IntegerMessage received;
        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) {
          sum += received.getData();
        }
      }
    }

If you follow the previous example, Each bsp processor will send a bundle of thousand Integer messages to a masterTask. Instead, you could use a Combiners in your BSP program to perform a sum Integer messages and to write more concise and maintainable as below, that is why you use Combiners.

No Format

  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
      BSPMessageBundle bundle = new BSPMessageBundle();
      int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Setup and Cleanup

...

— General Information —

In Apache Hama, you can implement your own BSP method by extending from org.apache.hama.bsp.BSP class. Apache Hama provides in this class a user-defined function bsp() that can be used to write your own BSP program.

The bsp() function handles whole parallel part of the program. (So it just gets called once, not all over again)

There are also setup() and cleanup() which will be called at the beginning of your computation, respectively at the end of the computation.

cleanup() is guranteed to run after the computation or in case of failure.

You can simply override the functions you need from BSP class.

Basically, a BSP program consists of a sequence of supersteps. Each superstep consists of the three phases:

  • Local computation
  • Process communication
  • Barrier synchronization

NOTE that these phases should be always sequential order.

In Apache Hama, the communication between tasks (or peers) is done within the barrier synchronizationNote that the barrier synchronization is very costly because it is a global synchronization. So you should synchronize as few as possible.