Hbase, MapReduce and the CLASSPATH


DEPRECATED!!! Instead see the new mapreduce package description which supersedes the old mapred package description


MapReduce jobs deployed to a mapreduce cluster do not by default have access to the hbase configuration under $HBASE_CONF_DIR nor to hbase classes.

You could add hbase-site.xml to $HADOOP_HOME/conf and add hbase.jar to the $HADOOP_HOME/lib and copy these changes across your cluster but the cleanest means of adding hbase configuration and classes to the cluster CLASSPATH is by uncommenting HADOOP_CLASSPATH in $HADOOP_HOME/conf/hadoop-env.sh and adding the path to the hbase jar and $HBASE_CONF_DIR directory. Then copy the amended configuration around the cluster. You'll probably need to restart the mapreduce cluster if you want it to notice the new configuration (You may not have to).

Below, find an example of how you would amend $HADOOP_HOME/conf/hadoop-env.sh adding the hbase jar, conf. This example assumes you are using the hbase-0.2.0 release, with additional commented export commands for other releases/builds:

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH=
# for hbase-0.2.0 release
export HADOOP_CLASSPATH=$HBASE_HOME/build/hbase-0.2.0.jar:$HBASE_HOME/conf
# for 0.16.0 release
#export HADOOP_CLASSPATH=$HBASE_HOME/hadoop-0.16.0-hbase.jar:$HBASE_HOME/conf
# for 0.16.0 developer build
#export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hadoop-0.16.0-hbase.jar:$HBASE_HOME/conf

Expand $HBASE_HOME appropriately in the in accordance with your local environment. To use the developer versions of the HADOOP_CLASSPATH, you first need to execute "ant" or "ant build" in $HBASE_HOME. This will create some .jar files in the $HBASE_HOME/build directory. To use the PerformanceEvaluation class from hbase test classes, you must use a developer build. Then, this is how you would run the PerformanceEvaluation MR job to put up 4 clients:

 > $HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4

The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test to HADOOP_CLASSPATH

NOTE: While previous it used to be possible to bundle the hbase.jar up inside the job jar you submit to hadoop, as of 0.2.0RC2, this is no longer so. See HBASE-797.

Hbase as MapReduce job data source and sink

Hbase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat, for mapreduce jobs. Writing mapreduce jobs that read or write hbase, you'll probably want to subclass TableMap and/or TableReduce. See the do-nothing passthrough classes IdentityTableMap and IdentityTableReduce for basic usage. For a more involved example, see BuildTableIndex from the same package or review the org.apache.hadoop.hbase.mapred.TestTableMapReduce unit test.

Running mapreduce jobs that have hbase as source or sink, you'll need to specify source/sink table and column names in your configuration.

Reading from hbase, the TableInputFormat asks hbase for the list of regions and makes a map-per-region. Writing, it may make sense to avoid the reduce step and write back into hbase from inside your map. You'd do this when your job does not need the sort and collation that MR does inside in its reduce; on insert, hbase sorts so no point double-sorting (and shuffling data around your MR cluster) unless you need to. If you do not need the reduce, you might just have your map emit counts of records processed just so the framework can emit that nice report of records processed when the job is done. See example code below. If running the reduce step makes sense in your case, its better to have lots of reducers so load is spread across the hbase cluster.

Example to bulk import/load a text file into an HTable

Here's a sample program from Allen Day that takes an HDFS text file path and an HBase table name as inputs, and loads the contents of the text file to the table.

