Status

Current state: Accepted

Discussion thread: async-high-level-api

JIRA: SAMZA-2055

Released: 

Problem

Traditionally, Samza has offered a solution to perform async IO using the processAsync API. In 0.13, Samza introduced high level APIs which provided an easier and powerful interface to express application logic. With more users adopting the new programming model, it is imperative to have feature parity between both offerings (low-level and high-level). In this SEP, we introduce async support APIs and concurrency support for high-level applications.

Motivation

Streaming applications are increasingly becoming complex and have a need to interact with the external systems to perform complex computations. These interactions are often expensive and time consuming and are best done asynchronously for efficiency. Further, async APIs and concurrency support, allows applications (high-level) to parallelize multiple inputs to their application DAG, make non-blocking remote calls and write performant applications.

Current

We currently offer a async process API in low level. Windows and Timers are single threaded and blocking. Since low level tasks are disjoint in nature and are chained externally through systems like Kafka, etc; synchronous windows and timers does not truly demand asynchronous characteristics for majority of the use cases.

High Level Applications

Before diving into the async design for high level, it is useful to understand about high level application and some of its internals. A typical high level samza application is composed of multiple operations chained together. It is analogous to chaining multiple low-level samza jobs together. For e.g, if you were to write an application to join two streams.

  • A low level application will consists of two jobs; one for partitioning the input streams using the same partition key. Another job to perform the actual join logic.
  • On the other hand, a high level application consists of only one job; The job does both partitioning of the input streams and the join.


For high level applications, the application logic is represented using a DAG internally. The operators in the DAG can be classified into terminal and non-terminal operators.

  1. Terminal Operator -  A terminal operator is one that typically doesn’t cascade the messages to its sub DAG directly and acts as a sink for the incoming message. E.g. sendTo, partitionBy
  2. Non-Terminal Operator - A non-terminal operator is one that cascades messages directly to its sub DAG. E.g. filter, map, flatmap, merge

Note: Window and join operators are exceptions since they act as terminal (store messages to RocksDB) and non-terminal(early trigger for windows, successful join) for some messages.

A simple example to illustrate terminal and non-terminal operators in a sample user application.


Here is a complex example that has interim terminal operator in the application DAG

Message Delivery

There are two ways for messages to get funneled into the application DAG.

  1. Messages from input sources (process API)
  2. Messages generated by windows and timer operators (window, timer API)


The framework supports concurrency within a task for the 1st delivery mechanism. The 2nd mechanism however, is single threaded and blocking. It is important to have clarity about this since that dictates the async characteristics of the DAG.

Consider the scenario when a window gets triggered in the complex example above. The messages generated by the window operator are propagated downstream and the window call is completed only when all of generated messages reach a terminal operator.

Samza fundamentally guarantees that no process calls happen during window or timer. It is important to provide this exclusivity for thread safety and to main happens-before semantics. However, this impacts the async characteristics of the pipeline. Operations that follow window or timers no matter async or not, will block the run loop until the window/timer call completes. This can potentially starve out the sub DAG (e.g. preceding window) if downstream sub DAG of windows/timers have expensive computations.

Proposed Changes

API

We introduce only one API for async that takes in a async flat map UDF

  • Pros:
    • Reduce API explosion
  • Cons:
    • Forces users to use the same operator to represent any async function
    • Inconsistent with the current

We will introduce new async versions of the UDF AsyncFlatMapFunction.

Public Interfaces

public interface MessageStream<M> {
...
/**
 * Applies the provided 1:n function to transform a message in this {@link MessageStream}
 * to n messages in the transformed {@link MessageStream}
 *
 * @param asyncFlatMapFn the function to transform a message to zero or more messages
 * @param <OM> the type of messages in the transformed {@link MessageStream}
 * @return the transformed {@link MessageStream}
 */
<OM> MessageStream<OM> asyncFlatMap(AsyncFlatMapFunction<? super M, ? extends OM> asyncFlatMapFn);

...
}


