Differences between revisions 107 and 108
Revision 107 as of 2013-03-08 23:25:05
Size: 26799
Comment: cleanup
Revision 108 as of 2013-07-13 00:37:59
Size: 26799
Editor: DaveBrosius
Comment: AudoBootstrap is auto_bootstrap
Deletions are marked like this. Additions are marked like this.
Line 84: Line 84:
To bootstrap a node, turn !AutoBootstrap on in the configuration file, and start it. To bootstrap a node, turn auto_bootstrap on in the configuration file, and start it.
Line 175: Line 175:
 1. (Recommended approach) Bring up the replacement node with a new IP address, Set initial token to `(failure node's token) - 1` and !AutoBootstrap set to true in cassandra.yaml. This will place the replacement node in front of the failure node. Then the bootstrap process begins. While this process runs, the node will not receive reads until finished. Once this process is finished on the replacement node, run `nodetool removetoken` once, supplying the token of the dead node, and `nodetool cleanup` on each node. You can obtain the dead node's token by running `nodetool ring` on any live node, unless there was some kind of outage, and the others came up but not the down one -- in that case, you can retrieve the token from the live nodes' system tables.  1. (Recommended approach) Bring up the replacement node with a new IP address, Set initial token to `(failure node's token) - 1` and auto_bootstrap set to true in cassandra.yaml. This will place the replacement node in front of the failure node. Then the bootstrap process begins. While this process runs, the node will not receive reads until finished. Once this process is finished on the replacement node, run `nodetool removetoken` once, supplying the token of the dead node, and `nodetool cleanup` on each node. You can obtain the dead node's token by running `nodetool ring` on any live node, unless there was some kind of outage, and the others came up but not the down one -- in that case, you can retrieve the token from the live nodes' system tables.


See CassandraHardware


See PerformanceTuning

Schema management

Server clocks should be synchronized with something like NTP. Otherwise, schema changes may be rejected as being obsolete.

See LiveSchemaUpdates [refers to functionality in 0.7]

Ring management

Each Cassandra server [node] is assigned a unique Token that determines what keys it is the first replica for. If you sort all nodes' Tokens, the Range of keys each is responsible for is (PreviousToken, MyToken], that is, from the previous token (exclusive) to the node's token (inclusive). The machine with the lowest Token gets both all keys less than that token, and all keys greater than the largest Token; this is called a "wrapping Range."

(Note that there is nothing special about being the "primary" replica, in the sense of being a point of failure.)

When the RandomPartitioner is used, Tokens are integers from 0 to 2**127. Keys are converted to this range by MD5 hashing for comparison with Tokens. (Thus, keys are always convertible to Tokens, but the reverse is not always true.)

Token selection

Using a strong hash function means RandomPartitioner keys will, on average, be evenly spread across the Token space, but you can still have imbalances if your Tokens do not divide up the range evenly, so you should specify InitialToken to your first nodes as i * (2**127 / N) for i = 0 .. N-1. In Cassandra 0.7, you should specify initial_token in cassandra.yaml.

With NetworkTopologyStrategy, you should calculate the tokens the nodes in each DC independently. Tokens still needed to be unique, so you can add 1 to the tokens in the 2nd DC, add 2 in the 3rd, and so on. Thus, for a 4-node cluster in 2 datacenters, you would have

node 1 = 0
node 2 = 85070591730234615865843651857942052864

node 3 = 1
node 4 = 85070591730234615865843651857942052865

If you happen to have the same number of nodes in each data center, you can also alternate data centers when assigning tokens:

[DC1] node 1 = 0
[DC2] node 2 = 42535295865117307932921825928971026432
[DC1] node 3 = 85070591730234615865843651857942052864
[DC2] node 4 = 127605887595351923798765477786913079296

With order preserving partitioners, your key distribution will be application-dependent. You should still take your best guess at specifying initial tokens (guided by sampling actual data, if possible), but you will be more dependent on active load balancing (see below) and/or adding new nodes to hot spots.

Once data is placed on the cluster, the partitioner may not be changed without wiping and starting over.


A Cassandra cluster always divides up the key space into ranges delimited by Tokens as described above, but additional replica placement is customizable via IReplicaPlacementStrategy in the configuration file. The standard strategies are

  • RackUnawareStrategy: replicas are always placed on the next (in increasing Token order) N-1 nodes along the ring

  • RackAwareStrategy: replica 2 is placed in the first node along the ring the belongs in another data center than the first; the remaining N-2 replicas, if any, are placed on the first nodes along the ring in the same rack as the first

