This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/q5dzjwj0qk34rbg2sczyypfhokxoc3q7

JIRA: FLINK-35263 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently in the Operator, there are three ways to take savepoints:

  • use the triggerSavepointNonce field
  • set up periodic savepoints
  • set upgradeMode: savepoint and suspend a job

And two ways to take checkpoints:

  • use the checkpointTriggerNonce field
  • set up periodic checkpoints


By creating a new Kubernetes custom resource and providing the ability to create/delete savepoints using a new CR called FlinkStateSnapshot, users will have much greater and more user-friendly control and observability over their savepoints and checkpoints.

The Operator already has a well-established way to trigger and check these snapshots, and it’s possible to re-use most of the code from there.

You can find an initial POC here: https://github.com/mateczagany/flink-kubernetes-operator/tree/savepoints 

Examples

Savepoint example resource:

apiVersion: flink.apache.org/v1beta1
kind: FlinkStateSnapshot
metadata:
  name: savepoint-01
spec:
  jobReference:
    kind: FlinkSessionJob
    name: session-job
  savepoint:
    path: /tmp/savepoint-out
    formatType: NATIVE
    disposeOnDelete: false

 backoffLimit: 0

Savepoint example result status:

status:
  path: file:/flink-data/checkpoints/9c51a75779c673166bf4edc84500f9fb/chk-137
  resultTimestamp: "2024-04-20T07:47:25Z"
  state: COMPLETED
  triggerId: 9c800db0f6cef07ce07e9be3883401aa
  triggerTimestamp: "2024-04-20T07:47:10Z"
  failures: 0

Checkpoint example resource:

apiVersion: flink.apache.org/v1beta1
kind: FlinkStateSnapshot
metadata:
  name: checkpoint-01
spec:
  jobReference:
    kind: FlinkDeployment
    name: deployment-job
  checkpoint:
    checkpointType: FULL

Checkpoint example result status:

status:
  path: file:/flink-data/checkpoints/9c51a75779c673166bf4edc84500f9fb/chk-137
  resultTimestamp: "2024-04-20T07:47:25Z"
  state: COMPLETED
  triggerId: 9c800db0f6cef07ce07e9be3883401aa
  triggerTimestamp: "2024-04-20T07:47:10Z"
  failures: 0

Public Interfaces

FlinkStateSnapshot:

public class FlinkStateSnapshotSpec implements Diffable<FlinkStateSnapshotSpec> {
/** Source to take a snapshot of. */
private JobReference jobReference;

/** Spec in case of savepoint. */
private SavepointSpec savepoint = null;

/** Spec in case of checkpoint. */
private CheckpointSpec checkpoint = null;

/** Maximum number of retries before the snapshot is considered as failed. Set to -1 for unlimited or 0 for no retries. */
private int backoffLimit = -1;
}

Exactly one of the savepoint or checkpoint fields must be configured for the Operator to know which type of snapshot to take. This means that even if a user wants to use the default checkpoint or savepoint configurations, they will have to add an empty map in their YAML definition, e.g: checkpoint: {}

SavepointSpec:

public class SavepointSpec {
/** Optional path for the savepoint. */
private String path;

/** Savepoint format to use. */
private SavepointFormatType formatType = SavepointFormatType.CANONICAL;

/** Dispose the savepoints upon CR deletion. */
private boolean disposeOnDelete = true;


/**
* Indicates that the savepoint already exists on the given path. The Operator will not trigger any new savepoints, just update the status of the resource as a completed snapshot.
*/
private boolean alreadyExists = false;
}



It is also possible to set the path in the spec in case of a savepoint, but if left empty, the Flink JobManager will decide the savepoint path.

If disposeOnDelete for a savepoint, it will be automatically deleted using the same method the Operator uses currently for other savepoints when the CR gets deleted.

CheckpointSpec:

public class CheckpointSpec {
/** Type of checkpoint to take. */
private CheckpointType checkpointType = CheckpointType.FULL;
}

JobReference:

public class JobReference {

/** Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. */
private JobKind kind;

/** Name of the Flink resource. */
private String name;
}

JobKind:

public enum JobKind {
/** FlinkDeployment CR kind. */
@JsonProperty(CrdConstants.KIND_FLINK_DEPLOYMENT)
FLINK_DEPLOYMENT,

/** FlinkSessionJob CR kind. */
@JsonProperty(CrdConstants.KIND_SESSION_JOB)
FLINK_SESSION_JOB
}

FlinkStateSnapshotStatus:

public class FlinkStateSnapshotStatus implements Diffable<FlinkStateSnapshotStatus> {

/** Current state of the snapshot. */
private FlinkStateSnapshotState state = FlinkStateSnapshotState.TRIGGER_PENDING;

/** Trigger ID of the snapshot. */
private String triggerId;

/** Trigger timestamp of a pending snapshot operation. */
private String triggerTimestamp;

/** Timestamp when the snapshot was last created/failed. */
private String resultTimestamp;

/** Final path of the snapshot. */
private String path;

/** Optional error information about the FlinkStateSnapshot. */
private String error;

      /** Number of failures, used for tracking max retries. */
      private int failures = 0;
}

FlinkStateSnapshotState:

public enum FlinkStateSnapshotState {
/** Snapshot was successful and available. */
COMPLETED,

/** Error during snapshot. */
FAILED,

/** Snapshot in progress. */
IN_PROGRESS,

/** Not yet processed by the operator. */
TRIGGER_PENDING,

/** Savepoint abandoned due to job failure/upgrade. */
ABANDONED
}

FlinkStateSnapshotReference:

public class FlinkStateSnapshotReference {

/** Namespace of the snapshot resource. */
private String namespace;

/** Name of the snapshot resource. */
private String name;


/** If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. */
private String path;
}

Modified Public Interfaces

JobSpec new field:

/**
* Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepoint/checkpoint redeploy nonce or during savepoint upgrades).
*/
private FlinkStateSnapshotReference flinkStateSnapshotReference;

In the JobSpec class, next to the already existing initialSavepointPath there will be a new field called initialSavepointName, which can be used to declare the initial savepoint via its Kubernetes name. Setting both fields will be considered a configuration error.

FlinkResourceMutator new method:

/**
* Mutate snapshot and return the mutated Object.
*
* @param stateSnapshot the savepoint to be mutated.
*/
FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot stateSnapshot);

FlinkResourceListener new methods:

/**
* Called when a new FlinkStateSnapshot event is triggered.
*
* @param ctx Context of the event and resource
*/
void onStateSnapshotEvent(FlinkStateSnapshotEventContext ctx);

/**
* Called when a FlinkStateSnapshot status gets updated.
*
* @param ctx Context of the new status and resource
*/
void onStateSnapshotStatusUpdate(FlinkStateSnapshotStatusUpdateContext ctx);

These two interface changes will make it possible to update currently existing plugins to be able to also handle new snapshot events.


New Configuration Options

Key

Type

Description

Default

kubernetes.operator.snapshot.resource.enabled

boolean

Create new FlinkStateSnapshot resources for storing snapshots. Disable if you wish to use the deprecated mode and save snapshot results to FlinkDeployment/FlinkSessionJob status fields. The Operator will fallback to the legacy mode during runtime if the CRD is not found, even if this value is true.

true

Proposed Changes

Manual savepoints

A single FlinkStateSnapshot resource in the Kubernetes cluster will map to exactly one snapshot on the filesystem, and thus it won’t be able to hold a history of savepoints like FlinkDeployment/FlinkSessionJob currently can.

In the FlinkStateSnapshot spec, we will have to define the job we would like to take a savepoint/checkpoint of. This can be either a FlinkDeployment (application mode only) or a FlinkSessionJob, and can’t be changed later in the lifetime of the snapshot.

We can also define the output path of a savepoint, this will be forwarded to the JobManager of the target Flink job. If not defined, this will be decided by the JobManager, and the result will be later saved in the status field of the resource.

We will also be able to instruct the operator to dispose of a savepoint from the filesystem when the FlinkStateSnapshot resource is deleted. However if the target FlinkSessionJob/FlinkDeployment is no longer running when the FlinkStateSnapshot is deleted, the Operator will not be able to dispose of the savepoint, and the deletion will stall. In this case the user either has to restart the job, or remove the finalizer from the FlinkStateSnapshot CR.

In case of checkpoints, deleting the FlinkStateSnapshot CR will not dispose of the checkpoint, and the snapshot data is considered to be owned and managed by Flink.

Upon creating a new FlinkStateSnapshot resource, its initial state will be TRIGGER_PENDING. The Operator will change it to IN_PROGRESS and update the field triggerTimestamp and triggerId when the savepoint request has been submitted.

On success: resultTimestamp, path fields will be updated accordingly and the field state will be set to SUCCESS

On failure: please see Error Handling below for more info.

Periodic savepoints/checkpoints

The already used configuration kubernetes.operator.periodic.savepoint.interval and kubernetes.operator.periodic.checkpoint.interval in Flink jobs will be used to instruct the operator to periodically create FlinkSavepoint CRs unless the new config option kubernetes.operator.snapshot.resource.enabled is set to false.

Upgrade savepoints

By default, savepoints created when stopping a job with upgrade-mode=savepoint will create new FlinkStateSnapshot CRs unless kubernetes.operator.snapshot.resource.enabled is set to false.

Error Handling

When snapshotting fails for some reason, resultTimestamp will be modified, and the Operator will keep retrying the operation with an exponential back-off delay. The max number of retries is configured using spec.backoffLimit. Number of previously observed failures will be counted in status.failed. When max retries are reached, the field state will be set to FAILED

Compatibility, Deprecation, and Migration Plan

In the current JobSpec class, the following fields will be deprecated:

  • initialSavepointPath
  • savepointTriggerNonce
  • checkpointTriggerNonce

In the JobStatus class, the following fields will be deprecated:

  • savepointInfo
  • checkpointInfo

By default, all savepoints and checkpoints created by the Operator will have new FlinkStateSnapshot resources created. It will still be possible to save snapshot data to the mentioned status fields by disabling the new configuration option below.

Test Plan

  1. Extend current unit-tests in the Operator repo to test any new/modified classes.
  2. Add new e2e-tests in the Operator repo that will test:
    • Creating savepoint via FlinkStateSnapshot
    • Deleting FlinkStateSnapshot
    • Starting job from existing FlinkStateSnapshot

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels