This document captures the specification for native map reduce jobs and proposal for executing native mapreduce jobs inside pig script. This is tracked at PIG-506.

Introduction

Pig needs to provide a way to natively run map reduce jobs written in java language. There are some advantages of this-

  1. The advantages of the mapreduce statement are that the user need not be worried about coordination between the jobs, pig will take care of it.

  2. User can make use of existing java applications without being a java programmer.

Syntax

To support native mapreduce job pig will support following syntax-

X = ... ;
Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];

This stores X into the inputLocation using storeFunc, which is then used by native mapreduce to read its data. After we run mymr.jar's mapreduce, we load back the data from outputLocation into alias Y using loadFunc as schema.

params are extra parameters required for native mapreduce job.

mymr.jar is any mapreduce jar file which can be run through "hadoop jar mymr.jar params" command. Thus, the contract for inputLocation and outputLocation is typically managed through params.

For Example, to run wordcount mapreduce program from Pig, we write

A = load 'WordcountInput.txt';
B = MAPREDUCE wordcount.jar Store A into 'inputDir' Load 'outputDir' as (word:chararray, count: int) `org.myorg.WordCount inputDir outputDir`;

Note that the files specified as input and output locations in MAPREDUCE statement will NOT be deleted by pig automatically. User has to delete them manually.

Comparison with similar features

Pig Streaming

Purpose of pig streaming is to send data through an external script or program to transform a dataset into a different dataset based on a custom script written in any programming/scripting language. Pig streaming uses support of hadoop streaming to achieve this. Pig can register custom programs in a script, inline in the stream clause or using a define clause. Pig also provides a level of data guarantees on the data processing, provides feature for job management, provides ability to use distributed cache for the scripts (configurable). Streaming application run locally on individual mapper and reducer nodes for transforming the data.

Hive Transforms

With hive transforms, users can also plug in their own custom mappers and reducers in the data stream. Basically, it is also an application of custom streaming supported by hadoop. Thus, these mappers and reducers can be written in any scripting languages and can be registered to distributed cache to help performance. To support custom map reduce programs written in java (bizo's blog), we can use our custom mappers and reducers as data streaming functions and use them to transform the data using 'java -cp mymr.jar'. This will not invoke a map reduce task but will attempt to transform the data during the map or the reduce task (locally).

Thus, both these features can transform data submitted to a map reduce job (mapper) into a different data set and/or transform data produced by a mapreduce job (reducer) into a different data set. But we should notice that data tranformation takes on a single machine and does not take advantage of map reduce framework itself. Also, these blocks only allow custom transformations inside the data pipeline and does not break the pipeline.

With native job support, pig can support native map reduce jobs written in java language that can convert a data set into a different data set after applying a custom map reduce functions of any complexity.

Implementation Details

X = ... ;
Y = MAPREDUCE 'mymr.jar' [('other.jar', ...)] STORE X INTO 'inputLocation' USING storeFunc LOAD 'outputLocation' USING loadFunc AS schema [`params, ... `];

Pig Plans

Logical Plan- Logical Plan creates a LONative operator with an internal plan that consists of a store and a load operator. The store operator cannot be attached to X at this level as it would start storing X at inputLocation for every plan that includes X, which is not intended. Although we can LOLoad operator for Y at this point, we delay this until the mapreduce plan and track this with LONative operator. Since Y has dataflow dependency on X, we make a connection between operators corresponding to these aliased at logical plan.

    X = ... ;
        |
        |
        |                            |--- (LOStore) Store X into 'inputLocation'
    Y = MapReduce ... ;              |
      (LONative)   --  innnerPlan ---|
        mymr.jar                     |
        params                       |--- (LOLoad) Load 'outputLocation'
        |
        |
        ...

TypeCastInserter- This is a mandatory optimizer that adds a foreach and a cast operator after a load so that if a field is loaded from a load we can convert it to required type. In absence of this, we fail with a cast exception after load is completed. Currently, we apply this optimizer on LOLoad and LOStream as they can be loaded "AS schema". As, mapreduce clause corresponds to a load operation, this optimization is also applicable to LONative operator. A test case for this scenario is-

B = mapreduce 'mapreduce.jar' Store A into 'input' Load 'output' as (name:chararray, count:int) `wordcount input output`;
C = foreach B generate count+1;

Physical Plan- Logical plan is visited to convert internal plan of load store combination into corresponding physical plan operators and connections are maintained as per the logical plan.

    X = ... ;
        |
        |
        |                            |--- (POStore) Store X into 'inputLocation'
    Y = MapReduce ... ;              |
      (PONative)   --  innnerPlan ---|
        mymr.jar                     |
        params                       |--- (POLoad) Load 'outputLocation'
        |
        |
        ...

MapReduce Plan- While compiling the mapreduce plan, with MRCompiler, we introduce a new MapReduceOper, NativeMapReduceOper that tracks the presence of native mapreduce job inside the plan. It also holds required parameters and jarname.

    X = ... ;
        |
        |
        |--- (POStore) Store X into 'inputLocation'

--------------- MR boundary -------------------------
    Y = MapReduce ... ;
     (NativeMapReduceOper)
        mymr.jar              
        params
--------------- MR boundary -------------------------
    Y = (POLoad) Load 'outputLocation'
        |
        |
        ...

Inside the JobControlCompiler's compile method if we find the native mapreduce operator we run the org.apache.hadoop.util.RunJar's Main method with the specified parameters. We also make sure all the dependencies of job are obeyed for the native jobs.

Security Manager

hadoop jar command is equivalent to invoking org.apache.hadoop.util.RunJar's main function with required arguments. RunJar internally can invoke several levels of driver classes before executing the hadoop job (for example- hadoop-example.jar). To detect failure or success of the job we need to detect the innermost error value and return it to Pig. To achieve this we install our own RunJarSecurityManager that delegates the security management to current security manager and captures the innermost exit code.

Pig Stats

Pig Stats are populated by assuming Native job as a single instance of mapreduce job and progress is also reported with the same assumption. As the native job is not under control of pig, except for the exit code, it is hard to capture any information about this job. Thus, job id generated is a pseudo id and it doesnt correspond to actual hadoop job id. We mark the native jobs with a new FEATURE called NATIVE.

References

  1. PIG-506, "Does pig need a NATIVE keyword?", https://issues.apache.org/jira/browse/PIG-506

  2. Pig Wiki, "Pig Streaming Functional Specification", http://wiki.apache.org/pig/PigStreamingFunctionalSpec

  3. Hive Wiki, "Transform/Map-Reduce Syntax", http://wiki.apache.org/hadoop/Hive/LanguageManual/Transform

  4. Bizos blog, "hive map reduce in java" http://dev.bizo.com/2009/10/hive-map-reduce-in-java.html

NativeMapReduce (last edited 2010-08-20 23:49:03 by ThejasNair)