General
Configuration file is parsed by DatabaseDescriptor (which also has all the default values, if any)
Thrift generates an API interface in Cassandra.java; the implementation is CassandraServer, and CassandraDaemon ties it together.
CassandraServer turns thrift requests into the internal equivalents, then StorageProxy does the actual work, then CassandraServer turns it back into thrift again
StorageService is kind of the internal counterpart to CassandraDaemon. It handles turning raw gossip into the right internal state.
AbstractReplicationStrategy controls what nodes get secondary, tertiary, etc. replicas of each key range. Primary replica is always determined by the token ring (in TokenMetadata) but you can do a lot of variation with the others. RackUnaware just puts replicas on the next N-1 nodes in the ring. RackAware puts the first non-primary replica in the next node in the ring in ANOTHER data center than the primary; then the remaining replicas in the same as the primary.
MessagingService handles connection pooling and running internal commands on the appropriate stage (basically, a threaded executorservice). Stages are set up in StageManager; currently there are read, write, and stream stages. (Streaming is for when one node copies large sections of its SSTables to another, for bootstrap or relocation on the ring.) The internal commands are defined in StorageService; look for registerVerbHandlers.
Configuration for the node (administrative stuff, such as which directories to store data in, as well as global configuration, such as which global partitioner to use) is held by DatabaseDescriptor. Per-KS, per-CF, and per-Column metadata are all stored as migrations across the database and can be updated by calls to system_update/add_* thrift calls, or can be changed locally and temporarily at runtime. See ConfigurationNotes.
Write path
StorageProxy gets the nodes responsible for replicas of the keys from the ReplicationStrategy, then sends RowMutation messages to them.
If nodes are changing position on the ring, "pending ranges" are associated with their destinations in TokenMetadata and these are also written to.
If nodes that should accept the write are down, but the remaining nodes can fulfill the requested ConsistencyLevel, the writes for the down nodes will be sent to another node instead, with a header (a "hint") saying that data associated with that key should be sent to the replica node when it comes back up. This is called HintedHandoff and reduces the "eventual" in "eventual consistency." Note that HintedHandoff is only an optimization; ArchitectureAntiEntropy is responsible for restoring consistency more completely.
on the destination node, RowMutationVerbHandler uses Table.Apply to hand the write first to CommitLog.java, then to the Memtable for the appropriate ColumnFamily.
When a Memtable is full, it gets sorted and written out as an SSTable asynchronously by ColumnFamilyStore.switchMemtable
When enough SSTables exist, they are merged by CompactionManager.doCompaction
- Making this concurrency-safe without blocking writes or reads while we remove the old SSTables from the list and add the new one is tricky, because naive approaches require waiting for all readers of the old sstables to finish before deleting them (since we can't know if they have actually started opening the file yet; if they have not and we delete the file first, they will error out). The approach we have settled on is to not actually delete old SSTables synchronously; instead we register a phantom reference with the garbage collector, so when no references to the SSTable exist it will be deleted. (We also write a compaction marker to the file system so if the server is restarted before that happens, we clean out the old SSTables at startup time.)
A "major" compaction of merging _all_ sstables may be manually initiated by the user; this results in submitMajor calling doCompaction with all the sstables in the ColumnFamily, rather than just sstables of similar size.
See ArchitectureSSTable and ArchitectureCommitLog for more details
Read path
StorageProxy gets the endpoints (nodes) responsible for replicas of the keys from the ReplicationStrategy as a function of the row key (the key of the row being read)
This may be a SliceFromReadCommand, a SliceByNamesReadCommand, or a RangeSliceReadCommand, depending
StorageProxy filters the endpoints to contain only those that are currently up/alive
StorageProxy then sorts, by asking the endpoint snitch, the responsible nodes by "proximity".
- The definition of "proximity" is up to the endpoint snitch
With a SimpleSnitch, proximity directly corresponds to proximity on the token ring.
With implementations based on AbstractNetworkTopologySnitch (such as PropertyFileSnitch), endpoints that are in the same rack are always considered "closer" than those that are not. Failing that, endpoints in the same data center are always considered "closer" than those that are not.
The DynamicSnitch, typically enabled in the configuration, wraps whatever underlying snitch (such as SimpleSnitch and NetworkTopologySnitch) so as to dynamically adjust the perceived "closeness" of endpoints based on their recent performance. This is in an effort to try to avoid routing traffic to endpoints that are slow to respond.
- The definition of "proximity" is up to the endpoint snitch
StorageProxy then arranges for messages to be sent to nodes as required:
- The closest node (as determined by proximity sorting as described above) will be sent a command to perform an actual data read (i.e., return data to the co-ordinating node).
- As required by consistency level, additional nodes may be sent digest commands, asking them to perform the read locally but send back the digest only.
For example, at replication factor 3 a read at consistency level QUORUM would require one digest read in additional to the data read sent to the closest node. (See ReadCallback, instantiated by StorageProxy)
If read repair is enabled (probabilistically if read repair chance is somewhere between 0% and 100%), remaining nodes responsible for the row will be sent messages to compute the digest of the response. (Again, see ReadCallback, instantiated by StorageProxy)
On the data node, ReadVerbHandler gets the data from CFS.getColumnFamily or CFS.getRangeSlice and sends it back as a ReadResponse
- The row is located by doing a binary search on the index in SSTableReader.getPosition
For single-row requests, we use a QueryFilter subclass to pick the data from the Memtable and SSTables that we are looking for. The Memtable read is straightforward. The SSTable read is a little different depending on which kind of request it is:
- If we are reading a slice of columns, we use the row-level column index to find where to start reading, and deserialize block-at-a-time (where "block" is the group of columns covered by a single index entry) so we can handle the "reversed" case without reading vast amounts into memory
- If we are reading a group of columns by name, we still use the column index to locate each column, but first we check the row-level bloom filter to see if we need to do anything at all
- The column readers provide an Iterator interface, so the filter can easily stop when it's done, without reading more columns than necessary
Since we need to potentially merge columns from multiple SSTable versions, the reader iterators are combined through a ReducingIterator, which takes an iterator of uncombined columns as input, and yields combined versions as output
In addition:
At any point if a message is destined for the local node, the appropriate piece of work (data read or digest read) is directly submitted to the appropriate local stage (see StageManager) rather than going through messaging over the network.
- The fact that a data read is only submitted to the closest replica is intended as an optimization to avoid sending excessive amounts of data over the network. A digest read will take the full cost of a read internally on the node (CPU and in particular disk), but will avoid taxing the network.
Deletes
Gossip
based on "Efficient reconciliation and flow control for anti-entropy protocols:" http://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
See ArchitectureGossip for more details
Failure detection
based on "The Phi accrual failure detector:" http://vsedach.googlepages.com/HDY04.pdf
Further reading
The idea of dividing work into "stages" with separate thread pools comes from the famous SEDA paper: http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf
Crash-only design is another broadly applied principle. Valerie Henson's LWN article is a good introduction
Cassandra's distribution is closely related to the one presented in Amazon's Dynamo paper. Read repair, adjustable consistency levels, hinted handoff, and other concepts are discussed there. This is required background material: http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html. The related article on article on eventual consistency is also relevant. Jeff Darcy's article on Availability and Partition Tolerance explains the underlying principle of CAP better than most.
Cassandra's on-disk storage model is loosely based on sections 5.3 and 5.4 of the Bigtable paper.
Facebook's Cassandra team authored a paper on Cassandra for LADIS 09: http://www.cs.cornell.edu/projects/ladis2009/papers/lakshman-ladis2009.pdf. Most of the information there is applicable to Apache Cassandra (the main exception is the integration of ZooKeeper).