Reporting in Pig
Hadoop has a notion that jobs that do not report progress might be stuck and will kill it after a timeout. To adopt this into Pig Map Reduce, where we exectue operator plans, we need to integrate this notion of reporting into Pig Operators. For supporting this, we want to propose a reporter interface that any backend in pig can use. The main method required by any reporter as of now is to report progress probably with a status msg. So the following should be enough for now. We can add other things on a need basis.
public interface PigProgressable {
//Use to just inform that you are
//alive
public void progress();
//If you have a status to report
public void progress(String msg);
}
Changes to current code
In the mapReduceLayer, we have to implement this interface with a class that wraps the Hadoop reporter. To minimize changes to the code, I am planning to have a static variable in PhysicalOperator class which will be set by the map & reduce functions at the beginning. Also, the processInput method would call PhysicalOperator.reporter.progress() so that all methods which use processInput method need not worry about reporting. Only operators that have a special processing model becuase of multiple inputs would have to do changes adding a call to the progress method of this static variable as soon as it starts processing a tuple. For ex., in POUnion, the current code would be changed to:
Current:
while(true){
if (done.nextClearBit(0) >= inputs.size()) {
res = new Result();
res.returnStatus = POStatus.STATUS_EOP;
clearDone();
return res;
}
...Changed to:
while(true){
PhysicalOperator.reporter.progress();
if (done.nextClearBit(0) >= inputs.size()) {
res = new Result();
res.returnStatus = POStatus.STATUS_EOP;
clearDone();
return res;
}
...