Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In data analysis scenarios, join is a very common operation. Users can efficiently obtain information by joining two or more tables. However, join is a resource-intensive and time-intensive operation, especially when the input data volume is huge. The execution of join will involve scanning of the underlying table, shuffling, and combining of tables. If we can reduce the data arriving join as much as possible, we can improve the query performance on the one hand, and reduce resource consumption on the other hand (network/io/cpu, etc.), which means we can support more queries with the same resources.

Runtime filter is a common optimization to improve join performance. It is designed to dynamically generate filter conditions for certain Join queries at runtime to reduce the amount of scanned or shuffled data, avoid unnecessary I/O and network transmission, and speed up the query. Its working principle is building a filter(e.g. bloom filter) based on the data on the small table side(build side) first, then pass this filter to the large table side(probe side) to filter the irrelevant data on it, this can reduce the data reaching the join and improve performance. 

In this FLIP, we propose to introduce runtime filter for Flink batch jobs.

Public Interfaces

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

table.optimizer.runtime-filter.enabled

Boolean

false

A flag to enable or disable the runtime filter.

table.optimizer.runtime-filter.max-build-data-size

MemorySize

150 MB

Data volume threshold of the runtime filter build side. Estimated data volume needs to be under this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-probe-data-size

MemorySize

10 GB

Data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-filter-ratio

Double

0.5

Filter ratio threshold of the runtime filter. Estimated filter ratio needs to be over this value to try to inject runtime filter.

Proposed Changes

The Whole Workflow for Runtime Filter

The changes of the runtime filter mainly involve two parts: the planner part and the runtime part.

In planner part, planner will try to inject runtime filter for eligible join during the optimization phase. We build the runtime filter in a two-stage manner: First, each subtask on the build side builds a local filter based on its local data, and sends the built filter to a global aggregation node. Then the global aggregation node aggregates the received filters into a global filter, and sends the global filter to all probe side subtasks. Therefore, we will add LocalRuntimeFilterBuilder, GlobalRuntimeFilterBuilder and RuntimeFilter into the physical plan (Figure 1 is an example).

In the runtime part, we need to provide operator implementations, we will add 3 new operators:  LocalRuntimeFilterBuilderOperator, GlobalRuntimeFilterBuilderOperator and RuntimeFilterOperator. When scheduling, the scheduler will schedule based on topological order. The first is the build side(with its chained LocalRuntimeFilterBuilderOperator), then the GlobalRuntimeFilterBuilderOperator, and then the RuntimeFilter(The built local/global filters are sent via the data edge).


CREATE TABLE dim (

  x INT,

  y INT,

  z BIGINT);

CREATE TABLE fact (

  a INT,

  b BIGINT,

  c INT,

  d VARCHAR,

  e BIGINT);

SELECT * FROM fact, dim WHERE x = a AND z = 2;

Next, we use the above query(the dim is small table, and the fact is large table) as an example to show whole workflow of runtime filter(Figure 1 shows the planner changes, Figure 2 shows the whole work flow):

  1. The parser parses a given query and converts it to an AST (Abstract Syntax Tree) plan. The optimizer will detect the runtime filter pattern and try to inject runtime filter, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager. Then, the scheduler will schedule job vertices execution according to the topological order.
  3. Schedule the TableSource(dim) with its chained LocalRuntimeFilterBuilder first.
  4. After the TableSource(dim) is finished, the scheduler will schedule the GlobalRuntimeFilterBuilder to generate the global filter.
  5. After the GlobalRuntimeFilterBuilder is finished, the scheduler will schedule the TableSource(fact) with its chained RuntimeFilter, and use the received filter to filter the data.
  6. At last, the Join will be scheduled.

Planner

Supported Join types

Flink supports six join types which are inner、left outer、right outer、full outer、left semi and left anti join. Runtime Filter can only supports the following 4 join cases:

  1. inner join
  2. semi join
  3. left join + left is build
  4. right join + right is build 

