Pluggable MessageChooser


When a Samza job is reading messages from more than one system/stream/partition (SSP), a decision needs to be made about the order in which the messages from these SSPs are processed. For example, if a message is available for (stream: AdViewEvent, partition: 7), and so is a message for (stream: AdClickEvent, partition: 7), which message should the SamzaContainer process next?

It turns out, the order to process messages can be determined based on any number of strategies:

Let's look at how different strategies can enable different use cases.

Use cases

Streams with different latency SLAs

Some Samza jobs consume two streams: one stream is fed by a real-time system and the other stream is fed by a batch system. A typical pattern is to have a Samza processor with a statistical model that is ranking a real-time feed of data. Periodically, this model needs to be retrained and updated. The Samza processor can be re-deployed with the new model, but how do you re-process all of the old data that the processor has already seen? This can be accomplished by having a batch system send messages to the Samza processor for any data that needs to be re-processed.

A concrete example of this is a Samza processor that is determining which country a user is coming from when they sign up for a website. This processor receives "please classify user" messages, and emits "user belongs to country X" message.

Initially, the processor might simply use the user's IP address to determine which country they are located in. At some point in the future, the processor could be updated to use not only the IP address of the user, but also the user's language setting. This new strategy leads to much more accurate classification, and we now want to re-process all of the old users using the new model. How? A naive approach would be to re-process all of the old users in an offline system like Hadoop, and then push the computed data to the same store that the Samza processor is writing to. This is not desirable because it means that you have to duplicate your classification logic in two places. A better approach is to have the batch system simply re-send "please classify user" messages for all users that need to be re-classified.

The problem with the second approach is that it leads to a situation where the batch system can produce thousand or even millions of messages a second. We don't want to starve the real-time system, whose traffic is much higher priority, since the users have just signed up, and are probably actively browsing the website when the message is being processed. An optimal prioritization between these two streams would be to always process messages from the real-time system when they are available, and only process messages from the batch system when no real-time processing are available to process.

Bootstrapping state streams

Some Samza jobs wish to fully consume a stream from offset 0 all the way through to the last message in the stream before they process messages from any other stream. This is useful for streams that have some key-value data that a Samza job wishes to use when processing messages from another stream.

Consider a case where you want to read a currencyCode stream, which has mappings of country code (e.g. USD) to symbols (e.g. $), and is partitioned by country code. You might want to join these symbols to a stream called transactions which is also partitioned by currency, and has a schema like {"country": "USD", "amount": 1234}. You could then have your StreamTask join the currency symbol to each transaction message, and emit messages like {"amount": "$1234"}.

To bootstrap the currencyCode stream, you need to read it from offset 0 all the way to the last message in the stream (what I'm calling head). It is not desirable to read any message from the transactions stream until the currencyCode stream has been fully read, or else you might try to join a transaction message to a country code that hasn't yet been read.

Time-aligned streams

Some Samza jobs wish to keep their input streams as time-aligned as possible. This allows lets StreamTasks to have a smaller buffer when trying to joins or buffered sorts.

Imagine that a Samza job is reading from two streams: AdViews and AdClicks. It's joining AdViews and AdClicks to produce a click-through stream that defines the % of views that had a click. This Samza job would buffer all AdViews for some period of time (say 5 minutes). As AdClicks come in, the job would join the AdView with the AdClick, and emit a message like (ad-id: 1234, click: true). Any AdViews that are in the window for more than 5 minutes are assumed to have no click, and messages are sent with (ad-id: 2345, click: false).

The problem that this kind of job runs into is that it wants to keep the AdView events and AdClick events as closely aligned (based on the time the messages were sent) as possible, because it allows the job to shrink the buffering window, which frees up memory and disk space. If you can keep AdView and AdClick messages within 2 minutes of each other, you only need a 2 minute buffer. It doesn't make any sense to process and buffer an AdView event from two seconds ago if an AdClick from 1 minute ago can be processed and its AdView can be removed from the buffer (thereby freeing up space).


In Samza 0.7.0 we've introduced a class called MessageChooser. This is an object that is given up to one message at a time from each input stream/partition pair (when they're available). The chooser's job is to return the incoming message envelope that should be processed next.