package com.spicylogic.hbase;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BulkImport implements Tool {
  private static final String NAME = "BulkImport";
  private Configuration conf;

  public static class InnerMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
    private HTable table;
    private HBaseConfiguration HBconf;

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
      if ( table == null )
        throw new IOException("table is null");
      
      String [] splits = value.toString().split("\t");
      if ( splits.length != 4 )
        return;

      String rowID     = splits[0];
      int timestamp    = Integer.parseInt( splits[1] );
      String colID     = splits[2];
      String cellValue = splits[3];

      reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID + "', time='" + timestamp + "'");

      BatchUpdate bu = new BatchUpdate( rowID );
      if ( timestamp > 0 )
        bu.setTimestamp( timestamp );

      bu.put(colID, cellValue.getBytes());      
      table.commit( bu );      
    }
    public void configure(JobConf job) {
      HBconf = new HBaseConfiguration();
      try {
        table = new HTable( HBconf, job.get("input.table") );
      } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
  
  
  public JobConf createSubmittableJob(String[] args) {
    JobConf c = new JobConf(getConf(), BulkImport.class);
    c.setJobName(NAME);
    c.setInputPath(new Path(args[0]));

    c.set("input.table", args[1]);
    c.setMapperClass(InnerMap.class);
    c.setNumReduceTasks(0);
    c.setOutputFormat(NullOutputFormat.class);
    return c;
  }
  
  static int printUsage() {
    System.err.println("Usage: " + NAME + " <input> <table_name>");
    System.err.println("\twhere <input> is a tab-delimited text file with 4 columns.");
    System.err.println("\t\tcolumn 1 = row ID");
    System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
    System.err.println("\t\tcolumn 3 = column ID");
    System.err.println("\t\tcolumn 4 = cell value");
    return -1;
  } 

  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
    // Make sure there are exactly 3 parameters left.
    if (args.length != 2) {
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }

  public Configuration getConf() {
    return this.conf;
  } 

  public void setConf(final Configuration c) {
    this.conf = c;
  }

  public static void main(String[] args) throws Exception {
    int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
    System.exit(errCode);
  }
}

Example to map rows/column families between two HTables

Here another sample program from Allen Day that will iterate over all rows in one table for specified column families and insert those rows/columns to a second table.

package com.spicylogic.hbase;
import java.io.IOException;

public class BulkCopy extends TableMap<Text, Text> implements Tool {
  static final String NAME = "bulkcopy";  
  private Configuration conf;
  
  public void map(ImmutableBytesWritable row, RowResult value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    HTable table = new HTable(new HBaseConfiguration(), conf.get("output.table"));
    if ( table == null ) {
      throw new IOException("output table is null");
    }

    BatchUpdate bu = new BatchUpdate( row.get() );

    boolean content = false;
    for (Map.Entry<byte [], Cell> e: value.entrySet()) {
      Cell cell = e.getValue();
      if (cell != null && cell.getValue().length > 0) {
        bu.put(e.getKey(), cell.getValue());
      }
    }
    table.commit( bu );
  }

  public JobConf createSubmittableJob(String[] args) throws IOException {
    JobConf c = new JobConf(getConf(), BulkExport.class);
    //table = new HTable(new HBaseConfiguration(), args[2]);
    c.set("output.table", args[2]);
    c.setJobName(NAME);
    // Columns are space delimited
    StringBuilder sb = new StringBuilder();
    final int columnoffset = 3;
    for (int i = columnoffset; i < args.length; i++) {
      if (i > columnoffset) {
        sb.append(" ");
      }
      sb.append(args[i]);
    }
    // Second argument is the table name.
    TableMap.initJob(args[1], sb.toString(), this.getClass(),
    Text.class, Text.class, c);
    c.setReducerClass(IdentityReducer.class);
    // First arg is the output directory.
    c.setOutputPath(new Path(args[0]));
    return c;
  }
  
  static int printUsage() {
    System.out.println(NAME +" <outputdir> <input tablename> <output tablename> <column1> [<column2>...]");
    return -1;
  }
  
  public int run(final String[] args) throws Exception {
    // Make sure there are at least 3 parameters
    if (args.length < 3) {
      System.err.println("ERROR: Wrong number of parameters: " + args.length);
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }

  public Configuration getConf() {
    return this.conf;
  }

  public void setConf(final Configuration c) {
    this.conf = c;
  }

  public static void main(String[] args) throws Exception {
    //String[] aa = {"/tmp/foobar", "M2", "M3", "R:"};
    int errCode = ToolRunner.run(new HBaseConfiguration(), new BulkCopy(), args);
    System.exit(errCode);
  }
}

Sample running HBase inserts out of Map Task

Here's sample code from Andrew Purtell that does HBase insert inside in the mapper rather than via TableReduce.

public class MyMap 
  extends TableMap<ImmutableBytesWritable,MapWritable> // or whatever
{
  private HTable table;

  public void configure(JobConf job) {
    super.configure(job);
    try {
      HBaseConfiguration conf = new HBaseConfiguration(job);
      table = new HTable(conf, "mytable");
    } catch (Exception) {
      // can't do anything about this now
    }
  }

  public void map(ImmutableBytesWritable key, RowResult value,
    OutputCollector<ImmutableBytesWritable,MapWritable> output,
    Reporter reporter) throws IOException
  {
    // now we can report an exception opening the table
    if (table == null)
      throw new IOException("could not open mytable");

    // ...

    // commit the result
    BatchUpdate update = new BatchUpdate();
    // ...
    table.commit(update);
  }
}

This assumes that you do this when setting up your job: JobConf conf = new JobConf(new HBaseConfiguration());

Or maybe something like this:

conf.set("hbase.master", myMaster);}}}

