Pig Performance

This document publishes current (as of 11/07/07) performance numbers for Pig. The objective is to have baseline numbers to compare to before we start making major changes to the system.

In addition, the document publishes numbers for Hadoop programs that perform identical computation. One of the objectives of Pig is to provide performance as close as possible (with 10-20%) to native hadoop code. These numbers allow to establish how wide the current gap is.

Test Setup

Both Pig and Hadoop tests ran on an 11 machine cluster with 1 machine dedicated to Name Node and Job Tracker and 10 compute nodes. Each machine had the following HW configuration:

The cluster had Hadoop 0.14.1 installed and had the following configuration:

Test Data

Two data sets were used. Both contained tab delimited auto-generated data with identical schema:

The first dataset (studenttab200M) contained 200 million rows (4384624709 bytes) and was used for all tests. The second set (studenttab10k) contained 10 thousand rows (219190 bytes) and was used as the second set in cogroup/join.

The data can be generated using [ADD TOOL]

Test Cases

Load and Store

A = load 'studenttab200M' using PigStorage('\t');
store A into my_studenttab200M using PigStorage();

Filter that removes 10% of data

A = load 'studenttab200M' using PigStorage('\t') as (name, age, gpa);
B = filter A by gpa < '3.6';
store B into my_studenttab200M_filter_10 using PigStorage();

Filter that removes 90% of data

A = load 'studenttab200M' using PigStorage('\t') as (name, age, gpa);
B = filter A by age < '25';
store B into my_studenttab200M_filter_90 using PigStorage();

Generate with basic arithmetic

A = load 'studenttab200M' using PigStorage('\t') as (name, age, gpa);
B = foreach A generate age * gpa + 3, age/gpa - 1.5;
store B into my_studenttab200M_projection using PigStorage();

Grouping

A = load 'studenttab200M' using PigStorage('\t') as (name, age, gpa);
B = group A by name;
C = foreach B generate flatten(group), COUNT(A.age);
store C into my_studenttab200M_group using PigStorage();

Cogrouping/Join

A = load 'studenttab200M' using PigStorage('\t') as (name, age, gpa);
B = load 'studenttab10k' using PigStorage('\t') as (name, age, gpa);
C = cogroup A by name inner, B by name inner;
D = foreach C generate flatten(A), flatten(B);
store D into my_studenttab200M_cogroup_small using PigStorage();

Performance Numbers

The wall-clock time was measured for each pig script and hadoop job by using time command on the client. In addition, average CPU and memory utilization for both map and reduce stages were measured by observing top. All processes were run 3 times and the performance numbers averaged.

Test

Output Size(bytes)

Pig Time(s)

Map CPU(%)

Map Memory(MB)

Reduce CPU(%)

Reduce Memory(MB)

Hadoop Time(s)

Map CPU(%)

Map Memory(MB)

Reduce CPU(%)

Reduce Memory(MB)

LoadStore

4384624709

185

99

40

N/A

N/A

103

95

40

N/A

N/A

Filter 10%

3940641765

207

99

40

N/A

N/A

137

95

40

N/A

N/A

Filter 90%

511609335

170

99

40

N/A

N/A

100

95

40

N/A

N/A

Project

7083620758

427

99

150 MB

N/A

N/A

127

95

40

N/A

N/A

Group

14144

1072

99

400

99

750

180

99

300

15

20

Cogroup

129701186044

2480

99

450

50-90

500

1425

30-99*

140

N/A

N/A

*In case of cogroup data node is taking up to 70% of CPU taking away CPU from one of the tasks.

Improvements

Change

Test

Old time(s)

New time(s)

Improvement(%)

binary comparator

Group

1072

655

39

binary comparator

Cogroup

2480

2040

18

Additional Work

We need to do also do the following:

Pig Streaming Performance

This section has been added on 5/30/08 to provide initial performance numbers for newly implemented streaming. The tests have a different setup than others:

(1) The same type of data as for other tests but 100GB in size (2) The tests ran on 100 machines with 2 map and 2 reduce slots and 500 MB per task (3) The tests ran against Hadoop 16 cluster

See PigStreamingFunctionalSpec for details of streaming.

Test Cases

The following Pig scripts and hadoop streaming job were executed.

Load/Store

This is just to establish baseline:

IP = load '/pig/in'; 
store IP into '/pig/out';

With binary optimization turned on:

IP = load '/pig/in' split by file; 
store IP into '/pig/out';

Load/Stream/Store

define CMD `filter.pl` ship('./filter.pl'); 
IP = load '/pig/in'; 
OP = stream IP through CMD; 
store OP into '/pig/out';

filter.pl implements the same filtering as the one in Load/Filter/Stream/Store test case.

We also run this with optimization turned on:

define CMD `filter.pl` ship('./filter.pl'); 
IP = load '/pig/in' split by file; 
OP = stream IP through CMD; 
store OP into '/pig/out';

Load/Filter/Stream/Store

IP = load '/pig/in';
FILTERED_DATA = filter IP by $1 > '0';
OP = stream IP through `perl -ne 'print $_;'`; 
store OP into '/pig/out';

Hadoop Streaming

Hadoop streaming code mimicked behavior of Load/Filter/Stream/Store.

Performance Numbers

Test

Time (sec)

Load/Store

1464

Load/Store optimized

423

Load/Stream/Store

1683

Load/Stream/Store optimized

773

Load/Filter/Stream/Store

1673

Hadoop

810

Note that last 4 test cases produce exactly the same data and so their timing can be directly compared.

See DataGenerator for details on testing data generation

PigPerformance (last edited 2009-09-20 23:38:14 by localhost)