Mohit Sabharwal and Xuefu Zhang, 06/30/2015

Objective

The initial patch of Pig on Spark feature was delivered by Sigmoid Analytics in September 2014. Since then, there has been effort by a small team comprising of developers from Intel, Sigmoid Analytics and Cloudera towards feature completeness. This document gives a broad overview of the project. It describes the current design, identifies remaining feature gaps and finally, defines project milestones.

Introduction

Pig on Spark project proposes to add Spark as an execution engine option for Pig, similar to current options of MapReduce and Tez.

Pig Latin commands can be easily translated to Spark transformations and actions. Each

command carries out a single data transformation such as filtering, grouping or aggregation. This characteristic translates well to Spark, where the data flow model enables step-by-step transformations of Resilient Distributed Datasets (RDDs).

Spark will be simply “plugged in” as a new execution engine. Any optimizations or features added to Pig (like new UDFs, logical plan or physical plan optimization) will be automatically available to the Spark engine.

For more information on Pig and Spark projects, see References.

Motivation

The main motivation for enabling Pig to run on Spark is to:

  • Increase Pig adoption amongst users who would like to standardize on one (Spark) execution backend for operational convenience.

  • Improve performance:

    • For Pig query plans that result in multiple MapReduce jobs, such jobs can be combined into a single Spark job such that each intermediate shuffle output (“working dataset”) is stored on local disk, and not replicated across the network on HDFS only to be read back again.

    • Spark re-uses YARN containers, so does not need to launch new AppMaster and Task JVMs for each job.

    • Spark allows explicit in-memory caching of RDD dataset, which supports multi-query implementation in Pig.

    • Spark features like broadcast variables support implementation of specialized joins in Pig like fragment-replicate join.

Functionality

Pig on Spark users can expect all existing Pig functionality.

Users may switch to the Spark execution engine by:

  • Setting the SPARK_MASTER environment variable to point to user’s spark cluster, and

  • specifying the -x spark argument in pig command line.

Note: At this stage of development, testing has only been done in Spark “local” mode (i.e. with SPARK_MASTER as “local”).  Additional code changes and environment settings may be required to configure Pig with a Spark cluster.

Spark engine will support:

  • EXPLAIN command that displays the Spark execution engine operator plan.

  • Progress, statistics and completion status for commands as well as error and debug logs.

Design

The design approach is to implement Pig Latin semantics using Spark primitives.

Since a Pig Latin command approximates a Spark RDD transformation, expressing Pig semantics directly as Spark primitives is a natural option. Moreover, like Pig, Spark supports lazy execution which is triggered only when certain commands (actions in Spark) are invoked.

This design was part of the initial patch, and is inline with that of Pig on Tez.

Note that this approach is different from one adopted by Hive on Spark project, which implements Hive QL semantics as MapReduce primitives which, in turn, are translated to Spark primitives.

Design Components

Pig Input Data Files as Spark RDDs

The first step in a Pig Latin program is to specify what the input data is, and how it’s contents are to be deserialized, i.e., converted from “input format” into Pig’s data model which views input as a sequence of Tuples (aka Bag). This step is carried out by Pig’s LOAD command which returns a handle to the bag. This bag is then processed by the next Pig command, and so on.

For Spark engine, an input Pig bag is simply a RDD of Tuples, and each subsequent Pig command can be translated to one or more RDD transformations.

InputFormat and OutputFormat

PigInputFormat abstracts out the underlying input format for the execution engine such that it always returns Pig Tuples.It is a wrapper around Pig LoadFunc which, in turn, is a wrapper around underlying Hadoop InputFormat. 

All input and output formats supported with Pig should work with Spark engine. No changes are expected related to input or output formats.

Logical Plan

A Pig Latin program is translated in a one-to-one fashion to a query plan called LogicalPlan containing LogicalRelationalOperators.  Pig builds a logical plan for every independent bag defined in the program.

No changes are expected to the logical plan.

Physical Plan

The LogicalPlan is converted to PhysicalPlan containing PhysicalOperators.

Note that some operators in LogicalPlan and PhysicalPlan may contain optimization information that is available to be used by the execution engine (Spark). One such scenario is when ORDER BY is following by a LIMIT operator. This is discussed in optimizations section later in the document.

Spark Plan

Spark Plan Compilation

The SparkCompiler identifies Spark jobs that need to be run for a given physical plan. It groups physical operators into one or more SparkOperators such that each SparkOperator represents a distinct Spark job. A SparkPlan is simply a DAG of SparkOperators.

The physical plan inside each SparkOperator forms the operator pipeline that gets executed by the execution engine.

