Reporting in Pig

Hadoop has a notion that jobs that do not report progress might be stuck and will kill it after a timeout. To adopt this into Pig Map Reduce, where we exectue operator plans, we need to integrate this notion of reporting into Pig Operators. For supporting this, we want to propose a reporter interface that any backend in pig can use. The main method required by any reporter as of now is to report progress probably with a status msg. So the following should be enough for now. We can add other things on a need basis.

public interface PigProgressable {
    //Use to just inform that you are
    //alive
    public void progress();
    
    //If you have a status to report
    public void progress(String msg);
}

Changes to current code

In the mapReduceLayer, we have to implement this interface with a class that wraps the Hadoop reporter. To minimize changes to the code, I am planning to have a static variable in PhysicalOperator class which will be set by the map & reduce functions at the beginning. Also, the processInput method would call PhysicalOperator.reporter.progress() so that all methods which use processInput method need not worry about reporting. Only operators that have a special processing model becuase of multiple inputs would have to do changes adding a call to the progress method of this static variable as soon as it starts processing a tuple. For ex., in POUnion, the current code would be changed to:

Current:

while(true){
            if (done.nextClearBit(0) >= inputs.size()) {
                res = new Result();
                res.returnStatus = POStatus.STATUS_EOP;
                clearDone();
                return res;
            }
...

Changed to:

while(true){
            PhysicalOperator.reporter.progress();
            if (done.nextClearBit(0) >= inputs.size()) {
                res = new Result();
                res.returnStatus = POStatus.STATUS_EOP;
                clearDone();
                return res;
            }
...