(This page is still under construction)

Streaming 2.0

Streaming protocol is re-designed from ground up in Apache Cassandra 2.0. Here is the overview of the protocol design and implementation.

Design goal

  • Better control
    • One API for all (bootstrap, move, bulkload, repair...)
    • Sending/receiving data in the same session
  • Better performance
    • Pipelined stream
    • Persistent connection per host
  • Better reporting
    • Better logging/tracing
    • Event notification
    • More metrics

Highlight

Stream Plan

Unlike the previous version, which performs sending and receiving data separately from each other and from the operation, Streaming 2.0 groups the related stream sessions under the same "Stream Plan".

Stream Plan for repair, bootstrap, bulkload, etc.
 |- Stream session with Endpoint 1
      |- Stream receiving tasks
      |- Stream transfer tasks
 |
 |- Stream session with Endpoint 2
 .
 .
 .

File transfer and messages

Streaming message and file exchange are pipelined on the same, persistent tcp connection.

Stream event support

Finer grained event notification. With JMX notification support, even external client can listen on event.

API

Public APIs

  • StreamPlan
  • Builder for building streaming plan(what to transfer, what to request). Internally builds StreamSession}}s to interact with the other nodes and associates them with {{StreamResultFuture which asynchronously returns final StreamState.
  • StreamResultFuture
  • Represents future result of StreamPlan execution. You can attach StreamEventHandler to track the progress of streaming plan.
  • StreamState
  • State of streaming execution. You can get snapshot of in-progress streaming from StreamResultFuture#getCurrentState or final state as the return value of StreamResultFuture#get.
  • StreamManager
  • Manages all streaming progress
  • Provides various metrics through JMX including notification
  • StreamEventHandler
  • Listens on various stream events.

Basic API usage is as follows:

// Start building your streaming plan
StreamPlan bulkloadPlan = new StreamPlan("Bulkload");
// Add transfer files tasks for each destination
for (InetAddress remote : remoteTargets)
    bulkloadPlan.transferFiles(remote, ranges, sstables);
// Execute your plan
StreamResultFuture result = bulkloadPlan.execute();
try
{
    // ... and wait for streaming completes
    result.get();
    // all streaming success!
}
catch (Exception e)
{
    // some stream failed
}

Alternatively, StreamResultFuture implements guava's ListenableFuture%3CStreamState%3E, So you can use FutureCallback%3CStreamState%3E to capture stream success and failure.

Futures.addCallback(result, new FutureCallback<StreamState>()
{
    public void onSuccess(StreamState result)
    {
        // Yes, we did it!
    }

    public void onFailure(Throwable t)
    {
        // O_o something goes wrong
    }
});

You can add event listener to StreamResultFuture for stream events:

StreamResultFuture result = bulkloadPlan.execute();
result.addEventListener(new StreamEventHandler()
{
    public void handleEvent(StreamEvent event)
    {
        // streaming completed
    }
});

Internal APIs

  • StreamSession
  • Group of stream tasks (INs and/or OUTs) per *destination*
  • StreamTask
  • Represents each IN/OUT stream task
  • Each task MUST belong to one Stream session
  • StreamReceiveTask
  • execute method sends stream request to destination, wait for reply,
  • StreamTransferTask
  • ConnectionHandler
  • Receives/sends streaming messages.

Stream session

Stream session handles the streaming part of one of more SSTables to and from a specific remote node. Both this node and the remote one will create a similar symmetrical StreamSession. A streaming session has the following life-cycle:

  1. Connections Initialization
    (a) A node (the initiator in the following) creates a new StreamSession, initialize and then start. Starting will create ConnectionHandler that creates two connections to the remote node (the follower in the following) with whom to stream and send StreamInit message. The first connection will be the incoming connection and the second connection will be the outgoing for the initiator.
    (b) Upon reception of StreamInit message, the follower creates its own StreamSession, initialize it if it still does not exist, and attaches connecting socket to its ConnectionHandler.
    (c) When the both incoming and outgoing connections are established, StreamSession starts the streaming prepare phase.

2. Streaming preparation phase

(a) Sends a Prepare message that includes what files/sections this node will stream to the follower and what the follower needs to stream back. If the initiator has nothing to receive from the follower, it goes directly to streaming phase. Otherwise, it waits for the follower's Prepare message.

(b) Upon reception of the Prepare message, the follower records which files/sections it will receive and send back its own Prepare message with a summary of the files/sections that will be sent to the initiator. After having sent that message, the follower goes to streaming phase.

(c) When the initiator receives the follower's Prepare message, it records which files/sections it will receive and then goes to streaming phase.

3. Streaming phase

(a) Sequentially sends a File message. Each File message consists of a File message header that indicates which file is coming and then start streaming the content for that file. When all files are sent, the task is marked as complete.

(b) On the receiving side, an SSTable will be written for the incoming file and once the File message is fully received, the file will be marked as completed and sends back Received message. Once all files are received, those are added to the ColumnFamilyStore and secondary indexes are built, and the task is marked as completed.

(c) If an I/O error occurs during the streaming, the node will send Retry message of the file(up to max_streaming_retries, default 3). On receiving Retry message, the sender simply queue back new File message for that file.

(d) When all transfer and receive tasks for the session are complete, move to the Completion phase.

4. Completion phase

(a) When the node has finished all transfer and receive task, it sends Complete message. Stream session is considered complete when the node sends Complete message and also receives Complete message from the other side.

Events

StreamResultFuture emits StreamEvent at the following cases:

  • Stream session prepared(SESSION_PREPARED)
    Fired when stream session complete prepare receiving/sending files to tell event handler about number of files and total bytes receiving/sending.
  • Stream session complete(SESSION_COMPLETE)
    Fired when session complete.
  • Stream progress(FILE_PROGRESS)
    Fired when receiving/sending file progress.

To listen to StreamEvent, implement StreamEventHandler and register handler to StreamResultFuture.

JMX support

JMX support is provided through StreamingManager MBean. You can get list of streaming states of all currently running stream plans. It also provides JMX Notification support so that you can subscribe to stream events above through JMX interface.

https://c.statcounter.com/9397521/0/fe557aad/1/|stats

  • No labels