Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As main memory grows, query performance is more and more determined by the raw CPU costs of query processing itself, this is due to the query processing techniques based on interpreted execution shows poor performance on modern CPUs due to lack of locality and frequent instruction mis-prediction. Therefore, the industry is also researching how to improve engine performance by increasing operator execution efficiency. In addition, during the process of optimizing Flink's performance for TPC-DS queries, we found that a significant amount of CPU time was spent on virtual function calls, framework collector calls, and invalid calculations, which can be optimized to improve the overall engine performance. To address these issues, we found that operator execution efficiency can be optimized in the following ways:

  • Reduce virtual function calls 
  • Keep data in registers as much as possible to avoid materializing it in memory
  • Generate optimal code for queries to reduce instruction cache miss
  • Use modern compilers to automatically unroll loops, reducing loop iteration and CPU instruction jump counts
  • Lazy computing

After researching these requirements, we found that an operator fusion code generator technology can achieve these goals, which is proposed by Thomas Neumann in the paper[1]. After comparing the performance benefits of vectorization and codegen[2][3], as well as considering the high development costs of vectorization, we decided to introduce Operator Fusion Codegen in Flink. Here we refer to it as OFCG. We have finished a PoC and got a 12% gain overall when only supporting only Calc&HashJoin&HashAgg operator. In some queries, we even get more than 30% gain.

                                           

Public Interfaces

OFCG does not involve any changes to the public interface. However, as it is a significant new feature that is currently experimental, it requires iterative development over multiple versions to improve continuously. Therefore, we need to introduce the options `table.exec.operator-fusion-codegen.enabled` to enable this feature. Currently, this option is disabled by default. As the feature gradually improves, we plan to enable it by default after two or three versions.

Proposed Changes

Outline Design

The following image illustrates how sql text is converted to a Transformation DAG.

RelNode is a relational operator defined in Apache Calcite. A relational expression is represented by a tree of RelNode, which can be optimized by the planner.

ExecNode is an execution node defined in the planner. Currently, each physical RelNode corresponds to an ExecNode, and an ExecNode will be translated to a Tranformation. 

The part included in the blue dashed box in the image is relevant to our design, which means that the work of the OFCG  mainly focuses on the transformation from ExecNode  to Transformation .

Workflow of OFCG

Drawing on the background information provided and the capabilities of Flink SQL, we have designed the overall workflow of OFCG by combining the produce-consume interface proposed in the paper [1] with the structure of Flink engine's runtime operators. The flowchart is shown below:

  1. Traverse the ExecNode DAG and create a FusionExecNode  for physical operators that can be fused together.

  2. Traverse each member operator in every FusionExecNode  to determine whether they support code generation.

  3. If any member operator does not support codegen, generate a Transformation DAG based on the topological relationship of member ExecNode  and jump to step 8.

  4. If all member ExecNode  support codegen, traverse them based on their topological relationships. For each ExecNode, generate a corresponding FusionCodegenSpec object that describes the information required to perform codegen on that operator. The resulting FusionCodegenSpec objects form a DAG.

  5. Traverse the FusionCodegenSpec  DAG based on topological relationships. First, call the produce-consume interface of the process method to generate code fragments for all member operators that process data, then concatenate them together.

  6. Next, call the produce-consume interface of the endInput to generate code fragments for all member operator's endInput method, then also concatenate them.

  7. Assemble the generated process and endInput-related code fragments into a fused operator.

  8. Generate a FusionTransformation, setting the parallelism and managed memory for the fused operator.

  9. Generate the JobGraph .

When performing operator fusion on ExecNode, there are two operator fusion mechanisms to consider. The first mechanism is proposed in [4], which supports fusing multiple operators with multiple inputs into one, known as BatchExecMultipleInput. This is an existing design and we can reuse its capability. The second mechanism is fusing multiple operators with only one input that can be fused together. Regarding this mechanism, we only perform OFCG on the ExecNode layer if all the operators that can be fused together support codegen. Otherwise, we delegate the fusion to the runtime's OperatorChain mechanism because there is no benefit to fusing non-codegen operators together at the ExecNode layer.

