Multiplication of matrices
public static void main() throws IOException { HamaConfiguration conf = new HamaConfiguration(); Matrix m1 = DenseMatrix.random(conf, 1000, 1000); Matrix m2 = DenseMatrix.random(conf, 1000, 1000); Matrix result = m1.mult(m2); // or multiplication using blocking algorithm Matrix result = m1.mult(m2, 100); }
Multiplication of file matrices
public static void main(String[] args) throws IOException { collectBlocksFromFile(path[0], true, collectionTable, conf); // path of matrix A collectBlocksFromFile(path[1], false, collectionTable, conf); // path of matrix B Matrix result = new DenseMatrix(conf, 4, 4); Job job = new Job(conf, "multiplication MR job : " + result.getPath()); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(Constants.BLOCK)); TableMapReduceUtil.initTableMapperJob(collectionTable, scan, BlockMultMap.class, BlockID.class, BytesWritable.class, job); TableMapReduceUtil.initTableReducerJob(result.getPath(), BlockMultReduce.class, job); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } private static void collectBlocksFromFile(Path path, boolean b, String collectionTable, HamaConfiguration conf) throws IOException { Job job = new Job(conf, "Blocking MR job" + path); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(BlockID.class); job.setMapOutputValueClass(MapWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.addInputPath(job, path); job.getConfiguration().set(MyMapper.BLOCK_SIZE, String.valueOf(2)); job.getConfiguration().set(MyMapper.ROWS, String.valueOf(4)); job.getConfiguration().set(MyMapper.COLUMNS, String.valueOf(4)); job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b); TableMapReduceUtil.initTableReducerJob(collectionTable, org.apache.hama.mapreduce.CollectBlocksReducer.class, job); try { job.waitForCompletion(true); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public static class MyMapper extends Mapper<IntWritable, MapWritable, BlockID, MapWritable> implements Configurable { private Configuration conf = null; /** Parameter of the path of the matrix to be blocked * */ public static final String BLOCK_SIZE = "hama.blocking.size"; public static final String ROWS = "hama.blocking.rows"; public static final String COLUMNS = "hama.blocking.columns"; public static final String MATRIX_POS = "a.or.b"; private int mBlockNum; private int mBlockRowSize; private int mBlockColSize; private int mRows; private int mColumns; public void map(IntWritable key, MapWritable value, Context context) throws IOException, InterruptedException { int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0; DenseVector dv = new DenseVector(key.get(), value); do { startColumn = i * mBlockColSize; endColumn = startColumn + mBlockColSize - 1; if (endColumn >= mColumns) // the last sub vector endColumn = mColumns - 1; context.write(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries()); i++; } while (endColumn < (mColumns - 1)); } @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; mBlockNum = conf.getInt(BLOCK_SIZE, 2); mRows = conf.getInt(ROWS, 4); mColumns = conf.getInt(COLUMNS, 4); mBlockRowSize = mRows / mBlockNum; mBlockColSize = mColumns / mBlockNum; } }