Error handling in Pig scripts

The current behavior of Pig when handling exceptions thrown by UDFs is to fail and stop processing. We want to extend this behavior to let user have finer grain control on error handling.

Depending on the use-case there are several options users would like to have:

Jira

https://issues.apache.org/jira/browse/PIG-2620

Proposal

The proposal is to add a ONERROR keyword ("on error", following the existing naming conventions: http://pig.apache.org/docs/r0.8.0/piglatin_ref2.html#Reserved+Keywords) (we're still looking for a good keyword: ON_ERROR (_ not used so far) vs ONERROR (harder to read) vs ON ERROR (ON not a keyword so far) vs ...)

<relation> = < PIG statement...> ONERROR <optional Handler> [SPLIT INTO <relation>  ...]

Usage

DEFAULT ONERROR <error handler>;

public interface ErrorHandler<T> {

// input is not the input of the UDF, it's the tuple from the relation 
T handle(IOExcetion ioe, !EvalFunc evalFunc, Tuple input) throws IOException;

Schema outputSchema(Schema input);

// called afterwards on the client side 
void collectResult() throws IOException;

}

example

DEFAULT ONERROR Ignore(); ...

DESCRIBE A; 
A: {name: chararray, age: int, gpa: float}

-- fail it more than 1% errors 
B1 = FOREACH A GENERATE Foo(age, gpa), Bar(name) ONERROR FailOnThreshold(0.01) ;

-- custom handler that counts errors and logs on the client side
C1 = FOREACH A GENERATE Foo(age, gpa), Bar(name) ONERROR CountMyErrors() ;

-- uses default handler and SPLIT
B2 = FOREACH A GENERATE Foo(age, gpa), Bar(name) ONERROR SPLIT INTO B2_ERRORS;

-- B2_ERRORS can not really contain the input to the UDF as it would have a different schema depending on what UDF failed 
DESCRIBE B2_ERRORS; 
B2_ERRORS: {input: (name: chararray, age: int, gpa: float), udf: chararray, error:(class: chararray, message: chararray, stacktrace: chararray) }

-- example of filtering on the udf
C2 = FOREACH A GENERATE Foo(age, gpa), Bar(name) ONERROR SPLIT INTO C2_FOO_ERRORS IF udf='Foo', C2_BAR_ERRORS IF udf='Bar';

-- uses handler and SPLIT 
A3 = FOREACH A GENERATE Foo(age, gpa), Bar(name) ONERROR HandleItMyWay() SPLIT INTO A3_ERRORS;

Questions and Comments

Alan, 1/21/11:

I really like the proposal. I have a number of questions and comments:

  1. I'm confused whether you envision this working just for foreach with UDFs or for all Pig statements. The syntax definition states that it works with all Pig statements. But your opening sentence, examples, and interface indicate it only works with foreach and UDFs.
  2. I'm confused on how the split works. Can I both pass something to an error handler and split it another relation or must I choose one or the other?
  3. The split case will take a lot more effort to implement because you'll have to manage dependencies in the graph. For example:

    A = load 'bla';
    B = foreach A generate myudf(*) onerror split into B_error;
    C = load 'morebla';
    D = join C by x, B_error by x;
    Now the logical plan has to build a dependency from D to B even though B isn't in the join. That will be a lot of work in the plan and the optimizer to get right. You may want to start without this feature so you can get this out sooner.
  4. I suspect that the dominant use case for split will be to store the errors into a separate file. To make that easier you might want to provide a Store error handler so that users don't need two statements (onerror split followed by store) for this very common case.
  5. Why is ErrorHandler a generic class? What do you envision handle() returning?

  6. Assuming that the answer to 1 above is that this is for UDFs, this interface is at the wrong level. You're giving the user a chance to set the error handling at the statement level. But what if I have five UDFs in my statement and I want to handle errors for each of them differently? If you want to provide error handling for UDFs, it would be better to allow users to define the error handling in a define statement, like  define myudf org.mycompany.myudf onerror Ignore;. If you want it at a statement level, than why limit it to UDFs? It would be nice to catch things like divisions by 0, etc.

  7. How do you envision collectResult() getting back information from the tasks? There's no general purpose channel for information flow from the tasks to the client. There's counters, but in recent versions MapReduce has put fairly tight lids on how many counters per job you can have (like 50 or 100 or something like that), and Pig is already using at least half of what they give us for things like input/output records counts, etc. The other way is to stash info in HDFS and read it back.

  8. When is ErrorHandler.outputSchema() called?

  9. Why is handle() passed the EvalFunc? Do you expect it to interact with the eval func in any way other than getting its name?

  10. The way to set defaults in Pig Latin is via set, so instead of default onerror Ignore() we'll want set default_onerror = Ignore.

Julien, 1/21/11: Answers to Alan

  1. Yes it should be for all Pig statements. We need more examples here. For now I would say that it is only to handle exceptions from UDFs as exceptions caused by Pig operators should be fatal. Unless there are some cases I am not aware of; please comment if so.
  2. The Split is for saving the tuples that caused errors along with the error message in a different relation. The handler has the ability to add more information in the error tuple. Possibly this is overkill and we should simplify so that, the handler is just notified and does not produce data, while split can still save the data in a different relation.
  3. Yes: then D depends on B. I agree, we may want to have 2 different JIRAs for that. We could see your example rewritten as follows:

    DEFINE udf onError("myudf")
    A = load 'bla';
    temp_B = foreach A generate udf(*);
    SPLIT temp_B INTO B IF code = 0, B_ERROR IF code = 1;
    B = FOREACH B GENERATE result;
    B_ERROR = FOREACH B_ERROR GENERATE result;
    
    C = load 'morebla';
    D = join C by x, B_error by x;

    onError is a UDF looking like this: does not compile, for illustration purpose)

      public Object exec(Tuple tuple) throws IOException {
            try {             
               Object result = delegateUDF.exec(tuple);
               return new Tuple(0, result);
            } catch (Exception e) {
               return new Tuple(1, new Tuple(delegateUDF, e, tuple)); 
            }
        }
    // outputSchema = ( code:int, result:inputSchema)
  4. For example this? : ... ONERROR STORE INTO 'errors'

  5. That was for error handler to have the ability to add informations to the error relation (so it was similar to the EvalFunc<T> definition), but I'm having second thoughts. I propose to remove this.

  6. That's a good point. In that case the handler probably needs the name of the relation as a parameter as well. So do we have the handler in DEFINE or in a STATEMENT in general? ONERROR is like a catch block for Pig. If it is defined in a statement then it is not the same thing anymore.
  7. Yes, this is going to require more thoughts.
  8. Same answer as 5.
  9. I think Dmitriy was mentioning that could be useful if the eval func has some state.
  10. OK

