Status

Current state: ACCEPTED

Discussion thread

JIRA:  SAMZA-1804 - Getting issue details... STATUS

Motivation

Samza jobs using the High Level API currently specify their inputs and outputs partly in code (i.e., the streamIds), and partly in configuration (e.g., system name, physical name and everything else). Complexity of configuring these inputs and outputs has been a long standing issue for Samza users. We want to allow users to describe their system, inputs, outputs and their properties entirely in code. To do this, we propose new System and Stream "descriptors" APIs. Now that StreamApplication/TaskApplication APIs for Samza are available (SEP-13), these may be used for describing inputs and outputs in the Low Level API as well.

We want to support the following use cases:

  1. Allow users to specify their system and stream properties in code instead of configuration.
  2. Let System implementers set reasonable defaults for their systems and streams (e.g., enforce Avro as the message serde).
  3. Support advanced use cases like delivering incoming message metadata to the application, and logical to physical stream expansion.

Requirements

User Experience

The general user workflow should be:

  1. Create a new SystemDescriptor instance and optionally configure it.
  2. Obtain a new InputDescriptor<I> and OutputDescriptor<O> using the SystemDescriptor above and optionally configure them.
  3. Call StreamApplicationDescriptor#getInputStream(InputDescriptor<I>) to get the MessageStream<I> and StreamApplicationDescriptor#getOutputStream(OutputDescriptor<O>) to get the OutputStream<O>.
  4. Apply transforms on the input MessageStream and produce the results to OutputStream.

As an example:

Usage Example
KafkaSystemDescriptor<KV<String, GenericRecord>> ksd = new KafkaSystemDescriptor<>("local") // system name 
    .withConsumerZkConnect("localhost:80")
    .withProducerBootstrapServers(Collections.singletonList("localhost:81"))
    .withSamzaFetchThreshold(10000)
    .withProducerConfigs(ImmutableMap.of(...));

KafkaInputDescriptor<KV<String, GenericRecord>> isd = ksd.getInputDescriptor("input") // stream-id 
    .withPhysicalName("my-input-stream") // physical name 
    .withBootstrap(true); 
KafkaOutputDescriptor<KV<String, GenericRecord>> osd = ksd.getOutputDescriptor("output"); // stream-id 

MessageStream<KV<String, GenericRecord>> is = graph.getInputStream(isd); 
OutputStream<KV<String, GenericRecord>> os = graph.getOutputStream(osd); 

is.map(...).filter(...).sendTo(os);


Users will not be required to describe everything in code - they'll have an option to do so. They may describe as much of their their system/stream properties in code as they like, and the rest may be in configuration. System/Stream properties from user provided descriptors will be converted to their corresponding configurations and merged with the user provided configuration.


Basic requirements

The basic requirements for the new APIs are:

  1. Users should be able to set any Samza properties for their input and output streams using these descriptors (e.g., bootstrap status, input priority, offset management, serdes etc.).
  2. System implementers should be able to provide custom implementations of these descriptors that support additional system-specific properties. For example, a KafkaSystemDescriptor may allow setting Kafka producer and consumer properties. 
    Users should chose the extended descriptor for their system if one is available. Else, they may use the Samza-provided GenericSystemDescriptor/GenericInputDescriptor/GenericOutputDescriptor implementations and provide system specific properties as the appropriate key-value configurations.
  3. Users should be able to use a fluent API for configuring these descriptors.
  4. The types of MessageStream/OutputStream created using these descriptors should match their contents. E.g., for simple descriptors, the type of Message/OutputStream should be determined by the system or stream level Serdes, whether provided by the system as a default, or configured by the user explicitly.
  5. Users should be able to provide a default SystemDescriptor which should be used to configure the properties of the system used for creating intermediate streams in the application. This is equivalent to setting and configuring job.default.system and its properties.

Advanced requirements

We will allow System implementers to extend these Descriptor implementations so that they can provide the following additional features to users:

System Specific Input/OutputDescriptor Getters

System implementers should be able to provide convenience methods to get Input/OutputDescriptors for their users.

For example, for simple cases system configuration is either unnecessary or provided by default, users should be able to obtain an input descriptor using just the system name:

Usage Example At LinkedIn
KafkaInputDescriptor<KV<String, GenericRecord>> pvd = KafkaInputDescriptor.from("PageViewEvent", "tracking"); // properties for the "tracking" system are provided in configuration
KafkaOutputDescriptor<KV<String, PageViewCount>> pvcd = KafkaInputDescriptor.from("PageViewCount", "tracking"); // properties for the "metrics" system are provided in configuration

