You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 88 Next »

Multiplication of matrices

  public static void main() throws IOException {
    HamaConfiguration conf = new HamaConfiguration();
    Matrix m1 = DenseMatrix.random(conf, 1000, 1000);
    Matrix m2 = DenseMatrix.random(conf, 1000, 1000);

    Matrix result = m1.mult(m2);
    // or multiplication using blocking algorithm
    Matrix result = m1.mult(m2, 100);
 
  }

Multiplication of file matrices

 public static void main(String[] args) throws IOException {
    collectBlocksFromFile(path[0], true, collectionTable, conf); // path of matrix A
    collectBlocksFromFile(path[1], false, collectionTable, conf); // path of matrix B 

    Matrix result = new DenseMatrix(conf, 4, 4);
    Job job = new Job(conf, "multiplication MR job : " + result.getPath());

    Scan scan = new Scan();
    scan.addFamily(Bytes.toBytes(Constants.BLOCK));

    TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
        BlockMultMap.class, BlockID.class, BytesWritable.class, job);
    TableMapReduceUtil.initTableReducerJob(result.getPath(),
        BlockMultReduce.class, job);

    try {
      job.waitForCompletion(true);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
    }
  }

  private static void collectBlocksFromFile(Path path, boolean b,
      String collectionTable, HamaConfiguration conf) throws IOException {
    Job job = new Job(conf, "Blocking MR job" + path);
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(BlockID.class);
    job.setMapOutputValueClass(MapWritable.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.addInputPath(job, path);

    job.getConfiguration().set(MyMapper.BLOCK_SIZE, String.valueOf(2));
    job.getConfiguration().set(MyMapper.ROWS, String.valueOf(4));
    job.getConfiguration().set(MyMapper.COLUMNS, String.valueOf(4));
    job.getConfiguration().setBoolean(MyMapper.MATRIX_POS, b);
    
    TableMapReduceUtil.initTableReducerJob(collectionTable,
        org.apache.hama.mapreduce.CollectBlocksReducer.class, job);

    try {
      job.waitForCompletion(true);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
    }
  }

  public static class MyMapper extends
      Mapper<IntWritable, MapWritable, BlockID, MapWritable> implements
      Configurable {
    private Configuration conf = null;
    /** Parameter of the path of the matrix to be blocked * */
    public static final String BLOCK_SIZE = "hama.blocking.size";
    public static final String ROWS = "hama.blocking.rows";
    public static final String COLUMNS = "hama.blocking.columns";
    public static final String MATRIX_POS = "a.or.b";

    private int mBlockNum;
    private int mBlockRowSize;
    private int mBlockColSize;
    private int mRows;
    private int mColumns;

    public void map(IntWritable key, MapWritable value, Context context)
        throws IOException, InterruptedException {
      int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
      DenseVector dv = new DenseVector(key.get(), value);

      do {
        startColumn = i * mBlockColSize;
        endColumn = startColumn + mBlockColSize - 1;
        if (endColumn >= mColumns) // the last sub vector
          endColumn = mColumns - 1;
        context.write(new BlockID(blkRow, i), dv.subVector(startColumn,
            endColumn).getEntries());

        i++;
      } while (endColumn < (mColumns - 1));
    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;

      mBlockNum = conf.getInt(BLOCK_SIZE, 2);
      mRows = conf.getInt(ROWS, 4);
      mColumns = conf.getInt(COLUMNS, 4);

      mBlockRowSize = mRows / mBlockNum;
      mBlockColSize = mColumns / mBlockNum;
    }
  }
  • No labels