conf/README.txt Required configuration files ============================
cassandra.yaml: main Cassandra configuration file log4j-server.proprties: log4j configuration file for Cassandra server
Optional configuration files ============================
access.properties: used for authorization passwd.properties: used for authentication cassandra-rack.properties: used by PropertyFileSnitch
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.
calculate_heap_sizes() {
case "uname" in
- Linux)
system_memory_in_mb=free -m | awk '/Mem:/ {print $2}' system_cpu_cores=egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo break
system_memory_in_bytes=sysctl hw.physmem | awk '{print $2}' system_memory_in_mb=expr $system_memory_in_bytes / 1024 / 1024 system_cpu_cores=sysctl hw.ncpu | awk '{print $2}' break
system_memory_in_mb=prtconf | awk '/Memory size:/ {print $3}' system_cpu_cores=psrinfo | wc -l break
- )
- # assume reasonable defaults for e.g. a modern desktop or # cheap server system_memory_in_mb="2048" system_cpu_cores="2"
max_heap_size_in_mb=expr $system_memory_in_mb / 2 MAX_HEAP_SIZE="${max_heap_size_in_mb}M" # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) max_sensible_yg_per_core_in_mb="100"
max_sensible_yg_in_mb=expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores
desired_yg_in_mb=expr $max_heap_size_in_mb / 4 if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] then
- HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
- HEAP_NEWSIZE="${desired_yg_in_mb}M"
- Linux)
}
# Override these to set the amount of memory to allocate to the JVM at # start-up. For production use you almost certainly want to adjust # this for your environment. MAX_HEAP_SIZE is the total amount of # memory dedicated to the Java heap; HEAP_NEWSIZE refers to the size # of the young generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should # be either set or not (if you set one, set the other). # # The main trade-off for the young generation is that the larger it # is, the longer GC pause times will be. The shorter it is, the more # expensive GC will be (usually). # # The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause # times. If in doubt, and if you do not particularly want to tweak, go with # 100 MB per physical CPU core.
#MAX_HEAP_SIZE="4G" #HEAP_NEWSIZE="800M"
if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
- calculate_heap_sizes
else
if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then
- echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" exit 1
fi
# Specifies the default port over which Cassandra will be available for # JMX connections. JMX_PORT="7199"
# Here we create the arguments that will get passed to the jvm when # starting cassandra.
# enable assertions. disabling this in production will give a modest # performance benefit (around 5%). JVM_OPTS="$JVM_OPTS -ea"
# add the jamm javaagent check_openjdk="${JAVA:-java}" -version 2>&1 | awk '{if (NR == 2) {print $1}}' if [ "$check_openjdk" != "OpenJDK" ] then
- JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
fi
# enable thread priorities, primarily so we can give periodic tasks # a lower priority to avoid interfering with client workload JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities" # allows lowering thread priority without being root. see # http://tech.stolsvik.com/2010/01/linux-java-thread-priorities-workaround.html JVM_OPTS="$JVM_OPTS -XX:ThreadPriorityPolicy=42"
# min and max heap sizes should be set to the same value to avoid # stop-the-world GC pauses during resize, and so that we can lock the # heap in memory on startup to prevent any of it from being swapped # out. JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" JVM_OPTS="$JVM_OPTS -XX:+HeapDumpOnOutOfMemoryError"
if [ "uname" = "Linux" ] ; then
- # reduce the per-thread stack size to minimize the impact of Thrift # thread-per-client. (Best practice is for client connections to # be pooled anyway.) Only do so on Linux where it is known to be # supported. JVM_OPTS="$JVM_OPTS -Xss128k"
fi
# GC tuning options JVM_OPTS="$JVM_OPTS -XX:+UseParNewGC" JVM_OPTS="$JVM_OPTS -XX:+UseConcMarkSweepGC" JVM_OPTS="$JVM_OPTS -XX:+CMSParallelRemarkEnabled" JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8" JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1" JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75" JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
# GC logging options -- uncomment to enable # JVM_OPTS="$JVM_OPTS -XX:+PrintGCDetails" # JVM_OPTS="$JVM_OPTS -XX:+PrintGCTimeStamps" # JVM_OPTS="$JVM_OPTS -XX:+PrintClassHistogram" # JVM_OPTS="$JVM_OPTS -XX:+PrintTenuringDistribution" # JVM_OPTS="$JVM_OPTS -XX:+PrintGCApplicationStoppedTime" # JVM_OPTS="$JVM_OPTS -Xloggc:/var/log/cassandra/gc-`date +%s`.log"
# uncomment to have Cassandra JVM listen for remote debuggers/profilers on port 1414 # JVM_OPTS="$JVM_OPTS -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1414"
# Prefer binding to IPv4 network intefaces (when net.ipv6.bindv6only=1). See # http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6342561 (short version: # comment out this entry to enable IPv6 support). JVM_OPTS="$JVM_OPTS -Djava.net.preferIPv4Stack=true"
# jmx: metrics and administration interface # # add this if you're having trouble connecting: # JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>" # # see # http://blogs.sun.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole # for more on configuring JMX through firewalls, etc. (Short version: # get it working with no firewall first.) JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
/**
- Cassandra has a back door called the Binary Memtable. The purpose of this backdoor is to
- mass import large amounts of data, without using the Thrift interface.
- Inserting data through the binary memtable, allows you to skip the commit log overhead, and an ack
- from Thrift on every insert. The example below utilizes Hadoop to generate all the data necessary
- to send to Cassandra, and sends it using the Binary Memtable interface. What Hadoop ends up doing is
- creating the actual data that gets put into an SSTable as if you were using Thrift. With enough Hadoop nodes
- inserting the data, the bottleneck at this point should become the network.
- We recommend adjusting the compaction threshold to 0, while the import is running. After the import, you need
to run nodeprobe -host <IP> flush_binary <Keyspace> on every node, as this will flush the remaining data still left
- in memory to disk. Then it's recommended to adjust the compaction threshold to it's original value.
The example below is a sample Hadoop job, and it inserts SuperColumns. It can be tweaked to work with normal Columns.
You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key>
- in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the
reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes.
- For Cassandra 0.6.4, we modified this example to wait for acks from all Cassandra nodes for each row
- before proceeding to the next. This means to keep Cassandra similarly busy you can either
- 1) add more reducer tasks,
- 2) remove the "wait for acks" block of code,
- 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
- THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
- /
package org.apache.cassandra.bulkloader;
import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List;
import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.clock.TimestampReconciler; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.BigIntegerToken; import org.apache.cassandra.io.util.DataOutputBuffer; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*;
public class CassandraBulkLoader {
public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
- private Text word = new Text();
public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- // This is a simple key/value mapper. output.collect(key, value);
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
- private Path[] localFiles;
private ArrayList<String> tokens = new ArrayList<String>(); private JobConf jobconf;
public void configure(JobConf job) {
- this.jobconf = job; String cassConfig; // Get the cached files try {
localFiles = DistributedCache.getLocalCacheFiles(job);
throw new RuntimeException(e);
StorageService.instance.initClient();
throw new RuntimeException(e);
- Thread.sleep(10*1000);
catch (InterruptedException e) {
throw new RuntimeException(e);
- try {
- // release the cache
DistributedCache.releaseCache(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), this.jobconf);
throw new RuntimeException(e);
throw new RuntimeException(e);
- // Sleep just in case the number of keys we send over is small Thread.sleep(3*1000);
catch (InterruptedException e) {
throw new RuntimeException(e);
StorageService.instance.stopClient();
- // release the cache
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
ColumnFamily columnFamily; String keyspace = "Keyspace1"; String cfName = "Super1"; Message message; List<ColumnFamily> columnFamilies; columnFamilies = new LinkedList<ColumnFamily>(); String line;
columnFamily = ColumnFamily.create(keyspace, cfName); while (values.hasNext()) {
- // Split the value (line based on your own delimiter) line = values.next().toString(); String[] fields = line.split("\1");
String SuperColumnName = fields[1]; String ColumnName = fields[2]; String ColumnValue = fields[3]; int timestamp = 0; columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), new TimestampClock(timestamp));
message = createMessage(keyspace, key.getBytes(), cfName, columnFamilies); List<IAsyncResult> results = new ArrayList<IAsyncResult>(); for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes())) {
results.add(MessagingService.instance.sendRR(message, endpoint));
for (IAsyncResult result : results) {
- try {
result.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
catch (TimeoutException e) {
- // you should probably add retry logic here
throw new RuntimeException(e);
- // Split the value (line based on your own delimiter) line = values.next().toString(); String[] fields = line.split("\1");
- this.jobconf = job; String cassConfig; // Get the cached files try {
JobConf conf = new JobConf(CassandraBulkLoader.class);
if(args.length >= 4) {
- conf.setNumReduceTasks(new Integer(args[3]));
- // We store the cassandra storage-conf.xml on the HDFS cluster
DistributedCache.addCacheFile(new URI("/cassandra/storage-conf.xml#storage-conf.xml"), conf);
throw new RuntimeException(e);
conf.setInputFormat(KeyValueTextInputFormat.class); conf.setJobName("CassandraBulkLoader_v2"); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(args[1])); FileOutputFormat.setOutputPath(conf, new Path(args[2])); try {
JobClient.runJob(conf);
throw new RuntimeException(e);
public static Message createMessage(String Keyspace, byte[] Key, String CFName, List<ColumnFamily> ColumnFamiles) {
ColumnFamily baseColumnFamily; DataOutputBuffer bufOut = new DataOutputBuffer(); RowMutation rm; Message message; Column column;
baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
ClockType.Timestamp, DatabaseDescriptor.getComparator(Keyspace, CFName), DatabaseDescriptor.getSubComparator(Keyspace, CFName), TimestampReconciler.instance, CFMetaData.getId(Keyspace, CFName));
for(ColumnFamily cf : ColumnFamiles) {
- bufOut.reset();
ColumnFamily.serializer().serializeWithIndexes(cf, bufOut); byte[] data = new byte[bufOut.getLength()]; System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());
column = new Column(FBUtilities.toByteArray(cf.id()), data, new TimestampClock(0)); baseColumnFamily.addColumn(column);
rm = new RowMutation(Keyspace, Key); rm.add(baseColumnFamily); try {
message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
throw new RuntimeException(e);
- runJob(args);
- private Text word = new Text();
}
#!/bin/bash # # /etc/init.d/cassandra # # Startup script for Cassandra # # chkconfig: 2345 20 80 # description: Starts and stops Cassandra
. /etc/rc.d/init.d/functions
export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0/ export CASSANDRA_HOME=/usr/share/cassandra/ export CASSANDRA_INCLUDE=/usr/share/cassandra/cassandra.in.sh export CASSANDRA_CONF=/etc/cassandra/conf export CASSANDRA_OWNR=cassandra log_file=/var/log/cassandra/cassandra.log pid_file=/var/run/cassandra/cassandra.pid CASSANDRA_PROG=/usr/sbin/cassandra
case "$1" in
- start)
- # Cassandra startup echo -n "Starting Cassandra: "
su $CASSANDRA_OWNR -c "$CASSANDRA_PROG -p $pid_file" > $log_file 2>&1 echo "OK" ;;
- # Cassandra shutdown echo -n "Shutdown Cassandra: "
su $CASSANDRA_OWNR -c "kill cat $pid_file" echo "OK" ;;
- $0 stop $0 start ;;
- ;;
- # Cassandra startup echo -n "Starting Cassandra: "
- )
echo "Usage: basename $0 start|stop|restart|reload" exit 1
esac
exit 0