Status
Current state: Accepted
Discussion thread: async-high-level-api
JIRA: SAMZA-2055
Released:
Problem
Traditionally, Samza has offered a solution to perform async IO using the processAsync API. In 0.13, Samza introduced high level APIs which provided an easier and powerful interface to express application logic. With more users adopting the new programming model, it is imperative to have feature parity between both offerings (low-level and high-level). In this SEP, we introduce async support APIs and concurrency support for high-level applications.
Motivation
Streaming applications are increasingly becoming complex and have a need to interact with the external systems to perform complex computations. These interactions are often expensive and time consuming and are best done asynchronously for efficiency. Further, async APIs and concurrency support, allows applications (high-level) to parallelize multiple inputs to their application DAG, make non-blocking remote calls and write performant applications.
Current
We currently offer a async process API in low level. Windows and Timers are single threaded and blocking. Since low level tasks are disjoint in nature and are chained externally through systems like Kafka, etc; synchronous windows and timers does not truly demand asynchronous characteristics for majority of the use cases.
High Level Applications
Before diving into the async design for high level, it is useful to understand about high level application and some of its internals. A typical high level samza application is composed of multiple operations chained together. It is analogous to chaining multiple low-level samza jobs together. For e.g, if you were to write an application to join two streams.
- A low level application will consists of two jobs; one for partitioning the input streams using the same partition key. Another job to perform the actual join logic.
- On the other hand, a high level application consists of only one job; The job does both partitioning of the input streams and the join.
For high level applications, the application logic is represented using a DAG internally. The operators in the DAG can be classified into terminal and non-terminal operators.
- Terminal Operator - A terminal operator is one that typically doesn’t cascade the messages to its sub DAG directly and acts as a sink for the incoming message. E.g. sendTo, partitionBy
- Non-Terminal Operator - A non-terminal operator is one that cascades messages directly to its sub DAG. E.g. filter, map, flatmap, merge
Note: Window and join operators are exceptions since they act as terminal (store messages to RocksDB) and non-terminal(early trigger for windows, successful join) for some messages.
A simple example to illustrate terminal and non-terminal operators in a sample user application.
Here is a complex example that has interim terminal operator in the application DAG
Message Delivery
There are two ways for messages to get funneled into the application DAG.
- Messages from input sources (process API)
- Messages generated by windows and timer operators (window, timer API)
The framework supports concurrency within a task for the 1st delivery mechanism. The 2nd mechanism however, is single threaded and blocking. It is important to have clarity about this since that dictates the async characteristics of the DAG.
Consider the scenario when a window gets triggered in the complex example above. The messages generated by the window operator are propagated downstream and the window call is completed only when all of generated messages reach a terminal operator.
Samza fundamentally guarantees that no process calls happen during window or timer. It is important to provide this exclusivity for thread safety and to main happens-before semantics. However, this impacts the async characteristics of the pipeline. Operations that follow window or timers no matter async or not, will block the run loop until the window/timer call completes. This can potentially starve out the sub DAG (e.g. preceding window) if downstream sub DAG of windows/timers have expensive computations.
Proposed Changes
API
We introduce only one API for async that takes in a async flat map UDF
- Pros:
- Reduce API explosion
- Cons:
- Forces users to use the same operator to represent any async function
- Inconsistent with the current
We will introduce new async versions of the UDF AsyncFlatMapFunction.
Public Interfaces
public interface MessageStream<M> { ... |
/** |
Implementation and Test Plan
Callback vs Future
We have two widely adopted abstractions for asynchronous computation; Callbacks and Futures. We already have a detailed analysis of both of these approaches for our table async APIs. You can find more information here.
We will use CompletionStage in our API due to following reasons
- Composable
- Ease of use
- Better error handling
- Native java API and better support
Beam Integration
In order to support async use cases for applications that are written using beam SDKs and use Samza runner underneath, we need the following.
- Beam native async support - Currently beam doesn’t have a native async API. The work to add async API in beam SDK is outside the scope of this document and will be covered separately.
- Samza runner async support - Integrate the beam native async API to leverage samza async API through samza runner.
Beam currently uses an OpAdapter to run samza operators for beam. Essentially, we will need an async variant or maybe existing adapter to return a future of output collection to the samza operator. The specifics of the integration will be covered in a separate document.
Stream-Table Joins
The existing table async APIs can be easily integrated with the high level async APIs without any user facing changes.
Window/Timer Operator
Although window/timer is single threaded and blocking, it is still useful to support concurrent propagation of the output messages of these operators. In order to accomplish concurrency, we need to introduce a thread pool at the operator level. Note this would still block the run loop until all of the asynchronous computations finish and then mark the window/timer complete. Alternatively, we can explore the approach of adding asynchronous window/timer which will truly make the pipeline async although at the cost of violating our fundamental guarantee of exclusivity between process and window/timer.
Checkpointing
For our discussion, we will limit ourselves to the details about the readiness of a message for checkpointing. The actual checkpointing design is out of scope and is agnostic to the type of application (low-level or high level).
Low Level applications
A message is considered ready for checkpointing once it has been processed by the user logic successfully. The user logic can be categorized as either synchronous or asynchronous. In case of asynchronous logic, the message is considered ready when applications invoke onComplete method on the callback provided by the framework. For synchronous logic, the message is considered ready upon completion of user logic.
High Level applications
A message is considered ready for checkpointing when it flows through the (sub)DAG and lands on a terminal operator. Note the terminal operator may not be the termination stage of the application. It is important to note this distinction as we currently checkpoint messages even if they have not have been completely processed by the pipeline. In order to guarantee that the message has been completely processed by the pipeline, we need to ensure the state, triggers and timers are persisted. However, this is out of scope for this design and as far as checkpointing is considered, we provide the same guarantee as the existing checkpointing semantics of high level.
Error Handling
We can broadly categorize errors into two types.
- Exceptions during processing - It can be propagated to the framework using the task callback provided by Samza. For messages that originate from timers and windows, since they are blocking, we can fail these calls in case of a downstream exception in the DAG.
- Timeouts during processing - Timeouts on the other hand are much more common especially in async model. Again, we will leverage the task callback provided by Samza in the process API to propagate timeouts. For messages that originate from timers and windows, we will reuse the task.callback.timeout.ms configuration and propagate timeout exception to the framework in case of the operators downstream taking more time than task callback timeout.
Test Plan
Rejected Alternatives
Option B: Overload existing APIs (map, filter, flatMap and sink) to take async UDFs (AsyncFilterFunction, AsyncMapFunction, AsyncFlatMapFunction and AsyncSinkFunction)
- Pros:
- Reduce API explosion
- Eliminates redundancy since the UDFs already carry the signature of whether the operation is async or not
- Cons:
- Breaks backward compatibility. Our UDFs are functional interfaces that allows users to write lambdas to express their logic. With overloads, we wouldn’t be able to differentiate if the provided lambda is async or not. Users need to explicitly cast their lambda. More information on this can be found in this discussion
Option C: Introduce new set of APIs (mapAsync, filterAsync, flatMapAsync and sinkAsync) that takes async UDFs.
- Pros:
- Backward compatible
- Explicit and unambiguous
- Simpler and cleaner
- Cons:
- API explosion