Note that with RackAwareStrategy, succeeding nodes along the ring should alternate data centers to avoid hot spots. For instance, if you have nodes A, B, C, and D in increasing Token order, and instead of alternating you place A and B in DC1, and C and D in DC2, then nodes C and A will have disproportionately more data on them because they will be the replica destination for every Token range in the other data center.

  • The corollary to this is, if you want to start with a single DC and add another later, when you add the second DC you should add as many nodes as you have in the first rather than adding a node or two at a time gradually.

Replication factor is not really intended to be changed in a live cluster either, but increasing it is conceptually simple: update the replication_factor from the CLI (see below), then run repair against each node in your cluster so that all the new replicas that are supposed to have the data, actually do.

Until repair is finished, you have 3 options:

  • read at ConsistencyLevel.QUORUM or ALL (depending on your existing replication factor) to make sure that a replica that actually has the data is consulted

  • continue reading at lower CL, accepting that some requests will fail (usually only the first for a given query, if ReadRepair is enabled)

  • take downtime while repair runs

The same options apply to changing replication strategy.

Reducing replication factor is easily done and only requires running cleanup afterwards to remove extra replicas.

To update the replication factor on a live cluster, forget about cassandra.yaml. Rather you want to use cassandra-cli:

  • update keyspace Keyspace1 with strategy_options = {replication_factor:3};

Network topology

Besides datacenters, you can also tell Cassandra which nodes are in the same rack within a datacenter. Cassandra will use this to route both reads and data movement for Range changes to the nearest replicas. This is configured by a user-pluggable EndpointSnitch class in the configuration file.

EndpointSnitch is related to, but distinct from, replication strategy itself: RackAwareStrategy needs a properly configured Snitch to place replicas correctly, but even absent a Strategy that cares about datacenters, the rest of Cassandra will still be location-sensitive.

There is an example of a custom Snitch implementation in http://svn.apache.org/repos/asf/cassandra/tags/cassandra-0.6.1/contrib/property_snitch/.

Range changes


Adding new nodes is called "bootstrapping."

To bootstrap a node, turn auto_bootstrap on in the configuration file, and start it.

If you explicitly specify an InitialToken in the configuration, the new node will bootstrap to that position on the ring. Otherwise, it will pick a Token that will give it half the keys from the node with the most disk space used, that does not already have another node bootstrapping into its Range. If you wish to enable vnodes, do not set the InitialToken, but set the num_tokens parameter. 256 is the recommended setting.

Important things to note:

  1. You should wait long enough for all the nodes in your cluster to become aware of the bootstrapping node via gossip before starting another bootstrap. The new node will log "Bootstrapping" when this is safe, 2 minutes after starting. (90s to make sure it has accurate load information, and 30s waiting for other nodes to start sending it inserts happening in its to-be-assumed part of the token ring.)
  2. Relating to point 1, one can only bootstrap N nodes at a time with automatic non-vnode token picking, where N is the size of the existing cluster. If you need to more than double the size of your cluster, you have to wait for the first N nodes to finish until your cluster is size 2N before bootstrapping more nodes. So if your current cluster is 5 nodes and you want add 7 nodes, bootstrap 5 and let those finish before bootstrapping the last two.
  3. As a safety measure, Cassandra does not automatically remove data from nodes that "lose" part of their Token Range to a newly added node. Run nodetool cleanup on the source node(s) (neighboring nodes that shared the same subrange) when you are satisfied the new node is up and working. If you do not do this the old data will still be counted against the load on that node.

Cassandra is smart enough to transfer data from the nearest source node(s), if your EndpointSnitch is configured correctly. So, the new node doesn't need to be in the same datacenter as the primary replica for the Range it is bootstrapping into, as long as another replica is in the datacenter with the new one.

Bootstrap progress can be monitored using nodetool with the netstats argument.

During bootstrap nodetool may report that the new node is not receiving nor sending any streams, in which case is may be building secondary indexes, visible in compactionstats.

Moving or Removing nodes

Removing nodes entirely

You can take a node out of the cluster with nodetool decommission to a live node, or nodetool removetoken (to any other machine) to remove a dead one. This will assign the ranges the old node was responsible for to other nodes, and replicate the appropriate data there. If decommission is used, the data will stream from the decommissioned node. If removetoken is used, the data will stream from the remaining replicas.

No data is removed automatically from the node being decommissioned, so if you want to put the node back into service at a different token on the ring, it should be removed manually.

Moving nodes

nodetool move: move the target node to a given Token. Moving is both a convenience over and more efficient than decommission + bootstrap. After moving a node, nodetool cleanup should be run to remove any unnecessary data.

