This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

State downloading in Flink can be a time and CPU consuming operation, which is especially visible if CPU resources per task slot are strictly restricted to for example a single CPU. Downloading 1GB of state size can take significant amount of time, while the code doing so is quite inefficient.

Currently when downloading state files, Flink is creating an FSDataInputStream from the remote file, and copies its bytes, to an OutputStream pointing to a local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream internally is being wrapped by many layers of abstractions and indirections and what’s worse, every file is being copied individually, which leads to quite high overheads for small files. Download times and download process CPU efficiency can be significantly improved if we introduced an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively and all at once.

For S3, there are at least two potential implementations. The first one is using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd party tool called s5cmd. It is claimed to be a faster alternative to the official AWS clients, which was confirmed by our benchmarks.

Benchmark results

Benchmarking setup consisted of two Task Managers, each with 1 task slot, each with 1 vCPU. Below are the benchmark results that we obtained with all three options and Flink 1.17 for comparison:

1GB State total recovery times

Flink 1.17

Flink 1.18

SDKv2

s5cmd

scaling up 1 → 2

~37s

~19s

~15s

~8.5s

Total recovery times here are dominated by state download speed, as during scaling up from 1 to 2, RocksDB is using a very fast range delete. Non state download phases of recovery were taking < ~1s.

The difference between Flink 1.17 and Flink 1.18 is mainly due to improved parallel downloads (https://issues.apache.org/jira/browse/FLINK-32345 ) leaving less (almost no) idle CPU during recovery.

Flink 1.18, SDKv2 and s5cmd are all using roughly speaking 100% of the available CPU resources so the recovery process is bottlenecked by the CPU. The difference between those three options is how efficiently the available CPU is being used.

Please note, all of this is an environment with limited available CPU to a single vCPU per task slot. With large enough amount of vCPUs, both SDKv2 and s5cmd can saturate the available network. However that’s not a realistic scenario for a Flink cluster. In real production setups, Task Managers can have available large amount of vCPUs but those are shared among equally large amount of actually used task slots. That’s why using the above mentioned setup of small TMs with restricted CPU resources is a good approximation of a production environment.

Proposed Changes

The main proposal is to allow FileSystems to copy files natively, bypassing the manual bytes copying using input/output streams. In order to enable that a couple of things would need to change.

Runtime code

It would be beneficial to clean up the FileSystem API, by extracting the actual interface to IFileSystem and adding there the following method:

/**
  * Tells if we can perform fast copy between given paths.
  *
  * @param source The path of the source file to copy
  * @param destination The path where to copy the source file
  * @return if this {@link IFileSystem} supports an optimised way to natively copy paths. In other
  *     words if it implements {@link PathsCopyingFileSystem}.
  */
default boolean canCopyPaths(Path source, Path destination) throws IOException {
    return false;
}

Additionally we would need to introduce the following APIs:


/**
 * An interface marking that given {@link FileSystem} have an optimised path for copying paths
 * instead of using {@link FSDataOutputStream} or {@link FSDataInputStream}.
 */
@Experimental
public interface PathsCopyingFileSystem extends IFileSystem {
    /** A pair of source and destination to duplicate a file. */
    interface CopyRequest {
        /** The path of the source file to duplicate. */
        Path getSource();

        /** The path where to duplicate the source file. */
        Path getDestination();

        /** A factory method for creating a simple pair of source/destination. */
        static CopyRequest of(Path source, Path destination) {
            return new CopyRequest() {
                @Override
                public Path getSource() {
                    return source;
                }

                @Override
                public Path getDestination() {
                    return destination;
                }
                
                @Override
                public String toString() {
                    return "CopyRequest{" + "source=" + source + ", destination=" + destination + '}';
                }
            };
        }
    }

    /**
     * List of {@link CopyRequest} to copy in batch by this {@link PathsCopyingFileSystem}. In case of
     * an exception some files might have been already copied fully or partially. Caller should
     * clean this up. Copy can be interrupted by the {@link CloseableRegistry}.
     */
    void copyFiles(List<CopyRequest> requests, ICloseableRegistry closeableRegistry)
            throws IOException;

    @Override
    default boolean canCopyPaths(Path source, Path destination) throws IOException {
        return true;
    }
}

In order for the runtime to differentiate between state handles that can be copied as files (like FileStateHandle) vs those that can not (like ByteStreamStateHandle), StreamStateHandle would have to be enriched with:

public interface StreamStateHandle {
    (...)
    
    /**
     * @return Path to an underlying file represented by this {@link StreamStateHandle} or {@link
     *     Optional#empty()} if there is no such file.
     */
    default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
        return Optional.empty();
    }
}

Then places like RocksDBStateDownloader would be able to use the faster PathsCopyingFileSystem::copyFiles call for the handles and FileSystems that support that.

FileSystems

Each FileSystem then could be augmented with the native files copying code path wherever it would be actually beneficial. As part of this FLIP we would propose to contribute s5cmd support for both Presto (FlinkS3PrestoFileSystem)and Hadoop FlinkS3FileSystem based S3 FileSystem implementations. 

We prefer s5cmd as it has proven to be faster/more CPU efficient. In order to configure s5cmd Flink’s user would need to specify path to the s5cmd binary.

Apart of allowing to configure s5cmd 's all arguments, there will be an additional parameter to configure cpulimit for the s5cmd, in order to control the cpu usage of that process and prevent it from overloading the TaskManager.

Public Interfaces

The before mentioned IFileSystem interface would be extracted from the abstract FileSystem class and new PathsCopyingFileSystem interface would be introduced.

Both Presto and Hadoop versions of the S3 FileSystem would gain the following new config options:


    public static final ConfigOption<String> S5CMD_PATH =
            ConfigOptions.key("s3.s5cmd.path")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "When specified, s5cmd will be used for coping files to/from S3. Currently supported only "
                                    + "during RocksDB Incremental state recovery.");

    public static final ConfigOption<String> S5CMD_EXTRA_ARGS =
            ConfigOptions.key("s3.s5cmd.args")
                    .stringType()
                    .defaultValue("-r 0")
                    .withDescription(
                            "Extra arguments to be passed to s5cmd. For example, --no-sign-request for public buckets and -r 10 for 10 retries");

  public static final ConfigOption<String> S5CMD_CPULIMIT =
          ConfigOptions.key("s3.s5cmd.cpulimit")
                  .doubleType()
                    .withDescription(
                          "Optional cpulimit value to set for the s5cmd to prevent TaskManager from overloading");

Future work

The same mechanism of natively copying files could be also used for uploading files. However this has lower priority as uploading files can be done incrementally - Flink doesn’t have to upload all files at once, only those that are newly created.

Also an independent future work would be to investigate the equivalent optimisations for other FileSystems supported by Flink (for GCP, Azure, …).

Compatibility, Deprecation, and Migration Plan

This feature is fully forward and backward compatible, the feature toggle enabling s5cmd can be disabled or enabled at any point.

Rejected Alternatives

Using AWS SDKv2

AWS SDKv2 proved to be not as fast/CPU efficient as s5cmd, without large benefit over the pre-existing method of copying files in Flink.