Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

On working on several implementing connectors for FLIP-171: Async Sink we have encountered a couple of issues similar to:

  • Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyFLINK-29854
    : Forcing implementors to use record size trigger even if not supported.
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyFLINK-34071
    : lacking timeout option for Async Sink records might cause deadlock.

Additionally the current interface for passing fatal exceptions and retrying records relies on java consumers which makes it harder to understand.While Adding adjustable trigger mechanism is addressed in FLIP-284 : Making AsyncSinkWriter Flush triggers adjustable this FLIP addresses the other issues listed

Proposed Changes

We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture which provides the following api

...

Code Block
languagejava
firstline1
titleAsyncSink code chagnes
@PublicEvolving 
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> { {
    ....                   
    private void flush() throws InterruptedException  {
       // create batch
	   ResultHandler<RequestEntryT> handler = new ResultHandler<> () {
        	ScheduledFuture<?> timeout = registerRequestTimeout(sendTime) 
            List<RequestEntryT> requestEntries = batch;        
            AtomicBoolean closed = new AtomicBoolean(false);
            public void complete() {
            if (closed.compareAndSet(false, true)) {
                timeoutTimer.cancel(false);
                mailboxExecutor.execute(
                        () -> completeRequest(Collections.emptyList(), batch.size(), System.currentTimeMillis()),
                        "Mark in-flight request as completed");
            }
        }
        private void timeout() {
            if (closed.compareAndSet(false, true)) {
                timeoutTimer.cancel(false);
                if(failOnTimeout) {
                   mailboxExecuter.execute(...fail)
                } else {
				   mailboxExecuter.execute(completeRequest(batch, sendTime))
				}

            }
        }
       }
       submitRequestEntries(List<RequestEntryT> requestEntries,);
       
    }
}



Compatibility, Deprecation, and Migration Plan

To maintain backward compatibility we will provide default implementation for the new method

Code Block
protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 ResultHandler<RequestEntryT> resultHandler){
        submitRequestEntries(requestEntries, (entries) -> {
            if (entries.isEmpty()) {
                resultHandler.complete();
            } else {
                resultHandler.retryForEntries(entries);
            }
        });
    }


Default Options

We will default values for failOnTimeout  and retryOnTimeout  to false to maintain a backward compatible behaviour, We will leave the default timeout value for sink implementers to handle.

Test Plan

We will introduce unit tests to the existing AsyncSink .

Rejected Alternatives

NA