...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
On working on several implementing connectors for FLIP-171: Async Sink we have encountered a couple of issues similar to:
: Forcing implementors to use record size trigger even if not supported.Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-29854
: lacking timeout option for Async Sink records might cause deadlock.Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-34071
Additionally the current interface for passing fatal exceptions and retrying records relies on java consumers which makes it harder to understand.While Adding adjustable trigger mechanism is addressed in FLIP-284 : Making AsyncSinkWriter Flush triggers adjustable this FLIP addresses the other issues listed
Proposed Changes
We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler
inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture
which provides the following api
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@PublicEvolving public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> { { .... private void flush() throws InterruptedException { // create batch ResultHandler<RequestEntryT> handler = new ResultHandler<> () { ScheduledFuture<?> timeout = registerRequestTimeout(sendTime) List<RequestEntryT> requestEntries = batch; AtomicBoolean closed = new AtomicBoolean(false); public void complete() { if (closed.compareAndSet(false, true)) { timeoutTimer.cancel(false); mailboxExecutor.execute( () -> completeRequest(Collections.emptyList(), batch.size(), System.currentTimeMillis()), "Mark in-flight request as completed"); } } private void timeout() { if (closed.compareAndSet(false, true)) { timeoutTimer.cancel(false); if(failOnTimeout) { mailboxExecuter.execute(...fail) } else { mailboxExecuter.execute(completeRequest(batch, sendTime)) } } } } submitRequestEntries(List<RequestEntryT> requestEntries,); } } |
Compatibility, Deprecation, and Migration Plan
To maintain backward compatibility we will provide default implementation for the new method
Code Block |
---|
protected void submitRequestEntries(List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler){ submitRequestEntries(requestEntries, (entries) -> { if (entries.isEmpty()) { resultHandler.complete(); } else { resultHandler.retryForEntries(entries); } }); } |
Default Options
We will default values for failOnTimeout
and retryOnTimeout
to false to maintain a backward compatible behaviour, We will leave the default timeout value for sink implementers to handle.
Test Plan
We will introduce unit tests to the existing AsyncSink
.
Rejected Alternatives
NA