As with bootstrap, see Streaming for how to monitor progress.

Load balancing

If you add nodes to your cluster your ring will be unbalanced and only way to get perfect balance is to compute new tokens for every node and assign them to each node manually by using nodetool move command.

Here's a python program which can be used to calculate new tokens for the nodes. There's more info on the subject at Ben Black's presentation at Cassandra Summit 2010. http://www.datastax.com/blog/slides-and-videos-cassandra-summit-2010

  • def tokens(nodes):
    • for x in xrange(nodes):
      • print 2 ** 127 / nodes * x

In versions of Cassandra 0.7.* and lower, there's also nodetool loadbalance: essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.

The status of move and balancing operations can be monitored using nodetool with the netstat argument. (Cassandra 0.6.* and lower use the streams argument).

Replacing a Dead Node

Since Cassandra 1.0 we can replace a dead node with a new one using the property "cassandra.replace_token=<Token>", This property can be set using -D option while starting cassandra demon process.

(Note:This property will be taken into effect only when the node doesn't have any data in it, You might want to empty the data dir if you want to force the node replace.)

(Note:Also this property will not be taken into effect if the node is listed as seed node in local cassandra.yaml, even if node has no data it will join the ring immediately and start serving read/writes right away , a situation you want to avoid.)

You must use this property when replacing a dead node (If tried to replace an existing live node, the bootstrapping node will throw a Exception). The token used via this property must be part of the ring and the node have died due to various reasons.

Once this Property is enabled the node starts in a hibernate state, during which all the other nodes will see this node to be down. The new node will now start to bootstrap the data from the rest of the nodes in the cluster (Main difference between normal bootstrapping of a new node is that this new node will not accept any writes during this phase). Once the bootstrapping is complete the node will be marked "UP", we rely on the hinted handoff's for making this node consistent (Since we don't accept writes since the start of the bootstrap).

Note: We Strongly suggest to repair the node once the bootstrap is completed, because Hinted handoff is a "best effort and not a guarantee".


Cassandra allows clients to specify the desired consistency level on reads and writes. (See API.) If R + W > N, where R, W, and N are respectively the read replica count, the write replica count, and the replication factor, all client reads will see the most recent write. Otherwise, readers may see older versions, for periods of typically a few ms; this is called "eventual consistency." See http://www.allthingsdistributed.com/2008/12/eventually_consistent.html and http://queue.acm.org/detail.cfm?id=1466448 for more.

See below about consistent backups.

Repairing missing or inconsistent data

Cassandra repairs missing data in three ways:

  1. HintedHandoff: when a replica does not acknowledge an update, the coordinator will store the update and replay it later.

  2. ReadRepair: when a read is performed, Cassandra compares the versions at each replica (in the background, if a low consistency was requested by the reader to minimize latency), and the newest version is sent to any out-of-date replicas. By default, Cassandra does this with 10% of all requests; this can be configured per-columnfamily with read_repair_chance and dclocal_read_repair_chance.

  3. AntiEntropy: when nodetool repair is run, Cassandra computes a Merkle tree for each range of data on that node, and compares it with the versions on other replicas, to catch any out of sync data that hasn't been read recently. This is intended to be run gradually (e.g., continuously over a period of about a week) since computing the Merkle tree is relatively expensive in disk i/o and CPU, since it scans ALL the data on the machine (but it is is very network efficient).

It is safe to run repair against multiple machines at the same time, but to minimize the impact on your application workload it is recommended to wait for it to complete on one node before invoking it against the next. Using the --partitioner-range ("partitioner range") option to repair will repair only the range assigned to each node by the partitioner -- i.e., not additional ranges added by the replication strategy. This is recommended for continuous repair, since it does no redundant work when performed against different nodes.

Repair shares the compaction_throughput_mb_per_sec limit with compaction -- that is, repair's scanning and compaction together will not exceed that limit. This keeps repair from negatively impacting your application workload. If you need to get a repair done quickly, you can still minimize the impact on your cluster by using the --with-snapshot option, which will cause repair to take a snapshot and then have pairs of replicas compare merkle trees at a time. Thus, if you have three replicas, you will always leave one completely unencumbered by repair.

Frequency of nodetool repair

Unless your application performs no deletes, it is strongly recommended that production clusters run nodetool repair periodically on all nodes in the cluster. The hard requirement for repair frequency is the value used for GCGraceSeconds (see DistributedDeletes). Running nodetool repair often enough to guarantee that all nodes have performed a repair in a given period GCGraceSeconds long, ensures that deletes are not "forgotten" in the cluster.

