Attachment 'TableMapReduceExample.java'

Download

   1 package org.apache.zebra.mapred;
   2 
   3 import org.apache.hadoop.conf.Configuration;
   4 import org.apache.hadoop.conf.Configured;
   5 import org.apache.hadoop.fs.Path;
   6 import org.apache.hadoop.io.BytesWritable;
   7 import org.apache.hadoop.io.IntWritable;
   8 import org.apache.hadoop.io.LongWritable;
   9 import org.apache.hadoop.io.Text;
  10 import org.apache.hadoop.mapred.*;
  11 import org.apache.hadoop.util.Tool;
  12 import org.apache.hadoop.util.ToolRunner;
  13 import org.apache.pig.data.Tuple;
  14 import org.apache.zebra.types.ParseException;
  15 import org.apache.zebra.types.Schema;
  16 import org.apache.zebra.types.TypesUtils;
  17 
  18 import java.io.IOException;
  19 import java.util.Iterator;
  20 
  21 /**
  22  * <code>TableMapReduceExample<code> is a map-reduce example for Table Input/Output Format.
  23  * <p/>
  24  * Schema for Table is set to two columns containing Word of type <i>string</i> and Count of type <i>int</i> using <code> BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int"); </code>
  25  * <p/>
  26  * Hint for creation of Column Groups is specified using <code>  BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]"); </code>. Here we have two column groups.
  27  * <p/>
  28  * Input file should contain rows of word and count, separated by a space. For example:
  29  * <pre>
  30  * this 2
  31  * is 1
  32  * a 5
  33  * test 2
  34  * hello 1
  35  * world 3
  36  * </pre>
  37  * <p/>
  38  * <p> Second job reads output from the first job which is in Table Format. Here we specify <i>count</i> as projection column. Table Input Format projects in put row
  39  * which has both word and count into a row containing only the count column and hands it to map.
  40  * <p/>
  41  * Reducer sums the counts and produces a sum of counts which should match total number of words in original text.
  42  */
  43 
  44 public class TableMapReduceExample extends Configured implements Tool {
  45 
  46   static class Map extends MapReduceBase implements Mapper<LongWritable, Text, BytesWritable, Tuple> {
  47     private BytesWritable bytesKey;
  48     private Tuple tupleRow;
  49 
  50     /**
  51      * Map method for reading input.
  52      */
  53     @Override
  54     public void map(LongWritable key, Text value,
  55                     OutputCollector<BytesWritable, Tuple> output, Reporter reporter)
  56         throws IOException {
  57 
  58       // value should contain "word count"
  59       String[] wordCount = value.toString().split(" ");
  60       if (wordCount.length != 2) {
  61         // LOG the error
  62         throw new IOException("Value does not contain two fields:" + value);
  63       }
  64 
  65       byte[] word = wordCount[0].getBytes();
  66       bytesKey.set(word, 0, word.length);
  67       tupleRow.set(0, new String(word));
  68       tupleRow.set(1, Integer.parseInt(wordCount[1]));
  69 
  70       output.collect(bytesKey, tupleRow);
  71     }
  72 
  73     /**
  74      * Configuration of the job. Here we create an empty Tuple Row.
  75      */
  76     @Override
  77     public void configure(JobConf job) {
  78       bytesKey = new BytesWritable();
  79       try {
  80         Schema outSchema = BasicTableOutputFormat.getSchema(job);
  81         tupleRow = TypesUtils.createTuple(outSchema);
  82       } catch (IOException e) {
  83         throw new RuntimeException(e);
  84       } catch (ParseException e) {
  85         throw new RuntimeException(e);
  86       }
  87     }
  88 
  89   }
  90 
  91   static class ProjectionMap extends MapReduceBase implements Mapper<BytesWritable, Tuple, Text, IntWritable> {
  92     private final static Text all = new Text("All");
  93 
  94     /**
  95      * Map method which gets count column after projection.
  96      *
  97      * @throws IOException
  98      */
  99     @Override
 100     public void map(BytesWritable key, Tuple value,
 101                     OutputCollector<Text, IntWritable> output, Reporter reporter)
 102         throws IOException {
 103       output.collect(all, new IntWritable((Integer) value.get(0)));
 104     }
 105   }
 106 
 107   public static class ProjectionReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
 108     /**
 109      * Reduce method which implements summation. Acts as both reducer and combiner.
 110      *
 111      * @throws IOException
 112      */
 113     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
 114       int sum = 0;
 115       while (values.hasNext()) {
 116         sum += values.next().get();
 117       }
 118       output.collect(key, new IntWritable(sum));
 119     }
 120   }
 121 
 122   /**
 123    * Where jobs and their settings and sequence is set.
 124    *
 125    * @param args arguments with exception of Tools understandable ones.
 126    */
 127   public int run(String[] args) throws Exception {
 128     if (args == null || args.length != 3) {
 129       System.out.println("usage: TableMapReduceExample input_path_for_text_file output_path_for_table output_path_for_text_file");
 130       System.exit(-1);
 131     }
 132 
 133     /*
 134        First MR Job creating a Table with two columns
 135      */
 136     JobConf jobConf = new JobConf();
 137     jobConf.setJobName("TableMapReduceExample");
 138     jobConf.set("table.output.tfile.compression", "none");
 139 
 140     // Input settings
 141     jobConf.setInputFormat(TextInputFormat.class);
 142     jobConf.setMapperClass(Map.class);
 143     FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
 144 
 145     // Output settings
 146     jobConf.setOutputFormat(BasicTableOutputFormat.class);
 147     BasicTableOutputFormat.setOutputPath(jobConf, new Path(args[1]));
 148 
 149     // set the logical schema with 2 columns
 150     BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int");
 151 
 152     // for demo purposes, create 2 physical column groups
 153     BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]");
 154 
 155     // set map-only job.
 156     jobConf.setNumReduceTasks(0);
 157 
 158     // Run Job
 159     JobClient.runJob(jobConf);
 160 
 161     /*
 162       Second MR Job for Table Projection of count column
 163     */
 164     JobConf projectionJobConf = new JobConf();
 165     projectionJobConf.setJobName("TableProjectionMapReduceExample");
 166 
 167     // Input settings
 168     projectionJobConf.setMapperClass(ProjectionMap.class);
 169     projectionJobConf.setInputFormat(TableInputFormat.class);
 170     TableInputFormat.setProjection(projectionJobConf, "count");
 171     TableInputFormat.setInputPaths(projectionJobConf, new Path(args[1]));
 172     projectionJobConf.setMapOutputKeyClass(Text.class);
 173     projectionJobConf.setMapOutputValueClass(IntWritable.class);
 174 
 175     // Output settings
 176     projectionJobConf.setOutputFormat(TextOutputFormat.class);
 177     FileOutputFormat.setOutputPath(projectionJobConf, new Path(args[2]));
 178     projectionJobConf.setReducerClass(ProjectionReduce.class);
 179     projectionJobConf.setCombinerClass(ProjectionReduce.class);
 180 
 181     // Run Job
 182     JobClient.runJob(projectionJobConf);
 183 
 184     return 0;
 185   }
 186 
 187   public static void main(String[] args) throws Exception {
 188     int res = ToolRunner.run(new Configuration(), new TableMapReduceExample(), args);
 189     System.exit(res);
 190   }
 191 }

Attached Files

To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.

You are not allowed to attach a file to this page.