MessageStream<KV<String, GenericRecord>> pvs = graph.getInputStream(pvd); 
OutputStream<KV<String, PageViewCount>> pvcs = graph.getOutputStream(pvcd); 

pvs.window(...).map(...).sendTo(pvcs);


Similarly, System implementers should be able to dictate a specific serialization format for messages in their streams. For example, if a system enforces Avro as the message format, users should be able to obtain an Input/OutputDescriptor without providing an explicit serde, and the input and output descriptors should have a KV<String, GenericRecord> message type by default.

Also, System implementers of systems that only support either input or output, but not both, should be able to disallow creating input or output descriptors.

Input Transformers

InputTransformer functions allow System implementers to apply custom transformations to messages in their input streams before they're delivered to the High Level API StreamApplication. The primary use case of this feature is to allow systems to add additional metadata to each incoming message by extending IncomingMessageEnvelope. In this use case, the transformer is provided by the System implementors as a system level default, and is not visible to the user.

The interface is:

InputStreamTransformer
/**
 * Transforms an {@link IncomingMessageEnvelope} with deserialized key and message to a message of type {@code OM}
 * which is delivered to the {@code MessageStream}.
 * <p>
 * May be provided by default by a {@code TransformingSystemDescriptor}, or set on a stream level on an
 * {@code InputStream}.
 *
 * @param <OM> type of the transformed message
 */
public interface InputTransformer<OM> extends InitableFunction, ClosableFunction, Serializable {

 /**
  * Transforms the provided {@link IncomingMessageEnvelope} with deserialized key and message into another message
  * which is delivered to the {@code MessageStream}.
  *
  * @param ime the {@link IncomingMessageEnvelope} to be transformed
  * @return the transformed message
  */
  OM apply(IncomingMessageEnvelope ime);

}


The InputTransformer function is applied at runtime in InputOperatorImpl on each IncomingMessageEnvelope after the key and value have been deserialized. For a InputDescriptor with a InputTransformer, the type of messages in their MessageStreams will always be the result type of the InputTransformer function. I.e., providing stream level serdes should not change the type of the MessageStream generated from the descriptor, since the InputTransformer is applied after deserialization and overrides the Serde type.

We will not support user-provided stream level transformers in the initial API, but we'll keep the option of doing so in future available.

Stream Expanders

StreamExpander functions allow System implementors to expand a single user-provided input descriptor into multiple input descriptors, apply some custom MessageStream transforms on the expanded input streams thus obtained, and then return a new MessageStream to the High Level API StreamApplication for further processing. The primary use case of this feature is to allow system implementors to resolve a "logical" user provided input stream to multiple underlying "physical" input stream sources at execution planning time and still present a unified logical view to the user for further processing. The interface is:

StreamExpander
/**
 * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
 * and returns a new {@link MessageStream} with the combined results. Called when {@link StreamApplicationDescriptor#getInputStream} 
 * is being used to get a {@link MessageStream} using an {@link InputDescriptor} from an 
 * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * <p>
 * This is provided by default by {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor}
 * implementations and can not be overridden or set on a per stream level.
 *
 * @param <OM> type of the messages in the resultant {@link MessageStream}
 */
public interface StreamExpander<OM> extends Serializable {

 /**
  * Expands the provided {@link InputDescriptor} to a sub-DAG of one or more operators on the {@link StreamApplicationDescriptor},
  * and returns a new {@link MessageStream} with the combined results. Called when the {@link InputDescriptor}
  * is being used to get an {@link MessageStream} using {@link StreamApplicationDescriptor#getInputStream}.
  * <p>
  * Note: Take care to avoid infinite recursion in the implementation; e.g., by ensuring that it doesn't call
  * {@link StreamApplicationDescriptor#getInputStream} with an {@link InputDescriptor} from the an 
  * {@link org.apache.samza.operators.descriptors.base.system.ExpandingSystemDescriptor} (like this one) again.
  *
  * @param StreamApplicationDescriptor the {@link StreamApplicationDescriptor} to register the expanded sub-DAG of operators on
  * @param inputDescriptor the {@link InputDescriptor} to be expanded
  * @return the {@link MessageStream} containing the combined results of the sub-DAG of operators
  */
  MessageStream<OM> apply(StreamApplicationDescriptor StreamApplicationDescriptor, InputDescriptor inputDescriptor);

}