It is important to note that for the mechanism proposed in [4], if there are operators within BatchExecMultipleInput that do not support codegen, we will abandon OFCG and fallback to the default implementation. There are two main reasons for this decision. Firstly, if we were to perform OFCG on only a subset of ExecNode that support codegen, this would make the design and implementation of BatchExecMultipleInput  more complex. Secondly, if there are only a few codegen-supported operators within the BatchExecMultipleInput, the benefits of performing OFCG are negligible. Taking into account the complexity and benefits, we have decided not to perform codegen altogether. Instead, we will put more effort into making more operators support OFCG.

Flink OFCG produce-consume 

Since Flink is a unified batch-stream compute engine, the design of the runtime operator structure is more complex due to the fact that batch jobs will eventually end. Currently, Flink's runtime operators mainly have two core methods, processElement and endInput. The processElement  method is responsible for the core computation logic of the operator, and is required in both streaming and batch mode. However, in batch mode, data is bounded, so the job will eventually end. Therefore, for batch jobs, an additional endInput method is needed to perform some cleanup work after all input data has been received, such as the HashAgg operator flushing data only after receiving all the data, and the build side of HashJoin completing the build of the hash table. Considering the needs of both scenarios, the produce-consume interface proposed in [1] cannot directly meet our case. We need two sets of produce-consume interfaces, respectively, the produce-consume interface corresponding to processElement , which is used to generate the code related to the processing of data by all operators. The other is the produce-consume interface corresponding to endInput, which is used to generate the code for the other processing logic needed after the data is processed. The two produce-consume interfaces work together to generate the complete code for the OFCG.

Next, let's take an example to see how the two sets of produce-consume interfaces work with each other to complete OFCG.

The concept of pipeline breaker is proposed and defined in [1]. According to its definition, this pattern can be partitioned into four pipelines, and the yellow output operator is a virtual operator introduced for OFCG, whose role is to bridge the fusion operator with the downstream operator.

Normally, in stream mode, the job is long-running, so each operator only needs to implement the processElement method, correspondingly in the OFCG case, only the process produce-consume interface needs to be implemented. In batch mode, the job will end and the operator may need to do some cleanup work in the endInput method, such as flush data to the downstream, so in the OFCG case, both the process and endInput produce-consume interfaces need to be implemented.

In the first step, we recursively call the processProduce method from the Output node until the leaf node of the operator tree, starts generating data, and then calls the downstream consume method to generate the code fragment of each operator to process the data. After the recursive call to the HashAgg operator, since it is a blocking operator, it needs to wait until all the data is processed before it starts to send data to downstream, so it does not call the consume method of the downstream operator immediately, so the recursive call to the process method ends.

In the second step, we recursively call the endInputProduce method from the Output node until the leaf node of the operator tree, and start to call back the endInputConsume method.  If the current operator needs to send the data downstream first in the endInput method, it needs to call the processConsume method of the downstream node first to consume the data. When the data is sent, then endInputConsume of the downstream operator is called to trigger the clean action of the downstream operator. In this pattern, since HashAgg is a blocking operator, when the endInputConsume method of this operator is called recursively, it will first call the processConsume  method of the downstream Join operator to consume the data generated by this operator, and then call the endInputConsume method of the Join operator to trigger other actions. The call is recursive until the end of the Output node, which is responsible for sending the data to the downstream node.

We complete the OFCG of multiple operators through the cooperation of process and endInput produce-consume interfaces, and the overall flow is relatively brief and clear.

Other Considerations

Janino Compilation

Considering that the Java code generated by OFCG may be too long for Janino to compile successfully, we will try to compile the generated code at the end of OFCG. If the compilation fails, we need to fall back to the original way to avoid the whole query cannot be executed because of the compilation problem.

Regarding the code length of method, we reuse the code split framework which is introduced in Flink to do code spilt to avoid too long method.

Parallelism

