Status

Discussion threadhttps://lists.apache.org/thread/57xfo6p25rbrhcg01dhyok46zt6jc5q1

                                                                         

Vote thread
JIRA
Releas

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In FLIP-435[1], we proposed Materialized Table, which is designed to simplify the data processing pipeline. Based on the defined freshness, Materialized Table will derive two types of data refresh modes: Full Refresh & Continuous Refresh Mode. In Full Refresh mode, the Materialized Table relies on a workflow scheduler to perform periodic refresh operation to achieve the desired data freshness.

There are numerous open-source workflow schedulers available, with popular ones including Airflow[2] and DolphinScheduler[3]. Additionally, cloud vendors[4] and medium-to-large enterprises generally have their own internally developed workflow schedulers. To enable Materialized Table to work with different workflow schedulers, we propose a pluggable workflow scheduler interface for Materialized Table in this FLIP

Outline Design

First, let's go through a timing diagram of how Materialized Table interacts with the Workflow Scheduler in Full Refresh mode to help understand the overall public interface design.

Public Interfaces

WorkflowOperation

Since we need to support various operations such as create, suspend, and resume of refresh workflow, we introduce the WorkflowOperation interface and its sub-interfaces to represent the information needed for different operations.

WorkflowOperation
/**
 * {@link RefreshWorkflow} is the basic interface that provide the related information to operate
 * the refresh workflow of {@link CatalogMaterializedTable}, the operation of refresh workflow
 * include create, modify, drop, etc.
 *
 * @see CreateRefreshWorkflow
 * @see ModifyRefreshWorkflow
 * @see DeleteRefreshWorkflow
 */
@PublicEvolving
public interface RefreshWorkflow {}
   
/**
 * {@link CreateRefreshWorkflow} provides the related information to create refresh workflow of
 * {@link CatalogMaterializedTable}.
 */
@PublicEvolving
public interface CreateRefreshWorkflow extends RefreshWorkflow {}

/**
 * {@link ModifyRefreshWorkflow} provides the related information to modify refresh workflow of
 * {@link CatalogMaterializedTable}.
 *
 * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to
 *     locate the refresh workflow in scheduler service.
 */
@PublicEvolving
public interface ModifyRefreshWorkflow<T extends RefreshHandler> extends RefreshWorkflow {

    /**
     * Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides
     * meta info to points to the refresh workflow in scheduler service.
     */
    T getRefreshHandler();
}

/**
 * {@link DeleteRefreshWorkflow} provides the related information to delete refresh workflow of {@link
 * CatalogMaterializedTable}.
 *
 * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to
 *     locate the refresh workflow in scheduler service.
 */
@PublicEvolving
public class DeleteRefreshWorkflow<T extends RefreshHandler> implements RefreshWorkflow {

    private final T refreshHandler;

    public DeleteRefreshWorkflow(T refreshHandler) {
        this.refreshHandler = refreshHandler;
    }

    /**
     * Return {@link RefreshHandler} from corresponding {@link WorkflowScheduler} which provides
     * meta info to points to the refresh workflow in scheduler service.
     */
    public T getRefreshHandler() {
        return refreshHandler;
    }
}

It should be emphasized that the implementation of RefreshWorkflow is similar to a POJO class. For different operations, such as create, suspend, resume, etc., because the engine needs to provide different information to the Scheduler for the execution of specific operations, so it is just a marking interface here.

RefreshWorkflow implementation class is provided by the engine to the WorkflowScheduler. Currently, its implementation class would include CreatePeriodicRefreshWorkflow, SuspendRefreshWorkflow, ResumeRefreshWorkflow, and ModifyRefreshWorkflowCronExpr.

WorkflowException

Introducing a new checked WokflowException for WorkflowScheduler.

WorkflowException
/**
 * A workflow-related operation exception to materialized table, including create, suspend, resume,
 * drop workflow operation, etc.
 */
@PublicEvolving
public class WorkflowException extends Exception {

    public WorkflowException(String message) {
        super(message);
    }

    public WorkflowException(String message, Throwable cause) {
        super(message, cause);
    }
}

