バージョン0.6以降のCassandraは、保持しているデータに対するHadoopのジョブ実行をサポートしています。https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ にサンプルがあります(Hadoopジョブの出力をCassandraに入力するのももちろん可能です)。Cassandraの行または行フラグメント(キーと、カラムのSortedMapのペア)がMapタスクの入力となります。それは各行からどのカラムをフェッチするかを決めるための SlicePredicate で指定されます。word_countサンプルでは1つのカラムを各行から選択していますが、その部分の処理は以下のようになっています。

            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
            SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
            ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Javaのコードを記述するのではなくPigのDSLでジョブを実行出来るようにCassandraはLoadFuncを提供します。https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/ にあります。

Cassandraのsplitはロケーションアウェアです(これはHadoopのInputSplitの設計の本質でもあります)。CassandraはデータのsplitとともにロケーションのリストもHadoopに渡します。Hadoopはデータに一番近いインスタンスでジョブを実行するようにスケジューリングします。ということはHadoopインスタンスをCassandraのマシンそれぞれに載せるのがよいでしょう。

0.6.2/0.7より前のリリースでは、ジョブを失敗させる原因となるリソースのリークが確認されています(コネクションが正常にリリースされないことによるリソースリーク)。環境の設定によりますが、この問題に遭遇することがあります。回避策としては、プロセスがオープン可能なファイルディスクリプタの上限を上げることです(linux/bashではulimit -n 32000とする、等)。エラーはHadoopのジョブ側にThriftのTimedOutExceptionとして報告されます。

1つのノードに対してHadoop連携テストを行っていて何らかのエラーが発生した場合、それは正常なことかもしれません。1台のマシンに対して負荷を与えすぎるとタイムアウトエラーが発生します。その場合、同時実行するタスクの数を減らすことで回避することが可能です。

             Configuration conf = job.getConfiguration(); 
             conf.setInt("mapred.tasktracker.map.tasks.maximum",1); 

また、Cassandraからバッチで読み込む行数を減らすには以下のようにします。

             ConfigHelper.setRangeBatchSize(job.getConfiguration(), 1000);

stats

HadoopSupport_JP (last edited 2013-11-14 20:00:32 by 107)