Regarding the parallelism of OFCG fusion operator, we take the maximum of all its input operators, which is consistent with [4]. For operators that can only be executed singly parallelism, we do not fuse them into OFCG. Also for Source and Sink operators, since they do not support codegen, we do not fuse them into OFCG nodes, so they work fine even if their parallelism is set separately.

In the stream scenario, users may have the need to set the operator parallelism separately, currently, we do not support the SQL operator to set parallelism separately yet, and overall there is no problem. If we support operator granularity setting parallelism in the future, we cannot support it for this case because we can't sense operator parallelism directly at the ExecNode layer, which is a limitation for us now. If you need to set operator parallelism separately, you need to turn off the OFCG feature. Of course, we will find a way to support setting the parallelism of the operator separately if it is supported in the future.

Batch

In batch mode, HashJoin and NestedLoopJoin start to read probe side input only after the build side input is finished. In addition, HashJoin and HashAgg operator also needs to use managed memory , so OFCG for batch should deal with both "input selection" for the inputs and "managed memory".

Input Selection

Keep the same way as in [4] input selection section.

Managed Memory

If there are operators that need to use managed memory, the ratio of managed memory for multiple operators included in fused operator is calculated in the same way as in [4] managed memory section.

Streaming

In streaming mode, we need to consider the state, checkpoint, barrier after supporting OFCG.

State & Checkpoint

After support OFCG, all operators will be executed in a fused operator, the original operator became a function in it,  so there is nothing we need to consider about state initialization. Another thing we need to consider is that a fused operator may fuse some operators of the same kind with the same state names. For this problem, we need to guarantee that all state names are unique during codegen. Otherwise, there would be state sharing.

State compatibility

The state compatibility will be broken as in [4] state compatibility section.

Barrier

Keep the same way as in [4] barrier section.

Summary

From the technical implementation point of view, this design is feasible in both stream and batch scenarios, so I consider both stream and batch mode. In the stream scenario, for stateful operator, according to our business experience, basically the bottleneck is on the state access, so the optimization effect of OFCG for the stream will not be particularly obvious, so we will not give priority to support it currently. On the contrary, in the batch scenario, where CPU is the bottleneck, this optimization is gainful. Taking the above into account, this FLIP is currently scoped to batch mode, we don't support streaming operators in the short term.

Internal Interface Changes

OpFusioContext

/**
 * A OpFusionContext contains information about the context in which {@link OpFusionCodegenSpec}
 * needed to do operator fusion codegen.
 */
public interface OpFusionContext {

    /** Return the output type of current {@link OpFusionCodegenSpecGenerator}. */
    RowType getOutputType();

    /**
     * Return the managed memory fraction of this {@link OpFusionCodegenSpecGenerator} needed during
     * all fusion operators.
     */
    double getManagedMemoryFraction();

    /** Return the input {@link OpFusionContext} of this {@link OpFusionCodegenSpecGenerator}. */
    List<OpFusionContext> getInputFusionContexts();

    /**
     * Generate Java source code to process the rows from operator corresponding input, delegate to
     * {@link OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} method.
     */
    void processProduce(CodeGeneratorContext codegenCtx);

    /**
     * Generate Java source code to do clean work for operator corresponding input, delegate to
     * {@link OpFusionCodegenSpecGenerator#endInputProduce(CodeGeneratorContext)} method.
     */
    void endInputProduce(CodeGeneratorContext codegenCtx);

    default String processConsume(List<GeneratedExpression> outputVars) {
        return processConsume(outputVars, null);
    }

    /**
     * Consume the generated columns or row from current {@link OpFusionCodegenSpec}, delegate to
     * {@link OpFusionCodegenSpecGenerator#processConsume(List, String)} ()} method.
     *
     * <p>Note that `outputVars` and `row` can't both be null.
     */
    String processConsume(List<GeneratedExpression> outputVars, String row);

    /**
     * Generate Java source code to do clean work for {@link OpFusionCodegenSpec} corresponding
     * input, delegate to {@link OpFusionCodegenSpecGenerator#endInputConsume()} method.
     */
    String endInputConsume();
}