Sample MR+HBase Jobs

A students/classes example by Naama Kraus.

Sample MR Bulk Uploader

Read the class comment below for specification of inputs, prerequisites, etc. In particular, note that the class comment says that this code is for hbase 0.1.x.

package org.apache.hadoop.hbase.mapred;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Sample uploader.
 * 
 * This is EXAMPLE code.  You will need to change it to work for your context.
 * 
 * Uses TableReduce to put the data into hbase. Change the InputFormat to suit
 * your data. Use the map to massage the input so it fits hbase.  Currently its
 * just a pass-through map.  In the reduce, you need to output a row and a
 * map of columns to cells.  Change map and reduce to suit your input.
 * 
 * <p>The below is wired up to handle an input whose format is a text file
 * which has a line format as follow:
 * <pre>
 * row columnname columndata
 * </pre>
 * 
 * <p>The table and columnfamily we're to insert into must preexist.
 * 
 * <p> To run, edit your hadoop-env.sh and add hbase classes and conf to your
 * HADOOP_CLASSPATH.  For example:
 * <pre>
 * export HADOOP_CLASSPATH=/Users/stack/Documents/checkouts/hbase/branches/0.1/build/classes:/Users/stack/Documents/checkouts/hbase/branches/0.1/conf
 * </pre>
 * <p>Restart your MR cluster after making the following change (You need to 
 * be running in pseudo-distributed mode at a minimum for the hadoop to see
 * the above additions to your CLASSPATH).
 * 
 * <p>Start up your hbase cluster.
 * 
 * <p>Next do the following to start the MR job:
 * <pre>
 * ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
 * </pre>
 * 
 * <p>This code was written against hbase 0.1 branch.
 */
public class SampleUploader extends MapReduceBase
implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
  private static final String NAME = "SampleUploader";
  private Configuration conf;

  public JobConf createSubmittableJob(String[] args) {
    JobConf c = new JobConf(getConf(), SampleUploader.class);
    c.setJobName(NAME);
    c.setInputPath(new Path(args[0]));
    c.setMapperClass(this.getClass());
    c.setMapOutputKeyClass(Text.class);
    c.setMapOutputValueClass(MapWritable.class);
    c.setReducerClass(TableUploader.class);
    TableReduce.initJob(args[1], TableUploader.class, c);
    return c;
  } 

  public void map(LongWritable k, Text v,
    OutputCollector<Text, MapWritable> output, Reporter r)
  throws IOException {
    // Lines are space-delimited; first item is row, next the columnname and
    // then the third the cell value.
    String tmp = v.toString();
    if (tmp.length() == 0) {
      return;
    }
    String [] splits = v.toString().split(" ");
    MapWritable mw = new MapWritable();
    mw.put(new Text(splits[1]),
      new ImmutableBytesWritable(splits[2].getBytes()));
    String row = splits[0];
    r.setStatus("Map emitting " + row + " for record " + k.toString());
    output.collect(new Text(row), mw);
  }
  
  public static class TableUploader
  extends TableReduce<Text, MapWritable> {
    @Override
    public void reduce(Text k, Iterator<MapWritable> v,
      OutputCollector<Text, MapWritable> output, Reporter r)
    throws IOException {
      while (v.hasNext()) {
        r.setStatus("Reducer committing " + k);
        output.collect(k, v.next());
      }
    }
  }
  
  static int printUsage() {
    System.out.println(NAME + " <input> <table_name>");
    return -1;
  } 
    
  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
    // Make sure there are exactly 2 parameters left.
    if (args.length != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
        args.length + " instead of 2.");
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }
    
  public Configuration getConf() {
    return this.conf;
  } 
  
  public void setConf(final Configuration c) {
    this.conf = c;
  }

  public static void main(String[] args) throws Exception {
    int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
      args);
    System.exit(errCode);
  }
}

Hbase/MapReduce (last edited 2009-12-12 17:30:56 by tuxracer69)