|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
In Flink 1.9, TableEnvironment introduces `void execute(String jobName)` interface to trigger the Flink table program execution, and extends `void sqlUpdate(String sql)` interface to evaluates not only a INSERT statement but also a DDL statement and a USE statement. But with more use cases coming up, there are some fatal shortcomings in current API design.
Let’s give an example to explain the buffering SQLs/Tables execution problem:
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')"); tEnv.sqlUpdate("INSERT INTO test SELECT ..."); tEnv.sqlUpdate("DROP TABLE test"); tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp2')"); tEnv.execute() |
The goal of this FLIP is to address the shortcomings mentioned above and make the APIs in TableEnvironment & Table more clear and stable. This FLIP won't support multiline statements which needs more discussion in further FLIP. (There have been some conclusions, please see the appendix.)
We propose to deprecate the following methods:
TableEnvironment.sqlUpdate(String)
meanwhile, we propose to introduce the following new methods:
interface TableEnvironment { // execute the given single statement, and return the execution result. TableResult executeSql(String statement); // get the AST and the execution plan for the given single statement (DQL, DML) String explainSql(String statement, ExplainDetail... extraDetails); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. StatementSet createStatementSet(); } |
interface Table { // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath); // write the Table to a TableSink that was registered // under the specified path. TableResult executeInsert(String tablePath, boolean overwrite); // create a StatementSet instance which can add DML statements or Tables // to the set and explain or execute them as a batch. String explain(ExplainDetail... extraDetails); // get the contents of the current table. TableResult execute(); } |
interface TableResult { // return JobClient if a Flink job is submitted // (for DML/DQL statement), else return empty (e.g. for DDL). Optional<JobClient> getJobClient(); // return the schema of the result TableSchema getTableSchema(); // return the ResultKind which can avoid custom parsing of // an "OK" row in programming ResultKind getResultKind(); // get the row contents as an iterable rows Iterator<Row> collect(); // print the result contents void print(); } |
public enum ResultKind { // for DDL, DCL and statements with a simple "OK" SUCCESS, // rows with important content are available (DML, DQL) SUCCESS_WITH_CONTENT } |
interface StatementSet { // add single INSERT statement into the set StatementSet addInsertSql(String statement); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table); // add Table with the given sink table name to the set StatementSet addInsert(String targetPath, Table table, boolean overwrite); // returns the AST and the execution plan to compute // the result of all statements and Tables String explain(ExplainDetail... extraDetails); // execute all statements and Tables as a batch TableResult execute(); } |
public enum ExplainDetail { STATE_SIZE_ESTIMATE, UID, HINTS, ... } |
Now `void sqlUpdate(String sql)` method will execute DDLs right now, while DMLs will be buffered and be triggered by `TableEnvironment.execute()`. Both behaviors should be kept consistent. So this method will be deprecated. We propose a new blocking method with execution result:
interface TableEnvironment { /** * Execute the given single statement and * the statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. * * If the statement is translated to a Flink job (DML/DQL), * the TableResult will be returned until the job is submitted, and * contains a JobClient instance to associate the job. * Else, the TableResult will be returned until the statement * execution is finished, does not contain a JobClient instance. * * @return result for DQL/SHOW/DESCRIBE/EXPLAIN, the affected row count * for `DML` (-1 means unknown),or a string message ("OK") for other * statements. */ TableResult executeSql(String statement); } |
This method only supports executing a single statement which can be DDL, DML, DQL, SHOW, DESCRIBE, EXPLAIN and USE. For DML and DQL, this method returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult is returned once the operation has finished. TableResult is the representation of the execution result, and contains the result data and the result schema. TableResult contains a JobClient which associates the job if the statement is DML.
/** * A TableResult is the representation of the statement execution result. */ interface TableResult { /** * return JobClient if a Flink job is submitted * (for DML/DQL statement), else return empty (e.g. DDL). */ Optional<JobClient> getJobClient(); /** * Get the schema of result. */ TableSchema getTableSchema(); /** * return the ResultKind which can avoid custom parsing of * an "OK" row in programming */ ResultKind getResultKind(); /** * Get the result contents as an iterable rows. */ Iterator<Row> collect(); /** * Print the result contents. */ void print(); } |
/** * ResultKind defines the types of the result. */ public enum ResultKind { // for DDL, DCL and statements with a simple "OK" SUCCESS, // rows with important content are available (DML, DQL) SUCCESS_WITH_CONTENT } |
The following table describes the result for each kind of statement:
Statement | Result Schema | Result Value | Result Kind | Examples | |
DDL | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | CREATE TABLE new_table (col1 BIGINT, ...) | |
DML (INSERT/UPDATE/DELETE) | field name: affected_rowcount field type: BIGINT | the affected row count (-1 means unknown) | SUCCESS_WITH_CONTENT | INSERT INTO sink_table SELECT … | |
SHOW xx | field name: result field type: VARCHAR(n) (n is the max length of values) | list all objects (multiple rows) | SUCCESS_WITH_CONTENT | SHOW CATALOGS | |
DESCRIBE xx | describe the detail of an object (single row) | DESCRIBE CATALOG catalog_name | |||
EXPLAIN xx | explain the plan of a query (single row) | EXPLAIN PLAN FOR SELECT … | |||
USE xx | field name: result field type: VARCHAR(2) | "OK" (single row) | SUCCESS | USE CATALOG catalog_name | |
SELECT xx | (select schema) | (select value) | SUCCESS_WITH_CONTENT | SELECT * FROM ... |
Like the `sqlUpdate` method, `TableEnvironment.insertInto(String, Table)` and `Table.insertInto(String)` also buffter the Tables, and will cause the buffer problem. So these two methods will be deprecated.
Since we will disable buffering SQLs/Tables and plans, it’s meaningless to provide `execute(String)` as the trigger entry point and explain(boolean) method should also not be used anymore. So we advise deprecating those two methods. Instead, we introduce a new method named `createStatementSet` and a new class named `StatementSet` to support multiple SQLs/Tables optimization. Only DML statements or Tables can be added to StatementSet. For DML, only `INSERT` is supported now, DELETE and UPDATE can also be supported in the future.
StatementSet supports adding a list of DMLs and Tables through the `addXX` methods, getting the plan of all statements and Tables through the `explain` method, optimizing the whole statements and Tables and submitting the job through the `execute` method. The added statements and Tables will be cleared when calling the `execute` method.
interface TableEnvironment { /** * Create a StatementSet instance which can add DML statements or Tables * to the set, the planner can optimize all added statements and Tables * together for better performance. */ StatementSet createStatementSet(); } |
interface StatementSet { /** * add insert statement to the set. */ StatementSet addInsertSql(String statement); /** * add Table with the given sink table name to the set. */ StatementSet addInsert(String targetPath, Table table); /** * add Table with the given sink table name to the set. */ StatementSet addInsert(String targetPath, Table table, boolean overwrite); /** * returns the AST and the execution plan to compute the result of the * all statements and Tables. * * @param extraDetails the extra details which the plan should contain. * e.g. estimated cost, uid */ String explain(ExplainDetail... extraDetails); /** * execute all statements and Tables as a batch. * * The added statements and Tables will be cleared when executing * this method. */ TableResult execute(); } |
/** * ExplainDetail defines the types of details for explain result */ public enum ExplainDetail { STATE_SIZE_ESTIMATE, UID, HINTS, ... } |
Each statement or Table has a return value which is the affected row count of a statement or a Table. So the TableResult has multiple columns. All column types are BIGINT, and the column name is "affected_rowcount_" plus the index of the statement or Table. e.g.
StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql("insert into xx ..."); stmtSet.addInsert("yy", tEnv.sqlQuery("select ...")); stmtSet.execute("test") |
The schema and data in TableResult:
column1 (insert into xx ... ) | column2 (stmtSet.addInsert("yy", tEnv.sqlQuery("select ..."))) | |
Schema | name: affected_rowcount_0 type: BIGINT | name: affected_rowcount_1 type: BIGINT |
Data (single row) | -1 | -1 |
Since Flip-64 has provided `ConnectTableDescriptor#createTemporaryTable` to register TableSource in TableEnvironment. This method should be deprecated too, it’s an omission in that flip.
Currently, we can’t explain a statement directly in TableEnvironment, we must convert a statement to a Table through `TableEnvironment.sqlQuery` method. Meanwhile, we can’t explain a INSERT statement, because we can’t convert an INSERT statement to a Table. We introduce `TableEnvironment.explainSql()` method to support explaining DQL and DML statements directly. The `explainSql` method only accepts single statement.
interface TableEnvironment { /** * returns the AST and the execution plan to compute the result of * the given statement. * The statement must be DQL or DML, and only single statement is * supported. * * @param extraDetails the extra details which the plan should contain. * e.g. estimated cost, uid */ String explainSql(String statement, ExplainDetail... extraDetails); } |
We also introduce the following methods to make the programming more fluent on Table.
interface Table { /** * Write the Table to a TableSink that was registered * under the specified path. * * @param tablePath The path of the registered TableSink to which * the Table is written. */ TableResult executeInsert(String tablePath); /** * Write the Table to a TableSink that was registered * under the specified path. * * @param tablePath The path of the registered TableSink to which * the Table is written. * @param overwrite Whether overwrite the existing data */ TableResult executeInsert(String tablePath, boolean overwrite); /** * Returns the AST and the execution plan to compute the result of * the current Table. * * @param extraDetails the extra details which the plan should contain. * e.g. estimated cost, uid */ String explain(ExplainDetail... extraDetails); /** * Get the contents of the current table. */ TableResult execute(); } |
First, let’s discuss the buffer problem in depth. Actually there are two levels of buffer, TableEnvironment will buffer SQLs/Tables and StreamExecutionEnvironment will buffer transformations to generate StreamGraph. Each TableEnvironment instance holds a StreamExecutionEnvironment instance. Currently, when translating a FlinkRelNode into a Flink operator, the generated transformations will be added to StreamExecutionEnvironment’s buffer. The bug[2] is caused by this behavior. Let’s give another simple example to explain the problem of StreamExecutionEnvironment’s buffer.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // will add transformations to env when translating to execution plan tEnv.sqlUpdate("INSERT INTO sink1 SELECT a, b FROM MyTable1") Table table = tEnv.sqlQuery("SELECT c, d from MyTable2") DataStream dataStream = tEnv.toAppendStream(table, Row.class) dataStream… env.execute("job name"); // or tEnv.execute("job name") |
The job submitted by each execute method contains the topology of both queries. Users are confused about the behavior. As suggested in "Public Interfaces",`StreamExecutionEnvironment.execute` only triggers DataStream program execution, and `TableEnvironment.execute` only triggers table program execution. So the expected behavior for the above example is `env.execute("job name")` submits the second query, and `tEnv.execute("job name") ` submits the first query.
To meet the requirement, we will change the current behavior of TableEnvironment: TableEnvironment instance buffers the SQLs/Tables and does not add generated transformations to the StreamExecutionEnvironment instance when translating to execution plan. The solution is similar to DummyStreamExecutionEnvironment. We can use StreamGraphGenerator to generate StreamGraph based on the transformations. This requires the StreamTableSink always returns DataStream, and the StreamTableSink.emitDataStream method should be removed since it’s deprecated in Flink 1.9. StreamExecutionEnvironment instance only buffers the transformation translated from DataStream.
The solution for BatchTableEnvironment is similar to StreamExecutionEnvironment, `BatchTableSink.emitDataSet` method should return DataSink, and DataSet plan can be created through a plan generator based on the DataSinks. ExecutionEnvironment instance only buffers the DataSink translated from DataSet.
Now, we introduce `StatementSet` to require users to explicitly buffer SQLs/Tables to support multiple sinks optimization. Although the `insertInto`, `sqlUpdate` and `execute` methods are deprecated, they will not be immediately deleted, so the deprecated methods and new methods must work together in one or more versions. The TableEnvironment’s buffer will be removed once the deprecated methods are deleted.
After we correct the behavior of the `execute` method, users can easily and correctly write the table program even if the deprecated methods, the new methods and the `to DataStream` methods are mixed used.
We will list some examples using old API and proposed API to have a straightforward comparison in this section.
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE test (...) with (path = '/tmp1')"); | TableResult result = tEnv.executeSql("CREATE TABLE test (...) with (path = '/tmp1')"); result... |
tEnv.sqlUpdate("INSERT INTO test SELECT ..."); tEnv.execute("test"); | TableResult result = tEnv.executeSql("INSERT INTO test SELECT ..."); JobClient jobClient = result.getJobClient().get(); jobClient... result.print(); |
Current Interface | New Interface |
tEnv.sqlUpdate("insert into xx ...") tEnv.sqlUpdate("insert into yy ...") tEnv.execute("test") // tEnv.explain(false) | StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsertSql("insert into xx ..."); stmtSet.addInsertSql("insert into yy ..."); TableResult result = stmtSet.execute(); // stmtSet.explain() |
Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... tEnv.insertInto("sink1", table1) tEnv.insertInto("sink2", table2) tEnv.execute("test") // tEnv.explain(false) | Table table1 = tEnv.sqlQuery("select xx ...")... Table table2 = tEnv.sqlQuery("select yy ...")... StatementSet stmtSet = tEnv.createStatementSet(); stmtSet.addInsert("sink1", table1); stmtSet.addInsert("sink2", table2); TableResult result = stmtSet.execute() // stmtSet.explain() |
TableEnvironment tEnv = ... tEnv.explainSql("insert into s1 ...") tEnv.explainSql("select xx ...") Table table1 = tEnv.sqlQuery("select xx ...")... String explanation = table1.explain(); TableResult result1 = table1.executeInsert("sink1"); Table table2 = tEnv.sqlQuery("select yy ...")... TableResult result2 = table2.execute(); result2.print(); |
TableEnvironment tEnv = ... StatementSet stmtSet = tEnv.createStatementSet(); tEnv.sqlUpdate("insert into s1 ..."); // statement1 stmtSet.addInsertSql("insert into s2 ..."); // statement2 tEnv.insertInto("sink1", tEnv.sqlQuery("select xx...")); // statement3 tEnv.executeSql("insert into s3 ..."); // only submit the plan of this statement tEnv.explain(false); // explain the plan of statement1 and statement3 tEnv.execute("test1"); // submit the plan of statement1 and statement3 stmtSet.addInsert("sink2", tEnv.sqlQuery("select yy...")); // statement4 stmtSet.explain(); // explain the plan of statement2 and statement4 TableResult result = stmtSet.execute(); // submit the plan of statement2 and statement4 |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.sqlUpdate("insert into s1 ..."); // statement1 StatementSet stmtSet = tEnv.createStatement(); stmtSet.addInsert("sink1", tEnv.sqlQuery("select xx...")); // statement2 Table table = tEnv.sqlQuery("select yy..."); DataStream dataStream = tEnv.toAppendStream(table, Row.class); // statement3 dataStream… tEnv.explain(false); // explain the plan of statemen1 stmtSet.explain(); // explain the plan of statemen2 env.execute("test1") ; // submit the plan of statement3 tEnv.execute("test2") ; // submit the plan of statement1 stmtSet.execute(); // submit the plan of statement2 |
Methods | Comments | |
TableEnvironment | JobExecutionResult execute(String jobName) | deprecated |
String explain(boolean extended) | deprecated | |
void sqlUpdate(String sql) | deprecated | |
void insertInto(String, Table) | deprecated | |
Table fromTableSource(TableSource tableSource) | deprecated | |
TableResult executeSql(String statement) | added | |
String explainSql(String, ExplainDetail... extraDetails) | added | |
StatementSet createStatementSet() | added | |
Table | insertInto(String tablePath) | deprecated |
TableResult executeInsert(String tablePath) | added | |
TableResult executeInsert(String tablePath, boolean overwrite) | added | |
String explain(ExplainDetail... extraDetails) | added | |
TableResult execute() | added |
single statement | multiple statements | |
DDL | executeSql(String) | Unsupported (supports multiple DDLs for easy testing in the future) |
SHOW/DESCRIBE/USE | executeSql(String) | Unsupported |
DQL | executeSql(String) | Unsupported |
DML | executeSql(String) | createStatementSet() -> StatementSet -> execute() |
EXPLAIN | explain(Table) explainSql(String) | createStatementSet() -> StatementSet -> explain() |
The `SatementSet#explain` method can be tested with unit tests, and other new methods can be tested with integration tests. We will also add some integration tests to verify the new methods can work with the deprecated methods correctly.
TableEnvironment#executeBatch(String... statement)
This method is consistent with the style of other methods in TableEnvironment, however It does not support Table API and can not explain the plan.
[1] FLIP-69 Flink SQL DDL Enhancement
[2] discuss planner buffer execute
[3] FLIP-64: Support for Temporary Objects in Table module
[4] JDBC statement addBatch interface
[5] multiple statements in SQL CLI
[6] multiple statements in TableEnvironment
[7] flip-73 Introducing Executors for job submission
[8] flip-74 Flink JobClient API
[9] Sqline deal with batch execute
[10] Feedback Summary
[11] Feedback Summary discussion thread
"Multiline statements" is also an important feature for SQL client and third-part sql based platforms. In SQL client, the most typical scenario is execute a SQL script which contains multiple statements. The main point that we are talking about is what's the behavior of this method when executing each a single line statement for batch, streaming and mix scenario (the scenarios are listed in [10]).
There is a preliminary draft that the method is:
interface TableEnvironment { /** * Execute multiline statement separated by a semicolon, return Iterator * over all TableResults that corresponds to each single line statement. * The Iterator.next() method would trigger the next statement execution. * This allows a caller to decide whether execute the statement * synchronously or asynchronously. * * @param statements multiline statement separated by a semicolon */ Iterator<TableResult> executeMultilineSql(String statements); } |
Introduce the `executeMultilineSql` in TableEnvironment method and return `Iterator<TableResult>` which would trigger the next statement submission. This allows a caller to decide synchronously when to submit statements async to the cluster. Thus, a service such as the SQL Client can handle the result of each statement individually and process statement by statement sequentially.
Please refer to the feedback summary document[10] and the discussion thread[11] for more detail. We will keep discussion in the future.