Status

Discussion thread

https://lists.apache.org/thread/wtkl4yn847tdd0wrqm5xgv9wc0cb0kr8

Vote thread


JIRA


Release

1.20 or beyond

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, if users have Avro schemas in an Apicurio Registry (an open source Apache 2 licensed schema registry), then the natural way to work with those Avro events is to use the schemas in the Apicurio Repository. This FLIP proposes a new Kafka oriented Avro Apicurio format, to allow Flink users to work with Avro schemas stored in the Apicurio Registry.

 

Messages in the Apicurio format have a schema ID (usually the global ID in a Kafka header), which identifies the schema in the Apicurio Registry. The new format will:

  • For inbound messages, use the ID to find the Avro schema 
  • For outbound messages, register a schema in Apicurio and include the ID in the message 

In this way Apache Flink can be used to consume and produce Apicurio events.

Public Interfaces

The main changes associated with this Flip are:

  • Core Flink to add new the Serialize and deserialize methods allowing additionalProperties to be passed from the Kafka connector
  • The Kafka connector calls the new methods
  • The Apicurio Avro format overrides the new methods to allow passing information between the Kafka connector and the format. 

The following details the changes to the core Flink DeSer and the Avro SchemaCoder interfaces. 

In core Flink add 2 new default methods to the DeserializeSchema interface

/**

 * Deserializes the byte message with additional input Properties.

 *

 * @param message The message, as a byte array.

 * @param additionalInputProperties map of additional input Properties that can be used

 *     for deserialization. Override this method to make use of the additionalInputProperties,

 * @return The deserialized message as an object (null if the message cannot be deserialized).

 */

@PublicEvolving

default T deserializeWithAdditionalProperties(

        byte[] message, Map<String, Object> additionalInputProperties) throws IOException {

    return deserialize(message);

}

@PublicEvolving

default void deserializeWithAdditionalProperties(

        byte[] message, Map<String, Object> additionalInputProperties, Collector<T> out)

        throws IOException {

    T deserialize = deserializeWithAdditionalProperties(message, additionalInputProperties);

    if (deserialize != null) {

        out.collect(deserialize);

    }

}


In core Flink add a new default method to the SerializeSchema interface

/**

 * Serializes the incoming element to a specified type and populates an output.

 * The additional input properties can be used by the serialization and the output 

 * additionalProperties can be populated by the serialization.

 *

 * @param element The incoming element to be serialized

 * @param additionalInputProperties additional input properties map supplied to serialization

 * @param additionalOutputProperties additional output properties that can be populated by the

 *     serialization

 * @return The serialized element.

 */

@PublicEvolving

default byte[] serialize(

        T element,

        Map<String, Object> additionalInputProperties,

        Map<String, Object> additionalOutputProperties) {

    throw new RuntimeException(

            "Method serialize(T element, Map<String, Object> additionalInputProperties,\n"

                    + "Map<String, Object> additionalOutputProperties) should be Overridden.");

}


The new SchemaCoder interfaces are


default Schema readSchemaWithAdditionalParameters(InputStream in, Map<String, Object> additionalInputProperties)

            throws IOException {

        throw new RuntimeException("readSchema passing additional input properties should be overridden.");

    }




default void writeSchema(

       Schema schema,

       OutputStream out,

       Map<String, Object> additionalInputProperties,

       Map<String, Object> additionalOutputProperties)

       throws IOException {

   throw new RuntimeException("writeSchema passing additional input and output properties should be overridden.");

 

Proposed Changes

In this section, we describe in detail all the configurations that need updating.

Apicurio avro Format Options 

Option

Required

Forwarded

Default

Type

Sink only

Description

Format

required

no

(none)

String


Specify what format to use, here should be 'avro-apicurio'.

properties

optional

yes

(none)

Map


This is the apicurio-registry client configuration properties. Any Flink properties take precedence,

apicurio.registry.request.ssl.truststore.location

optional

yes

(none)

String


Location / File of SSL truststore

apicurio.registry.request.ssl.truststore.type

optional

yes

(none)

String


Type of SSL truststore

apicurio.registry.request.ssl.truststore.password

optional

yes

(none)

String


Password of SSL truststore

apicurio.registry.request.ssl.keystore.location

optional

yes

(none)

String


Location / File of SSL keystore

apicurio.registry.request.ssl.keystore.type

optional

yes

(none)

String


Type of SSL keystore

apicurio.registry.request.ssl.keystore.password

optional

yes

(none)

String


Password of SSL keystore

apicurio-auth-basic-userid

optional

yes

(none)

String


Basic auth userId

apicurio-auth-basic-password

optional

yes

(none)

String


Basic auth password

apicurio-auth-oidc-url

optional

yes

(none)l

String


The auth URL to use for OIDC

apicurio-auth-oidc-clientID

optional

yes

(none)

String


Client ID to use for OIDC

apicurio.auth.oidc.clientSecret

optional

yes

(none)

String


Client secret to use for OIDC

apicurio-auth-oidc-scope

optional

yes

(none)

String


Client scope to use for OIDC

apicurio-auth-oidc-client-token-expiration-reduction

optional

yes

1

String


The token expiration to use for OIDC. This is a Duration in seconds. This is the amount of time before the token expires that Apicurio requests a new token.  

apicurio-avro.use-headers

optional

yes

true

boolean


Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload.

apicurio-avro.use-globalid

optional

yes

true

boolean


Used by serializers and deserializers. Configures the  identifier for artifacts. 

  • True means use global ID
  • false means use content ID.

 Instructs the serializer to write the specified ID to Kafka, and instructs the deserializer to use this ID to find the schema.

avro-apicurio.artifactId

optional

yes

<topic-name>-value or <topic-name>-key

String

Y

Specifies the artifactId of the artifact to be registered. If not specified, then for a key this is the topic name suffixed with “-key” and for a value it is the topic name suffixed with “-value”. A key and value can be specified for the same topic.

avro-apicurio.artifactName

optional

yes

<topic-name>-value or <topic-name>-key

 

String

Y

This specifies the name of the artifact to be registered. If not specified, then for a key this is the topic name suffixed with “-key” and for a value it is the topic name suffixed with “-value”.A key and value can be specified for the same topic.

 

avro-apicurio.artifactDescription

optional

yes

Schema registered by Apache Flink.

 

String

Y

This specifies the description of the artifact to be registered.

avro-apicurio.artifactVersion

optional

yes

“1”

String

Y

This specifies the version of the artifact to be registered.

avro-apicurio.schema

optional

yes

(none)

String


The schema registered or to be registered in the Apicurio Registry. If no schema is provided Flink converts the table schema to avro schema. The schema provided must match the table schema.

avro-apicurio.groupId

optional

yes

(none)

String

Y

The group id to use when creating  a schema.

avro-apicurio.register-schema

optional

yes

True

Boolean

Y

When true the schema is registered, otherwise the schema is not registered.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible as the Kafka connector will reflectively check whether the new serialize and deserialiize methods are present before attempting to call them. If the new Deser methods are not present the existing ones will be called.

Test Plan

Existing UT/IT can ensure compatibility with old options. New tests will cover the new options.

 

Rejected Alternatives

-   Running Apicurio in Confluent mode,  so that the Confluent Avro format could be used. This would be great for some users , but this FLIP is provides function to facilitate working naturally with Apicurio schemas where the global/content ID are in the Kafka headers.

-      







  • No labels