*IF* your operations team is sufficiently on the ball, you can get by without repair as long as you do not have hardware failure -- in that case, HintedHandoff is adequate to repair successful updates that some replicas have missed. Hinted handoff is active for max_hint_window_in_ms after a replica fails.

Full repair or re-bootstrap is necessary to re-replicate data lost to hardware failure (see below).

Dealing with the consequences of nodetool repair not running within GCGraceSeconds

If nodetool repair has not been run often enough to the point that GCGraceSeconds has passed, you risk forgotten deletes (see DistributedDeletes). In addition to data popping up that has been deleted, you may see inconsistencies in data return from different nodes that will not self-heal by read-repair or further nodetool repair. Some further details on this latter effect is documented in CASSANDRA-1316.

There are at least three ways to deal with this scenario.

  1. Treat the node in question as failed, and replace it as described further below.
  2. To minimize the amount of forgotten deletes, first increase GCGraceSeconds for all Column Families via the CLI or your client, perform a full repair on all nodes, and then change GCRaceSeconds back again. This has the advantage of ensuring tombstones spread as much as possible, minimizing the amount of data that may "pop back up" (forgotten delete).
  3. Yet another option, that will result in more forgotten deletes than the previous suggestion but is easier to do, is to ensure 'nodetool repair' has been run on all nodes, and then perform a compaction to expire toombstones. Following this, read-repair and regular nodetool repair should cause the cluster to converge.

Handling failure

If a node fails and subsequently recovers, the ordinary repair mechanisms will be adequate to deal with any inconsistent data. Remember though that if a node misses updates and is not repaired for longer than your configured GCGraceSeconds (default: 10 days), it could have missed remove operations permanently. Unless your application performs no removes, you should wipe its data directory, re-bootstrap it, and removetoken its old entry in the ring (see below).

