There are two main instances of streaming (post 0.7):
- Transfer - Occurs when a Source pushes SSTables for certain ranges to a Destination. Initiated and controlled by the Source.
- Request - Occurs when a Destination requests a set of ranges from a Source. Initiated and controlled by the Destination.
The following steps occur for Stream Transfers.
- Source has a list of ranges it must transfer to another node.
- Source copies the data in those ranges to sstable files in preparation for streaming. This is called anti-compaction (because compaction merges multiple sstable files into one, and this does the opposite).
Source builds a list of PendingFile's which contains information on each sstable to be transfered.
- Source starts streaming the first file from the list, followed by the log "Waiting for transfer to $some_node to complete". The header for the stream contains information on the streamed file for the Destination to interpret what to do with the incoming stream.
Destination receives the file writes it to disk and sends a FileStatus.
- On successful transfer the Source streams the next file until its done, on error it re-streams the same file.
- Destination compiles a list of ranges it needs from another node.
Destination sends a StreamRequestMessage to the Source node with the list of ranges.
Source prepares the SSTables for those ranges and creates the PendingFile's.
Source starts streaming the first file in the list. The header for the first stream contains info of the current stream and a list of the remaining PendingFile's that fall in the requested ranges.
- Destination receives the stream and writes it to disk, followed by the log message "Streaming added org.apache.cassandra.io.sstable.SSTableReader(path='/var/lib/cassandra/data/Keyspace1/Standard1-e-1-Data.db')".
- Destination then takes the lead and requests the remaining files one at a time. If an error occurs it re-requests the same file, if not continues with the next file until done.
- Source streams each of the requested files. The files are already anti-compacted, so it just streams them to the Destination.
Transfer (0.6 and below)
- Source starts with STREAM_INITIATE (Prepares the request ranges and sends a list of pending files)
- Destination acknowledges with STREAM_INITIATE_DONE (Adds to list of pending files per node)
- Source starts streaming the first file from the list of files it has prepared for that Destination node.
- Destination receives the file returns a Stream_Status. Here the order of the files is maintained and only one transfer from a node can happen.
- Source based on status, restreams file or streams the next file until complete.
Request (0.6 and below)
- Destination invokes STREAM_REQUEST (Compiles a set of ranges that it needs from the source)
- From this point on the steps in Transfer (0.6) are followed.
Note: Order is very important in 0.6 streaming and the source can only transfer one file at a time, and the destination can only receive one set of transfers from a source at any instant. Multiple streams would break the process.
Streaming in Cassandra between nodes is invoked in the following contexts:
Bootstrapping - During bootstrap the node requests ranges from other nodes. It invokes Stream Request.
Repair - The Anti-Entropy service performs repairs by comparing Merkle trees and the final step in the process is to transfer conflicting ranges to other nodes and request conflicting changes from other nodes in that order. So both Request and Transfer are invoked here.
Restore Replica - StorageService performs a Stream Request.
Un-bootstrap - During node decommission or move, un-bootstrap is invoked to transfer the nodes ranges to other nodes. Transfer is invoked in this case.
Anti-compaction, both during request and transfer takes the most amount of time. It can be monitored using the org.apache.cassandra.db.CompactionManager mbean on the Source.
Monitoring the status of streaming on both Source and Destination nodes can be found under the org.apache.cassandra.streaming.StreamingService MBean. The Status attribute gives an easy indication of what a node is doing with respect to streaming. The operations getOutgoingFiles(host) and getIncomingFiles(host) each return a list of strings describing the status of individual files being streamed to and from a given host. Each string follows this format: [path to file] [bytes sent/received]/[file size] If you think that streaming is taking too long on your cluster, the first thing you should do is check StreamSources or StreamDestinations to figure out which hosts are streaming files. Use those hosts as inputs to getOutgoingFiles() or getIncomingFiles() to check on the status of individual files from the problematic source and destination nodes. Streaming is conducted in 32MB chunks, so you should refresh the file status after a few seconds to see if the sent/received values change. If they do not change, or change more slowly than you'd like, something is wrong.
The streaming status can also be monitored using nodetool -h <hostname/IP> -p <jmxport> netstats