Getting Scribe to write to Cassandra can be very easy by just implementing a custom scribe server.
Here's a super simple example written in java and using Hector. It uses a column family according to the LogEntry's category, and puts the payload directly into a column name "scribePayload".
1 class ScribeToCassandraService implements com.facebook.scribe.scribe.Iface {
2
3 private static final String CLUSTER_NAME = "myCluster";
4 private static final String HOST_PORT = "localhost:9160";
5 private static final String KEYSPACE = "keyspace1";
6 private static final String COLUMN = "scribePayload";
7 ...
8 public ResultCode Log(final List<LogEntry> logEntries) throws TException {
9
10 Cluster cluster = HFactory.getOrCreateCluster(CLUSTER_NAME, HOST_PORT);
11 Mutator<UUID> mutator = HFactory.createMutator(
12 HFactory.createKeyspace(KEYSPACE, cluster),
13 UUIDSerializer.get());
14
15 for (LogEntry logEntry : logEntries) {
16 try{
17 String payload = logEntry.getMessage();
18 String category = msg.getCategory();
19
20 HColumn<String,String> column = HFactory.createColumn(
21 COLUMN,
22 payload,
23 StringSerializer.get(),
24 StringSerializer.get());
25
26 UUID uuid = UUID.fromString(UUIDGenerator.getInstance().generateTimeBasedUUID().toString());
27 mutator.addInsertion(uuid, category, column);
28
29 }catch(Exception je){
30 // example catch code
31 LOG.error("invalid LogEntry\n"
32 + "category=" + logEntry.getCategory()
33 + "\nmessage is\n" + logEntry.getMessage() + "\n",
34 je);
35 }
36 }
37 mutator.execute();
38 return ResultCode.OK;
39 }
40 ...
41 }
If you are running an embedded cassandra you can directly use the StorageProxy for faster performance,
1 // replace above lines 11-13 with
2 List<RowMutation> mutations = new ArrayList<RowMutation>();
3
4 // replace above lines 20-27 with
5 UUID uuid = UUID.fromString(UUIDGenerator.getInstance().generateTimeBasedUUID().toString());
6 RowMutation change = new RowMutation(keyspaceName, UUIDSerializer.get().toByteBuffer(uuid));
7 ColumnPath cp = new ColumnPath(category).setColumn(COLUMN.getBytes());
8 change.add(new QueryPath(cp), ByteBuffer.wrap(payloadBytes), HFactory.createClock());
9 mutations.add(change);
10
11 // replace above line 37 with
12 StorageProxy.mutate(mutations, ConsistencyLevel.ONE);