Status

Current state: Accepted

Discussion threadAdd in-memory system

JIRA: SAMZA-1395

Released: 

Problem

With 0.13 release, the rich high level APIs allows users to chain complex processing logic as one coherent and fluent application. With so much power, there is a need for inherent support for ease of testing. Currently, the users will have get their hands dirty and understand some implementation details of Samza to write exhaustive integration tests. We want to tackle this problem in steps and this SEP, will take us one step closer towards the goal by introducing an in-memory system in Samza.

Terminologies

  • IME - Incoming Message Envelope
  • EOS - End of Stream

Motivation

With in-memory system, we will alleviate the following pain points.

  1. Dependency on Kafka for intermediate streams for testing

  2. Running time for tests (time spent on setting up and tearing down)

  3. Ease of testing

  4. Lack of collection based input for testing (this SEP addresses this problem partly)

Assumptions

  1. In-memory system is applicable only for jobs in local execution environment. Remote execution environment isn’t supported.

  2. The scope of in-memory system and the data it handles are limited to a container. I.e. there is no support for process to process interaction or sharing.

  3. Checkpointing is not supported and consumers always start from the beginning in case of restart.

  4. In-memory system doesn’t support persistence and is not the source of truth for the data. The data in the queue is lost when the job restarts or shutdowns unexpectedly.

Design

Data Source & Sink

The input data source for the in-memory system can be broadly classified as bounded and unbounded data. We are limiting the scope of this SEP to only bounded data source that is immutable as the input source. It simplifies the view of the data and also the initialization step for the consumers. However, in-memory system for intermediate streams supports both bounded and unbounded data. The sink a.k.a output source is modeled to be mutable.

Data Type

Samza has a pluggable system design allowing users to implement their own system consumers & producers. Typically, consumers consume raw message and wrap them in an IME. However, it is possible for some systems to introduce subclass of IME and pass them to the tasks instead. For this reason, we need to support different data types within in-memory collection.

  1. Raw messages: In-memory system will behave like a typical consumer and wrap the raw message in an IME. The offset and key fields for the message are populated by the in-memory system. Note, the offset is defined as the position of the data in the collection and the key is the hash code of the raw message. If the user needs fine grained control on these fields, they should construct their own IME.
  2. Type of IME:  In-memory system acts as a pass through system consumer, passing the actual message envelope to the task without any processing.

Data Partitioning

Samza is a distributed stream processing framework that achieves parallelism with partitioned data. With a bounded data source, we need to think about how the data is partitioned and how it is mapped to SystemStreamPartition in Samza. Partitioning is only interesting in the case when the input source is raw message. With IME, partitioning information is already part of it and in-memory system will respect the partition information within the IME.

Single Partition

We can use a trivial and simpler approach of associating all of our data source to a single partition. It is not a bad strategy since the primary use case for in-memory systems is testing and the volume of data is negligible that we can barely notice the effects of parallelism. Although it does come w/ a downside that it constraints the users to only test their job with only one task. It might not be a desirable and exhaustive testing strategy from a user’s perspective.

Multiple Partition

In order to exploit the parallelism that the Samza framework offers and to enable users test their job with multiple tasks, we need to support multiple partitioning.

Partitioning at source

In this approach, we push the partitioning to the source. For e.g. we can read of a `Collection<Collection<T>>` and have each collection within the collection assigned to one partition. This is surprisingly simple yet powerful since it eliminates the need for repartitioning phase and allows the user to group the data at his/her whim. The downside w/ this approach is the input collections can be skewed and Samza don’t control the evenness in the distribution of the data. Since the primary use case is testing, the skew should have negligible impact.

End of Stream

In-memory system will leverage the EOS feature introduced in SEP-6 to mark the end of stream for bounded sources. 

Proposed Changes

Architecture


 

 


Implementation

  • Approach A - Use existing BlockingEnvelopeMap and have one common class that shares the responsibility of consumer as well as producer. The class will be responsible for handling both producing and consuming messages off the same queue.
  • Approach B - Have separate producer and consumer. Tie up the consumer with the producer so that producer has hooks to produce to the same underlying `BlockingEnvelopeMap` that consumer uses.
  • Approach C - Have separate consumer and producer. Introduce a custom queue that are shared between consumer and producer. The queue lifecycle is managed by the SystemAdmin.

