Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

cleanup() is guranteed to run after the computation or in case of failure. (In 0.4.0 it is actually not, we expect this to be fixed in 0.5.0).

You can simply override the functions you need from BSP class.

...

Since Hama 0.4.0 we provide a input and output system for BSP Jobs.TODO: Some blahblah about key value and stuff What's in case when no input is configured? and stuff like that should be documented here..

We choose the key/value model from Hadoop, since we want to provide a conherent API to widely used products like Hadoop MapReduce (SequenceFiles) and HBase (Column-storage).

Input

Configuring Input

When setting up a BSPJob, you can provide a InputFormat and a Path where to find the input.

...

You can implement your own inputformat blabla. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.

Output

Configuring Output

Like the input, you can configure the output while setting up your BSPJob.

No Format

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);

    job.setOutputFormat(TextOutputFormat.class);

    FileOutputFormat.setOutputPath(job, TMP_OUTPUT);

As you can see there are 3 major sections.

The first section is about setting the classes for output key and output value.

Using Input

Custom Outputformat

The second section is about setting the format of your output. In this case this is TextOutputFormat, it outputs key separated by tabstops ('\t') from the value. Each record (key+value) is separated by a newline ('\n').

The third and last section is about setting the path where your output should go. You can use the static method in your choosen Outputformat as well as the convenience method in BSPJob:

No Format

 job.setOutputPath(new Path("/tmp/out"));

If you don't provide output, no output folder or collector will be allocated.

Using Output

From your BSP, you can output like this:

No Format

 @Override
 public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {

     peer.write(new Text("Estimated value of PI is"), new DoubleWritable(3.14));

 }

Note that you can always output, even from Setup or Cleanup methods!

Custom Outputformat

You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats, so you can use existing literature to get into it.

Implementation notes

Internal implementation details

...

The BSP job will be finished only when all processes have no more local and outgoing queues entries and all processes done or is killed by the user.

Counters

Just like in Hadoop MapReduce you can use Counters.

Counters are basically enums that you can only increment. You can use them to track meaningful metrics in your code, e.G. how often a loop has been executed.

From your BSP code you can use counters like this:

No Format

    // enum definition
    enum LoopCounter{
      LOOPS
    }

    @Override
    public void bsp(
        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
        throws IOException, SyncException, InterruptedException {
      for (int i = 0; i < iterations; i++) {
        // details ommitted
        peer.getCounter(LoopCounter.LOOPS).increment(1L);
      }
      // rest ommitted
    }

Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.

Setup and Cleanup

Combiners

...

No Format
  public static class SumCombiner extends Combiner {

    @Override
    public BSPMessageBundle combine(Iterable<BSPMessage> messages) {
      BSPMessageBundle bundle = new BSPMessageBundle();
      int sum = 0;

      Iterator<BSPMessage> it = messages.iterator();
      while (it.hasNext()) {
        sum += ((IntegerMessage) it.next()).getData();
      }

      bundle.addMessage(new IntegerMessage("Sum", sum));
      return bundle;
    }

  }

Job Configuration and Submission

...