You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 102 Next »

Multiplication of dense matrices

  public static void main() throws IOException {
    HamaConfiguration conf = new HamaConfiguration();
    Matrix m1 = DenseMatrix.random(conf, 1000, 1000);
    Matrix m2 = DenseMatrix.random(conf, 1000, 1000);

    Matrix result = m1.mult(m2);
    // or multiplication using blocking algorithm
    Matrix result = m1.mult(m2, 100);
 
  }

Multiplication example of file (dense) matrices on HDFS

Let's assume we have an 4,000 by 4,000 matrices A and B as a sequence file.

  K : !IntWritable
  V : MapWritable<IntWritable, DoubleWritable>.

It can be represented as below:

  0 : (0, 0.34), (1, 0.52), (2, 0.12), (3, 0.56) ...
  1 : (0, 0.74), (1, 0.25), (2, 0.44), (3, 0.12) ...
  ..
  3999 : (0, 0.24), (1, 0.48), (2, 0.32), (3, 0.46) ...

To mutliply two dense matrices A and B, We collect the blocks to 'collectionTable' firstly using map/reduce. Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) to avoid duplicated records. Each row has a two sub matrices of a(i, k) and b(k, j) so that minimized data movement and network cost. Finally, We multiply and sum sequentially.

Blocking jobs:

Collect the blocks to 'collectionTable' from A and B.

- A map task receives a row n as a key, and vector of each row as its value
 - emit (blockID, sub-vector)
- Reduce task merges block structures based on the information of blockID

Multiplication job:

- A map task receives a blockID n as a key, and two sub-matrices of A and B as its value
- Reduce task computes sum of blocks

See a full example code

public class FileMatrixBlockMult extends AbstractExample {
  final static Log LOG = LogFactory.getLog(FileMatrixBlockMult.class.getName());
  private static int BLOCKSIZE;
  private static int ROWS;
  private static int COLUMNS;

  /**
   * Collect blocks from sequence file,
   */
  public static class MyMapper extends CollectBlocksMapReduceBase implements
      CollectBlocksMap<IntWritable, MapWritable> {
    private MapWritable value;

    @Override
    public void map(IntWritable key, MapWritable value,
        OutputCollector<BlockID, MapWritable> output, Reporter reporter)
        throws IOException {
      int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
      this.value = value;
      
      do {
        startColumn = i * mBlockColSize;
        endColumn = startColumn + mBlockColSize - 1;
        if (endColumn >= mColumns) // the last sub vector
          endColumn = mColumns - 1;
        output.collect(new BlockID(blkRow, i), subVector(key.get(), startColumn, endColumn));

        i++;
      } while (endColumn < (mColumns - 1));
    }

    private MapWritable subVector(int row, int i0, int i1) {
      DenseVector res = new DenseVector();
      res.setRow(row);
      
      for (int i = i0; i <= i1; i++) {
        res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get());
      }

      return res.getEntries();
    }
  }

  /**
   * @param a the path of matrix A
   * @param b the path of matrix B
   * @return the result C
   * @throws IOException
   */
  private static DenseMatrix matMult(Path a, Path b) throws IOException {
    HamaConfiguration conf = new HamaConfiguration();
    Matrix collectionTable = new DenseMatrix(conf);

    collectBlocksFromFile(a, true, collectionTable.getPath(), conf);
    collectBlocksFromFile(b, false, collectionTable.getPath(), conf);

    DenseMatrix result = new DenseMatrix(conf);
    JobConf jobConf = new JobConf(conf);
    jobConf.setJobName("multiplication MR job : " + result.getPath());

    BlockMultiplyMap.initJob(collectionTable.getPath(), BlockMultiplyMap.class,
        BlockID.class, BlockWritable.class, jobConf);
    BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
        jobConf);

    JobManager.execute(jobConf, result);

    return result;
  }

  public static void main(String[] args) throws IOException {
    if (args.length < 5) {
      System.out
          .println("multfiles  [-m maps] [-r reduces] <seqfile1> <seqfile1> <blocks> <rows> <columns>");
      System.exit(-1);
    } else {
      parseArgs(args);
    }

    Path a = new Path(ARGS.get(0));
    Path b = new Path(ARGS.get(1));

    BLOCKSIZE = Integer.parseInt(ARGS.get(2));
    // You should know dimensions
    ROWS = Integer.parseInt(ARGS.get(3));
    COLUMNS = Integer.parseInt(ARGS.get(4));

    DenseMatrix result = matMult(a, b);
    System.out.println("result: " + result.getRows() + " by "
        + result.getColumns());
  }

  private static void collectBlocksFromFile(Path path, boolean b,
      String collectionTable, HamaConfiguration conf) throws IOException {
    JobConf jobConf = new JobConf(conf);
    jobConf.setJobName("Blocking MR job" + path);

    jobConf.setMapperClass(MyMapper.class);
    jobConf.setInputFormat(SequenceFileInputFormat.class);
    FileInputFormat.addInputPath(jobConf, path);

    MyMapper.initJob(collectionTable, b, BLOCKSIZE, ROWS, COLUMNS, jobConf);
    JobClient.runJob(jobConf);
  }
}

  • No labels