We want to use Pig to process arbitrary Avro data and store results as Avro files. This page summarizes how to use AvroStorage, a Pig load/store func for Avro data.

1. Introduction

AvroStorage() extends two PigFuncs: LoadFunc and StoreFunc, and is used to load and store Avro data in Pig scripts.

Due to discrepancies of Avro and Pig data models, AvroStorage has:

For simplicity, we also make the following assumption:

Below is an example pig script using AvroStorage.

REGISTER avro-1.4.0.jar
REGISTER json-simple-1.1.jar
REGISTER piggybank.jar
REGISTER jackson-core-asl-1.5.5.jar
REGISTER jackson-mapper-asl-1.5.5.jar

fs -rmr $home/testOut/testCnt;

avro = LOAD '$home/avro/TestInput/case1/part-00000.avro'
USING AvroStorage ();

groups = GROUP avro BY value.member_id;
sc = FOREACH groups GENERATE group AS member_id, COUNT(avro) AS cnt;

STORE sc INTO '$home/testOut/testCnt'
USING AvroStorage ();

Users can provide parameters to AvroStorage when used as a LoadFunc or StoreFunc.

2. Input Parameters (as a LoadFunc)

AvroStorage assumes that all avro files in sub-directories of an input directory share the same schema and it by default does schema check. This process may take seconds when the input directory contains many sub-directories. Users can use option "no_schema_check" to disable the schema check.


-- does schema check of sub-directories under /testIn/dir
data = LOAD '/testIn/dir' USING storage.avro.AvroStorage ();

-- disable schema check of sub-directories under /testIn/dir
data = LOAD '/testIn/dir' USING storage.avro.AvroStorage ('no_schema_check');


Additionally if there are multiple Avro files in different directories having schemas varying by a column. The first schema will be used as an input to read all other Avro files in that directory. This can sometimes result in undesirable results.

3. Input Parameters (as a StoreFunc)

No Parameter

Users can choose not to provide any parameter to AvroStorage and Avro schema of output data is derived from its Pig schema. This may result in undesirable schemas due to discrepancies of Pig and Avro data models or problems of Pig itself:

We thus highly recommend users provide output schema information using the following options.

Pig results are tuples which may contain multiple fields. Users can provide parameters applying to all fields, denoted as global parameters, or specific fields, denoted as field parameters.

Global Parameters
Field Parameters

Users can provide schema information for specific fields. The index of field, n, starts from 0.

Put Parameters in a JSON record

Users can put all above parameters in a JSON record. For instance

....
STORE data INTO 'out_location'
USING AvroStorage(
'{ "debug": 5,
"index": 2,
"schema": {"type":"record","name":"Y",
"fields":[{"name":"b1","type":"float"},
{"name":"b2","type":"float"}]}
}');

4. Examples

A. How to store data in different ways.

Notice that users can pass in parameters as string list or a JSON record.

REGISTER avro-1.4.0.jar
REGISTER json-simple-1.1.jar
REGISTER piggybank.jar
REGISTER jackson-core-asl-1.5.5.jar
REGISTER jackson-mapper-asl-1.5.5.jar

fs -rmr testOut/testSplit1_1;
fs -rmr testOut/testSplit1_2;

B = LOAD 'testIn/B' AS (b1:int,b2:int);
DUMP B;
/* get
(2,4)
(8,9)
(1,3)
*/

SPLIT B INTO X IF b1 < 3, Y IF b1 > 7;
STORE X INTO 'testOut/testSplit1_1'
USING org.apache.pig.impl.io.avro.AvroStorage(
'index', '1',
'schema',
' {"type":"record","name":"X",
"fields":[{"name":"b1","type":"int"},
{"name":"b2","type":"int"}]}');

STORE Y INTO 'testOut/testSplit1_2'
USING org.apache.pig.impl.io.avro.AvroStorage(
'{"index": 2,
"schema": {"type":"record","name":"Y",
"fields": [{"name":"b1","type":"float"},
{"name":"b2","type":"float"}
]
}
}');

