Discussion thread
Vote thread
JIRA


Release2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This is a sub-FLIP for the disaggregated state management and its related work, please read the FLIP-423 first to know the whole story.

Motivation

FLIP-423[1] introduced the disaggregated state management and the FLIP-425[2] introduced the new execution model of asynchronous state access in an event-driven way. This model has the potential to significantly boost performance by leveraging parallel I/O operations. However, it does lead to increased draining times during checkpoints, presenting a trade-off between system throughput and checkpoint synchronization delay. This balance can be calibrated through adjusting the buffer size. As a follow-up FLIP for FLIP-425, this FLIP proposes a faster way of checkpoint by snapshot state requests that are waiting in the buffer of "Asynchronous Execution Controller (AEC)" as part of the checkpoint. By this approach, we expect only a great optimization for the draining time overhead compared with the original plan in FLIP-425, especially under a high back-pressure scenario. To achieve the snapshot of state requests, the callbacks from user should be persisted across job attempts. This FLIP introduces a novel approach for declaring element processing where all callbacks are re-declared and bound to the corresponding previous state requests during the operator's initialization phase. This ensures that the entire pipeline can be accurately restored and operations can resume smoothly after a job restart.

Public Interfaces

There is no change on Public APIs. Some new interfaces will be added to declare async state processing on operators, which are only for internal usage (e.g. SQL operator developing).

Proposed Changes

The core change lies in how to do checkpoint and restore on state requests. We cannot checkpoint the requests that are already in progress, otherwise there will be inconsistency on state. The requests stay in the buffer of AEC could be safely snapshotted. A state request consists of five following parts:

  • The target state
  • Request description: the request type and input
  • The stream record and corresponding key group
  • The callback of this request
  • Local variables for processing each record

Serializing the request description and the stream record is a straightforward task. The state itself can easily be retrieved in the new job by its name, so only the name should be serialized and stored in checkpoint. For the callback from user, we do NOT plan to serialize the anonymous callback class or lambda itself, as this may bring in more problems (as described in previous discussion[3]):

  • Not compatible across different JREs
  • Problems when updating dependencies of the code inside the lambdas, if the updated dependencies are not binary compatible.
  • No way to fix bugs inside the lambdas - users might get stuck in an unrecoverable state

Consequently, we propose that users redeclare their lambda expressions upon each job restart and re-associate them with the requests from previous checkpoints. When redeclaring a lambda, users would provide a unique name or identifier for it; the checkpoint snapshot would record this identifier, which would then be used to match with the newly declared lambda in the new job.

Also, the local variables for each record need a serializer to perform checkpoint, we propose a way to declare them as well.

Internal Interfaces for Process Declaration

To enable users declare the lambdas before the task run, this FLIP proposes to introduce a declareProcess alongside the original processElement, if any Input or Operator wants to declare the callbacks upfront, it should implements the corresponding interface. If so, the processElement will not take effect and be overridden by the result of declareProcess. The Interface definition:

interface for one input
 /** Input that can declare the processing by a predefined function. */
@Internal
public interface DeclarativeProcessingInput<IN> extends Input<IN> {

/**
* A hook for declaring the process in {@code processElement}. If subclass wants to define its
* {@code processElement} in declarative way, it should implement this class. If so, the {@code
* processElement} will not take effect and instead, the return value of this method will become
* the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN>, Exception> declareProcess(DeclarationContext context) throws DeclarationException;

}
Interface for operator with two inputs
/**
* Operator with two inputs that can declare the processing of each input by a predefined function.
*/
@Internal
public interface DeclarativeProcessingTwoInputOperator<IN1, IN2, OUT> extends TwoInputStreamOperator<IN1, IN2, OUT> {

/**
* A hook for declaring the process in {@code processElement1}. If subclass wants to define its
* {@code processElement1} in declarative way, it should implement this class. If so, the {@code
* processElement1} will not take effect and instead, the return value of this method will
* become the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN1>, Exception> declareProcess1(DeclarationContext context) throws DeclarationException;

/**
* A hook for declaring the process in {@code processElement2}. If subclass wants to define its
* {@code processElement2} in declarative way, it should implement this class. If so, the {@code
* processElement2} will not take effect and instead, the return value of this method will
* become the processing logic. This method will be called after {@code open()} of operator.
*
* @param context the context that provides useful methods to define named callbacks.
* @return the whole processing logic just like {@code processElement}.
*/
ThrowingConsumer<StreamRecord<IN2>, Exception> declareProcess2(DeclarationContext context) throws DeclarationException;
}

The return value of those interfaces match the type of processElement

DeclarationContext

The new introduced DeclarationContext is the entry for users to declare processing. It provide the following methods (Implementation omitted):

DeclarationContext
public class DeclarationContext {

