Hama Pipes is equivalent to Hadoop Pipes and offers the possibility to use Hama with C/C++.
The current status of Hama Pipes is experimental and can be found here: HAMA-619
...
Installation
You can compile Hama Pipes by executing the following commands:
No Format |
---|
cd $HAMA_HOME/c++/utils
./configure
make install
cd $HAMA_HOME/c++/pipes
./configure
make install
|
Interface
Hama Pipes provides the following methods for C/C++ integration: (similar to the BSPModel)
...
Function | Description |
| Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID. |
| Reads the next key/value pair from the SequenceFile. |
| Appends the next key/value pair to the SequenceFile. |
| Closes a SequenceFile. |
C++ BSP example
Finally here is the Pi Estimator example implemented with Hama Pipes:
No Format |
---|
#include "hama/Pipes.hh"
#include "hama/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
#include <time.h>
#include <math.h>
#include <string>
#include <iostream>
#include <cstdlib>
using std::string;
using std::cout;
using HamaPipes::BSP;
using HamaPipes::BSPContext;
using namespace HadoopUtils;
class PiCalculationBSP: public BSP {
private:
string masterTask;
int iterations;
public:
PiCalculationBSP(BSPContext& context) {
iterations = 10000;
}
inline double closed_interval_rand(double x0, double x1) {
return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
}
void bsp(BSPContext& context) {
// initialize random seed
srand(time(NULL));
int in = 0;
for (int i = 0; i < iterations; i++) {
//rand() -> greater than or equal to 0.0 and less than 1.0.
double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
if (sqrt(x * x + y * y) < 1.0) {
in++;
}
}
double data = 4.0 * in / iterations;
context.sendMessage(masterTask, toString(data));
context.sync();
}
void setup(BSPContext& context) {
// Choose one as a master
masterTask = context.getPeerName(context.getNumPeers() / 2);
}
void cleanup(BSPContext& context) {
if (context.getPeerName().compare(masterTask)==0) {
double pi = 0.0;
int msgCount = context.getNumCurrentMessages();
string received;
for (int i=0; i<msgCount; i++) {
string received = context.getCurrentMessage();
pi += toDouble(received);
}
pi = pi / msgCount; //msgCount = numPeers
context.write("Estimated value of PI is", toString(pi));
}
}
};
int main(int argc, char *argv[]) {
return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
}
|
...