These functions are executed once in the ApplicationRunner when creating/describing the StreamApplication.

We do not plan to support user provided stream level StreamExpanders.

Similar to a SystemDescriptor with a InputTransformer, the type of the messages in their MessageStream (but not OutputStream) obtained from their InputDescriptors will always be the result type of the StreamExpander. Same parameterization rules apply.

Proposed Implementation

Base Classes and Type Bounds

The requirements above impose some constraints on the number and type parameters of the new API classes:

  1. A Fluent API for describing inputs and outputs: This requires each super class method to return an instance of the concrete sub-class it was called on for further chaining. This means that each super class needs to carry the type of its extending class in its type parameters. E.g.:

    Descriptor Sub-type
    public abstract class SystemDescriptor<SystemMessageType, SubClass extends SystemDescriptor<SystemMessageType, SubClass>> 
    public final class GenericSystemDescriptor<SystemMessageType> extends SystemDescriptor<SystemMessageType, GenericSystemDescriptor<SystemMessageType>>
  2. Support both System and Stream level serdes: If system implementers want to set a default serde for a SystemDescriptor, any Input/OutputDescriptors created from it should inherit the default serde type. They may or may not allows users to provide stream level serdes that override the system level serde. If users provide a Stream level serde, the Input/OutputDescriptors should have the same type as the stream level serde. The simplest way to do this is to require stream-level serdes to be provided while creating the Input/OutputDescriptors. 

    BaseSystemDescriptor API
    class SystemDescriptor<SystemMessageType> {
        public InputDescriptor<A> getInputDescriptor(String id) // case 1 
        public InputDescriptor<B> getInputDescriptor(String id, Serde<StreamMessageType> s) // case 2
    } 
  3. Stream level serde should not affect InputDescriptor type when used with InputTransformer. This is in conflict with (or an exception to) the requirement above. For a InputDescriptor with a InputTransformer, the type of messages in their MessageStreams will always be the result type of the InputTransformer function. I.e., providing system/stream level serdes should not affect the type of the descriptor, since the InputTransformer is applied after deserialization and overrides the Serde type. User provided StreamTransformer overrides System provided default StreamTransformer. 

    class SystemDescriptor<SystemMessageType> {
        ... // 1 and 2 as above, plus:
        public InputDescriptor<C> getInputDescriptor(String id, InputTransformer<StreamTransformerType> t) // case 3 
        public InputDescriptor<D> getInputDescriptor(String id, InputTransformer<StreamTransformerType> t, Serde<StreamMessageType> s) // case 4
    }

    While we will consider the use case of stream-level InputTransformer functions in the API design here, we will not add them to the API in the initial version. This will simplify the SystemDescriptor so that users have 1-2 ways of getting an input stream from a system descriptor instead of 4.


  4. System or stream level serde should not affect InputDescriptor type when used with StreamExpander. The type of such InputDescriptors will always be the result type of the StreamExpander function.

The constraints above imply that the desired InputDescriptor types for the methods above are:

  1. For non-transforming System:
    A = SystemMessageType, B = StreamMessageType, C = StreamTransformerType, D = StreamTransformerType
  2. For System with default InputTransformer<SystemTransformerType>:
    A = SystemTransformerType, B = SystemTransformerType, C = StreamTransformerType, D = StreamTransformerType
  3. For System with StreamExpander<SystemExpanderType>:
    A = SystemExpanderType, B = SystemExpanderType, C = SystemExpanderType, D = SystemExpanderType
  4. For System with InputTransformer<SystemTransformerType> and StreamExpander<SystemExpanderType>:
    A = SystemExpanderType, B = SystemExpanderType, C = SystemExpanderType, D = SystemExpanderType

In other words, the type patterns above are:


getISD(id)getISD(id, serde)getISD(id, txfn)getISD(id, txfn, serde)
Non Transforming System

SystemMessageType

StreamMessageTypeStreamTransformerTypeStreamTransformerType
Transforming SystemSystemTransformerTypeSystemTransformerTypeStreamTransformerTypeStreamTransformerType
Expanding SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderTypeSystemExpanderType
Expanding Transforming SystemSystemExpanderTypeSystemExpanderTypeSystemExpanderType

SystemExpanderType

