Status
Discussion thread | https://lists.apache.org/thread/wtkl4yn847tdd0wrqm5xgv9wc0cb0kr8 |
Vote thread | |
JIRA | |
Release | 1.20 or beyond |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
- Motivation
- Public Interfaces
- Proposed Changes
- Ensure all the ConfigOptions are properly annotated
- Ensure all user-facing configurations are included in the documentation generation process
- Make the existing ConfigOptions use the proper type
- Mark all internally used ConfigOptions with the @Internal annotation
- Compatibility, Deprecation, and Migration Plan
- Test Plan
Motivation
Currently, if users have Avro schemas in an Apicurio Registry (an open source Apache 2 licensed schema registry), then the natural way to work with those Avro events is to use the schemas in the Apicurio Repository. This FLIP proposes a new Kafka oriented Avro Apicurio format, to allow Flink users to work with Avro schemas stored in the Apicurio Registry.
Messages in the Apicurio format have a schema ID (usually the global ID in a Kafka header), which identifies the schema in the Apicurio Registry. The new format will:
- For inbound messages, use the ID to find the Avro schema
- For outbound messages, register a schema in Apicurio and include the ID in the message
In this way Apache Flink can be used to consume and produce Apicurio events.
Public Interfaces
The main changes associated with this Flip are:
- Core Flink to add new the Serialize and deserialize methods allowing additionalProperties to be passed from the Kafka connector
- The Kafka connector calls the new methods
- The Apicurio Avro format overrides the new methods to allow passing information between the Kafka connector and the format.
The following details the changes to the core Flink DeSer and the Avro SchemaCoder interfaces.
In core Flink add 2 new default methods to the DeserializeSchema interface
/** * Deserializes the byte message with additional input Properties. * * @param message The message, as a byte array. * @param additionalInputProperties map of additional input Properties that can be used * for deserialization. Override this method to make use of the additionalInputProperties, * @return The deserialized message as an object (null if the message cannot be deserialized). */ @PublicEvolving default T deserializeWithAdditionalProperties( byte[] message, Map<String, Object> additionalInputProperties) throws IOException { return deserialize(message); } @PublicEvolving default void deserializeWithAdditionalProperties( byte[] message, Map<String, Object> additionalInputProperties, Collector<T> out) throws IOException { T deserialize = deserializeWithAdditionalProperties(message, additionalInputProperties); if (deserialize != null) { out.collect(deserialize); } }
In core Flink add a new default method to the SerializeSchema interface
/** * Serializes the incoming element to a specified type and populates an output. * The additional input properties can be used by the serialization and the output * additionalProperties can be populated by the serialization. * * @param element The incoming element to be serialized * @param additionalInputProperties additional input properties map supplied to serialization * @param additionalOutputProperties additional output properties that can be populated by the * serialization * @return The serialized element. */ @PublicEvolving default byte[] serialize( T element, Map<String, Object> additionalInputProperties, Map<String, Object> additionalOutputProperties) { throw new RuntimeException( "Method serialize(T element, Map<String, Object> additionalInputProperties,\n" + "Map<String, Object> additionalOutputProperties) should be Overridden."); }
The new SchemaCoder interfaces are
default Schema readSchemaWithAdditionalParameters(InputStream in, Map<String, Object> additionalInputProperties) throws IOException { throw new RuntimeException("readSchema passing additional input properties should be overridden."); } default void writeSchema( Schema schema, OutputStream out, Map<String, Object> additionalInputProperties, Map<String, Object> additionalOutputProperties) throws IOException { throw new RuntimeException("writeSchema passing additional input and output properties should be overridden.");
Proposed Changes
In this section, we describe in detail all the configurations that need updating.
Apicurio avro Format Options
Option | Required | Forwarded | Default | Type | Sink only | Description |
Format | required | no | (none) | String | Specify what format to use, here should be 'avro-apicurio'. | |
properties | optional | yes | (none) | Map | This is the apicurio-registry client configuration properties. Any Flink properties take precedence, | |
apicurio.registry.request.ssl.truststore.location | optional | yes | (none) | String | Location / File of SSL truststore | |
apicurio.registry.request.ssl.truststore.type | optional | yes | (none) | String | Type of SSL truststore | |
apicurio.registry.request.ssl.truststore.password | optional | yes | (none) | String | Password of SSL truststore | |
apicurio.registry.request.ssl.keystore.location | optional | yes | (none) | String | Location / File of SSL keystore | |
apicurio.registry.request.ssl.keystore.type | optional | yes | (none) | String | Type of SSL keystore | |
apicurio.registry.request.ssl.keystore.password | optional | yes | (none) | String | Password of SSL keystore | |
apicurio-auth-basic-userid | optional | yes | (none) | String | Basic auth userId | |
apicurio-auth-basic-password | optional | yes | (none) | String | Basic auth password | |
apicurio-auth-oidc-url | optional | yes | (none)l | String | The auth URL to use for OIDC | |
apicurio-auth-oidc-clientID | optional | yes | (none) | String | Client ID to use for OIDC | |
apicurio.auth.oidc.clientSecret | optional | yes | (none) | String | Client secret to use for OIDC | |
apicurio-auth-oidc-scope | optional | yes | (none) | String | Client scope to use for OIDC | |
apicurio-auth-oidc-client-token-expiration-reduction | optional | yes | 1 | String | The token expiration to use for OIDC. This is a Duration in seconds. This is the amount of time before the token expires that Apicurio requests a new token. | |
apicurio-avro.use-headers | optional | yes | true | boolean | Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload. | |
apicurio-avro.use-globalid | optional | yes | true | boolean | Used by serializers and deserializers. Configures the identifier for artifacts.
Instructs the serializer to write the specified ID to Kafka, and instructs the deserializer to use this ID to find the schema. | |
avro-apicurio.artifactId | optional | yes | <topic-name>-value or <topic-name>-key | String | Y | Specifies the artifactId of the artifact to be registered. If not specified, then for a key this is the topic name suffixed with “-key” and for a value it is the topic name suffixed with “-value”. A key and value can be specified for the same topic. |
avro-apicurio.artifactName | optional | yes | <topic-name>-value or <topic-name>-key
| String | Y | This specifies the name of the artifact to be registered. If not specified, then for a key this is the topic name suffixed with “-key” and for a value it is the topic name suffixed with “-value”.A key and value can be specified for the same topic.
|
avro-apicurio.artifactDescription | optional | yes | Schema registered by Apache Flink.
| String | Y | This specifies the description of the artifact to be registered. |
avro-apicurio.artifactVersion | optional | yes | “1” | String | Y | This specifies the version of the artifact to be registered. |
avro-apicurio.schema | optional | yes | (none) | String | The schema registered or to be registered in the Apicurio Registry. If no schema is provided Flink converts the table schema to avro schema. The schema provided must match the table schema. | |
avro-apicurio.groupId | optional | yes | (none) | String | Y | The group id to use when creating a schema. |
avro-apicurio.register-schema | optional | yes | True | Boolean | Y | When true the schema is registered, otherwise the schema is not registered. |
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible as the Kafka connector will reflectively check whether the new serialize and deserialiize methods are present before attempting to call them. If the new Deser methods are not present the existing ones will be called.
Test Plan
Existing UT/IT can ensure compatibility with old options. New tests will cover the new options.
Rejected Alternatives
- Running Apicurio in Confluent mode, so that the Confluent Avro format could be used. This would be great for some users , but this FLIP is provides function to facilitate working naturally with Apicurio schemas where the global/content ID are in the Kafka headers.
-