Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

WIP

Public Interfaces

WIP

Proposed Changes

State downloading in Flink can be a time and CPU consuming operation, which is especially visible if CPU resources per task slot are strictly restricted to for example a single CPU. Downloading 1GB of state size can take significant amount of time, while the code doing so is quite inefficient.

Currently when downloading state files, Flink is creating an FSDataInputStream from the remote file, and copies its bytes, to an OutputStream pointing to a local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream internally is being wrapped by many layers of abstractions and indirections and what’s worse, every file is being copied individually, which leads to quite high overheads for small files. Download times and download process CPU efficiency can be significantly improved if we introduced an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively and all at once.

For S3, there are at least two potential implementations. The first one is using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd party tool called s5cmd. It is claimed to be a faster alternative to the official AWS clients, which was confirmed by our benchmarks.

Benchmark results

Benchmarking setup consisted of two Task Managers, each with 1 task slot, each with 1 vCPU. Below are the benchmark results that we obtained with all three options and Flink 1.17 for comparison:

1GB State total recovery times

Flink 1.17

Flink 1.18

SDKv2

s5cmd

scaling up 1 → 2

~37s

~19s

~15s

~8.5s

Total recovery times here are dominated by state download speed, as during scaling up from 1 to 2, RocksDB is using a very fast range delete. Non state download phases of recovery were taking < ~1s.

The difference between Flink 1.17 and Flink 1.18 is mainly due to improved parallel downloads (https://issues.apache.org/jira/browse/FLINK-32345 ) leaving less (almost no) idle CPU during recovery.

Flink 1.18, SDKv2 and s5cmd are all using roughly speaking 100% of the available CPU resources so the recovery process is bottlenecked by the CPU. The difference between those three options is how efficiently the available CPU is being used.

Please note, all of this is an environment with limited available CPU to a single vCPU per task slot. With large enough amount of vCPUs, both SDKv2 and s5cmd can saturate the available network. However that’s not a realistic scenario for a Flink cluster. In real production setups, Task Managers can have available large amount of vCPUs but those are shared among equally large amount of actually used task slots. That’s why using the above mentioned setup of small TMs with restricted CPU resources is a good approximation of a production environment.

Proposed Changes

The main proposal is to allow FileSystems to copy files natively, bypassing the manual bytes copying using input/output streams. In order to enable that a couple of things would need to change.

Runtime code

It would be beneficial to clean up the FileSystem API, by extracting the actual interface to IFileSystem and adding there the following method:

/**
 * @return if this {@link IFileSystem} supports an optimised way to natively copy paths. In other
 *     words if it implements {@link PathsCopyingFileSystem}.
 */
default boolean canCopyPaths() {
    return false;
}

Additionally we would need to introduce the following APIs:

/**
 * An interface marking that given {@link FileSystem} have an optimised path for copying paths
 * instead of using {@link FSDataOutputStream} or {@link FSDataInputStream}.
 */
@Experimental
public interface PathsCopyingFileSystem extends IFileSystem {
    /**
     * A POJO representing a task to download a file from the {@link #getSrcPath()} to the {@link
     * #getDestPath()}.
     */
    class CopyTask {
        private final Path srcPath;
        private final Path destPath;

        public CopyTask(Path srcPath, Path destPath) {
            this.srcPath = srcPath;
            this.destPath = destPath;
        }

        public Path getSrcPath() {
            return srcPath;
        }

        public Path getDestPath() {
            return destPath;
        }

        @Override
        public String toString() {
            return "CopyTask{" + "srcPath=" + srcPath + ", destPath=" + destPath + '}';
        }
    }

    /**
     * List of {@link CopyTask} to copy in batch by this {@link PathsCopyingFileSystem}. In case of
     * an exception some files might have been already copied fully or partially. Caller should
     * clean this up. Copy can be interrupted by the {@link CloseableRegistry}.
     */
    void copyFiles(List<CopyTask> copyTasks, ICloseableRegistry closeableRegistry)
            throws IOException;

    @Override
    default boolean canCopyPaths() {
        return true;
    }
}

In order for the runtime to differentiate between state handles that can be copied as files (like FileStateHandle) vs those that can not (like ByteStreamStateHandle), StreamStateHandle would have to be enriched with:

public interface StreamStateHandle {
    (...)
    
    /**
     * @return Path to an underlying file represented by this {@link StreamStateHandle} or {@link
     *     Optional#empty()} if there is no such file.
     */
    default Optional<org.apache.flink.core.fs.Path> maybeGetPath() {
        return Optional.empty();
    }
}

Then places like RocksDBStateDownloader would be able to use the faster PathsCopyingFileSystem::copyFiles call for the handles and FileSystems that support that.

FileSystems

Each FileSystem then could be augmented with the native files copying code path wherever it would be actually beneficial. As part of this FLIP we would propose to contribute s5cmd support for both Presto (FlinkS3PrestoFileSystem)and Hadoop FlinkS3FileSystem based S3 FileSystem implementations.

We prefer s5cmd as it has proven to be faster/more CPU efficient. In order to configure s5cmd Flink’s user would need to specify path to the s5cmd binary.

Public Interfaces

The before mentioned IFileSystem interface would be extracted from the abstract FileSystem class and new PathsCopyingFileSystem interface would be introduced.

Future work

The same mechanism of natively copying files could be also used for uploading files. However this has lower priority as uploading files can be done incrementally - Flink doesn’t have to upload all files at once, only those that are newly created.

Also an independent future work would be to investigate the equivalent optimisations for other FileSystems supported by Flink (for GCP, Azure, …).WIP

Compatibility, Deprecation, and Migration Plan

WIP

Test Plan

WIP

Rejected Alternatives

This feature is fully forward and backward compatible, the feature toggle enabling s5cmd can be disabled or enabled at any point.

Rejected Alternatives

Using AWS SDKv2

AWS SDKv2 proved to be not as fast/CPU efficient as s5cmd, without large benefit over the pre-existing method of copying files in Flink.WIP