Clearly, there are no single SystemDescriptor class that can satisfy the type constraints for all four cases. In fact, there are 3 different sets of interface method signatures above (Case 1-3).


Note that if we add new features that affects the message type of InputDescriptors, we may need yet another set of getInputStream methods. In general there will have to be 2^n ways of getting an InputDescriptor to support all combinations of features, where n is the number of features where the type of the input stream is affected by the feature. Currently n == 1 if we only support stream level serde, or 2 if we support stream level transform functions as well. This might be a concern for future API evolution, but its not as bad as it appears to be because:

  1. This is only an issue for features that affect the type of messages in the stream. For other features, their properties/functions can be set on the Input/OutputDescriptor after it has been created. E.g., delete committed messages is a new feature that can be used directly with the InputDescriptor since it doesn't affect the type of messages in the stream.
  2. The variations in the ways of getting an InputDescriptor apply to the SystemDescriptor base classes, not the InputDescriptor classes. Also, the new API methods will be in addition to the methods proposed here. This means that when we add these methods for the new feature:
    1. The feature may only be necessary for a specific system implementation, in which case we could introduce a new SystemDescriptor and only add those methods there. All existing code will be unchanged.
    2. If the feature does need to be supported by all existing systems, the new methods would be added to the existing abstract base classes, which could provide a default implementation. Any existing user code that doesn't use the feature will be remain unchanged.

Because of these reasons, we consider this API to be a reasonable tradeoff for providing a type safe fluent descriptor API to our users.

Proposed APIs

To satisfy the requirements above, we propose the following class hierarchy of System and Stream Descriptors.

Base Classes:

  • SystemDescriptor

  • StreamDescriptor
    • InputDescriptor
    • OutputDescriptor

To provide flexibility to system implementers to customize whether and how users get input and output descriptors (e.g. advanced requirement 1), we do not enforce a particular API for getInputDescriptor and getOutputDescriptor. System implementers are free to extend the base SystemDescriptor class and add any getInputDescriptor/getOutputDescriptor variants. We do provide the following interfaces as guidelines for System implementers based on the discussion above:

  • OutputDescriptorProvider:  Interface for simple SystemDescriptors that return OutputDescriptors parameterized by the type of the provided stream level serde.

  • SimpleInputDescriptorProvider: Interface for simple SystemDescriptors that return InputDescriptors parameterized by the type of the provided stream level serde. (Case 1 above)

  • TransformingInputDescriptorProvider: Interface for advanced SystemDescriptors that constrain the type of returned InputDescriptors to their own InputTransformer or StreamExpander function result types. Note that this interface is used for both transforming and expanding system descriptors for now, since in the absence of stream level transformer functions the method signature for getInputStream(id, serde) is the same in both cases. We may split this into two interfaces when we support stream level transformer functions.

We also provide the following generic implementations of System, InputStream and OutputStream Descriptors. If a system doesn't provide a more specific implementation, users can use these to configure any Samza properties and provide the system specific properties as a Map<String, String> of configs.

Generic (System Agnostic) Implementations:

  • GenericSystemDescriptor
  • GenericInputDescriptor
  • GenericOutputDescriptor

The new StreamApplicationDescriptor interface with these classes is:

public interface StreamApplicationDescriptor {
  void setDefaultSystem(SystemDescriptor<?, ?> defaultSystemDescriptor);
  <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor);
  <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor);
  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor);
}


Default System Descriptor and Intermediate Streams

The "default system" for a job, currently specified by setting the "job.default.system" configuration and its properties, is used for creating intermediate streams (e.g. for partitionBy and broadcast), as well as some internal streams like coordinator, checkpoint and store changelog streams, unless their system has been explicitly set using job.coordinator.system, task.checkpoint.system, or job.changelog.system respectively.

We will add a new ApplicationDescriptor#setDefaultSystem(SystemDescriptor) API that can be used to set and configure the default system used by the job. Like everything else in the Descriptors API, user should be able to set default system properties in config instead of in code if they wish to.

We will not provide a way to set intermediate stream properties in code using a StreamDescriptor since we don't expect this to be a common use case. If users need to manually configure them, they'll have to find the intermediate stream name and set its properties in configuration.

This default system will also not be used to create input and output streams. I.e., users will always have to associate an input/output stream explicitly with a system name, either by getting them from a system descriptor, or by providing the name when creating the InputDescriptor using the InputStream#from convenience factory methods.

