Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Zhanghao Chen and Dewei Su .]

Motivation

Currently, Flink Table/SQL jobs do not expose fine-grained control of operator parallelism to users. FLIP-146 brings us support for setting parallelism for sinks, but except for that, one can only set a default global parallelism and all other operators share the same parallelism. However, in many cases, setting parallelism for sources individually is preferable:

  • Many connectors have an upper bound parallelism to efficiently ingest data. For example, the parallelism of a Kafka source is bound by the number of partitions, any extra tasks would be idle.

  • Other operators may involve intensive computation and need a larger parallelism.

Goals

  • Support setting parallelism for Table/SQL sources to provide more flexibility for performance tuning for Table/SQL pipelines.

Non-Goals

We deliberately exclude the following from our goals:

  • Support setting parallelism for individual Table/SQL operators.

    • Unlike sources whose parallelism can be easily set via a connector option, setting parallelism for each individual operator will inevitably touch a lot on the public interfaces.

    • Setting parallelism for individual operators is more complex and we'd better to leave it for future work to make this FLIP more focused.

  • Support parallelism inference for Table/SQL sources.

    • It requires support from sources, will consider adding it after the support is ready.

Public Interfaces

Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface:

  • SourceProvider

  • SourceFunctionProvider

  • InputFormatProvider

  • DataStreamScanProvider

  • TransformationScanProvider


For each of them, add a new provider creation helper method that takes an extra parameter to specify parallelism. Take SourceProvider for example:

/** Helper method for creating a Source provider with a provided source parallelism. */
static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) {
return new SourceProvider() {

@Override public Source<RowData, ?, ?> createSource() {
return source;
}

@Override public boolean isBounded() {
return Boundedness.BOUNDED.equals(source.getBoundedness());
}

@Override public Optional<Integer> getParallelism() {
return Optional.ofNullable(sourceParallelism);
}
};
}

The user specifies the customized parallelism for sources through a new connector option:

Option

Type

Default value

scan.parallelism

Integer

None (Use global parallelism setting)

Proposed Changes

  • Make all classes that implement ScanRuntimeProvider interface to also implement the ParallelismProvider interface to allow configuring source parallelism.

  • Adapt CommonExecTableSourceScan to get parallelism from ParallelismProvider if possible and configure the source parallelism into source transformation after validation.

  • Integrate with Kafka & datagen connector to support the new connector option scan.parallelism for source parallelism setting.

Specifically, two subtleties need to be taken care of when configuring the source parallelism, and they are detailed below.

Keep downstream operators' parallelism unchanged by wrapping the source transformation in a phantom transformation

Currently, all table exec nodes (except for the sink exec node, which has the ability to configure its own parallelism) accept its input parallelism. As a result, adjusting the parallelism of source will cause the parallelism of all downstream operators to change accordingly and contradicts our initial goal to provide more flexibility for performance tuning for Table/SQL pipelines.
We propose to add a new subclass of Transformation named SourceTransformationWrapper to solve the this issue. Specifically, when the parallelism of source is set to be different from the default paralleism, the source transformation is wrapped in SourceTransformationWrapper. SourceTransformationWrapper's getWrapped() method will return the true source transformation so that the real operator and its parallelism are used when building StreamGraph. The parallelism of SourceTransformationWrapper will be set to default parallelism so that the parallelism of downstream operators will not change. Notice that if the source and downstream operator have the same parallelism, SourceTransformationWrapper will not be used and we will not support wrapping the legacy LegacySourceTransformation.

Deal with changelog messages

A source can generate update and delete data, e.g. use the Upsert Kafka connector to consume changelog messages. If the parallelism of the source and its downstream operators is different, a rebalance partitioner will be used by default, which may mess up the keyed partition.
Specifically, when the source and downstream operators parallelism are different, it can be divided into two categories based on which kind of data the source generates:

  1. When source only generates insert-only data, maintain the current default behavior, that is, the shuffle mode between source and downstream operators is rebalance.

  2. When the source generates update and delete data (determined by checking the changelog mode of the source), the source will use hash partitioner to send data. Notice that the use of hash partitioner requires the existence of a primary key, and an exception will be thrown if no primary key defined in the source schema.

This is also in alignment with the implementation when setting parallelism for sinks.

Other Concerns

Setting parallelism for sources may break inexplictly keyed streams

Some users may inexplictly reinterprete normal data streams as keyed streams and rely on the fact that all operators use the same parallelism for maintaining the keyed attribute of the streams. For example, a Kafka-to-Kafka ETL job may have its source Kafka topics partitioned by keys before entering the job, and would expect it to remain keyed during processing. Specifying the parallelism of sources may break it by introducing shuffles among tasks, and lead to unexpected behavior. However, we don't think Flink offers such kind of guarantees and users are free to explicitly set a primary key to enforce keyed semantics or just not to specify source parallelism.

Compatibility, Deprecation, and Migration Plan

For users who want to upgrade an existing job to set sources with a parallelism different from the default paralleism, the following imcompatibilities shall be noted:

  1. Old state may be incompatible due to topology change resulted from breaking source chaining.
  2. For CDC sources, the primary key constraint is manditary in this case to ensure data correctness.

Test Plan

The changes will be covered by UTs.