The purpose of creating a SparkPlan is  two fold:

  • It identifies all Spark jobs that need to be run.

  • It allows for Spark specific optimizations to be performed to the plan before execution.

Design for SparkPlan needs improvement. In the current implementation, we convert the SparkPlan into a pipeline of RDD transformations and immediately execute the RDD pipeline (by performing a Spark action). There is no intermediate step that allows optimization of the RDD pipeline, if so deemed necessary, before execution. This task will entail re-working of the current sparkPlanToRDD()code, for example by introducing an RDDPlan of RDDOperators.

Spark Plan Execution

Executing a SparkPlan entails converting underlying PhysicalOperators to RDDs and then triggering the execution via a Spark action.

Execution begins by converting the POLoad operator into an RDD<Tuple> using Spark’s API that accepts Hadoop input format (PigInputFormat). Next, we move down SparkPlan’s operator pipeline and perform a series of RDD transformations, resulting in a new RDD at each step. Step-by-step conversion of physical operators to RDDs is show in the example below:

For a given physical operator, an RDD transformation generally involves taking input tuples one by by from the predecessor RDD, “attaching” it to the underlying physical plan and calling getNextTuple() on  the leaf operator of the physical plan to do the actual processing. (Pig uses the “pull” model for execution of physical operator pipeline.)

Physical Operator to Spark RDD Conversion

Every Pig Latin command translates to one or more physical operators. Converting every physical operator to a Spark RDD is an important milestone in feature completion.

 

Physical Operator

Converter Description

Spark APIs

Status

POLoad

Creates an RDD for given HDFS file read with PigInputFormat. FileSpec info is passed to input format via config. Returns RDD of Pig Tuples, i.e. RDD<Tuple>

sc.newAPIHadoopFile(), rdd.map()

    

PODistinct

Shuffles using reduceByKey()

Note that Spark has rdd.distinct() API as well - needs investigation whether using distinct() is more optimal.

rdd.reduceByKey()

  

POForEach

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple().

When GROUP BY is followed by FOREACH with algebraic UDFs or nested DISTINCT, there is opportunity to use a Combiner function. This optimization remains to be done for Spark engine.  

rdd.mapPartitions(...)

  ✔ 

POFilter

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple() .

rdd.filter(...)

  ✔ 

POCross

N/A since POCross is only used when CROSS is used inside nested ForEach.

Note that for (non-nested) CROSS, Pig parallelizes the operation by generating a randomized synthetic key (in GFCross UDF) for every record,  replicating the records, performing a shuffle based on the synthetic key and then joining records in each reducer. Spark engine simply re-uses the physical plan without any changes.

    

  N/A

POLimit

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple().

rdd.coalesce(), rdd.mapPartitions()

  

POSort

Sorts using JavaPairRDD.sortBykey()

using POSort.SortComparator

rdd.map()

rdd.sortByKey(), rdd.mapPartitions()

 

POSplit

Used either explicitly or implicitly in case of multiple stores ( “multi-query execution” feature)

 

  

POStore

Persists Pig Tuples (i.e. RDD<Tuple>) using PigOutputFormat to HDFS.

PairRDDFunctions.saveAsNewAPIHadoopFile()

 ✔

POUnion

Returns union of all predecessor RDDs as a new UnionRDD.

new UnionRDD()

 

POStream

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple()

rdd.mapPartitions()

POSkewedJoin

Optimizes case where there is significant skew in the number of records per key.

Currently implemented as a regular RDD join.

JavaPairRDD.join()

 

POFRJoin

No shuffle join when one input fits in memory.

Currently implemented as a regular RDD join.

JavaPairRDD.join()

JavaPairRDD.leftOuterJoin()

 

POMergeJoin

A no-shuffle join if both inputs are already sorted. Currently implemented as a regular RDD join.

JavaPairRDD.join()

 

POLocalRearrange

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple(). Generates tuples of the form (index, key,(Tuple without key)).

rdd.map()

 

POGlobalRearrange

Creates a new CoGroupRDD of predecessor RDDs. Generates tuples of the form (bag index, key, {tuple without key}).

The output is always processed next by the POPackage operator.

Note that PIG represents ordinary shuffle operations like GROUP BY as three physical operators: LocalRearrange (to identify the key and source), GlobalRearrange (to do the actual shuffle) and Package (to generate the output in each reducer). We use a Spark API to do the shuffle  (CoGroupRDD). We just need to identify the key, not the sources. So, the Packaging step can be combined with GlobalRearrange step for Spark. This optimization remains to be done for Spark engine.

new CoGroupRDD()

rdd.map()

 ✔

POPackage