WorkflowScheduler

Introducing the WorkflowScheduler interface for interaction with specific scheduler service, supporting the creation, pausing, resuming, and deletion of refresh workflows for Materialized Tables.

WorkflowScheduler
/**
 * This interface is used to interact with specific workflow scheduler services that support
 * creating, modifying, and deleting refreshed workflow of Materialized Table.
 *
 * @param <T> The type of {@link RefreshHandler} used by specific {@link WorkflowScheduler} to
 *     locate the refresh workflow in scheduler service.
 */
@PublicEvolving
public interface WorkflowScheduler<T extends RefreshHandler> {

    /**
     * Open the WorkflowScheduler. Used for any required preparation in initialization phase.
     *
     * @throws WorkflowException if initializing workflow scheduler occur exception
     */
    void open() throws WorkflowException;

    /**
     * Close the WorkflowScheduler when it is no longer needed and release any resource that it
     * might be holding.
     *
     * @throws WorkflowException if close the related resources of workflow scheduler failed
     */
    void close() throws WorkflowException;

    /**
     * Return a {@link RefreshHandlerSerializer} instance to serialize and deserialize {@link
     * RefreshHandler} created by specific workflow scheduler service.
     */
    RefreshHandlerSerializer getRefreshHandlerSerializer();

    /**
     * Create a refresh workflow in specific scheduler service for the materialized table, return a
     * {@link RefreshHandler} instance which can locate the refresh workflow detail information.
     *
     * <p>This method supports creating workflow for periodic refresh, as well as workflow for a
     * one-time refresh only.
     *
     * @param createRefreshWorkflow The detail info for create refresh workflow of materialized
     *     table.
     * @return The meta info which points to the refresh workflow in scheduler service.
     * @throws WorkflowException if create refresh workflow failed
     */
    T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;

    /**
     * Modify the refresh workflow status in scheduler service. This includes suspend, resume,
     * modify schedule cron operation, and so on.
     *
     * @param modifyRefreshWorkflow The detail info for modify refresh workflow of materialized
     *     table.
     * @throws WorkflowException if modify refresh workflow failed
     */
    void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow)
            throws WorkflowException;
    /**
     * Delete the refresh workflow in scheduler service.
     *
     * @param deleteRefreshWorkflow The detail info for delete refresh workflow of materialized
     *     table.
     * @throws WorkflowException if delete refresh workflow failed
     */
    void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow)
            throws WorkflowException;
}

WorkflowSchedulerFactory

WorflowScheduerFactory is used to create a specific WorkflowScheduler via the SPI mechanism.

WorkflowSchedulerFactory
/**
 * A factory to create a {@link WorkflowScheduler} instance.
 *
 * <p>See {@link Factory} for more information about the general design of a factory.
 */
@PublicEvolving
public interface WorkflowSchedulerFactory extends Factory {

    /** Create a workflow scheduler instance which interacts with external scheduler service. */
    WorkflowScheduler createWorkflowScheduler(Context context);

    /** Context provided when a workflow scheduler is created. */
    @PublicEvolving
    interface Context {

        /** Gives the config option to create {@link WorkflowScheduler}. */
        ReadableConfig getConfiguration();

        /**
         * Returns the options with which the workflow scheduler is created. All options that are
         * prefixed with the workflow scheduler identifier are included in the map.
         *
         * <p>All the keys in the options are pruned with the prefix. For example, the option {@code
         * workflow-scheduler.airflow.endpoint}'s key is {@code endpoint} in the map.
         *
         * <p>An implementation should perform validation of these options.
         */
        Map<String, String> getWorkflowSchedulerOptions();
    }
}

Configuration

We also introduce the option 'workflow-scheduler.type'to allow users to specify the scheduler. For simplicity, we don't plan to introduce another yaml for workflow scheduler and users can specify the scheduler options in the flink-conf.yaml.

Key

Required

Default

Type

Description

workflow-scheduler.type

required

None

String

Specify the workflow scheduler type that is used.


For example, assuming the user is using airflow, the following options should be configured in the flink-conf.yaml:

workflow-scheduler.type: airflow
workflow-scheduler.airflow.endpoint: localhost:8085
workflow-scheduler.airflow.user-name: admin
workflow-scheduler.airflow.password: 123456

Considering the issue of scheduler options conflicting with framework options, all scheduler options need to be prefixed with 'workflow-scheduler' and a specific scheduler type.

SQL Gateway

Refresh Rest API

After a refresh workflow for the Materialized Table has been created on the scheduler service, the scheduler requires reliance on the SQL Gateway[5] to complete the workflow's refresh operation. Therefore, the SQL Gateway needs to provide the corresponding Refresh API. 

This is an asynchronous API, to unify the API of the SQL gateway, binding all concepts to the session. On the one hand, this approach allows us to reduce maintenance and understanding costs, as we only need to maintain one set of architecture to complete basic concepts. On the other hand, the benefits of an asynchronous architecture are evident: we maintain state on the server side. If the request is a long connection, even in the face of network layer jitter, we can still find the original result through session and operation handles.

v3/sessions/:session_handle/materialized-tables/:identifier/refresh

Verb: POST

Response code: 200 OK

Trigger a refresh operation of the materialized table.

Request

body

{

  "isPeriodic": true, # required

  "scheduleTime": "", #optional

  "dynamicOptions": { #optional

           "key": "value"

   },

   "staticPartitions": { #optional

           "key": "value"

    },

  "executionConfig":  {  #optional

            "key":  "value"

    }

}

Response

body

{

  jobID: "",

  clusterInfo: {

        "key": "value"

   }

}

The materialized table identifier must be a fully qualified path: 'catalogName.databaseName.objectName', it is responsibility for locating the materialized table in catalog.

Regarding the scheduleTime format, we uniformly use the SQL standard timestamp format “yyyy-MM-dd HH:mm:ss” without time zone, which can satisfy our various needs for time, that is to say, the workflow scheduler should pass scheduleTime according to this format. Then materialized table manager parses this time with the local time zone.

SqlGatewayService

Corresponding to the rest API of refresh operation, the SqlGatewayService interface also needs to add a new refreshMaterializedTable method to support the refresh operation.

SqlGatewayService
/** A service of SQL gateway is responsible for handling requests from the endpoints. */
@PublicEvolving
public interface SqlGatewayService {
    /**
     * Trigger a refresh operation of specific materialized table.
     *
     * @param sessionHandle handle to identify the session.
     * @param materializedTableIdentifier A fully qualified materailized table identifier: 'catalogName.databaseName.objectName', 
            used for locating the materialized table in catalog.
     * @param isPeriodic Represents whether the workflow is refreshed 
            periodically or one-time-only.
     * @param scheduleTime The time point at which the scheduler triggers 
            this refresh operation.
     * @param staticPartitions The specific partitions for one-time-only 
            refresh workflow.
     * @param executionConfig The flink job config.
     * @return handle to identify the operation.
     */
    OperationHandle refreshMaterializedTable(
            SessionHandle sessionHandle,
            String materializedTableIdentifier,
            boolean isPeriodic,
            @Nullable String scheduleTime,
            Map<String, String> dynamicOptions,
            Map<String, String> staticPartitions,
            Map<String, String> executionConfig);
}

Implementation Plan

  1. Implement an in-memory WorkflowScheduler, which lacks High-Availability (HA) capabilities and is solely used for testing purposes.

  2. After the release of Flink 1.20, provide an open-source DolphinScheduler WorkflowScheduler plugin implementation that can be used in production environments.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

Both Unit Tests & Integration Tests & Manual Tests will be introduced to verify this change. 

Rejected Alternatives

Regarding the WorkflowScheduler interface, there is another design paradigm that does not rely on WorkflowOperation but provides a method for each type of operation. The advantage of this design is that there is a clear method for each operation; however, the downside is that the interface compatibility is poor. Every time a new type of operation is added, a method must be added, and the parameters of the method may not meet the needs of different schedulers. Therefore, we have rejected this option.

WorkflowScheduler
/**
 * This interface is used to interact with specific workflow scheduler services that support
 * creating, suspending, resuming, and deleting refreshed workflow of Materialized Table.
 */
