Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Hama Pipes is equivalent to Hadoop Pipes and offers the possibility to use Hama with C/C++.

The current status of Hama Pipes is experimental and can be found here: HAMA-619

...

Installation

You can compile Hama Pipes by executing the following commands:

No Format

cd $HAMA_HOME/c++/utils
./configure
make install

cd $HAMA_HOME/c++/pipes
./configure
make install

Interface

Hama Pipes provides the following methods for C/C++ integration: (similar to the BSPModel)

...

Function

Description

sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType)

Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID.

bool sequenceFileReadNext(int fileID, string& key, string& value)

Reads the next key/value pair from the SequenceFile.

bool sequenceFileAppend(int fileID, const string& key, const string& value)

Appends the next key/value pair to the SequenceFile.

bool sequenceFileClose(int fileID)

Closes a SequenceFile.

C++ BSP example

Finally here is the Pi Estimator example implemented with Hama Pipes:

No Format
#include "hama/Pipes.hh"
#include "hama/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"

#include <time.h>
#include <math.h>
#include <string>
#include <iostream>
#include <cstdlib>

using std::string;
using std::cout;

using HamaPipes::BSP;
using HamaPipes::BSPContext;
using namespace HadoopUtils;

class PiCalculationBSP: public BSP {
private:
    string masterTask;
    int iterations;
public:
  PiCalculationBSP(BSPContext& context) {
      iterations = 10000;
  }
    
  inline double closed_interval_rand(double x0, double x1) {
    return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
  }

  void bsp(BSPContext& context) {
      
    // initialize random seed
    srand(time(NULL));
    
    int in = 0;
    for (int i = 0; i < iterations; i++) {
      //rand() -> greater than or equal to 0.0 and less than 1.0. 
      double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
      double y = 2.0 * closed_interval_rand(0, 1) - 1.0;    
      if (sqrt(x * x + y * y) < 1.0) {
        in++;
      }
    }      
      
    double data = 4.0 * in / iterations;
      
    context.sendMessage(masterTask, toString(data));
    context.sync();
  }
    
  void setup(BSPContext& context) {
    // Choose one as a master
    masterTask = context.getPeerName(context.getNumPeers() / 2);
  }
    
  void cleanup(BSPContext& context) {
    if (context.getPeerName().compare(masterTask)==0) {
      double pi = 0.0;
      int msgCount = context.getNumCurrentMessages();
      string received;
      for (int i=0; i<msgCount; i++) {
        string received = context.getCurrentMessage();
        pi += toDouble(received);
      }

      pi = pi / msgCount; //msgCount = numPeers
      context.write("Estimated value of PI is", toString(pi));
    }
  }
};

int main(int argc, char *argv[]) {
  return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
}

...