Packages globally rearranged tuples into format required by co-group. Attaches Pig tuple as input to underlying physical operator and calls getNextTuple().

rdd.map()

 ✔

PONative

Native MR. Follow up with Native Spark.

 

POCollectedGroup

Attaches Pig tuple as input to underlying physical operator and calls getNextTuple().

rdd.mapPartitions()

POMergeGroup

  

  ⨯

POCounter

This operator supports the RANK command and appears right before the PORank operator in the plan.

The output is an RDD of tuples of the form  (partition index, counter, tuple)

where the counter is incremented for every record (there is special handling for  “DENSE” rank).

rdd.mapPartitionsWithIndex()

PORank

The operator appears right after the POCounter operator.

Runs two Spark jobs. First to compute number of records per partition index. And a second Spark job to compute the rank of each tuple by adding offset to counter values in tuples based on output of the first job.  

rdd.mapToPair().groupByKey().sortByKey().collectAsMap()


rdd.map()

 

= Implemented, = Needs optimal implementation, = Not Implemented

Special Considerations

Multi-Query Execution

Multi-query execution in Pig is motivated by the fact that users often process the same data set in multiple ways, but do not want to pay the cost of reading it multiple times. To address this case, Pig inserts a SPLIT operator for every logical operator that has multiple outputs, which essentially means “materialize state at this point”. For Spark engine, a SPLIT can be translated to an optimization step where the RDD data set is pulled into Spark’s cluster-wide in-memory cache, such that child operators read from the cache. (In the MapReduce engine, child operators read from disk.)

Without this optimization, Spark engine Pig jobs will run significantly slower in the multi-query case because RDDs will need to be recomputed. This optimization needs to be implemented.

Remaining Optimizations

Specialized Joins and Groups

Pig supports specialized joins like fragment replicate join, merge join and skew join, as well as specialized grouping like collected groups and merge groups. These are explicitly specified by the user with the USING clause in the Pig command. (Pig does not automatically choose a specific join or group based on input data set.)

These are currently implemented as regular joins and groups. Specialized versions need to be implemented.

Secondary Sort

In Pig with MapReduce engine, there are several map-side performance optimizations.

A good example is secondary key sort:

B = GROUP A by FOO;
C = FOREACH B {
  D = ORDER A by BAR;
  GENERATE D;
}

MapReduce provides specialized API to support secondary key sort within groups. Spark currently does not have support for secondary sort (SPARK-3655).

Currently, secondary sort in Spark engine is implemented using two shuffles. This needs to be fixed.

Combiner Optimizations

Using a combiner lowers shuffle volume and skew on the reduce side.

The Pig combiner is an optimization that applies to certain FOREACH cases:

  • In nested foreach when the only nested operation is DISTINCT (i.e. dedupes in map phase to reduce shuffle volume).

  • In non-nested foreach, where all projections are either:

    • Expressions on the GROUP column, Or

    • UDFs implementing the Algebraic interface.

The combiner either translates to MR combiner or a special Pig operator which does in-memory combining in the map stage (“partial aggregation” feature).

Combiner support is currently not implemented for Spark engine.

Limit after Sort

In MapReduce engine, a sort entails three map reduce jobs - first one for computing quantiles from samples of input data, second one for performing the shuffle partitioned based on quantile ranges, and third one which is a 1-reduce-task shuffle to generate the final output.

In the scenario where ORDER BY is followed by LIMIT n, logical and physical plans do not have the POLimit operator. Instead, the sort operator (POSort) contains the limit information  (see LimitOptimizer and LimitAdjuster). MapReduce uses the limit information to optimize the cost of sorting in the second MR job where the combiner and the reducer stages output just the top n items.

Currently, Spark sort API does not take limit information. Hence no limit related optimization is implemented for the Spark engine. See PIG-4438.

Use Optimal Spark API for shuffling

Currently shuffle is implemented using Spark’s groupBy, sortByKey and CoGroupRDD APIs. However, Spark has since added other variants like aggregateByKey (which also support combiner functions).

Parallelism during shuffle

Currently no parallelism estimate is made when calling Spark’s shuffle APIs, leaving Spark to set it.

Native operator for Spark

For several reasons (performance, difficult translation to Pig, legacy code, etc.), user may want to directly run Spark code written in Scala, Python or Java from a Pig script.

This entails breaking the Pig pipeline, writing data to disk (added POStore), invoking the native Spark script, and then reading data back from disk (added POLoad). Some issues:

  • Co-ordination between Pig’s spark jobs and native spark jobs.

  • Adding stats and progress for native Spark jobs.

  • Handling any security implications when running Spark code natively.

This is a low priority item for the first version.