B. How to get rid of unwanted tuple wrappers
REGISTER avro-1.4.0.jar
REGISTER json-simple-1.1.jar
REGISTER piggybank.jar
REGISTER jackson-core-asl-1.5.5.jar
REGISTER jackson-mapper-asl-1.5.5.jar

fs -rmr testOut/testPrimitive4;

in = LOAD 'avro/testInput/case2/part-00000.avro' USING AvroStorage ();

B = GROUP in ALL;
sum = FOREACH B GENERATE COUNT(in) ;

STORE sum INTO 'testOut/testPrimitive4'
USING AvroStorage('schema','"long"');

Note if no input parameter specified, schema data of output is

{"type":"record","name":"TUPLE",
"fields":[{"name":"FIELD","type":["null","long"]}]}
C. How to use predefined schemas in data files
REGISTER avro-1.4.0.jar
REGISTER json-simple-1.1.jar
REGISTER piggybank.jar
REGISTER jackson-core-asl-1.5.5.jar
REGISTER jackson-mapper-asl-1.5.5.jar

fs -rmr testOut/testFields;

in = LOAD 'inputData'
USING AvroStorage ();

in = FILTER in BY pageNumer > 1;

out = FOREACH in GENERATE impressionDetails.id as id,
impressionDetails.type as type;

STORE out INTO 'testOut/testFields' USING AvroStorage (
'data', 'inputData',
'field0', 'def:impressionDetails.id', /* use field name to look up schema*/
'field1', 'def:ItemType'); /* use type name to look up schema */

Suppose inputData contains Avro data with the following schema:

{  
   "name" : "ImpressionSetEvent",
   "type" : "record",
   "fields" : [
      {  
         "name" : "pageNumber",
         "type" : [ "int", "null" ] 
      },
      {  
         "name" : "impressionDetails",
         "type" : {
            "name" : "ImpressionDetailsRecord",
            "type" : "record",
            "fields" : [
               {  
                  "name" : "id",
                  "type" : "int"
               },
               {  
                  "name" : "type",
                  "type" : {
                     "type" : "enum",
                     "name" : "ItemType",
                     "symbols" : [ "person", "job", "group", "company", "nus",
"news", "ayn" ]
                  }
               },
               {  
                  "name" : "details",
                  "type" : {
                     "type" : "map",
                     "values" : "string"
                  }
               }
            ]
         }
      }
   ]
}

AvroStorage will construct two maps. One maps from type name to schema as

type name

schema

ImpressionSetEvent

the whole schema

ImpressionDetailsRecord

{"type": "record", "name": ImpressionDetailsRecord","fields" : [{"name":"itemId", "type":"int"}, {"name":"itemType", "type":{"type":"enum", "name":"ItemType","symbols":["person", "job", "group", "company", "nus", "news", "ayn"]}}, {"name":"details","type":{"type":"map","values":"string" }}

ItemType

{"type":"enum", "name":"ItemType","symbols":["person", "job", "group", "company", "nus", "news", "ayn"]}

The other maps from field names to schema as:

field name

schema

pageNumber

["int", "null"]

impressionDetails

ImpressionDetailsRecord

impressionDetails.id

int

impressionDetails.type

ItemType

impressionDetails.details

{"type":"map","values":"string" }

Users can specify which schema to use by field name (as in field 0) or type name (as in field 1).

D. How to store results using schema of existing avro file
REGISTER avro-1.4.0.jar
REGISTER json-simple-1.1.jar
REGISTER piggybank.jar
REGISTER jackson-core-asl-1.5.5.jar
REGISTER jackson-mapper-asl-1.5.5.jar

data = LOAD 'input/part-00000.avro' USING AvroStorage ();
ret = FILTER data BY value.member_id > 2000;

STORE ret INTO 'output' USING AvroStorage (
'same', 'input/part-00000.avro');

5. Known Issues

6. Related Work

7 Acknowledgments

This documentation was originally written by Lin Guo, and appeared at http://linkedin.jira.com/wiki/display/HTOOLS/AvroStorage+-+Pig+support+for+Avro+data