Adding User Specified Ordering to Pig


This document proposes changing pig to allow users to provide and specify a function for doing ordering in an ORDER BY statement. In cases where users have data that is not scalar or needs to be ordered in a special way this will allow them to accomplish that ordering in pig. In the short term it will also allow users to do numeric and descending ordering.

Syntax Changes

An class that contanis an ordering function provided by the user will need to be part of a jar that is registered with pig using REGISTER in the same way that evaluation functions currently are. The ORDER BY clause in pig will then change to allow a user to specify the class containing the function (not the function itself):

ORDER BY keys [ USING class ];

Where the class is the full package and class name. For example:

a = LOAD 'myfile' as name, address, zipcode;
b = ORDER a BY zipcode USING com.mycompany.myproject.MyOrderByClass;

Specification for the Ordering Class

The ordering class provided by the user will need to implement the interface java.util.Comparator for Tuple.

class MyOrderByClass extends ComparisonFunc {

public int compare(Tuple t1, Tuple t2) { ... }



Example 1

This code implements string-based comparator to produce the data in the descending order.

package org.apache.pig.test;

import org.apache.pig.ComparisonFunc;

public class OrdDesc extends ComparisonFunc {
    // this is a simple example - more complex comparison will require
    //   breakout of the individual values. I suggest you'll have
    //   to convert "catch(IOException e) to RuntimeException('msg', e)"
    public int compare(Tuple t1, Tuple t2) {
        return t2.compareTo(t1);

Example 2

This code implements numeric comparator to produce the data in the descending order. Note that you need a recursive compare to account for arbitrary complex tuples.

package org.apache.pig.test;


import org.apache.pig.ComparisonFunc;

public class OrdDescNumeric extends ComparisonFunc {
    public int compare(Tuple t1, Tuple t2) {
        try {
            for (int i = 0; i < t1.arity(); i++) {
                Datum d1 = t1.getField(i);
                Datum d2 = t2.getField(i);
                int comp;
                if (d1 instanceof DataAtom) {
                    comp = compare((DataAtom)d1, (DataAtom)d2);
                } else {
                    comp = compare((Tuple)d1, (Tuple)d2);
                if (comp != 0) {
                    return comp;
            return 0;
        } catch (IOException e) {
            throw new RuntimeException("Error comparing keys in OrdDEscNumeric", e);

    private int compare(DataAtom a1, DataAtom a2) throws IOException {
        double num1 = a1.numval();
        double num2 = a2.numval();
        if (num2 > num1) {
            return 1;
        } else if (num2 < num1) {
            return -1;
        return 0;


The comparator provided by the user will be applied to the whole ORDER BY key. This was chosen for two reasons:

  1. Providing a separate comparator for every element in the key would make the syntax burdensome.
  2. It is not planned that this mechanism be used to provide comparators for user defined types. Those should be provided as part of the type instead. This lessens the need for per key comparators.

Logical and Physical Plan Changes

When an ORDER BY clause is encountered in a query a ProjectSpec is created and passed to the SortDistinctSpec that controls how a sort is done. ProjectSpec is a subclass of EvalSpec. EvalSpec includes a function getComparator that returns a java.util.Comparator object. Currently EvalSpec hard wires the return function for this.

EvalSpec will be changed to include a new member Comparator<Tuple> comparator and a new method void setComparator(Comparator<Tuple> comparator). The existing anonymous class currently returned by EvalSpec.getComparator() will be retained, and will be the default value for EvalSpec.comparator. If the user specifies a comparator in the ORDER BY clause, then the parser will call EvalSpec.setComparator on the ProjectSpec for the associated SortDistinctSpec.

These changes will handle nested sorts (that is, those inside a foreach).

For top level sorts, a few more changes need to be made:

First, SortPartitioner, which is used to determine which partition a key is in, needs to change to use the designated comparator rather than using the default. In the function getPartition it calls Arrays.binarySearch(Object, Object) This needs to be changed to use the templated version that accepts a comparator, Arrays.binarySearch<T>(T, T, Comparator<T>). The information of which comparator to use will be stored in the JobConf passed to configure by calling getOutputKeyComparatorClass.

A member will need to be added to POMapReduce, userComparator of type Comparator.

In MapReduceLauncher.launchPig, a check needs to be made to see if POMapReduce.userComparator is not null. If so, then in addition to calling JobConf.setOutputKeyClass it will also need to call JobConf.setOutputKeyComparatorClass and provide the user specified comparator (NOTE: Need to check with hadoop people and make sure I have this right.)

MapreducePlanCompiler.getQuantileJob will need to change to set userComparator for the POMapReduce object it constructs for the quantiles job (currently named quantileJob). The method getSortJob will need to change in the same way, setting userComparator for the sortJob POMapReduce object. Both of these methods can obtain the proper comparator by calling getSortSpec().getComparator() on the passed in loSort argument. These functions will need to be careful to check whether getComparator is returning the default comparator or a user provided one rather than always taking the result of this and setting userComparator.

Future Considerations

In the future, it is proposed that ORDER BY will allow ascending and descending order per key, in a SQL like manner (see ProposedRoadMap). This will not work well with the fact that this user provided comparator works on the entire key. If ascending/descending is added in the future, the parser should also be modified to make it an error to use these key words with a user provided comparator.

UserDefinedOrdering (last edited 2009-09-20 23:38:10 by localhost)