Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TaskStatus
  • Write documentation for new integrated systems.
TODO
  • Review checkpointing mechanism for new integrated systems.
TODO

Project Reports

Report Ending Week

...

June 22nd

Project description

Integration between Apache Samza and Amazon Kinesis

Review of Previous Actions

Discussion about current implementation (pros and cons) which is based on the Amazon Kinesis Client Library (KCL).

For this first phase of the project two approaches were explored:

  • An approach was based on creating a single KCL for each Yarn Container. Although this would use less resources and allow the creation of more containers within the same server, it also suffers from the fact that the ingested messages are mapped to specific tasks. 
  • The other approach allowed every task to have its own KCL. This means that each task could handle the number of shard readers the KCL decides it has to use. This is based on the number of available shards and workers registered through the KCL).

In any of these approaches, loosing messages was a possibility in the cases that are explained below.

Objectives

We needed to review the current implementation as there were specific cases in which pounds on higher scalability and robustness through the usage of KCL. Although this could lead to lost of messages in the Samza side.

Pros

  • The main advantage of the above explained approaches is that we can rely on the KCL capability of load-balancing, managing resharding, error handling, and request retrying.
  • The current implementation provides a KCL to each task. If any of the shards assigned to a specific container would be deleted or merged, then the task's KCLs would rebalance their work by coordinating through Amazon DynamoDB and know where to fetch the data from automatically. All this happens behinds the curtains, and Samza doesn't have to be bother with it.

Cons

  1. If a whole container goes down
    • The tasks assigned to the container will also go down.
    • The tasks' KCLs would coordinate among them and start consuming from the (temporary) unconsumed shards. This behaviour is great, but the problem is that the messages from the (temporary) unconsumed shards haven't been assigned to this still alive container. 
    • This in turn means that the tasks from that container will not process such messages.
    • Even if the dead container comes back, the tasks' KCLs will have already "read" messages from the unconsumed data, and will start from a later point, skipping the messages read by the tasks from the container that didn't go down.
  2. If a task goes down
    • Then the shards assigned to it through its KCL would be redirected to the any other KCL worker. This means that if we are lucky then another task (with its KCL) inside the same container would get it, and everything would just magically work.
    • If the shards assigned to the dead task are redirected to a different container, then the fail case is the same as if the whole container would have been down.
  3. If resharding happens (adding new Kinesis shards or deleting existing ones)
    • Then the KCLs within the tasks will take care of this. But again the messages might end up in containers where no-tasks were assigned to them.
Future Actions

My mentor and me decided that it is better to have a fully correct implementation first, rather than having an implementation with more options but that could present undesired behaviour.

Change the usage of the Kinesis Client library over the usage of a finer grain Kinesis access method. This is based on using the simple consumer request provided by Amazon. This would help us in two important aspects of the work:

  • The correctness of the implementation (really important one).
  • The ability to control checkpointing from the Samza side. We have to do checkpointing from the Kinesis side as well, but in case of failures, Samza should be able to handle them at least with the current implementation. Maybe at some point the checkpointing can also be given to the underlaying system.

I got the chance to discuss some of these ideas also with some people from Amazon, and this issue (the hard mapping between containers/tasks/partitions/messages) is something they are aware of the initial implementation.

Some ideas for a future improvement of this part is:

  • Creating a different hashing strategy from the incoming messages to the tasks. This has to be agnostic of the number of partitions, and only has to rely on the messages and on the current tasks being executing.
  • Thus, we would probably have to create a different ContainerModel (so we don't tight the partitions to the tasks) and a different MessageChooser (so we can redirect new messages to specific tasks)

I will start documenting the follow up JIRA issues that can follow this initial integration because the previous ideas fall out of the scope of this GSoC project.

Mentors Comments