If a node goes down entirely, then you have two options:

  1. (Recommended approach) Bring up the replacement node with a new IP address, Set initial token to (failure node's token) - 1 and auto_bootstrap set to true in cassandra.yaml. This will place the replacement node in front of the failure node. Then the bootstrap process begins. While this process runs, the node will not receive reads until finished. Once this process is finished on the replacement node, run nodetool removetoken once, supplying the token of the dead node, and nodetool cleanup on each node. You can obtain the dead node's token by running nodetool ring on any live node, unless there was some kind of outage, and the others came up but not the down one -- in that case, you can retrieve the token from the live nodes' system tables.

  2. (Alternative approach) Bring up a replacement node with the same IP and token as the old, and run nodetool repair. Until the repair process is complete, clients reading only from this node may get no data back. Using a higher ConsistencyLevel on reads will avoid this.

The reason why you run nodetool cleanup on all live nodes is to remove old Hinted Handoff writes stored for the dead node.

Backing up data

Cassandra can snapshot data while online using nodetool snapshot. You can then back up those snapshots using any desired system, although leaving them where they are is probably the option that makes the most sense on large clusters. nodetool snapshot triggers a node-wide flush, so all data written before the execution of the snapshot command is contained within the snapshot.

With some combinations of operating system/jvm you may receive an error related to the inability to create a process during the snapshotting, such as this on Linux

Exception in thread "main" java.io.IOException: Cannot run program "ln": java.io.IOException: error=12, Cannot allocate memory

This is caused by the operating system trying to allocate the child "ln" process a memory space as large as the parent process (the cassandra server), even though it's not going to use it. So if you have a machine with 8GB of RAM and no swap, and you gave 6GB to the cassandra server, it will fail during this because the operating system wants 12 GB of virtual memory before allowing you to create the process.

This error can be worked around by either :

  • dropping the jna.jar file into Cassandra's lib folder (requires at least Cassandra 0.6.6)


  • creating a swap file, snapshotting, removing swap file


  • turning on "memory overcommit"

To restore a snapshot:

  1. shut down the node
  2. clear out the old commitlog and sstables
  3. move the sstables from the snapshot location to the live data directory.

Consistent backups

You can get an eventually consistent backup by snapshotting all node; no individual node's backup is guaranteed to be consistent but if you restore from that snapshot then clients will get eventually consistent behavior as usual.

There is no such thing as a consistent view of the data in the strict sense, except in the trivial case of writes with consistency level = ALL.

Import / export

As an alternative to taking snapshots it's possible to export SSTables to JSON format using the bin/sstable2json command:

Usage: sstable2json <sstable> [-k key [-k key [...]]]

bin/sstable2json accepts as a required argument the full path to an SSTable data file, (files ending in -Data.db). You can optionally pass the names of specific keys using the -k argument to limit what is exported. Output goes to stdout.

Note: If you are not running the exporter on in-place SSTables, there are a couple of things to keep in mind.

  1. The corresponding configuration must be present (same as it would be to run a node).
  2. SSTables are expected to be in a directory named for the keyspace (same as they would be on a production node).

JSON exported SSTables can be "imported" to create new SSTables using bin/json2sstable:

Usage: json2sstable -K keyspace -c column_family <json> <sstable>

bin/json2sstable takes arguments for keyspace and column family names, and full paths for the JSON input file and the destination SSTable file name.

You can also import pre-serialized rows of data using the BinaryMemtable interface. This is useful for importing via Hadoop or another source where you want to do some preprocessing of the data to import.

NOTE: Starting with version 0.7, json2sstable and sstable2json must be run in such a way that the schema can be loaded from system tables. This means that cassandra.yaml must be found in the classpath and refer to valid storage directories.


Running nodetool cfstats can provide an overview of each Column Family, and important metrics to graph your cluster. Cassandra also exposes internal metrics as JMX data. This is a common standard in the JVM world; OpenNMS, Nagios, and Munin at least offer some level of JMX support. For a non-stupid JMX plugin for Munin check out https://github.com/tcurdt/jmx2munin The specifics of the JMX Interface are documented at JmxInterface.

Some folks prefer having to deal with non-jmx clients, there is a JMX-to-REST bridge available at http://code.google.com/p/polarrose-jmx-rest-bridge/ Bridging to SNMP is a bit more work but can be done with https://github.com/tcurdt/jmx2snmp

Important metrics to watch on a per-Column Family basis would be: Read Count, Read Latency, Write Count and Write Latency. Pending Tasks tell you if things are backing up. These metrics can also be exposed using any JMX client such as jconsole. (See also http://simplygenius.com/2010/08/jconsole-via-socks-ssh-tunnel.html for how to proxy JConsole to firewalled machines.)

You can also use jconsole, and the MBeans tab to look at PendingTasks for thread pools. If you see one particular thread backing up, this can give you an indication of a problem. One example would be ROW-MUTATION-STAGE indicating that write requests are arriving faster than they can be handled. A more subtle example is the FLUSH stages: if these start backing up, cassandra is accepting writes into memory fast enough, but the sort-and-write-to-disk stages are falling behind.

If you are seeing a lot of tasks being built up, your hardware or configuration tuning is probably the bottleneck.

Running nodetool tpstats will dump all of those threads to console if you don't want to use jconsole. Example:

Pool Name                    Active   Pending      Completed   Blocked  All time blocked
ReadStage                         0         0              0         0                 0
RequestResponseStage              0         0              0         0                 0
MutationStage                     0         0          12024         0                 0
ReadRepairStage                   0         0              0         0                 0
ReplicateOnWriteStage             0         0              0         0                 0
GossipStage                       0         0              0         0                 0
AntiEntropyStage                  0         0              0         0                 0
MigrationStage                    0         0              0         0                 0
MemtablePostFlusher               0         0              2         0                 0
StreamStage                       0         0              0         0                 0
FlushWriter                       0         0              2         0                 0
MiscStage                         0         0              0         0                 0
AntiEntropySessions               0         0              2         0                 0
InternalResponseStage             0         0              0         0                 0
HintedHandoff                     0         0              0         0                 0

Message type           Dropped
RANGE_SLICE                  0
READ_REPAIR                  0
BINARY                       0
READ                         0
MUTATION                     0
REQUEST_RESPONSE             0

Monitoring with MX4J

mx4j provides an HTML and HTTP interface to JMX. Starting from version 0.7.0 cassandra lets you hook up mx4j very easily. To enable mx4j on a Cassandra node:

  • Download mx4j-tools.jar from http://mx4j.sourceforge.net/

  • Add mx4j-tools.jar to the classpath (e.g. under lib/)
  • Start cassandra
  • In the log you should see a message such as
    • HttpAdaptor version 3.0.2 started on port 8081
  • To choose a different port (8081 is the default) or a different listen address ( is not the default) edit conf/cassandra-env.sh and uncomment #MX4J_ADDRESS="-Dmx4jaddress=" and #MX4J_PORT="-Dmx4jport=8081"

Now browse to http://cassandra:8081/ and use the HTML interface.

If you want XML then add &template=identity to the end of any URL, e.g. http://cassandra:8081/?&template=identity

For more details see https://issues.apache.org/jira/browse/CASSANDRA-1068

Operations (last edited 2016-08-15 15:43:57 by JonathanEllis)