OpFusionCodegenSpec

This interface maintains the required information an operator needs to support OFCG.

OpFusionCodegenSpec
/** An interface for those physical operators that support operator fusion codegen. */
@Internal
public interface OpFusionCodegenSpec {

    /**
     * Initializes the operator spec. Sets access to the context. This method must be called before
     * doProduce and doConsume related methods.
     */
    void setup(OpFusionContext opFusionContext);

    /** Prefix used in the current operator's variable names. */
    String variablePrefix();

    /**
     * The subset of column index those should be evaluated before this operator.
     *
     * <p>We will use this to insert some code to access those columns that are actually used by
     * current operator before calling doProcessConsume().
     */
    Set<Integer> usedInputColumns(int inputId);

    /**
     * Specific inputId of current operator needed {@link RowData} type, this is used to notify the
     * upstream operator wrap the proper {@link RowData} we needed before call doProcessConsume
     * method. For example, HashJoin build side need {@link BinaryRowData}.
     */
    Class<? extends RowData> getInputRowDataClass(int inputId);

    /**
     * Every operator need one {@link CodeGeneratorContext} to store the context needed during
     * operator fusion codegen.
     */
    CodeGeneratorContext getCodeGeneratorContext();

    /** Get the {@link ExprCodeGenerator} used by this operator during operator fusion codegen, . */
    ExprCodeGenerator getExprCodeGenerator();

    /**
     * Generate the Java source code to process rows, only the leaf operator in operator DAG need to
     * generate the code which produce the row, other middle operators just call its input {@link
     * OpFusionCodegenSpecGenerator#processProduce(CodeGeneratorContext)} normally, otherwise, the
     * operator has some specific logic. The leaf operator produce row first, and then call {@link
     * OpFusionContext#processConsume(List)} method to consume row.
     *
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
     */
    void doProcessProduce(CodeGeneratorContext codegenCtx);

    /**
     * The process method is responsible for the operator data processing logic, so each operator
     * needs to implement this method to generate the code to process the row. This should only be
     * called from {@link OpFusionCodegenSpecGenerator#processConsume(List, String)}.
     *
     * <p>Note: A operator can either consume the rows as RowData (row), or a list of variables
     * (inputVars).
     *
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
     * @param inputVars field variables of current input.
     * @param row row variable of current input.
     */
    String doProcessConsume(
            int inputId, List<GeneratedExpression> inputVars, GeneratedExpression row);

    /**
     * Generate the Java source code to do operator clean work, only the leaf operator in operator
     * DAG need to generate the code, other middle operators just call its input `endInputProduce`
     * normally, otherwise, the operator has some specific logic.
     *
     * <p>The code generated by leaf operator will be saved in fusionCtx, so this method doesn't has
     * return type.
     */
    void doEndInputProduce(CodeGeneratorContext codegenCtx);

    /**
     * The endInput method is used to do clean work for operator corresponding input, such as the
     * HashAgg operator needs to flush data, and the HashJoin build side need to build hash table,
     * so each operator needs to implement the corresponding clean logic in this method.
     *
     * <p>For blocking operators such as HashAgg, the {@link OpFusionContext#processConsume(List,
     * String)} method needs to be called first to consume the data, followed by the
     * `endInputConsume` method to do the cleanup work of the downstream operators. For pipeline
     * operators such as Project, you only need to call the `endInputConsume` method.
     *
     * @param inputId This is numbered starting from 1, and `1` indicates the first input.
     */
    String doEndInputConsume(int inputId);
}


OpFusionCodegenSpecGenerator

/**
 * {@link OpFusionCodegenSpecGenerator} is used to operator fusion codegen that generate the fusion
 * code, it has multiple inputs and outputs, then form a DAG. Every OpFusionCodegenSpecGenerator
 * holds an {@link OpFusionCodegenSpec} that used to generate the operator process row code. In
 * addition, it also provides some meta information that codegen needed.
 */
@Internal
public abstract class OpFusionCodegenSpecGenerator {