I am leaning towards Approach C as its simpler, not tied to BlockingEnvelopeMap and has separation of concerns.

Example Usages

High level stateless application

/**
 * Sample test case w/ multiple input partition using collection based system.
 */
 
...
...
 
ImmutableList<IV> inputA = ...
ImmutableList<IV> inputB = ...
 
List<OV> outputData = ...   // mutable
 
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
       .from(ImmutableList.of(inputA, inputB));
 
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
       .from(outputData);
 
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
   .map(...)
   .sendTo(output);
 
app.run();
app.waitForFinish();
 
// assertions on outputData


Low level stateless application

/**
 * Sample test case using collection based system for low level application.
 */

...
...

ImmutableList<IV> inputA = ...

List<OV> outputData = ... // mutable

StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
	.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);

// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData


Low level application with custom IME

/**
 * Sample test case using collection based system for low level application using custom IME.
 * It demonstrates the use of IME as a data source as opposed to raw message. The users are responsible for creating a 
 * complete IME object with partition information.
 */
 
...
...
 
ImmutableList<MyIME> inputData = Utils.createMyIME(...);
 
List<OV> outputData = ... // mutable
 
StreamDescriptor.Input<Object, Object> input = StreamDescriptor.<>input("input-stream-low-level-app")
	.from(inputData);
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
	.from(outputData);
 
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
	.addInputs(Collections.singletonList(input, changelog))
	.addOutputs(Collections.singletonList(output));

app.run();
app.waitForFinish();

// assertions on outputData

Unsupported Use cases

For V1, we will not support the following use cases since it has a depdencies.

  • High level application with durable state
  • Low level application with durable state
  • Application with manual checkpoint. Note. Manual checkpointing will result in a no-op and might not result in desired behaviour.

Samza SQL application

Users should be able to leverage in-memory collection based system to test Samza SQL application provided Samza SQL integrates with SEP-2.

 

  • No labels

3 Comments

  1. Thanks for pulling this together.
    1. can you please flush out the Test Plan section as well ?
    2.  you mention config for memory pool.. can you provide details ?  I am not sure if it is required as part of min-bar for this feature.. Frankly I think it will get complicated.. I would suggest you skip it unless we explicitly hear from customers that this is needed.
    3. is this designed to only work for fluent api ?  I assume it also works for StreamTask..  can you please provide a sample for that as well. 
    4. what happens when apps do manual checkpointing ?   Hopefully it doesn't throw an exception ..  Basically just coz we don't support checkpointing shouldn't mean that inmemory system consumer is not applicable for such apps.  (need test case for this as well)
    5. since the goal is that this works with 99% of the apps, rocksDB based apps clearly have to work.  Which would imply that in memory producer should work for change log.  We need samples (and test cases for those). 
    6. will broadcast stream work ?  I assume yes ?
    7. It would be good to mention, if there are any samza applications that cannot be tested with this feature. what is the 1%.
    1. Done.
    2. I agree it might be not be necessary for it to be configurable for test setting. If we were to use the in-memory system outside test setting, we would need to limit the queue size and we can go with the default for now.
    3. It does work for SteamTask as well. I have updated the test cases.
    4. Manual checkpointing would result in the desired behavior for everything else except the in-memory system where it will just be a no-op.
    5. Updated the test plan. It is possible for the change-log to use in-memory system though its managed by samza for the users. In order to inspect the change-log, we would need StoreDescriptor similar to input and output StreamDescriptor.
    6. Yes it does.
    7. Will do
  2. How would users test code that uses triggers deterministically? Beam allows you to advance both the processing time and watermarks as a part of its test stream. Here's an example:

     

    TestStream<String> source = TestStream.create(StringUtf8Coder.of())
        .addElements(TimestampedValue.of("this", start))
        .addElements(TimestampedValue.of("that", start))
        .addElements(TimestampedValue.of("future", start.plus(Duration.standardMinutes(1))))
        .advanceProcessingTime(Duration.standardMinutes(3))
        .advanceWatermarkToInfinity();