We want to use Pig to process arbitrary Avro data and store results as Avro files. This page summarizes how to use AvroStorage, a Pig load/store func for Avro data.
AvroStorage() extends two PigFuncs: LoadFunc and StoreFunc, and is used to load and store Avro data in Pig scripts.
Due to discrepancies of Avro and Pig data models, AvroStorage has:
For simplicity, we also make the following assumption:
Below is an example pig script using AvroStorage.
REGISTER avro-1.4.0.jar REGISTER json-simple-1.1.jar REGISTER piggybank.jar REGISTER jackson-core-asl-1.5.5.jar REGISTER jackson-mapper-asl-1.5.5.jar fs -rmr $home/testOut/testCnt; avro = LOAD '$home/avro/TestInput/case1/part-00000.avro' USING AvroStorage (); groups = GROUP avro BY value.member_id; sc = FOREACH groups GENERATE group AS member_id, COUNT(avro) AS cnt; STORE sc INTO '$home/testOut/testCnt' USING AvroStorage (); |
Users can provide parameters to AvroStorage when used as a LoadFunc or StoreFunc.
AvroStorage assumes that all avro files in sub-directories of an input directory share the same schema and it by default does schema check. This process may take seconds when the input directory contains many sub-directories. Users can use option "no_schema_check" to disable the schema check.
-- does schema check of sub-directories under /testIn/dir data = LOAD '/testIn/dir' USING storage.avro.AvroStorage (); -- disable schema check of sub-directories under /testIn/dir data = LOAD '/testIn/dir' USING storage.avro.AvroStorage ('no_schema_check'); |
Additionally if there are multiple Avro files in different directories having schemas varying by a column. The first schema will be used as an input to read all other Avro files in that directory. This can sometimes result in undesirable results.
Users can choose not to provide any parameter to AvroStorage and Avro schema of output data is derived from its Pig schema. This may result in undesirable schemas due to discrepancies of Pig and Avro data models or problems of Pig itself:
We thus highly recommend users provide output schema information using the following options.
Pig results are tuples which may contain multiple fields. Users can provide parameters applying to all fields, denoted as global parameters, or specific fields, denoted as field parameters.
|
record |
array |
map |
enum |
fixed |
bytes |
boolean |
int |
long |
float |
double |
string |
tuple |
P |
P |
P |
P |
P |
P |
P |
P |
P |
P |
P |
P |
bag |
|
P |
|
|
|
|
|
|
|
|
|
|
map |
|
|
Y |
|
|
|
|
|
|
|
|
|
chararray |
|
|
|
P |
|
|
|
|
|
|
|
Y |
bigchararray |
|
|
|
P |
|
|
|
|
|
|
|
Y |
bytearray |
|
|
|
|
P |
Y |
|
|
|
|
|
|
boolean |
|
|
|
|
|
|
Y |
|
|
|
|
|
int |
|
|
|
|
|
|
Y |
Y |
Y |
Y |
Y |
|
long |
|
|
|
|
|
|
|
|
Y |
Y |
Y |
|
float |
|
|
|
|
|
|
|
|
|
Y |
Y |
|
double |
|
|
|
|
|
|
|
|
|
|
Y |
|
undefined |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Y |
Users can provide schema information for specific fields. The index of field, n, starts from 0.
Users can put all above parameters in a JSON record. For instance
.... STORE data INTO 'out_location' USING AvroStorage( '{ "debug": 5, "index": 2, "schema": {"type":"record","name":"Y", "fields":[{"name":"b1","type":"float"}, {"name":"b2","type":"float"}]} }'); |
Notice that users can pass in parameters as string list or a JSON record.
REGISTER avro-1.4.0.jar REGISTER json-simple-1.1.jar REGISTER piggybank.jar REGISTER jackson-core-asl-1.5.5.jar REGISTER jackson-mapper-asl-1.5.5.jar fs -rmr testOut/testSplit1_1; fs -rmr testOut/testSplit1_2; B = LOAD 'testIn/B' AS (b1:int,b2:int); DUMP B; /* get (2,4) (8,9) (1,3) */ SPLIT B INTO X IF b1 < 3, Y IF b1 > 7; STORE X INTO 'testOut/testSplit1_1' USING org.apache.pig.impl.io.avro.AvroStorage( 'index', '1', 'schema', ' {"type":"record","name":"X", "fields":[{"name":"b1","type":"int"}, {"name":"b2","type":"int"}]}'); STORE Y INTO 'testOut/testSplit1_2' USING org.apache.pig.impl.io.avro.AvroStorage( '{"index": 2, "schema": {"type":"record","name":"Y", "fields": [{"name":"b1","type":"float"}, {"name":"b2","type":"float"} ] } }'); |
REGISTER avro-1.4.0.jar REGISTER json-simple-1.1.jar REGISTER piggybank.jar REGISTER jackson-core-asl-1.5.5.jar REGISTER jackson-mapper-asl-1.5.5.jar fs -rmr testOut/testPrimitive4; in = LOAD 'avro/testInput/case2/part-00000.avro' USING AvroStorage (); B = GROUP in ALL; sum = FOREACH B GENERATE COUNT(in) ; STORE sum INTO 'testOut/testPrimitive4' USING AvroStorage('schema','"long"'); |
Note if no input parameter specified, schema data of output is
{"type":"record","name":"TUPLE", "fields":[{"name":"FIELD","type":["null","long"]}]} |
REGISTER avro-1.4.0.jar REGISTER json-simple-1.1.jar REGISTER piggybank.jar REGISTER jackson-core-asl-1.5.5.jar REGISTER jackson-mapper-asl-1.5.5.jar fs -rmr testOut/testFields; in = LOAD 'inputData' USING AvroStorage (); in = FILTER in BY pageNumer > 1; out = FOREACH in GENERATE impressionDetails.id as id, impressionDetails.type as type; STORE out INTO 'testOut/testFields' USING AvroStorage ( 'data', 'inputData', 'field0', 'def:impressionDetails.id', /* use field name to look up schema*/ 'field1', 'def:ItemType'); /* use type name to look up schema */ |
Suppose inputData contains Avro data with the following schema:
{ "name" : "ImpressionSetEvent", "type" : "record", "fields" : [ { "name" : "pageNumber", "type" : [ "int", "null" ] }, { "name" : "impressionDetails", "type" : { "name" : "ImpressionDetailsRecord", "type" : "record", "fields" : [ { "name" : "id", "type" : "int" }, { "name" : "type", "type" : { "type" : "enum", "name" : "ItemType", "symbols" : [ "person", "job", "group", "company", "nus", "news", "ayn" ] } }, { "name" : "details", "type" : { "type" : "map", "values" : "string" } } ] } } ] } |
AvroStorage will construct two maps. One maps from type name to schema as
type name |
schema |
ImpressionSetEvent |
the whole schema |
ImpressionDetailsRecord |
{"type": "record", "name": ImpressionDetailsRecord","fields" : [{"name":"itemId", "type":"int"}, {"name":"itemType", "type":{"type":"enum", "name":"ItemType","symbols":["person", "job", "group", "company", "nus", "news", "ayn"]}}, {"name":"details","type":{"type":"map","values":"string" }} |
ItemType |
{"type":"enum", "name":"ItemType","symbols":["person", "job", "group", "company", "nus", "news", "ayn"]} |
The other maps from field names to schema as:
field name |
schema |
pageNumber |
["int", "null"] |
impressionDetails |
ImpressionDetailsRecord |
impressionDetails.id |
int |
impressionDetails.type |
ItemType |
impressionDetails.details |
{"type":"map","values":"string" } |
Users can specify which schema to use by field name (as in field 0) or type name (as in field 1).
REGISTER avro-1.4.0.jar REGISTER json-simple-1.1.jar REGISTER piggybank.jar REGISTER jackson-core-asl-1.5.5.jar REGISTER jackson-mapper-asl-1.5.5.jar data = LOAD 'input/part-00000.avro' USING AvroStorage (); ret = FILTER data BY value.member_id > 2000; STORE ret INTO 'output' USING AvroStorage ( 'same', 'input/part-00000.avro'); |
A = load 'complex.txt' using PigStorage('\t') as (mymap:map[chararray], mytuple:(num:int, str:chararray, dbl:double), bagofmap:{t:(m:map[chararray])}, rownum:int); describe A; store A into 'avro_complex.out' USING org.apache.pig.piggybank.storage.avro.AvroStorage(); |
This documentation was originally written by Lin Guo, and appeared at http://linkedin.jira.com/wiki/display/HTOOLS/AvroStorage+-+Pig+support+for+Avro+data