Serdes For Streams

Every InputDescriptor and OutputDescriptor in the DAG must have a non-null Serde instance associated with it. This serde may be:

  1. Inherited from their SystemDescriptor, if they were created using a SystemDescriptor with a default serde, or
  2. Provided explicitly when creating the stream, either using SystemDescriptor#getInputStream(String, Serde) etc, or using the InputDescriptor#from convenience factory methods.

This implies that, unlike other configurations, input/output stream serdes *must* be specified using descriptors, and cannot be specified in configuration. Users and SystemDescriptors may specify an KVSerde<NoOpSerde, NoOpSerde> if they don't want messages in the system/stream to be deserialized by Samza. This may be useful when the SystemConsumer/Producer handles the serialization itself.

Going forward, serdes will be required for intermediate streams.

Stream Configuration Generation

During StreamApplication#init(StreamApplicationDescriptor), users create System and StreamDescriptors, and get input/output streams for them using StreamApplicationDescriptor#getInputStream(StreamDescriptor). The StreamApplicationDescriptor tracks all Input/OutputDescriptors and the default SystemDescriptor used by the application. When the planner runs, it generates configurations for the input output streams in the following way:

Planning inputs: User provided configuration, User provided System and Stream Descriptors.

Step 1: Create an instance of the StreamApplicationDescriptor and the StreamApplication, and call StreamApplication#init(StreamApplicationDescriptor)

Step 2: Collect all the user provided System/StreamDescriptors from the StreamApplicationDescriptor.

Step 3: Merge the user provided configuration with the Descriptor generated configuration.

Step 4: Run the planning phase using this merged configuration. The planner internally creates StreamSpec instances based on this merged configuration, mutates these StreamSpecs to set additional inferred stream properties, creates/validates internal streams using them, and then merges StreamSpec generated configuration with previously merged configuration from Step 3. StreamSpec generated configuration has higher precedence.

This is the final configuration that gets sent to the executing containers.

Note: System and Stream descriptor generated configuration have higher precedence than user provided configuration. We will need to offer a mechanism to override these descriptor generated configs in cfg2. This is useful, for example, to support fabric/instance level overrides. This can currently be done by specifying them with a special jobs.job-name.* override scope. For example, the value for config "foo.bar" generated in step 4 can be overridden using the config "jobs.job-name.foo.bar".

Note: In the case where there are "expanded" streams, configs will be generated for the "expanded" streams only, not the initial logical stream.

Default Config Values and Descriptors

Some configurations have default values associated with them. E.g., default value of "streams.stream-id.samza.bootstrap" is false. Similarly, users may get default values from some configurations from another source (e.g., app-def). 

As described above, user provided descriptors are used to generate system and stream configurations during planning. Note that in Step 3 above any configurations generated using the descriptor override any configurations provided by the user.

For these reasons, if the user hasn't explicitly provided a default value for a property in the descriptor, the descriptor will not generate any configuration for the property, not even the default value. In the example above, if the user has not explicitly set bootstrap status for a stream as true or false, the correct behavior is for InputDescriptor#toConfig() to not generate the "streams.stream-id.samza.bootstrap" configuration at all, not to set it to false.

By convention, all such fields are marked as Optional<T> to clarify that they may not have been set by the user.

Compatibility, Deprecation, and Migration Plan

The changes in the StreamApplicationDescriptor interface are backwards incompatible. Specifically, instead of getting an input/output stream using a streamId, users will now have to get an input/output stream using SystemDescriptors and StreamDescriptors.

Rest of the code remains the same, and users can migrate to specifying more of their input/output properties using descriptors instead of configuration over time.

  • No labels

3 Comments

  1. StreamGraph
    I think this would be updated to StreamAppDescriptor after SEP-13?
  2. StreamGraph
    Just realized one issue: the broadcast stream is now only supported in MessageStream#broadcast() API as an intermediate stream. How do we expect to configure/setup the broadcast input stream? Is broadcast going to be a feature of InputDescriptor/OutputDescriptor? To support low-level API, I have added addBroadcastStream() as the TaskAppDescriptor's API. Should we do the same here? Or add the isBroadcast to the StreamDescriptor s.t. we still use getInputStream()/getOutputStream API here?
    1. Yi Pan , I created  SAMZA-1841 - Getting issue details... STATUS  to support declaring an input stream as a broadcast stream using its descriptor.