This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

FLINK-20217 - Getting issue details... STATUS

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently many operators when processing watermarks, are firing multiple timers each producing potentially multiple output records. If this happens under back-pressure, it can quite easily block the subtask thread in the hard back-pressure state for an extended period of time blocking mailbox from processing any incoming mails, like processing checkpoint barriers and completing unaligned checkpoints.

To address this issue, we would like to propose to make firing multiple timers interruptible in such a way, that Flink would be able to interrupt firing timers, perform checkpoint and return back to firing timers.

Proposed Changes

Currently when AbstractStreamOperator or AbstractStreamOperatorV2 processes a watermark, it’s calling InternalTimerServiceImpl#advanceWatermark, where it (simplified pseudo code):

public void advanceWatermark(long time) {
  while ((timer = eventTimeTimersQueue.peek()) != null
                && timer.getTimestamp() <= time) {
    eventTimeTimersQueue.poll();
    triggerTarget.onEventTime(timer);
  }
}

This method returns only when all timers are fired (which can take long time as described in the motivation).

We propose to add a check after firing a timer, if firing should be interrupted:

@PublicEvolving
public interface MailboxExecutor {
  /**
   * Return if operator/function should interrupt a longer computation and return from the
   * currently processed elemenent/watermark, for example in order to let Flink perform a
   * checkpoint.
   *
   * @return whether operator/function should interrupt its computation.
   */
  boolean isInterrupted();
}

If MailboxExecutor#isInterrupted() check returns true while firing timers, the code would:

  1. Enqueue a continuation mail in the mailbox (at the end of the mailbox queue), to continue firing timers

  2. Return from the call stack #processWatermark call stack

This would allow StreamTask to perform checkpoint, and once this is done, execute the continuation mail and resume firing timers.

// Pseudocode - the actual implementation would be different
// This only highlights the concept.
public boolean tryAdvanceWatermark(long time)
        throws Exception {
    InternalTimer<K, N> timer;
    while ((timer = eventTimeTimersQueue.peek()) != null
            && timer.getTimestamp() <= time) {
        eventTimeTimersQueue.poll();
        triggerTarget.onEventTime(timer);
        if (shouldIntterupt()) {
            sheduleFiringTimersContinuation(time);
            return false;
        }
    }
    return true;
}

Continuation mail

Continuation mail can be as simple as effectively calling tryAdvanceWatermark(time) again, with the same value for the argument as the original call that we want to interrupt.

Recovery

In case of a recovery, continuation mail to fire remaining timers will not be persisted, but the underlying timers queue (eventTimeTimersQueue), with the remaining not-yet-fired-timers is persisted. Those timers would be then fired after recovery once a new watermark arrives.

This can delay firing some timers, as it is not guaranteed when the following watermark arrives. However this is not a breaking issue, as that wouldn’t violate any existing guarantees. Also the same issue of delaying firing timers until next watermark is emitted, can already happen for unaligned checkpoints.

Semantic change

There will be a slight semantic change when it comes to the invocation of #processWatermark(...) method for operators. Without this FLIP, that call is atomic, while with this FLIP it won’t be atomic anymore. When #processWatermark(...) method will be called, only some of the timers might be fired and code would return from this method. If some operator overrides #processWatermark(...) and assumes that after calling super.processWatermark(...) all timers have fired, that assumption would be broken by this FLIP. Due to this reason, we propose to add to AbstractStreamOperator and AbstractStreamOperatorV2 (where InternalTimerServiceImpl is being used, and where interrupting of timers will be integrated) the following method:

    /**
     * Can be overridden to disable splittable timers for this particular operator even if config
     * option is enabled. By default, splittable timers are disabled.
     *
     * @return {@code true} if splittable timers should be used. {@code false} if
     *     splittable timers should never be used.
     */
    @Internal
    public boolean useSplittableTimers() {
        return false;
    }

