Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the flink sql show create table  statement [1] is supported to display the DDL for a specified table, This feature is crucial for users to understand the structure of an existing table or recreate the table in another database without the need for additional services.

As the application scenario of Catalog  expands, it plays an increasingly important role in Flink. Services such as JDBC (MySQL, PostgreSQL), Hive, and data lakes (Paimon, Iceberg) all use Catalog in Flink. However, we currently cannot obtain detail metadata from existing catalogs, which make it hard for users to reuse the catalog. Some well-known engines, such as Apache Doris [2], already support this syntax. Until FLIP-295 [3] introduced CatalogStore , making it possible to implement the long-awaited feature.

Therefore, I propose to support the show create catalog  feature, which would be helpful for both users and developers.

After online discussion, we reached a consensus to introduce more catalog-related syntaxes in this FLIP, such as alter catalog , show create catalog , describe catalog  and enhanced create catalog .

Public Interfaces

Sql Syntax

1. SHOW CREATE CATALOG catalog_name

Show creation statement for the specified catalog. The output includes the catalog name and relevant properties, which allows you to easily reuse the created catalogs.

Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
[INFO] Execute statement succeeded.

Flink SQL> show create catalog cat2;
+---------------------------------------------------------------------------------------------+
|                                                                                      result |
+---------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat2` WITH (
  'default-database' = 'db',
  'type' = 'generic_in_memory'
)
 |
+---------------------------------------------------------------------------------------------+
1 row in set

Flink SQL>  CREATE CATALOG cat1 WITH (
>    'type' = 'jdbc', 'default-database' = 'mysql?useSSL=false',
>    'base-url' = 'jdbc:mysql://localhost:3307',
>    'username' = 'root',
>    'password' = 'root'
> );
[INFO] Execute statement succeeded.

Flink SQL> show create catalog cat1;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                         result |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat1` WITH (
  'base-url' = 'jdbc:mysql://localhost:3307',
  'password' = 'root',
  'default-database' = 'mysql?useSSL=false',
  'type' = 'jdbc',
  'username' = 'root'
)
 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

2. DESCRIBE/DESC CATALOG [EXTENDED] catalog_name

Describe the metadata of an existing catalog. The metadata information includes the catalog’s name, type, and comment. If the optional EXTENDED option is specified, catalog properties are also returned.

NOTICE: The parser part of this syntax has been implemented in FLIP-69 [4], and it is not actually available. we can complete the syntax in this FLIP. 

Flink SQL> describe catalog cat2;
+-----------+-------------------+
| info name |        info value |
+-----------+-------------------+
|      name |              cat2 |
|      type | generic_in_memory |
|   comment |                   |
+-----------+-------------------+
3 rows in set

Flink SQL> describe catalog extended cat2;
+-------------------------+-------------------+
|               info name |        info value |
+-------------------------+-------------------+
|                    name |              cat2 |
|                    type | generic_in_memory |
|                 comment |                   |
| option:default-database |                db |
+-------------------------+-------------------+
4 rows in set

3. ALTER CATALOG catalog_name SET (key1=val1, key2=val2, ...)

Set one or more properties in the specified catalog. If a particular property is already set in the catalog, override the old value with the new one.

Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory');
[INFO] Execute statement succeeded.

Flink SQL> ALTER CATALOG cat2 set ('default-database'='db');
[INFO] Execute statement succeeded.

Flink SQL> show create catalog cat2;
+---------------------------------------------------------------------------------------------+
 |                                                                                      result |
+---------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat2` WITH (
  'default-database' = 'db',
  'type' = 'generic_in_memory'
)
 |
+---------------------------------------------------------------------------------------------+
1 row in set

Flink SQL> ALTER CATALOG cat2 set ('k1'='v1');
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Unsupported options found for 'generic_in_memory'.

Unsupported options:

k1

Supported options:

default-database
property-version

4. ALTER CATALOG catalog_name RESET (key1, key2, ...)

Reset one or more properties to its default value in the specified catalog.

Flink SQL> create catalog cat2 WITH ('type'='generic_in_memory', 'default-database'='db');
[INFO] Execute statement succeeded.

Flink SQL> show create catalog cat2;
+---------------------------------------------------------------------------------------------+
 |                                                                                      result |
+---------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat2` WITH (
  'default-database' = 'db',
  'type' = 'generic_in_memory'
)
 |
+---------------------------------------------------------------------------------------------+
1 row in set

Flink SQL> ALTER CATALOG cat2 reset ('default-database');
[INFO] Execute statement succeeded.

Flink SQL> show create catalog cat2;
+---------------------------------------------------------------------------------------------+
 |                                                                                      result |
