Multiplication of dense matrices
public static void main() throws IOException { HamaConfiguration conf = new HamaConfiguration(); Matrix m1 = DenseMatrix.random(conf, 1000, 1000); Matrix m2 = DenseMatrix.random(conf, 1000, 1000); Matrix result = m1.mult(m2); // or multiplication using blocking algorithm Matrix result = m1.mult(m2, 100); }
Multiplication example of file (dense) matrices on HDFS
Let's assume we have an 4,000 by 4,000 matrices A and B as a sequence file.
K : !IntWritable V : !MapWritable<IntWritable, DoubleWritable>. It can be represented as below: 0 : (0, 0.34), (1, 0.52), (2, 0.12), (3, 0.56) ... 1 : (0, 0.74), (1, 0.25), (2, 0.44), (3, 0.12) ... .. 3999 : (0, 0.24), (1, 0.48), (2, 0.32), (3, 0.46) ...
To mutliply two dense matrices A and B, We collect the blocks to 'collectionTable' firstly using map/reduce. Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) to avoid duplicated records. Each row has a two sub matrices of a(i, k) and b(k, j) so that minimized data movement and network cost. Finally, We multiply and sum sequentially.
Blocking jobs: Collect the blocks to 'collectionTable' from A and B. - A map task receives a row n as a key, and vector of each row as its value - emit (blockID, sub-vector) - Reduce task merges block structures based on the information of blockID Multiplication job: - A map task receives a blockID n as a key, and two sub-matrices of A and B as its value - Reduce task computes sum of blocks
See a full example code
public class FileMatrixBlockMult extends AbstractExample { final static Log LOG = LogFactory.getLog(FileMatrixBlockMult.class.getName()); private static int BLOCKSIZE; private static int ROWS; private static int COLUMNS; /** * Collect blocks from sequence file, */ public static class MyMapper extends CollectBlocksMapReduceBase implements CollectBlocksMap<IntWritable, MapWritable> { private MapWritable value; @Override public void map(IntWritable key, MapWritable value, OutputCollector<BlockID, MapWritable> output, Reporter reporter) throws IOException { int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0; this.value = value; do { startColumn = i * mBlockColSize; endColumn = startColumn + mBlockColSize - 1; if (endColumn >= mColumns) // the last sub vector endColumn = mColumns - 1; output.collect(new BlockID(blkRow, i), subVector(key.get(), startColumn, endColumn)); i++; } while (endColumn < (mColumns - 1)); } private MapWritable subVector(int row, int i0, int i1) { DenseVector res = new DenseVector(); res.setRow(row); for (int i = i0; i <= i1; i++) { res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get()); } return res.getEntries(); } } /** * @param a the path of matrix A * @param b the path of matrix B * @return the result C * @throws IOException */ private static DenseMatrix matMult(Path a, Path b) throws IOException { HamaConfiguration conf = new HamaConfiguration(); Matrix collectionTable = new DenseMatrix(conf); collectBlocksFromFile(a, true, collectionTable.getPath(), conf); collectBlocksFromFile(b, false, collectionTable.getPath(), conf); DenseMatrix result = new DenseMatrix(conf); JobConf jobConf = new JobConf(conf); jobConf.setJobName("multiplication MR job : " + result.getPath()); BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class, BlockID.class, BlockWritable.class, jobConf); BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class, jobConf); JobManager.execute(jobConf, result); return result; } public static void main(String[] args) throws IOException { if (args.length < 5) { System.out .println("multfiles [-m maps] [-r reduces] <seqfile1> <seqfile1> <blocks> <rows> <columns>"); System.exit(-1); } else { parseArgs(args); } Path a = new Path(ARGS.get(0)); Path b = new Path(ARGS.get(1)); BLOCKSIZE = Integer.parseInt(ARGS.get(2)); // You should know dimensions ROWS = Integer.parseInt(ARGS.get(3)); COLUMNS = Integer.parseInt(ARGS.get(4)); DenseMatrix result = matMult(a, b); System.out.println("result: " + result.getRows() + " by " + result.getColumns()); } private static void collectBlocksFromFile(Path path, boolean b, String collectionTable, HamaConfiguration conf) throws IOException { JobConf jobConf = new JobConf(conf); jobConf.setJobName("Blocking MR job" + path); jobConf.setMapperClass(MyMapper.class); jobConf.setInputFormat(SequenceFileInputFormat.class); FileInputFormat.addInputPath(jobConf, path); MyMapper.initJob(collectionTable, b, BLOCKSIZE, ROWS, COLUMNS, jobConf); JobClient.runJob(jobConf); } }