    // ------------- Declaring Callback part ----------------

    /** Declare a callback with a name. */
    public <T> NamedConsumer<T> declare(
            String name, ThrowingConsumer<T, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with a name. */
    public <T, V> NamedFunction<T, V> declare(
            String name, FunctionWithException<T, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with a name. */
    public <T, U, V> NamedBiFunction<T, U, V> declare(
            String name, BiFunctionWithException<T, U, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T> NamedConsumer<T> declare(ThrowingConsumer<T, ? extends Exception> callback)
            throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T, V> NamedFunction<T, V> declare(
            FunctionWithException<T, V, ? extends Exception> callback) throws DeclarationException {
    }

    /** Declare a callback with an automatically assigned name. */
    public <T, U, V> NamedBiFunction<T, U, V> declare(
            BiFunctionWithException<T, U, V, ? extends Exception> callback)
            throws DeclarationException {
    }

    // ----------- End of Declaring Callback part ------------

    /**
     * Declaring a processing chain.
     * @param first the first code block
     * @return the chain itself.
     * @param <IN> the in type of the first block
     * @param <T> the out type of the state future given by the first block
     */
    public <IN, T> DeclarationChain<IN, T>.DeclarationStage<T> declareChain(
            FunctionWithException<IN, StateFuture<T>, Exception> first)
            throws DeclarationException {
    }

    /**
     * Declare a variable used across the callbacks.
     * @param type the type information of the variable
     * @param name the unique name of this variable
     * @param initialValue the initial value when the variable created.
     * @return the variable itself that can used by lambdas.
     * @param <T> the variable type.
     */
    public <T> DeclaredVariable<T> declareVariable(TypeInformation<T> type, String name, Supplier<T> initialValue) throws DeclarationException {
    }
}

A simple PoC (https://github.com/apache/flink/pull/24719) has been implemented for DeclarationContext as well as the APIs.

Declare Callbacks

And example usage when an operator extends and implements the declareProcess :

Example operator
// Under customized operator class, the INPUT = Tuple2<Integer, String>>  @Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {

    NamedFunction<Integer, StateFuture<Integer>> adder =
            context.declare(
                    "counter",    // can be omitted with an auto-assigned name
                    (v) -> {
                        int updated = v == null ? 1 : (v + 1);
                        return FutureUtils.wrapWithAnotherResult(state.asyncUpdate(updated), updated);
                    });
    NamedConsumer<Integer> output =
            context.declare(
                    "output",     // can be omitted with an auto-assigned name
                    (v) -> {
                        context.getCollector().collect(v);
                    });
    return (e) -> {
        state.asyncGet().thenCompose(adder).thenAccept(output);
    };
}

Or the new introduced DeclarationContext will provide a chain-style of way to declare a series of chained `thenCompose` and `thenAccept`, which is the most common use case for async processing.

Chain-style declaration
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {

    return context.<StreamRecord<Tuple2<Integer, String>>, Integer>declareChain(e -> state.asyncGet())
            .thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state.asyncUpdate(updated), updated);
            })
            .withName("counter")   // can be omitted with an auto-assigned name
            .thenAccept(v -> context.getCollector().collect(v))
            .withName("output")   // can be omitted with an auto-assigned name
            .finish();
}     

The declared chain can also used to define a parallel processing by assembling chains in the final consumer:

Chain-style declaration with parallel processing
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {
    ThrowingConsumer<Integer, Exception> chain1 = context.<Integer, Integer>declareChain(c -> {
                return state1.asyncGet();
            }).thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state1.asyncUpdate(updated), updated);
            })
            .withName("adder1")
            .thenAccept(v -> context.getCollector().collect(v))
            .withName("collector")
            .finish();

    ThrowingConsumer<Integer, Exception> chain2 = context.<Integer, Integer>declareChain(c -> {
                return state2.asyncGet();
            }).thenCompose(v -> {
                int updated = v == null ? 1 : (v + 1);
                return FutureUtils.wrapWithAnotherResult(state2.asyncUpdate(updated), updated);
            })
            .withName("adder2")
            .finish();

    return (e) -> {
        chain1.accept(e.getValue().f0);
        chain2.accept(e.getValue().f0);
    };
}

Declare Variables