+---------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat2` WITH (
  'type' = 'generic_in_memory'
)
 |
+---------------------------------------------------------------------------------------------+
1 row in set

5. CREATE CATALOG [IF NOT EXISTS] catalog_name [COMMENT] WITH (key1=val1, key2=val2, ...)

IF NOT EXISTS  clause: If the catalog already exists, nothing happens.

COMMENT clause: An optional string literal. The description for the catalog.

NOTICE: we just need to introduce the '[IF NOT EXISTS]' and '[COMMENT]' clause to the 'create catalog' statement.

Flink SQL> create catalog cat2 comment 'hello' WITH ( 'type'='generic_in_memory', 'default-database'='db');
[INFO] Execute statement succeeded.

Flink SQL> create catalog if not exists cat2 comment 'hello2' WITH ( 'type'='generic_in_memory', 'default-database'='db');
[INFO] Execute statement succeeded.

API Changes

  • For syntax 1 and 2: introduce CatalogManager.getCatalogDescriptor()

  • For syntax 3 and 4: introduce CatalogManager.alterCatalog()
  • For syntax 5: introduce CatalogDescriptor.getComment(), add `comment` argument to constructor method


Proposed Changes

1. SHOW CREATE CATALOG, DESCRIBE CATALOG

NOTICE: Here is the changes about show create catalog syntax, the implementation of describe catalog syntax is almost the same as that and no need to implement the Parser part discussed in FLIP-69 [4].

We propose to introduce getCatalogDescriptor() function in CatalogManager to expose the ability to obtain metadata from the specified existing catalog.

public final class CatalogManager implements CatalogRegistry, AutoCloseable {
    ...

    public Optional<CatalogDescriptor> getCatalogDescriptor(String catalogName) {
        return catalogStoreHolder.catalogStore().getCatalog(catalogName);
    }

   ...
}

Extend the SQL parser implementation of 'SHOW CREATE ...' statement.

SqlShowCreate SqlShowCreate() :
{
    ...
}
{
    <SHOW> <CREATE>
    (
        <TABLE>
        ...
    |
        <VIEW>
        ...
    |
        <CATALOG>
        { pos = getPos(); }
        sqlIdentifier = SimpleIdentifier()
        {
            return new SqlShowCreateCatalog(pos, sqlIdentifier);
        }
    )
}

Add the corresponding SQLNode.

public class SqlShowCreateCatalog extends SqlShowCreate {

    public static final SqlSpecialOperator OPERATOR =
            new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);

    ...

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("SHOW CREATE CATALOG");
        sqlIdentifier.unparse(writer, leftPrec, rightPrec);
    }
}  

Add the corresponding Operation.

public class ShowCreateCatalogOperation implements ShowOperation {
     ...

    @Override
    public TableResultInternal execute(Context ctx) {
        CatalogDescriptor catalogDescriptor =
                ctx.getCatalogManager()
                        .getCatalogDescriptor(catalogName)
                        .orElseThrow(
                                () ->
                                        new ValidationException(
                                                String.format(
                                                        "Cannot obtain metadata information from Catalog %s.",
                                                        catalogName)));
        String resultRow = ShowCreateUtil.buildShowCreateCatalogRow(catalogDescriptor);

        return buildStringArrayResult("result", new String[] {resultRow});
    }
}

2. ALTER CATALOG

NOTICE: Here is the changes about alter catalog set syntax, the implementation of alter catalog reset syntax is almost the same as that.

Given that CatalogManager already provides catalog operations such as create, get, and unregister, and in order to facilitate future implementation of audit tracking, we propose to introduce alterCatalog() function in CatalogManager to expose the ability to modify metadata for the specified existing catalog.

Add the SQL parser implementation of 'ALTER CATALOG' statement.

SqlAlterCatalog SqlAlterCatalog() :
{
    SqlParserPos startPos;
    SqlIdentifier catalogName;
    SqlNodeList propertyList = SqlNodeList.EMPTY;
}
{
    <ALTER> <CATALOG> { startPos = getPos(); }
    catalogName = SimpleIdentifier()
    <SET>
    propertyList = TableProperties()
    {
        return new SqlAlterCatalog(startPos.plus(getPos()),
                    catalogName,
                    propertyList);
    }
}

Add the corresponding SQLNode.

public class SqlAlterCatalog extends SqlCall {
     ...

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("ALTER CATALOG");
        catalogName.unparse(writer, leftPrec, rightPrec);
        writer.keyword("SET");
        SqlWriter.Frame withFrame = writer.startList("(", ")");
        for (SqlNode property : propertyList) {
            SqlUnparseUtils.printIndent(writer);
            property.unparse(writer, leftPrec, rightPrec);
        }
        writer.newlineAndIndent();
        writer.endList(withFrame);
    }
}

Add the corresponding Operation.

public class AlterCatalogOperation implements AlterOperation {
    ...

    @Override
    public TableResultInternal execute(Context ctx) {
        try {
            ctx.getCatalogManager()
                    .alterCatalog(
                            catalogName,
                            CatalogDescriptor.of(catalogName, Configuration.fromMap(properties)));

            return TableResultImpl.TABLE_RESULT_OK;
        } catch (CatalogException e) {
            throw new ValidationException(
                    String.format("Could not execute %s", asSummaryString()), e);
        }
    }
}

3. Enhance `CREATE CATALOG`

We propose to introduce `getComment()` method in `CatalogDescriptor`, and the reasons are as follows.

1. For the sake of design consistency, follow the design of FLIP-295 [1] which introduced `CatalogStore` component, `CatalogDescriptor` includes names and attributes, both of which are used to describe the catalog, and `comment` can be added smoothly.

2. Extending the existing class rather than add new method to the existing interface, Especially, the `Catalog` interface, as a core interface, is used by a series of important components such as `CatalogFactory`, `CatalogManager` and `FactoryUtil`, and is implemented by a large number of connectors such as JDBC, Paimon, and Hive. Adding methods to it will greatly increase the implementation complexity, and more importantly, increase the cost of iteration, maintenance, and verification.

public class CatalogDescriptor {

    ...

    private final String comment;

    public String getComment() {
        return comment;
    }

    private CatalogDescriptor(String catalogName, Configuration configuration, String comment) {
        this.catalogName = catalogName;
        this.configuration = configuration;
        this.comment = comment;
    }

    public static CatalogDescriptor of(String catalogName, Configuration configuration, String comment) {
        return new CatalogDescriptor(catalogName, configuration, comment);
    }

    public static CatalogDescriptor of(String catalogName, Configuration configuration) {
        return new CatalogDescriptor(catalogName, configuration, null);
    }
}

Modify CreateCatalogOperation to initialize comment when the "create catalog" statement is executed.

    @Override
    public TableResultInternal execute(Context ctx) {
        try {
            ctx.getCatalogManager()
                    .createCatalog(
                            catalogName,
                            CatalogDescriptor.of(catalogName, Configuration.fromMap(properties), comment));

            return TableResultImpl.TABLE_RESULT_OK;
        } catch (CatalogException e) {
            throw new ValidationException(
                    String.format("Could not execute %s", asSummaryString()), e);
        }


Compatibility, Deprecation, and Migration Plan


Test Plan

The target is to make all existing tests pass with the current architecture. 

Also, we will add new integration and unit tests

Rejected Alternatives

Exposing CatalogStore  from CatalogManager.

  1. Introducing getCatalogStore() function in CatalogManager would make it hard for developers to implement audit tracking in CatalogManager in the future.
  2. The proposed feature only requires getCatalogDescriptor() function. Exposing components with excessive functionality will bring unnecessary risks.

The output style of DESCRIBE CATALOG syntax.

Flink SQL> desc catalog extended cat2;
+--------------------------+---------------------------------------------------------+
| catalog_description_item |                               catalog_description_value |
+--------------------------+---------------------------------------------------------+
|                     Name |                                                    cat2 |
|                     Type |                                       generic_in_memory |
|                  Comment |                                                         |
|               Properties | ('default-database','db'), ('type','generic_in_memory') |
+--------------------------+---------------------------------------------------------+
4 rows in set

After offline discussions with Jane Chan and Jark Wu, we suggest improving it for the following reasons, and we have reached a consensus, see details in https://lists.apache.org/thread/tkgg1lv9hg8s3p44256nh5pl48wfwmtf
1. The title should be consistent with engines such as Databricks for easy understanding, and it should also be consistent with Flink's own naming style. Therefore, the title adopts "info name", "info value", and the key name should be unified in lowercase, so "Name" is replaced by "name".

Note: Databricks output style [5] as follows:

> DESCRIBE CATALOG main;
 info_name     info_value
 ------------  ------------------------------------
 Catalog Name                                  main
      Comment           Main catalog (auto-created)
        Owner                 metastore-admin-users
 Catalog Type                               Regular

2. There may be many attributes of the catalog, and it is very poor in readability when displayed in one line. It should be expanded into multiple lines, and the key name is prefixed with "option:" to identify that this is an attribute row. And since `type` is an important information of the catalog, even if `extended` is not specified, it should also be displayed, and correspondingly, "option:type" should be removed to avoid redundancy.



[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/show/#show-create-table

[2] https://doris.incubator.apache.org/docs/sql-manual/sql-reference/Show-Statements/SHOW-CREATE-CATALOG

[3] FLIP-295: Support lazy initialization of catalogs and persistence of catalog configurations

[4] FLIP-69: Flink SQL DDL Enhancement

[5] https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/sql-ref-syntax-aux-describe-catalog

  • No labels