In plain-spoken English, Samza says, "I have IncomingMessageEnvelopes from stream/partition X, and stream/partitionY, which should I process next?" It's the MessageChooser's job to answer this question.

The MessageChooser's interface currently looks like this:

interface MessageChooser {
   * Notify the chooser that a new envelope is available for a processing. A
   * MessageChooser will receive, at most, one outstanding envelope per
   * system/stream/partition combination. For example, if update is called for
   * partition 7 of kafka.mystream, then update will not be called with an
   * envelope from partition 7 of kafka.mystream until the previous envelope has
   * been returned via the choose method. Update will only be invoked after the
   * chooser has been started.
   * @param envelope
   *          An unprocessed envelope.
  void update(IncomingMessageEnvelope envelope);

   * The choose method is invoked when the SamzaContainer is ready to process a
   * new message. The chooser may elect to return any envelope that it's been
   * given via the update method, which hasn't yet been returned. Choose will
   * only be called after the chooser has been started.
   * @return The next envelope to process, or null if the chooser has no
   *         messages or doesn't want to process any at the moment.
  IncomingMessageEnvelope choose();

The manner in which MessageChooser is used is:

  1. SystemConsumers buffers messages from all SSPs as they become available.

  2. If MessageChooser has no messages for a given SSP, and SystemConsumers has a message in its buffer for the SSP, the MessageChooser will be updated once with the next message in the buffer.

  3. When SamzaContainer is ready to process another message, it calls SystemConsumers.choose, which in-turn calls MessageChooser.choose.

Since the MessageChooser only receives one message at a time per SSP, it can be used to order messages between different SSPs, but it can't be used to re-order messages within a single SSP (a buffered sort). This must be done within a StreamTask.

The contract between the MessageChooser and the SystemConsumers class is:


The MessageChooser is currently hard-wired to a default MessageChooser implementation, which uses a round-robin strategy to pick messages from all streams with outstanding messages. This implementation is insufficient.

Problems to think about:

  1. Do we accept that the use cases and strategies listed above are legitimate?
  2. Is the MessageChooser the appropriate way to support the use cases outlined above?

  3. Is the MessageChooser's interface correct to support the use cases outlined above?

  4. Can we implement a single default MessageChooser that solves all of the use cases listed above?

  5. Should the MessageChooser be pluggable?

The general feedback I've gotten is that question 4 is most important. I pretty much agree. The default implementation should:

  1. Allow preferring certain streams for the state bootstrap.
  2. Keep all streams of the same priority roughly in sync.

Such an implementation would allow us to support the QoS, bootstrapping, and time-alignment use cases.


First, some Axioms that I believe to be true:

  1. We can't support prioritization based on message content without the MessageChooser being pluggable.

  2. We can't support time-aligned prioritization without either a pluggable MessageChooser (that can introspect a message and extract the timestamp), a timestamp field in IncomingMessageEnvelope, or some equivalent way to get message stream-time.

  3. The MessageChooser must know if a stream is "caught up" or still being bootstrapped when making a decision about which message to process next.

  4. Once a stream is "caught up" (envelope.getOffset == offsetOfLastMessageInStream at least once), we can assume the stream has been fully "bootstrapped".

I believe this set of rules implies that we have to have a pluggable MessageChooser, unless we decide to add some concept of time to Samza.


To support bootstrapping, I propose that we change the SystemConsumers behavior slightly. Right now, SystemConsumers alternates between calling update and choose methods. We should change SystemConsumers to only call choose if the MessageChooser has been updated with at least one envelope from all streams that are not yet at head. For example, if a bootstrap stream with four partitions is not at head, then MessageChooser.choose will only be called if an envelope from at least on of the streams' four partitions has been given to the MessageChooser.

If we make this change, then a simple hard-coded priority chooser will allow us to bootstrap simply by prioritizing the bootstrap stream as higher priority than anything else (e.g. currencyCodes=1, transactions=0). If you do this, the chooser will always choose the bootstrap stream, and the SystemConsumers will guarantee that the chooser will always have a message from the bootstrap stream until it's at head.

The changes required to do this are:

  1. Add getLastOffset(String streamName, Partition partition): String (*) to SystemAdmin interface.

  2. Modify SamzaContainer to get SystemAdmin for input stream's system.

  3. Modify SamzaContainer to use SystemAdmins.getLastOffset to construct a Map[SystemStreamPartition, String], where value is last offset.

  4. Modify SystemConsumers to take a lastOffsets: Map[SystemStreamPartition, String] parameter in the constructor.

  5. Modify SystemConsumers to have a streamsYetToHitHead: Set[SystemStreamPartition] variable that is initialized to contain all input system stream partitions.

  6. Modify SystemConsumers so that chooser.choose is called only if streamsYetToHitHead.length == 0 or the chooser has been given a message from every stream that has yet to hit head.

  7. Modify SystemConsumers to remove chooser.choose.getSystemStreamPartition from streamsYetToHitHead whenever lastOffsets(envelope.getSystemStreamPartition).equals(envelope.getOffset)

  8. Modify SystemConsumers.register to remove SystemStreamPartition from streamsYetToHitHead if lastReadOffset.equals(lastOffsets(SystemStreamPartition))

  9. Add a getLastOffset(String streamName, Partition partition): String method to KafkaSystemAdmin

* Note, we might want to batch these offset requests to getLastOffset(Set[SystemStreamPartition] SystemStreamPartition): Map[SystemStreamPartition, String]


This is actually the easiest use case. Again, I think a simple priority chooser will take care of this. We don't even need to change the SystemConsumers behavior to support this use case. If we set the priority for real-time=1 and batch-stream=0, then real-time messages will always be processed before batch messages, provided that they're available. If we make the change to SystemConsumers outlined in the bootstrapping section, this will also guarantee that, when a processor is stopped and started, the real-time stream will be fully caught up to "head" before any batch messages are processed.

Time alignment

As mentioned in the axiom list above, I don't think we can support time-aligned streams without either adding a concept of time to Samza or allowing the MessageChooser to introspect into the message. I am abandoning supporting this feature in the default message chooser. Instead, I think we should just support pluggable MessageChoosers, and allow developers to implement their own time-aligned MessageChoosers.

A question worth asking is: can we make the default chooser support a poor-man's version of this? Maybe we can't align directly by stream-time, but other proxies are:


Offset is the only one we could support with no API changes. The problems with supporting offset-aligned streams are:

  1. Different stream systems might have different offset styles. How do you sort between two entirely different offset styles?
  2. Offsets are represented as strings. Kafka offsets are longs. Sorting strings that contains longs using lexicographical sort leads to "2" > "100".

Additionally, the behavior of a chooser that chooses messages by lowest offset is somewhat unintuitive. If you have a Kafka stream X with current offset 1 billion, and a new Kafka stream Y with offset 0, prioritizing by offset effectively means that you are always going to take messages from stream Y until its offset number catches up to stream X, which might never happen if stream X gets more messages/sec than stream Y.

Messages behind head

This approach would require changing the API to allow the MessageChooser to know how far behind a message was from head.

The behavior of this strategy is not desirable in cases where we have one very high throughput stream, and one very low throughput stream. In such a case, the low throughput stream might be 100 messages behind, which might be equivalent to two days in wall-clock time. The high throughput stream might be 1000 messages behind, which might be equivalent to 10 seconds behind, in wall-clock time. Using this strategy, the high throughput stream's message would be picked. This is counter intuitive, since we're trying to find a proxy for time alignment, and this approach is actually not aligning by time in this scenario.

Percent behind head

This approach would require changing the API to allow the MessageChooser to know (or derive) what percentage behind head a message is.

In cases where you have two input created at dramatically different points in time (for example a year ago, and a day ago), the percentage behind head is a misleading measurement. 50% behind a stream that was created a year ago means you have half a year's worth of messages to process. 90% behind a stream that was created a day ago means you have 21 hours of messages to process. In this scenario, this strategy would pick messages from the stream created yesterday, even though it's actually much closer to "now" in wall-clock time. This, again, is counter intuitive, since our goal is to find a proxy for time-alginment.

Maintain starting alignment

This approach would require changing the API to allow the MessageChooser to know how far a message was from head.

It appears that we can't come up with a good general-purpose proxy for time alignment. In the absence of such a strategy, the next best thing to do seems to be just to guarantee maintaining the alignment of the streams that existed before the Samza job started.

For example, take the case where there are two input streams at job-start time: one that's 100 messages behind, and the other that's 1000 messages behind. There are three possible states for the streams to be in with this example: both streams are behind their starting alignments (> 100 messages and > 1000 messages behind, respectively), one stream is behind and the other is ahead of its starting alignment (> 100 messages behind and < 1000 messages behind), or both streams are ahead of their starting alignment. The strategy for the MessageChooser then becomes:

  1. All behind: pick a message from the stream that is farthest behind its original alignment (in terms of number of messages).
  2. Partially behind: pick a message from the stream that is farthest behind its original alignment (in terms of number of messages).
  3. All ahead: pick a message from the stream that is closest to its original alignment (in terms of number of messages).

Choosing messages when all streams are ahead of their original alignments (3) is an interesting case. Taking our original example, if the 100-message-behind-stream is now 90 messages behind head, then it is 10 messages from its original alignment. If the 1000-messages-behind-stream is now 900 messages behind head, then it is 100 messages from its original alignment. The MessageChooser would pick the stream that is 10 messages ahead of its original alignment, because it is deemed to be "closest" to its original alignment.

I'm going to declare this strategy as out of scope for SAMZA-2, but SAMZA-67 is open to track it.


I think the default chooser should be a simple hard-coded priority chooser, which lets you say, "If you have messages from stream X, always read them before messages from stream Y." This can be implemented with a config:


The convention is task.chooser.priorities.<system>.<stream>=<priority>, where higher priority streams get processed between lower priority streams.

In cases where two streams have the same priority, we need to implement a strategy (either time-aligned, round robin, or some proxy for time-aligned). I haven't come up with an opinion on which strategy we should use yet.


I mentioned batching at the beginning of this document, but haven't mentioned it since. I think we can make the DefaultChooser batch simply by having an affinity to the last SSP it picked in cases where all envelopes have the same priority.

For example, if two streams, X and Y, are of the same priority, and each has an envelope available, the chooser would execute some tie breaking logic (time-aligned, round robin, etc) to choose the next envelope. The next time choose is called, if stream X and Y both have envelopes again, the chooser doesn't execute the same tie-breaking logic; it simply picks the envelope from the SSP that it picked last time. It can do this up to the batch size, at which point it can then re-execute the tie-breaking logic, and reset its batch counter.

The advantage of batching is that it will lead to smaller replay log messages (read X, read Y, read X, read Y vs. read 2 X, read 2 Y).

MessageChooser interface

I think the MessageChooser interface is fine as it is. Initially, I wanted to add register/start/stop methods to it.

The main motivation for start/stop is that it allows developers to setup a client that queries some outside service to make picking decisions. I'm not saying that this is advisable, but I know people will try and do it. Without stop, there's no way to shut down the client when the service stops. If we assume the service never stops, then this isn't a problem, but if there is a definite "end" to the processor (i.e. TaskCoordinator.shutdown), then the chooser needs a graceful shutdown.

The motivation for register is that there are situations where you want to initialize your data structures (or whatever) on startup before any messages are received. Letting the MessageChooser know which SystemStreamPartitions it's going to be receiving messages from just seems like a good idea.

I have since backed off on this idea since I can't come up with a good concrete example of why we need them, and all of the reference implementations we've written so far wouldn't need them.

Open questions

A (probably partial) list of open questions:

  1. How should we prioritize streams that are the same priority in the DefaultChooser?

  2. Should we add a concept of time to Samza?
  3. What is the behavior of aligning streams by messages behind high watermark, or percent behind high watermark?

Pluggable MessageChooser (last edited 2013-10-28 18:20:51 by ChrisRiccomini)