There might be some local variables under processElement which can be accessed across the processing procedure. In async state processing, it could be accessed by multiple callbacks. While the state requests can be materialized in checkpoint, those variables should also be persisted. The DeclarationContext provides a method named declareVariable which allows for declaring a variable as part of the RecordContext. The declared variables could be safely accessed in callback lambdas, and could be persisted in checkpoint for each in-flight record.

An example usage:

Declare Variables Example
@Override
public ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> declareProcess(DeclarationContext context) throws DeclarationException {
    DeclaredVariable<Integer> updated = context.declareVariable(Types.INT, "updated", () -> 0);
    return context.<StreamRecord<Tuple2<Integer, String>>, Integer>declareChain(
                    e -> {
                        return state.asyncGet();
                    })
            .thenCompose(v -> {
                updated.set(v == null ? 1 : (v + 1));
                return state.asyncUpdate(updated.get());
            })
            .withName("adder")
            .thenAccept(v -> context.getCollector().collect(updated.get()))
            .withName("collector")
            .finish();
}

Note: Only the variables accessed across the callbacks should be declared. The local variables under a single callback lambda need not to be declared.

Limitations

The APIs introduced above lacks some flexibility compared to the original ones. To enable the ability of checkpoint the in-flight requests, the users (SQL developers) should:

  • Try to use named callbacks everywhere. —— If there are some anonymous callbacks, the checkpoint will not proceed until all anonymous callbacks and corresponding requests finish. See next section for more details.
  • Only use declared variables. —— If other local variables captured in any named callbacks, the value of those might be lost after recovery, which leads to a correctness issue.

Checkpoint the in-flight state requests

Not every state request within the AEC is suitable for serialization and inclusion in a checkpoint. A request must fulfill the following criteria to be eligible for checkpoint inclusion:

  1. The request should not be in progress when the checkpoint is triggered.
  2. The request's callback must be a named one.
  3. The request's callback should depend on a single request rather than being associated with multiple requests (for example, not designed for operations like thenCombine() or StateFutureUtils.combineAll() or any iteration).

The rationale for condition 3 arises from the complexity involved in tracking dependencies, orchestrating the linkage of several requests to a single callback instance during recovery, and managing partially completed results of multiple requests. Upon triggering a checkpoint, the AEC will stop firing any requests that satisfy conditions 2 and 3. All existing requests that do not meet these conditions will continue to be processed and drained before the checkpoint is executed. Figure 1 illustrates the overview of the AEC and state requests at the moment of checkpoint arrival.

Fig.1 the overview of AEC and requests at the time of checkpoint arrival

Compared with original plan of draining all existing state requests in AEC, if all the user's callbacks are named ones, current plan will hold most of requests in checkpoint, which greatly accelerate the draining process.

Configuration

We plan to make the state request checkpoint as a part of unaligned checkpoint, as they are similar in semantics of checkpointing in-flight data. When unaligned checkpoint enabled, the in-flight state requests can be included in checkpoint, while otherwise, the checkpoint proceed in original way (drain before snapshot). Ideally most of the SQL operators with async state processing will be on top of the declarative APIs, which provides the possibility of including state requests in unaligned checkpoints.

Whether to introduce a separate option apart from unaligned checkpoint or enable this by default could be discussed later if we meet the concrete users' need of doing so.

Trade-off

As a price for speeding up checkpointing, the size of checkpoint would be enlarged more or less. It depends on the amount of in-flight requests and the average length of users' record as well as the registered variables. We expect this to be the similar size as the unaligned checkpoint, which is acceptable in most production env.

Recovery & Rescaling

The checkpoint of state requests is organized in key-group, for the convenience of rescaling. During recovery, all the related state requests will be restored into the AEC. After the declaration of named callbacks, they will be re-associated with the requests in AEC. Afterwards the whole pipeline will proceed.

Compatibility, Deprecation, and Migration Plan

Since the async execution model (FLIP-425) is not released yet, no compatibility issues need to be considered.

The processElement still takes effect when user does not implement the declareProcess. User could restore their job written by declareProcess from the checkpoint of processElement version, but not the other way around.

Reject Alternatives

  • Serialize the anonymous callback class or lambda itself in checkpoint.


[1] https://cwiki.apache.org/confluence/x/R4p3EQ

[2] https://cwiki.apache.org/confluence/x/S4p3EQ

[3] https://lists.apache.org/thread/4gxj5wb7tpjd8bhfsx8q8t6k09x25fgn




  • No labels