Olga, 1/21/11

Glad we are discussing this! A couple of questions/comments and follow ups on discussion with Alan:

  1. Is error handler a property of a UDF, an operator, or both. If it is a property of the UDF then perhaps it can be attached to the UDF via define statement rather than to the Pig statement itself
  2. How does FailOnThreshhold works? You will still have to run the job to completion since there is no way to get continuous global count, right?

  3. How does this interact with error handling in Pig? In Pig we also have a way to generate and aggregate warnings. Should UDFs be allowed to do the same and rather than FailOnError just being able to report aggregated warning count so that users can decide what to do.

  4. Related to 3, UDFs can also choose to patch invalid records with NULL values. Currently, they have no way to communicate that back other than print warnings into stderr. Perhaps this case should also be addressed in this proposal.

Julien, 1/23/11: Answers to Olga

  1. The definition could be at multiple levels: The handle method would become:

       void handle(String relation, Exception e, EvalFunc udf, Tuple inputTuple /* of the UDF */) throw IOException; 
       void collectResult(String relation) throws IOException;
    The handling would still be per statement. The handler decides if we continue or fail and there an option to filter invalid inputs. The use of ON ERROR SPLIT would still have the same side effects. Global Level (Default): applies to all statements unless overridden. UDF level: to handle exceptions of a specific UDF. Statement level: to handle all exceptions at the statement level (including division by zero ...).
  2. Right. Possibly it fails one ore two relations later if several relations are combined in the same job.
  3. I would say that the existing aggregate warnings become the default implementation of the handler. That's a good idea.
  4. Agreed.

PigErrorHandlingInScripts (last edited 2013-03-07 01:16:53 by Aniket Mokashi)