You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »

General Information

In Apache Hama, you can implement your own BSP method by extending from org.apache.hama.bsp.BSP class. Apache Hama provides in this class a user-defined function bsp() that can be used to write your own BSP program.

The bsp() function handles whole parallel part of the program. (So it just gets called once, not all over again)

There are also setup() and cleanup() which will be called at the beginning of your computation, respectively at the end of the computation.

cleanup() is guranteed to run after the computation or in case of failure.

You can simply override the functions you need from BSP class.

Basically, a BSP program consists of a sequence of supersteps. Each superstep consists of the three phases:

  • Local computation
  • Process communication
  • Barrier synchronization

NOTE that these phases should be always sequential order.

In Apache Hama, the communication between tasks (or peers) is done within the barrier synchronization.

Communication

Within bsp() function, you can use the powerful communication functions for many purposes using BSPPeer. We tried to follow the standard library of BSP world as much as possible.

Incoming messages are stored in a queue, thus the messages are not ordered.

The following table describes all the functions you can use:

Function

Description

send(String peerName, BSPMessage msg)

Send a message to another peer.

getCurrentMessage()

Get a received message from the queue.

getNumCurrentMessages()

Get the number of messages currently in the queue.

sync()

Starts the barrier synchronization.

getPeerName()

Get the peer name of this task.

getPeerName(int index)

Gets the n-th peer name.

getNumPeers()

Get the number of peers.

getAllPeerNames()

Get all peer names (including "this" task). (Hint: These are always sorted in ascending order)

Here is an example that sends a message to all peers:

  public final void bsp(
      BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer)
      throws IOException, InterruptedException, SyncException {

    for (String otherPeer : peer.getAllPeerNames()) {
      String peerName = peer.getPeerName();
      LongMessage msg = 
        new LongMessage("Hello from " + peer.getPeerName());
      bspPeer.send(peerName, mgs);
    }

    bspPeer.sync();
  
}

The generics in the BSPPeer are related to the Input and Output System.

Synchronization

When all processes have entered the barrier by sync() function, the Hama proceeds to the next superstep. In previous example case, the BSP job will be finished by one synchronization after sending a message “Hello from ...” to all peers. The sync() function is very flexible. For example, the sync() function can be also located in a for loop:

   public final void bsp(
      BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer)
      throws IOException, InterruptedException, SyncException {

    for (int i = 0; i < 100; i++) {
      // send some messages
      bspPeer.sync();
    }

  }

The BSP job will be finished only when all processes have no more local and outgoing queues entries and all processes done or is killed by the user.

Note that the barrier synchronization is very costly because it is a global synchronization. So you should synchronize as few as possible.

  • No labels