Multiplication of Hama matrices
public static void main() throws IOException { HamaConfiguration conf = new HamaConfiguration(); Matrix m1 = DenseMatrix.random(conf, 1000, 1000); Matrix m2 = DenseMatrix.random(conf, 1000, 1000); Matrix result = m1.mult(m2); }
Multiplication example of file matrices on HDFS
Let's assume we have an 4 by 4 matrices A and B as a sequence file.
K : !IntWritable V : !MapWritable<!IntWritable, !DoubleWritable>. It can be represented as below: 0 : (0, 0.34), (1, 0.52), (2, 0.12), (3, 0.56) 1 : (0, 0.74), (1, 0.25), (2, 0.44), (3, 0.12) .. 3 : (0, 0.24), (1, 0.48), (2, 0.32), (3, 0.46)
We collect the blocks (sub-matrix) to 'collectionTable' firstly using MyMapper. It used to minimize data movement and network cost.
- Map task takes <Row, <Column, Entry>>
- Emit <BlockID, SubVector> along through iterations
- Emit <BlockID, SubVector> along through iterations
- Reduce task gets (BlockID, SubVector*)
- Merge vectors into a Block
- Emit (BlockID, Block)
[http://lh5.ggpht.com/_DBxyBGtfa3g/SXAixuOid_I/AAAAAAAAAr0/w-_KhIMSOC0/s800/mat-mult.PNG]
Finally, We multiply and sum sequentially using BlockMultiplyMap/Reduce.
See a full example code
public class FileMatrixBlockMult extends AbstractExample { final static Log LOG = LogFactory.getLog(FileMatrixBlockMult.class.getName()); private static int BLOCKSIZE; private static int ROWS; private static int COLUMNS; /** * Collect blocks from sequence file, */ public static class MyMapper extends CollectBlocksMapReduceBase implements CollectBlocksMap<IntWritable, MapWritable> { private MapWritable value; @Override public void map(IntWritable key, MapWritable value, OutputCollector<BlockID, MapWritable> output, Reporter reporter) throws IOException { int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0; this.value = value; do { startColumn = i * mBlockColSize; endColumn = startColumn + mBlockColSize - 1; if (endColumn >= mColumns) // the last sub vector endColumn = mColumns - 1; output.collect(new BlockID(blkRow, i), subVector(key.get(), startColumn, endColumn)); i++; } while (endColumn < (mColumns - 1)); } private MapWritable subVector(int row, int i0, int i1) { DenseVector res = new DenseVector(); res.setRow(row); for (int i = i0; i <= i1; i++) { res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get()); } return res.getEntries(); } } /** * @param a the path of matrix A * @param b the path of matrix B * @return the result C * @throws IOException */ private static DenseMatrix matMult(Path a, Path b) throws IOException { HamaConfiguration conf = new HamaConfiguration(); Matrix collectionTable = new DenseMatrix(conf); collectBlocksFromFile(a, true, collectionTable.getPath(), conf); collectBlocksFromFile(b, false, collectionTable.getPath(), conf); DenseMatrix result = new DenseMatrix(conf); JobConf jobConf = new JobConf(conf); jobConf.setJobName("multiplication MR job : " + result.getPath()); BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class, BlockID.class, BlockWritable.class, jobConf); BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); return result; } public static void main(String[] args) throws IOException { if (args.length < 5) { System.out .println("multfiles [-m maps] [-r reduces] <seqfile1> <seqfile1> <blocks> <rows> <columns>"); System.exit(-1); } else { parseArgs(args); } Path a = new Path(ARGS.get(0)); Path b = new Path(ARGS.get(1)); BLOCKSIZE = Integer.parseInt(ARGS.get(2)); // You should know dimensions ROWS = Integer.parseInt(ARGS.get(3)); COLUMNS = Integer.parseInt(ARGS.get(4)); DenseMatrix result = matMult(a, b); System.out.println("result: " + result.getRows() + " by " + result.getColumns()); } private static void collectBlocksFromFile(Path path, boolean b, String collectionTable, HamaConfiguration conf) throws IOException { JobConf jobConf = new JobConf(conf); jobConf.setJobName("Blocking MR job" + path); jobConf.setMapperClass(MyMapper.class); jobConf.setInputFormat(SequenceFileInputFormat.class); FileInputFormat.addInputPath(jobConf, path); MyMapper.initJob(collectionTable, b, BLOCKSIZE, ROWS, COLUMNS, jobConf); JobClient.runJob(jobConf); } }