Status

Current state: [ UNDER DISCUSSION ]

Discussion thread: <link to mailing list DISCUSS thread>

JIRA SAMZA-1773 - Getting issue details... STATUS

Released: Yes

Problem

With the growth of stateful streaming processing applications, a class of applications (e.g. machine learning applications) often have large local state that is periodically updated from a secondary data source (e.g. via hadoop push jobs), but is otherwise read-only. The rows (keys) in this dataset are independent of each other, and there's no requirement for the entire dataset to be updated atomically. This means that once the store is bootstrapped, it can be updated in change capture mode in the background while processing continues. This dataset is usually used for scoring a real-time stream of data. In this SEP, we propose a feature to  make data ingestion easy and efficient for such dataset in Samza.

Motivation

  1. ML Model Push Efficiency - Typically model data is pushed from secondary data sources like hadoop, potentially repartitioned/transformed by a Samza job, and is finally consumed as an input and stored in a local store with a changelog for fast recovery. The real-time input is then co-partitioned and joined with this feature data.
  2. Hot StandBy - We rely on host affinity to reduce bootstrap times for jobs with large state. Host affinity is a YARN feature and supporting hot standbys helps with reduced bootstrap times for applications regardless of the deployment model. Side inputs can be leveraged to restore changelog on the background.


Proposed Changes

APIs

We will introduce the following APIs with no default implementations.

SideInputsProcessor
/**
* The processing logic for store side inputs. Accepts incoming messages from side input streams
* and the current store contents, and returns the new key-value entries to be written to the store.
*/
@FunctionalInterface
@InterfaceStability.Unstable
public interface SideInputsProcessor extends Serializable {

/**
* Process the incoming side input message for the {@code store}.
*
* @param message incoming message envelope
* @param store the store associated with the incoming message envelope
* @return a {@link Collection} of {@link Entry}s that will be written to the {@code store}.
*/
Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store);
}
SideInputsProcessorFactory
/**
* A factory to build {@link SideInputsProcessor}s.
*
* Implementations should return a new instance for every invocation of
* {@link #getSideInputsProcessor(Config, MetricsRegistry)}
*/
@FunctionalInterface
@InterfaceStability.Unstable
public interface SideInputsProcessorFactory extends Serializable {
/**
* Creates a new instance of a {@link SideInputsProcessor}.
*
* @param config the configuration
* @param metricsRegistry the metrics registry
* @return an instance of {@link SideInputsProcessor}
*/
SideInputsProcessor getSideInputsProcessor(Config config, MetricsRegistry metricsRegistry);
}

Configurations

stores.store-name.side.inputs

A comma-separated list of streams. Each stream is of the format system-name.stream-name. Additionally, applications should add the side inputs to job inputs task.inputs and configure side input processor stores.store-name.side.inputs.processor.factory

stores.store-name.side.inputs.processor.factory

The value is a fully-qualified name of a Java class that implements SideInputProcessorFactory. It is a required configuration for stores with side inputs stores.store-name.side.inputs

Implementation

Changelogs, Bootstrap Streams and Side Inputs

Since the new feature has similarities with both changelogs and bootstrap topics, here's how the new feature differs from the other two.

  1. Samza will be able to bootstrap a local store from a (ideally compacted) stream. Bootstrap refers to store getting populated from the local checkpoint (see 3 below) up to the latest message in the topic before regular processing starts. 
  2. Once the bootstrap is complete and regular processing has started, Samza should be able to keep updating the store in the background as new data comes in to the topic. These updates don't need to be versioned or atomic swaps, and should not block regular processing.
  3. Stores will restore from the side input topics when containers restart or move to a different host. Since we need to support host affinity, this topic will be check-pointed to local disk (and not to the checkpoint topic) when store contents are flushed.
  4. Support a more general processing model for these side input topics: 
  5. Stores can configure more than one bootstrap topic.
  6. Applications can leverage the SideInputsProcessor to plug in their custom processing logic before the messages are written to the store. The SideInputProcessor implementation can be plugged in via a factory class described above
  7. We will introduce APIs for Tables to specify the side inputs and SideInputsProcessor. We will also implement handling the configuration generation for the inputs and processor specified using TableDescriptor.


Option A: Implement using run loop

  • We can leverage the run loop since it offers flexibility and extensibility for supporting bootstrap, broadcast and priority semantics for side input streams.
  • It simplifies the change capture scenario since we don't have to do additional work once bootstrap is completed.

Option B: Implement using SystemConsumers

  • It can be leveraged by standby and store restoration logic. It consolidates all of store restoration in one place
  • Requires handling change capture on the background 
  • Additional effort is needed to support stream semantics like broadcast, bootstrap and priority.


For the first iteration, we will go down the run loop path. However, we can evaluate the SystemConsumers option when we do StandBy containers.

Test Plan

  1. Unit tests for newly introduced components 
    1. TaskSideInputStorageManager
    2. TableProviders
    3. StorageConfig
  2. Integration tests for high level application using table provider
  3. Integration tests for low level application


  • No labels