Packaging as part of GlobalRearrange

As described earlier, the Packaging operator does not necessarily need it’s own RDD transformation in Spark and may be made part of the GlobalRearrange RDD transformation.

This is an optimization step which can save a few extra transformations. Though it might make it more confusing to diverge the behavior from MR and Tez.

Progress Reporting and Statistics

Basic support for Spark job progress reporting, statistics and logs has been implemented. Needs more work for comprehensive support.

Test Infrastructure

Unit Tests

Status of latest unit test run is here.

Unit tests with Spark engine use the standard miniDFS cluster. However, currently unit tests run in Spark “local” mode. Spark offers a way to run jobs in “local cluster” mode, where a cluster is made up of a given number of processes on the local machine. Unit test execution needs to be switched to local-cluster mode once local mode tests pass.

More Spark specific unit tests need to be added.

No testing has been done so far with actual Spark cluster.

Not much thought has been given so far on benchmark and performance testing.

Summary of Remaining Tasks

  • Design

    • Current design for SparkPlan needs improvement as mentioned earlier.

  • Functionality

    • All physical operators are supported at this point, except PONative.

    • Unit test failures point to some important gaps in existing implementation. These are highlighted as items that need to implemented as part of Milestone 1 below.

  • Optimizations

    • Specialized versions of joins and cogroups.

    • Running combiner for Algebraic UDFs and Foreach optimizaton.

    • Computing optimal parallelism for shuffles.

    • Spark now has several similar shuffle APIs. Need to choose the optimal ones.

    • More efficient implementation of secondary sort.

  • Spark integration

    • Progress and error reporting support is implemented but needs improvement.

  • Tests

    • Test with Spark local-cluster mode.

    • Add remaining unit tests for Spark engine specific code.

    • Test on Spark cluster.

    • Benchmarking and performance tests.

Comparison with Pig on Tez

Tez, as a backend execution engine, is very similar to Spark in that it offers the same optimizations that Spark does (speeds up scenarios that require multiple shuffles by storing intermediate output in local disk or memory, re-use of YARN containers and support for distributed in-memory caching.). The main implementation difference when using Tez as a backend engine is that Tez offers a much lower level API for expressing computation. From the direct user perspective, Tez also does not offer a built-in shell.

Pig on Tez design is very similar to current Pig on Spark design, in that it constructs a new plan directly from the PhysicalPlan.

In Pig on Tez, every shuffle boundary translates into two Tez vertices and the connecting edge expresses the fact we are shuffling.

 

In Pig on Spark, the API is not as low level, so every shuffle is expressed as a high level call to Spark like reduceBy  or CoGroupRDD.  

Significant refactoring of code was done in Pig 0.13 to support backends other than MapReduce starting with Pig on Tez. Pig on Spark builds on that effort.

Milestones

This document lists milestones for ongoing work on Pig on Spark.

Milestone 1

Goal: Functional completeness of major items

ETA: ~ 10-12 developer weeks

Missing Operators

 

Operator

Comment

Owner

ETA

Status

POCross (Top level & Nested Cross)

PIG-4549 (top level) and PIG-4552 (nested)

Mohit

3 days

POFRJoin

PIG-4278

Mohit

1 day

POMergeJoin

PIG-4422

Mohit

1 day

PONative

Low priority item for first version. Need a (no-op) implementation.

 

1 day

 

POMergeGroup

  

1 day

 

POSkewJoin

PIG-4421

 Kelly

 

 

 

Fix or disable tests for specialized JOIN and GROUP/COGROUP operators.

Missing Features

 

Feature

Comment

Test

Owner

ETA

Status

Support for custom Partitioner

PIG-4565

Used in DISTINCT, GROUP, JOIN, CROSS. Need to wrap user’s custom MR Partitioner in Spark Partitioner object.

TestCustomPartitioner

Mohit

1 week

Combiner support for Algebraic UDFs    

 

TestCombiner

 

TBD

 

Spark should call cleanup in MR OutputCommitter API

Low priority clean-up item. Not a must-do for first version.

TestStore

 

3 days

Blocked by SPARK-7953 and multi-query

Support for HBase storage

PIG-4585, PIG-4611

TestHBaseStorage

Mohit/Kelly

4 days

 

Support for secondary sort

PIG-4504

TestAccumulator#testAccumWithSort

 Kelly

2 days

 

Use optimal shuffle API

Currently, we use groupByKey, which assumes all values for a key will fit in memory. Use aggregateByKey or reduceByKey instead.

  

2 days

 

Secondary Sort using one shuffle

