Differences between revisions 8 and 9
Revision 8 as of 2013-12-03 16:16:14
Size: 7392
Comment:
Revision 9 as of 2013-12-11 17:17:59
Size: 7389
Comment:
Deletions are marked like this. Additions are marked like this.
Line 197: Line 197:
hama seqdumper -seqFile /examples/output/piestimator/part-00001 hama seqdumper -file /examples/output/piestimator/part-00001

Hama Pipes is equivalent to Hadoop Pipes and offers the possibility to use Hama with C++.

Installation

The native compilation is integrated into the Maven build process.

mvn install

Interface

Hama Pipes provides the following methods for C++ integration: (similar to the functions of BSP Communication Model)

Function

Description

void sendMessage(const string& peerName, const M& msg)

Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.

M getCurrentMessage()

Returns a message from the peer's received messages queue (a FIFO).

int getNumCurrentMessages()

Returns the number of messages in the peer's received messages queue.

void sync()

Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers.

long getSuperstepCount()

Returns the count of current super-step.

string& getPeerName()

Returns the name of this peer in the format "hostname:port".

string& getPeerName(int index)

Returns the name of n-th peer from sorted array by name.

int getPeerIndex()

Returns the index of this peer from sorted array by name.

vector<string> getAllPeerNames()

Returns the names of all the peers executing tasks from the same job (including this peer).

int getNumPeers()

Returns the number of peers.

void clear()

Clears all queues entries.

void write(const K2& key, const V2& value)

Writes a key/value pair to the output collector.

bool readNext(K1& key, V1& value)

Deserializes the next input key value into the given objects.

void reopenInput()

Closes the input and opens it right away, so that the file pointer is at the beginning again.

The following additional methods support access to SequenceFiles in C++:

Function

Description

int sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType)

Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID.

bool sequenceFileReadNext(int fileID, K& key, V& value)

Reads the next key/value pair from the SequenceFile.

bool sequenceFileAppend(int fileID, const K& key, const V& value)

Appends the next key/value pair to the SequenceFile.

bool sequenceFileClose(int fileID)

Closes a SequenceFile.

Compilation

The following command can be used to compile a Hama Pipes application:

g++ -Ic++/src/main/native/utils/api \
    -Ic++/src/main/native/pipes/api \
    -Lc++/target/native \
    -lhadooputils -lpthread \
    PROGRAM.cc \
    -o PROGRAM \
    -g -Wall -O2

Please notice that paths have to be adjusted, if you are not operating in the Hama source folder.

Examples

Hama Pipes includes the following examples:

These examples are located in c++/src/main/native/examples and explained in c++/src/main/native/examples/README.txt.

PiEstimator Example

This is a Hama Pipes C++ implementation of PiEstimator.

#include "hama/Pipes.hh"
#include "hama/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"

#include <time.h>
#include <math.h>
#include <stdlib.h>
#include <string>
#include <iostream>

using std::string;
using std::cout;

using HamaPipes::BSP;
using HamaPipes::BSPContext;
using namespace HadoopUtils;

class PiEstimatorBSP: public BSP<string,string,string,double,int> {
private:
  string master_task_;
  long iterations_; // iterations_per_bsp_task
public:
  PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
    iterations_ = 1000000L;
  }
  
  inline double closed_interval_rand(double x0, double x1) {
    return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
  }
  
  void setup(BSPContext<string,string,string,double,int>& context) {
    // Choose one as a master
    master_task_ = context.getPeerName(context.getNumPeers() / 2);
  }
  
  void bsp(BSPContext<string,string,string,double,int>& context) {
    /* initialize random seed */
    srand(time(NULL));
    
    int in = 0;
    for (long i = 0; i < iterations_; i++) {
      double x = 2.0 * closed_interval_rand(0, 1) - 1.0;
      double y = 2.0 * closed_interval_rand(0, 1) - 1.0;
      if (sqrt(x * x + y * y) < 1.0) {
        in++;
      }
    }
    
    context.sendMessage(master_task_, in);
    context.sync();
  }
  
  void cleanup(BSPContext<string,string,string,double,int>& context) {
    if (context.getPeerName().compare(master_task_)==0) {
      
      long total_hits = 0;
      int msg_count = context.getNumCurrentMessages();
      for (int i=0; i < msg_count; i++) {
        total_hits += context.getCurrentMessage();
      }
      
      double pi = 4.0 * total_hits / (msg_count * iterations_);
      context.write("Estimated value of PI", pi);
    }
  }
};

int main(int argc, char *argv[]) {
  return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
}

The corresponding job configuration piestimator.xml looks like that:

<configuration>
  <property>
    <name>hama.pipes.executable</name>
    <value>hdfs:/examples/bin/piestimator</value>
  </property>
  <property>
    <name>hama.pipes.java.recordreader</name>
    <value>true</value>
  </property>
  <property>
    <name>hama.pipes.java.recordwriter</name>
    <value>true</value>
  </property>
  <property>
    <name>bsp.input.format.class</name>
    <value>org.apache.hama.bsp.NullInputFormat</value>
  </property>
  <property>
    <name>bsp.output.format.class</name>
    <value>org.apache.hama.bsp.SequenceFileOutputFormat</value>
  </property>
  <property>
    <name>bsp.output.key.class</name>
    <value>org.apache.hadoop.io.Text</value>
  </property>
  <property>
    <name>bsp.output.value.class</name>
    <value>org.apache.hadoop.io.DoubleWritable</value>
  </property>
  <property>
    <name>bsp.message.class</name>
    <value>org.apache.hadoop.io.IntWritable</value>
  </property>
  <property>
    <name>hama.pipes.logging</name>
    <value>false</value>
  </property>
  <property>
    <name>bsp.peers.num</name>
    <value>3</value>                                            
  </property>
</configuration>

Finally you can run the PiCalculation example by executing the following commands:

# First copy piestimator binary to dfs
hadoop fs -put c++/target/native/examples/piestimator \
 /examples/bin/piestimator

# Run piestimator example
hama pipes \
 -conf c++/src/main/native/examples/conf/piestimator.xml \
 -output /examples/output/piestimator

# View output data
hama seqdumper -file /examples/output/piestimator/part-00001

# You should see
# Input Path: /examples/output/piestimator/part-00001
# Key class: class org.apache.hadoop.io.Text 
# Value Class: class org.apache.hadoop.io.DoubleWritable
# Key: Estimated value of PI: Value: 3.139116
# Count: 1

# Delete output folder
hadoop fs -rmr /examples/output/piestimator

HamaPipes (last edited 2013-12-11 17:17:59 by MartinIllecker)