/**
* Transforms an input message into a collection of 0 or more messages, possibly of a different type.
*
* @param <M>  type of the input message
* @param <OM>  type of the transformed messages
*/
@InterfaceStability.Unstable
public interface AsyncFlatMapFunction<M, OM> extends InitableFunction, ClosableTask, Serializable {

/**
 * Transforms the provided message into a collection of 0 or more messages.
 *
 * @param message  the input message to be transformed
 * @return  a collection of 0 or more transformed messages
 */
CompletableFuture<Collection<OM>> apply(M message);
}


Implementation and Test Plan

Callback vs Future

We have two widely adopted abstractions for asynchronous computation; Callbacks and Futures. We already have a detailed analysis of both of these approaches for our table async APIs. You can find more information here.

We will use CompletionStage in our API due to following reasons

  • Composable
  • Ease of use
  • Better error handling
  • Native java API and better support

Beam Integration

In order to support async use cases for applications that are written using beam SDKs and use Samza runner underneath, we need the following.

  1. Beam native async support - Currently beam doesn’t have a native async API. The work to add async API in beam SDK is outside the scope of this document and will be covered separately.
  2. Samza runner async support - Integrate the beam native async API to leverage samza async API through samza runner.

Beam currently uses an OpAdapter to run samza operators for beam. Essentially, we will need an async variant or maybe existing adapter to return a future of output collection to the samza operator. The specifics of the integration will be covered in a separate document.

Stream-Table Joins

The existing table async APIs can be easily integrated with the high level async APIs without any user facing changes.

Window/Timer Operator

Although window/timer is single threaded and blocking, it is still useful to support concurrent propagation of the output messages of these operators. In order to accomplish concurrency, we need to introduce a thread pool at the operator level. Note this would still block the run loop until all of the asynchronous computations finish and then mark the window/timer complete. Alternatively, we can explore the approach of adding asynchronous window/timer which will truly make the pipeline async although at the cost of violating our fundamental guarantee of exclusivity between process and window/timer.

Checkpointing

For our discussion, we will limit ourselves to the details about the readiness of a message for checkpointing. The actual checkpointing design is out of scope and is agnostic to the type of application (low-level or high level).

Low Level applications

A message is considered ready for checkpointing once it has been processed by the user logic successfully. The user logic can be categorized as either synchronous or asynchronous. In case of asynchronous logic, the message is considered ready when applications invoke onComplete method on the callback provided by the framework. For synchronous logic, the message is considered ready upon completion of user logic.

High Level applications

A message is considered ready for checkpointing when it flows through the (sub)DAG and lands on a terminal operator. Note the terminal operator may not be the termination stage of the application. It is important to note this distinction as we currently checkpoint messages even if they have not have been completely processed by the pipeline. In order to guarantee that the message has been completely processed by the pipeline, we need to ensure the state, triggers and timers are persisted. However, this is out of scope for this design and as far as checkpointing is considered, we provide the same guarantee as the existing checkpointing semantics of high level.

Error Handling

We can broadly categorize errors into two types.

  1. Exceptions during processingIt can be propagated to the framework using the task callback provided by Samza. For messages that originate from timers and windows, since they are blocking, we can fail these calls in case of a downstream exception in the DAG.
  2. Timeouts during processingTimeouts on the other hand are much more common especially in async model. Again, we will leverage the task callback provided by Samza in the process API to propagate timeouts. For messages that originate from timers and windows, we will reuse the task.callback.timeout.ms configuration and propagate timeout exception to the framework in case of the operators downstream taking more time than task callback timeout.

Test Plan


Rejected Alternatives

Option B: Overload existing APIs (map, filter, flatMap and sink) to take async UDFs (AsyncFilterFunction, AsyncMapFunction, AsyncFlatMapFunction and AsyncSinkFunction)

  • Pros:
    • Reduce API explosion
    • Eliminates redundancy since the UDFs already carry the signature of whether the operation is async or not
  • Cons:
    • Breaks backward compatibility. Our UDFs are functional interfaces that allows users to write lambdas to express their logic. With overloads, we wouldn’t be able to differentiate if the provided lambda is async or not. Users need to explicitly cast their lambda. More information on this can be found in this discussion

Option C: Introduce new set of APIs (mapAsync,  filterAsync, flatMapAsync and sinkAsync) that takes async UDFs.

  • Pros:
    • Backward compatible
    • Explicit and unambiguous
    • Simpler and cleaner
  • Cons:
    • API explosion



  • No labels