PIG-4504 implements secondary sort using 2 shuffles. We should do it in one (PIG-4553). This is a performance item, but a well used feature, so we should do it in first milestone.

  

4 days

 

Multi-query support

TestMultiQuery, TestMultiQueryLocal broken

 

Kelly

1 week

 

 

Critical Tests

Corner cases failing in already implemented features

 

Test

Comment

Owner

ETA

Status

TestJoin

Joining empty tuples fails

 Kelly

1 day

 

TestPigContext, TestGrunt

PIG-4295. Deserializing UDF classpath config fails in Spark because it’s thread local.

Kelly

3 days

TestProjectRange

PIG-4297 Range expressions fail with groupby

Kelly

2 days

 

TestAssert

FilterConverter issue?

 

1 day

 

TestLocationInPhysicalPlan

  

1 day

 

 

Other Tests

ETA: 1 week

Several tests are failing due to either:

  • Ordering difference in shuffle results (MR returns sorted results, Spark doesn’t), Or

  • Gaps in SparkPigStats.

We should fix these tests as these as we find time as these are “low hanging fruit” and might help us uncover other issues. These include TestScriptLanguage, TestPigRunner, TestJoinLocal, TestEvalPipelineLocal, TestDefaultDateTimeZone, etc.

Investigate

And fix if needed

 

Feature

Comment

Owner

ETA

Status

Streaming

Test are passing, but we need confirmation that this feature works.

 

1 day

 

 

Spark Unit Tests

Few Spark engine specific unit tests have been written so far (for features that have Spark specific implementations). Following is a partial list of what we need to add. Need to update this list as we add more Spark specific code. We should also add tests for POConverter implementations.

 

Test

Comment

Owner

ETA

Status

TestSparkLauncher

    

TestSparkPigStats

    

TestSecondarySortSpark

 

Kelly

 

 

Enhance Test Infrastructure

ETA: ~ 2 weeks (additional test failures expected)

Use “local-cluster” mode to run unit tests and fix resulting failures.

Milestone 2

Goal: Spark Integration & remaining functionality items

ETA: ~ 5 developer weeks

Spark Integration

ETA: 2 weeks

Including error reporting, improved progress and stats reporting

Fix Remaining Tests

ETA: 3 weeks

TestScriptLanguageJavaScript

TestPigRunner

TestPruneColumn: Fixed in PIG-4582

TestForEachNestedPlanLocal:Fixed in PIG-4552 

TestRank1

TestPoissonSampleLoader

TestPigServerLocal

TestNullConstant:   Fixed in PIG-4597

TestCase: Fixed PIG-4589

TestOrcStoragePushdown

Milestone 3

Goal: Performance optimization and code cleanup

ETA: TBD

Performance Tests

TBD

Performance Optimizations

 

Feature

Comment

Owner

ETA

Status

Split / MultiQuery using RDD.cache()

    

In Group + Foreach aggregations, use aggregateByKey or reduceByKey for much better performance

For example: COUNT or DISTINCT aggregation inside nested foreach is handled by Pig code. We should use Spark to do in more efficiently

   

Compute optimal Shuffle Parallelism

Currently we let Spark pick the default

   

Combiner support for “Group+ForEach”

    

Multiple GROUP BYs on the same data set can avoid multiple shuffles.

See MultiQueryPackager

   

Switch to Kryo for Spark data serialization

Are all Pig serializable classes compatible with Kryo ?

   

FR Join

    

Merge Join (including sparse merge join)

    

Skew Join

    

Merge CoGroup

    

Collected CoGroup

    

 

Note that there is ongoing work in Spark SQL to support specialized joins: See SPARK-2211. As an example, support for merge join is in Spark SQL in Spark 1.4 (SPARK-2213 and SPARK-7165). This implies that Spark community will not be adding support for these joins in Spark Core library.

Re-design Spark Plan

Currently, the SparkLauncher converts the SparkPlan to RDD pipeline and immediately executes it. There is no intermediate step that allows optimization of the RDD pipeline, if so deemed necessary, before execution. This will need re-working of sparkPlanToRDD(), perhaps by introduction of a RDDPlan of RDDOperators.

Other Features

  • Native Spark operator support.

  • Allow Spark Partitioner to be specified using PARTITION BY.

Getting Started

Github: https://github.com/apache/pig/tree/spark

Please refer to PIG-4059 for instructions on how to setup your development environment, PIG-4266 for instructions on how to run unit tests and PIG-4604 for instructions on package import order.

References

[1] Pig

[2] Pig Latin   

[3] Pig Execution Model

[4] Apache Spark wiki 

[5] Spark 

[6] Spark blog post

[7] Hive on Spark design doc



  • No labels