Hama Pipes is equivalent to Hadoop Pipes and offers the possibility to use Hama with C/C++.
The current status of Hama Pipes is experimental and can be found here: HAMA-619
Hama Pipes will be part of Hama 0.6.0.
Hama Pipes provides the following methods for C/C++ integration: (similar to the BSPModel)
Function | Description |
| Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order. |
| Returns a message from the peer's received messages queue (a FIFO). |
| Returns the number of messages in the peer's received messages queue. |
| Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers. |
| Returns the count of current super-step. |
| Returns the name of this peer in the format "hostname:port". |
| Returns the name of n-th peer from sorted array by name. |
| Returns the index of this peer from sorted array by name. |
| Returns the names of all the peers executing tasks from the same job (including this peer). |
| Returns the number of peers. |
| Clears all queues entries. |
| Writes a key/value pair to the output collector. |
| Deserializes the next input key value into the given objects. |
| Closes the input and opens it right away, so that the file pointer is at the beginning again. |
The following additional methods support access to SequenceFiles under C/C++:
Function | Description |
| Opens a SequenceFile with option "r" or "w" returns the corresponding fileID. |
| Reads the next key/value pair from the SequenceFile. |
| Appends the next key/value pair to the SequenceFile. |
| Closes a SequenceFile. |
Finally here is the Pi Estimator example implemented with Hama Pipes:
No Format |
---|
#include "hama/Pipes.hh"
#include "hama/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
#include <time.h>
#include <math.h>
#include <string>
#include <iostream>
using std::string;
using std::cout;
using HamaPipes::BSP;
using HamaPipes::BSPContext;
using namespace HadoopUtils;
class PiCalculationBSP: public BSP {
private:
string masterTask;
int iterations;
public:
PiCalculationBSP(BSPContext& context) {
iterations = 10000;
}
inline double closed_interval_rand(double x0, double x1) {
return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
}
void bsp(BSPContext& context) {
// initialize random seed
srand(time(NULL));
int in = 0;
for (int i = 0; i < iterations; i++) {
//rand() -> greater than or equal to 0.0 and less than 1.0.
double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
if (sqrt(x * x + y * y) < 1.0) {
in++;
}
}
double data = 4.0 * in / iterations;
context.sendMessage(masterTask, toString(data));
context.sync();
}
void setup(BSPContext& context) {
// Choose one as a master
masterTask = context.getPeerName(context.getNumPeers() / 2);
}
void cleanup(BSPContext& context) {
if (context.getPeerName().compare(masterTask)==0) {
double pi = 0.0;
int msgCount = context.getNumCurrentMessages();
string received;
for (int i=0; i<msgCount; i++) {
string received = context.getCurrentMessage();
pi += toDouble(received);
}
pi = pi / msgCount; //msgCount = numPeers
context.write("Estimated value of PI is", toString(pi));
}
}
};
int main(int argc, char *argv[]) {
return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>());
}
|
Makefile for this example:
No Format |
---|
CC = g++
CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include
PiCalculation: PiCalculation.cc
$(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@
clean:
rm -f PiCalculation
|
The corresponding job configuration PiCalculation_job.xml
looks like that:
No Format |
---|
<?xml version="1.0"?>
<configuration>
<property>
// Set the binary path on DFS
<name>hama.pipes.executable</name>
<value>bin/PiCalculation</value>
</property>
<property>
<name>hama.pipes.java.recordreader</name>
<value>true</value>
</property>
<property>
<name>hama.pipes.java.recordwriter</name>
<value>true</value>
</property>
<property>
<name>bsp.input.format.class</name>
<value>org.apache.hama.bsp.NullInputFormat</value>
</property>
<property>
<name>bsp.output.format.class</name>
<value>org.apache.hama.bsp.TextOutputFormat</value>
</property>
<property>
<name>hama.pipes.logging</name>
<value>false</value>
</property>
<property>
<name>bsp.peers.num</name>
<value>10</value>
</property>
</configuration>
|
Finally the PiCalculation example can be submitted with these commands:
No Format |
---|
# delete output dir
hadoop dfs -rmr output/PiCalculation
# copy piCalculation binary to HDFS
hadoop dfs -rmr bin/PiCalculation
hadoop dfs -put PiCalculation bin/PiCalculation
# submit job
hama pipes -conf PiCalculation_job.xml -output output/PiCalculation
|
...