Despite that, I’ve looked through some of the existing operators, and I think it’s safe to enable this feature for at least TableStreamOperator and CepOperator, and I haven’t found an example that would actually brake with this change.

Issue with mails and checkpoint barriers priorities

This feature will be impacted by the already pre-existing issue: https://issues.apache.org/jira/browse/FLINK-35051 . The fix for FLINK-35051 will have to be made compatible with implementation of MailboxExecutor#isInterrupted.

Without FLINK-35051 being fixed, there might be a couple smaller issues:

  • Firing timers can be interrupted in favour of other non priority mails, like AsyncWaitOperator return results. This is not ideal, as it adds a small overhead of re-enqueuing mail without any benefits.

  • If some other mailbox action other than UC preempts firing timers, while at the same time the mail to handle UC barriers is enqueued, it can also happen that mail to fire timers will be executed first. However if that happens, firing timers will be again interrupted as soon as mail to handle UC barriers will be enqueued. The reaction time to execute UC checkpoint will be slightly larger then it could be, but still better compared to the current status quo.

Public Interfaces

Apart of the before mentioned boolean MailboxExecutor#shouldInterrupt() the following things would be added to the public API.

StreamOperatorParameters

StreamOperatorParameters would gain a new accessor

StreamOperatorParameters#getMailboxExecutor

To allow every AbstractStreamOperatorV2 access to the MailboxExecutor.

YieldingOperator

/**
 * A V1 operator that needs access to the {@link MailboxExecutor} should implement this interface.
 * Note, this interface is not needed when using {@link StreamOperatorFactory} or {@link
 * AbstractStreamOperatorV2} as those have access to the {@link MailboxExecutor} via {@link
 * StreamOperatorParameters#getMailboxExecutor()}
 */
@Internal
public interface YieldingOperator<OUT> extends StreamOperator<OUT> {
    void setMailboxExecutor(MailboxExecutor mailboxExecutor);
}

Would be added to allow AbstractStreamOperator s an easy access to the MailboxExecutor, without having to modify large chunk of codebase. Without YieldingOperator, we would have to either modify every’s operator #setup() method signature to add MailboxExecutor there.

Configuration

ExecutionCheckpointingOptions would gain a new config value:

@Experimental
public static final ConfigOption<Boolean> ENABLE_UNALIGNED_SPLITTABLE_TIMERS =
        ConfigOptions.key("execution.checkpointing.unaligned.splittable-timers.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription(
                        "Allows unaligned checkpoints to skip timers that are currently being fired.");

while CheckpointConfig would gain:

@Experimental
public void enableUnalignedCheckpointsSplittableTimers(boolean enabled) {
    configuration.set(
            ExecutionCheckpointingOptions.ENABLE_UNALIGNED_SPLITTABLE_TIMERS, enabled);
}

Future work

Some concepts from this FLIP could be used in other places. MailboxExecutor#isInterrupted() could be used by other operators to speed up returning execution control back to the StreamTask.

For example when AsyncWaitOperator is backpressured and lacks buffer capacity to process a new element, it could use #isInterrupted() signal to implement a buffer overdraft feature (as discussed https://issues.apache.org/jira/browse/FLINK-34704?focusedCommentId=17836106&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17836106 ) - temporarily exceed allowed buffer size to expedite checkpoint.

Other operators (like joins, aggregations, CepOperator), might be able to use #isInterrupted() and enqueuing mailbox continuation mail pattern, to interrupt their own long computations, as long as they would be able to support checkpointing while they are being interrupted (for example join operator remembering the currently processed record and index in the build table for which emitting more output records should be resumed after checkpoint/recovery).

Compatibility, Deprecation, and Migration Plan

The feature will be fully backward and forward compatible and there is no need for any migration plan. This feature can be enabled and disabled at any point of time.

YieldingOperatorFactory

YieldingOperatorFactory would become deprecated in favour of StreamOperatorParameters#getMailboxExecutor.

Rejected Alternatives

None.

Credits

Special thanks to David Moravek for the initial implementation of the PoC.