...
cleanup()
is guranteed to run after the computation or in case of failure. (In 0.4.0 it is actually not, we expect this to be fixed in 0.5.0).
You can simply override the functions you need from BSP class.
...
Since Hama 0.4.0 we provide a input and output system for BSP Jobs.TODO: Some blahblah about key value and stuff What's in case when no input is configured? and stuff like that should be documented here..
We choose the key/value model from Hadoop, since we want to provide a conherent API to widely used products like Hadoop MapReduce (SequenceFiles) and HBase (Column-storage).
Input
Configuring Input
When setting up a BSPJob, you can provide a InputFormat and a Path where to find the input.
...
You can implement your own inputformat blabla. It is similar to Hadoop MapReduce's input formats, so you can use existing literature to get into it.
Output
Configuring Output
Like the input, you can configure the output while setting up your BSPJob.
No Format |
---|
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, TMP_OUTPUT);
|
As you can see there are 3 major sections.
The first section is about setting the classes for output key and output value.
Using Input
Custom Outputformat
The second section is about setting the format of your output. In this case this is TextOutputFormat, it outputs key separated by tabstops ('\t') from the value. Each record (key+value) is separated by a newline ('\n').
The third and last section is about setting the path where your output should go. You can use the static method in your choosen Outputformat as well as the convenience method in BSPJob:
No Format |
---|
job.setOutputPath(new Path("/tmp/out"));
|
If you don't provide output, no output folder or collector will be allocated.
Using Output
From your BSP, you can output like this:
No Format |
---|
@Override
public void bsp(
BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException {
peer.write(new Text("Estimated value of PI is"), new DoubleWritable(3.14));
}
|
Note that you can always output, even from Setup or Cleanup methods!
Custom Outputformat
You can implement your own outputformat. It is similar to Hadoop MapReduce's output formats, so you can use existing literature to get into it.
Implementation notes
Internal implementation details
...
The BSP job will be finished only when all processes have no more local and outgoing queues entries and all processes done or is killed by the user.
Counters
Just like in Hadoop MapReduce you can use Counters.
Counters are basically enums that you can only increment. You can use them to track meaningful metrics in your code, e.G. how often a loop has been executed.
From your BSP code you can use counters like this:
No Format |
---|
// enum definition enum LoopCounter{ LOOPS } @Override public void bsp( BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer) throws IOException, SyncException, InterruptedException { for (int i = 0; i < iterations; i++) { // details ommitted peer.getCounter(LoopCounter.LOOPS).increment(1L); } // rest ommitted } |
Counters are in 0.4.0 not usable for flow controls, since they are not synced during sync phase. Watch HAMA-515 for details.
Setup and Cleanup
Combiners
...
No Format |
---|
public static class SumCombiner extends Combiner { @Override public BSPMessageBundle combine(Iterable<BSPMessage> messages) { BSPMessageBundle bundle = new BSPMessageBundle(); int sum = 0; Iterator<BSPMessage> it = messages.iterator(); while (it.hasNext()) { sum += ((IntegerMessage) it.next()).getData(); } bundle.addMessage(new IntegerMessage("Sum", sum)); return bundle; } } |
Job Configuration and Submission
...