Regarding the above join types, currently, there are 3 join implementation strategies: HashJoin, SortMergeJoin and NestedLoopJoin. Since NestedLoopJoin is usually used only when the amount of data is small or without equal join condition, runtime filter will only support HashJoin and SortMergeJoin.

Conditions of Injecting Runtime Filter

We will inject the runtime filters only if the following requirements are met:

  1. The estimated data volume of build side needs to be under the value of "table.optimizer.runtime-filter.max-build-data-size". If the data volume of build side is too large, the building overhead will be too large, which may lead to a negative impact on job performance.
  2. The estimated data volume of probe side needs to be over the value of "table.optimizer.runtime-filter.min-probe-data-size". If the data volume on the probe side is too small, the overhead of building runtime filter is not worth it.
  3. The filtering ratio needs to be over the value of “table.optimizer.runtime-filter.min-filter-ratio”. Same as 2, a low filter rate is not worth it to build runtime filter. The filter ratio can be estimated by the following formula:

     

in which the buildNdv is the number of distinct values of build side data, the probeNdv is the number of distinct values of probe side data. If the ndv cannot be estimated, use row count instead.

Placement of Runtime Filter

Theoretically, the runtime filter can be pushed down along the probe side, as close to data sources as possible. But in the first version, for simplicity, we will only inject the runtime filter before the join, without pushdown, the runtime filter pushdown can be a future improvement.

Runtime

Supported Shuffle Types

The runtime filter can work well with all shuffle modes: pipeline shuffle, blocking shuffle, and hybrid shuffle.

LocalRuntimeFilterBuilderOperator

The  LocalRuntimeFilterBuilderOperator is responsible for building a bloom filter based on current subtask's local data. When building the bloom filter, we should also compare “table.optimizer.runtime-filter.max-build-data-size” with the size of received data. Once this limit is exceeded, it will output a fake filter(which always returns true). 

GlobalRuntimeFilterBuilderOperator

The GlobalRuntimeFilterBuilderOperator is responsible for aggregating all received bloom filters into a global bloom filter by OR operation. It will also output a fake filter if one of the received filters is fake. The parallelism of this operator will be 1, and the built global bloom filter will be broadcast to all related RuntimeFilterOperator.

RuntimeFilterOperator

RuntimeFilterOperator is responsible for filtering the data of the probe side using the received bloom filter, which should normally be chained with the probe side.

Compatibility, Deprecation, and Migration Plan

In the first version, the runtime filter will be an optional optimization which the user has to activate explicitly by setting the config option table.optimizer.runtime-filter.enabled: true. This entails that Flink's default behavior won't change.

Future Improvements

More underlying filter implementations

In this version, the underlying implementation of the runtime filter is a bloom-filter. In the future, we can introduce more underlying implementations for further optimization. For example, when the input data volume on the build side is small enough, we can use an in-filter to reduce building overhead and avoid the false positive problem. We can also introduce a min-max filter so that the filter can be easily pushed down to the source to reduce the scan IO.

Runtime Filter PushDown

Like normal filters, runtime filters can be further pushed down along the probe side, as close to data sources as possible. As mentioned above, we can even push the runtime filters to the source to reduce scan IO if it is supported.

Use the real execution information

We need to give the number of expected records when creating a bloom filter. Currently, the number is estimated in the planning phase. However, a better solution would be to let the RuntimeFilterBuilder know the real number of records on the build side at execution phase, we may do it in the future.

Reuse the hash table for deduplication

When the join type is hash join, we can reuse the hash table built in the join operator to build the bloom filter. The keyset of the hash table gives us exact NDV counts and deduplicated keys, which helps avoid inserting records twice into the bloom filter. This idea comes from the discussion on the mailing list, you can check the mailing list for more details.

Use blocked bloom filters to improve cache efficiency

If we want to improve cache efficiency for the build of larger filters, we could structure them as blocked bloom filters, where the filter is separated into blocks and all bits of one key go only into one block. This idea comes  from the discussion on the mailing list, you can check the mailing list for more details.

Test Plan

The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.