@PublicEvolving
public interface WorkflowScheduler<T extends RefreshHandler> {

    /**
     * Open the WorkflowScheduler. Used for any required preparation in initialization phase.
     *
     * @throws WorkflowException if initializing workflow scheduler occur exception
     */
    void open() throws WorkflowException;

    /**
     * Close the WorkflowScheduler when it is no longer needed and release any resource that it
     * might be holding.
     *
     * @throws WorkflowException if close the related resources of workflow scheduler failed
     */
    void close() throws WorkflowException;

    /**
     * Return a {@link RefreshHandlerSerializer} instance to serialize and deserialize {@link
     * RefreshHandler} created by specific workflow scheduler service.
     */
    RefreshHandlerSerializer getRefreshHandlerSerializer();

    /**
     * Create a refresh workflow in specific scheduler service for the materialized table, return a
     * {@link RefreshHandler} instance which can locate the refresh workflow detail information.
     *
     * <p>This method supports creating workflow for periodic refresh, as well as workflow for a
     * one-time refresh only.
     *
     * @param mtIdentifier The materialized table identifier to create a refresh workflow.
     * @param refreshDescription The description of refresh statement for print or log.
     * @param isPeriodic Represents whether the workflow is refreshed periodically or one-time-only.
     * @param refreshCron The cron expression used for periodic refresh workflow.
     * @param staticPartitions The specific partitions for one-time-only refresh workflow.
     * @param executionConfig The flink job config.
     * @param refreshTriggerUrl The url used by scheduler service to trigger the execution of
     *     refresh workflow.
     * @return The meta info which points to the detail refresh workflow in scheduler service.
     * @throws WorkflowException if create refresh workflow failed
     */
    T createRefreshWorkflow(
            ObjectIdentifier mtIdentifier,
            String refreshDescription,
            boolean isPeriodic,
            @Nullable String refreshCron,
            @Nullable Map<String, String> staticPartitions,
            Map<String, String> executionConfig,
            String refreshTriggerUrl)
            throws WorkflowException;

    /**
     * Suspend the refresh workflow in scheduler service.
     *
     * @param refreshHandler The meta info which points to the detail refresh workflow in scheduler
     *     service.
     * @return return whether this operation is success or not.
     * @throws WorkflowException if suspend refresh workflow failed
     */
    void suspendRefreshWorkflow(T refreshHandler) throws WorkflowException;

    /**
     * Resume the refresh workflow in scheduler service with new execution config and materialized
     * table options.
     *
     * @param refreshHandler The meta info which points to the detail refresh workflow in scheduler
     *     service.
     * @param executionConfig The updated flink job config.
     * @param mtDynamicOptions The dynamic options of materialized table that can improve the sink
     *     write behavior of flink job.
     * @return return whether this operation is success or not.
     * @throws WorkflowException if resume refresh workflow failed
     */
    void resumeRefreshWorkflow(
            T refreshHandler,
            Map<String, String> executionConfig,
            Map<String, String> mtDynamicOptions)
            throws WorkflowException;

    /**
     * Modify the schedule cron expression of refresh workflow in scheduler service.
     *
     * @param refreshHandler The meta info which points to the detail refresh workflow in scheduler
     *     service.
     * @param refreshCron The updated cron expression.
     * @throws WorkflowException if modify refresh workflow cron expression failed
     */
    void modifyRefreshWorkflow(T refreshHandler, String refreshCron) throws WorkflowException;

    /**
     * Delete the refresh workflow in scheduler service.
     *
     * @param refreshHandler The meta info which points to the detail refresh workflow in scheduler
     *     service.
     * @throws WorkflowException if delete refresh workflow failed
     */
    void deleteRefreshWorkflow(T refreshHandler) throws WorkflowException;
}

References

  1. https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines

  2. https://airflow.apache.org

  3. https://dolphinscheduler.apache.org

  4. https://learn.microsoft.com/en-us/azure/data-factory/introduction

  5. FLIP-91: Support SQL Gateway
  • No labels