People generally request the ability to throttle Samza jobs in some way because they think it will help them:
- Prevent a high-volume stream from overwhelming/starving a low volume stream. A typical pattern here is having a realtime stream and a hadoop-based stream feeding into the same job.
- Prevent their Samza job from negatively impacting the other processes that are co-located on the same host as their job.
- Prevent their Samza job from hammering away on a remote database or web service. Samza jobs can process messages much faster than a remote DB or service might be able to process RPC requests. When a Samza job is triggering an RPC call to a remote web service to join data to events, for instance, developers want a way to limit processing to prevent overwhelming the web service.
For request 2, Samza leverages YARN's CGroup support. Currently, this only accounts for memory and CPU. As CGroups and YARN mature, we expect to support other resources as well.
For request 3, Samza currently doesn't have a framework-level answer. Developers can implement throttling themselves simply by Thread.sleep'ing in the StreamTask.process method. Still, it'd be nice to make life easier on developers and provide some framework-level throttling mechanism.
The methods of throttling that have been proposed thus far are:
Adding a TaskContext.maybeThrottle method. We could just steal Kafka's Throttler class.
Providing an out-of-the-box TaskLifecycleListener that implements throttling. Since the TaskLifecycleListener is per-SamzaContainer, and gets beforeProcess and afterProcess alerts, it could simply Thread.sleep to slow things down.
Throttle based on arbitrary metrics. Samza provides a MetricsRegistry that supports Counters and Gauges. This MetricsRegistry exposes a lot of metrics at the framework level (msgs/sec, bytes/sec, process invocations, windows, commits, heap usage, etc), and also allows developers to plug in arbitrary metrics from their StreamTask.
Instinctively, I like the third solution best, but I haven't given much thought to it. The advantages that I see to at are that it's very flexible, pretty intuitive, and requires no API change (the simpler the API, the better, IMO).
One confusing thing about implementing metrics-based throttling, though, is that each SamzaContainer actually has one MetricsRegistry per partition, and one for the SamzaContainer itself. The reason for this is that it allows a StreamTask to create a metric (e.g. RpcCalls), and still keep per-partition granularity (i.e. partition 0's StreamTask has a distinct RpcCalls count from partition 1's StreamTask). This is useful when you have per-partition issues (skew, for example), since you need per-partition metric granularity to detect which partition is having problems. This makes things a bit unintuitive for developers that want to throttle on a metric, though. Do you throttle on RpcCall's aggregate sum for all partitions in a SamzaContainer, or if any individual partition's RpcCalls exceeds the limit? What about throttling on metrics that are in the SamzaContainer itself (e.g. JVM metrics)? Should we handle throttling for Gauges, and if so, how?
Straw man proposal
I think we should throttle on the aggregate count of the metric in all MetricsRegistries. We can add a throttle method to the event loop in SamzaContainer. This throttle method can invoke Throttler.maybeThrottle. The Throttler class could do several things:
- Receive a group name, metric name, and threshold as its parameters.
- Hold on to any metric that's registered with the group/metric name.
- When maybeThrottle is called, sum all count values from all metrics that Throttler is holding. Throttle if the sum is greater than the threshold.
The nice thing about this approach is that it works for both container-level and task-level metrics, and it's intuitive for developers to use (I think). It also works for both counters and numeric-based gauges.
Developers can define throttling on metrics like this:
job.throttler.group=com.foo.MyTask job.throttler.metric=RpcCalls job.throttler.max.per.sec=3000
This example would throttle a single SamzaContainer's RPC call count to 3000/sec (I couldn't think of a good name for the max.per.sec config).
We need a good algorithm for smoothing out the throttling. Kafka's Throttler class would be a good place to look.