    private final RowType outputType;
    protected final OpFusionCodegenSpec opFusionCodegenSpec;
    private final OpFusionContext opFusionContext;
    private double managedMemoryFraction = 0;

    public OpFusionCodegenSpecGenerator(
            RowType outputType, OpFusionCodegenSpec opFusionCodegenSpec) {
        this.outputType = outputType;
        this.opFusionCodegenSpec = opFusionCodegenSpec;
        this.opFusionContext = new OpFusionContextImpl(this);
    }

    /**
     * Initializes the operator spec generator needed information. This method must be called before
     * produce and consume related method.
     */
    public void setup(Context context) {
        this.managedMemoryFraction = context.getManagedMemoryFraction();
        this.opFusionCodegenSpec.setup(opFusionContext);
    }

    public OpFusionCodegenSpec getOpFusionCodegenSpec() {
        return opFusionCodegenSpec;
    }

    public OpFusionContext getOpFusionContext() {
        return opFusionContext;
    }

    public abstract long getManagedMemory();

    public abstract List<OpFusionCodegenSpecGenerator> getInputs();

    /**
     * Add the specific {@link OpFusionCodegenSpecGenerator} as the output of current operator spec
     * generator, one {@link OpFusionCodegenSpecGenerator} may have multiple outputs that form a
     * DAG.
     *
     * @param inputIdOfOutput This is numbered starting from 1, and `1` indicates the first input of
     *     output {@link OpFusionCodegenSpecGenerator}.
     * @param output The {@link OpFusionCodegenSpecGenerator} as output of current spec generator.
     */
    public abstract void addOutput(int inputIdOfOutput, OpFusionCodegenSpecGenerator output);

    /** Generate Java source code to process the rows from operator corresponding input. */
    public abstract void processProduce(CodeGeneratorContext fusionCtx);

    /**
     * Consume the generated columns or row from current operator, call its output's {@link
     * OpFusionCodegenSpec#doProcessConsume(int, List, GeneratedExpression)} method.
     *
     * <p>Note that `outputVars` and `row` can't both be null.
     */
    public abstract String processConsume(List<GeneratedExpression> outputVars, String row);

    /** Generate Java source code to do clean work for operator corresponding input. */
    public abstract void endInputProduce(CodeGeneratorContext fusionCtx);

    /**
     * Generate code to trigger the clean work of operator, call its output's {@link
     * OpFusionCodegenSpec#doEndInputConsume(int)}. The leaf operator start to call endInputConsume
     * method.
     */
    public abstract String endInputConsume();
}


FusionCodegenExecNode

If one ExecNode wants to support OFCG, this Interface needs to be implemented.

FusionCodegenExecNode
/** A {@link ExecNode} which support operator fusion codegen. */
public interface FusionCodegenExecNode {

    /** Whether this ExecNode supports OFCG or not. */
    boolean supportFusionCodegen();

    /**
     * Translates this node into a {@link OpFusionCodegenSpecGenerator}.
     *
     * <p>NOTE: This method should return same spec generator result if called multiple times.
     *
     * @param planner The {@link Planner} of the translated graph.
     */
    OpFusionCodegenSpecGenerator translateToFusionCodegenSpec(Planner planner);
}

Implementation Plan

  1. In release-1.18, we will prioritize the development of Calc , HashJoin , and HashAgg  operators in batch mode to support OFCG because them are used in high frequency.

  2. Our final goal is to support all operators in batch mode, so we put our effort into the rest operators supporting OFCG in subsequent two or three releases.
  3. The first step is to support OFCG based on BatchExecMultipleInput , and the second step is to support one-input operators for OFCG.

  4. After two to three versions of iteration, we will enable the table.exec.operator-fusion-codegen.enabled option by default.

Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

UT/IT

Appendix

  1. Efficiently Compiling Efficient Query Plans for Modern Hardware
  2. Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask
  3. Photon: A Fast Query Engine for Lakehouse Systems
  4. Support Multiple Input for Blink Planner
  5. PoC: https://github.com/lsyldliu/flink/tree/OFCG