Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migration of unmigrated content due to installation of a new plugin
Table of Contents

This effort is still a "work in progress". Please feel free to add comments.

But please make the content less visible by using smaller fonts. – Edward J. Yoon

Overview

This is intended to explain and to illustrate the concept of Hama. There are two main parts:

  • How to store the matrices?
  • How to perform matrix operations using MapReduce?

Introduction

Hama (Hadoop Matrix) is a distributed matrix computation package currently in incubation with Apache. It is a library of matrix operations for large-scale processing and development environments as well as a Map/Reduce framework for a large-scale numerical analysis and data mining, that need the intensive computation power of matrix inversion, e.g., linear regression, PCA, SVM and etc. It will be useful for many scientific applications, e.g., physics computations, linear algebra, computational fluid dynamics, statistics, graphic rendering and many more.

Block Diagram

http://wiki.apache.org/hama-data/attachments/Architecture/attachments/block.png

Implementation

User Interfaces

Storage Structure

Store Matrices & Graphs

The matrix or network structure that frequently changes should have flexible storage structure for easy update and indicies that point to the appropriate entry. Also, we need a model that uses the concept of column-iterative methods.

HBase is an open-source, distributed, column-oriented store modeled as a google bigtable. Hama uses Hbase to store the matrices and graphs which are represented mathematically.

Matrices are basically tables. They are ways of storing numbers and other things. Typical matrix has rows and columns which is often called a 2-way matrix because it has two dimensions. For example, you might have respondents-by-attitudes. Of course, you might collect the same data on the same people at 5 points in time. In that case, you either have 5 different 2-way matrices, or you could think of it as a 3-way matrix, that is respondent-by-attitude-by-time.

Just a thought, considering the depleted activity in HBase should we not explore ways to avoid HBase ? --Prasen
Hbase seems activity at this time, However Yes. We should think about it. --Edward

Structure Considerations

A lot of columns causes huge storage expense. So I propose that we store a piece of Vector to each cell.

No Format

DenseMatrix Table scheme:

       column:                               metadata:
===============================================================================
row1   column:startLocation  <sub-vector1>   metadata:subVectorInterval <1000>
       column:startLocation  <sub-vector2>   metadata:matrixType <DenseMatrix>
       column:startLocation  <sub-vector3>   ...
       ...
row2
...
No Format

SparseMatrix Table scheme:

       column:                               metadata:
===============================================================================
row1   column:column1  <entry1>   metadata:matrixType <SparseMatrix>
       column:column2  <entry2>   ...
       column:column3  <entry3>   
       ...
row2
...

Algorithms

The Map/Reduce is designed to distributed process large data set by connecting many commodity computers together so, If there's a inter-node communication, the elapsed run time will be slower with more nodes. Consequently, an "effective" algorithm should avoid large amounts of communication.

Basic Algorithms

Addition

Addition of multiple matrices

Multiplication

  • Iterative Approach
No Format

For i = 0 step 1 until N -1
  Job: Computes the ith row of C = Matrix-Vector multiplication

Iterative job:

- A map task receives a row n of B as a key, and vector of row as its value
 - Multiplying by all columns of ith row of A
- Reduce task find and add the ith product

1st
+             +   +             +
| a11 a12 a13 |   | a11 a21 a31 |
| ... ... ... | X | a12 a22 a32 |
| ... ... ... |   | a13 a23 a33 |
+             +   +             +

2nd
+             +   +             +
| ... ... ... |   | a11 a21 a31 |
| a21 a22 a23 | X | a12 a22 a32 |
| ... ... ... |   | a13 a23 a33 |
+             +   +             +
....

  • Blocking Algorithm Approach

To mutliply two dense matrices A and B, We collect the blocks to 'collectionTable' firstly using map/reduce. Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) to avoid duplicated records.

No Format

CollectionTable:

                            matrix A         matrix B
------------------------+-------------------------------             
block(0, 0)-0               block(0, 0)      block(0, 0)
block(0, 0)-1               block(0, 1)      block(1, 0)
block(0, 0)-2               block(0, 2)      block(2, 0)
...         N               ...
block(N-1, n-1)-(N^3-1)     block(N-1, N-1)  block(N-1, N-1)

Each row has a two sub matrices of a(i, k) and b(k, j) so that minimized data movement and network cost.

No Format

Blocking jobs:

Collect the blocks to 'collectionTable' from A and B.

- A map task receives a row n as a key, and vector of each row as its value
 - emit (blockID, sub-vector) pairs
- Reduce task merges block structures based on the information of blockID

Multiplication job:

- A map task receives a blockID n as a key, and two sub-matrices of A and B as its value
 - Multiply two sub-matrices: a[i][j] * b[j][k]
- Reduce task computes sum of blocks 
 - c[i][k] += multiplied blocks

Matrix Norm

  • Find the maximum absolute row sum of matrix

Matrix.Norm.One is that find the maximum absolute row sum of matrix. Comparatively, it's a good fit with MapReduce model because doesn't need iterative jobs or table/file JOIN operations.

No Format


                                         j=n
The maximum absolute row sum =   max   ( sum | a_{i,j} | ) 
                               1<=i<=n   j=1


- A map task receives a row n as a key, and vector of each row as its value
 - emit (row, the sum of the absolute value of each entries)
- Reduce task select the maximum one

NOTE: Matrix.infinity, Matrix.Maxvalue and Matrix.Frobenius are almost same with this.

Compute the transpose of matrix

The transpose of a matrix is another matrix in which the rows and columns have been reversed. The matrix must be square for this work.

No Format

+             +    +             +
| a11 a12 a13 |    | a11 a21 a31 |
| a21 a22 a23 | => | a12 a22 a32 |
| a31 a32 a33 |    | a13 a23 a33 |
+             +    +             +

- A map task receives a row n as a key, and vector of each row as its value
 - emit (Reversed index, the entry with the given index)
- Reduce task sets the reversed values

Compute the determinant of square matrix

Decomposition Algorithms

Cholesky Decomposition

Singular Value Decompostion

Components

Wiki Markup
Apache Hama, based on Bulk Synchronous Parallel model\[1\], comprises three major components: 

It is very similar with Hadoop architecture, only except the portion of communication and synchronization mechanisms.

In a normal usecase the user submits a so called "Job" which is a definition of how to run a computation. A job once submitted will have multiple tasks that are launched across the cluster.

BSPMaster

BSPMaster is responsible for the following:

  • Maintaining its own state.
  • Maintaining groom server status.
  • Maintaining supersteps and other counters in a cluster.
  • Maintaining jobs and tasks.
  • Scheduling Jobs and assigning tasks to groom servers
  • Distributing execution classes and configuration across groom servers.
  • Providing users with the cluster control interface (web and console based).

A BSP Master and multiple grooms are started by the script. Then, the bsp master starts up with a RPC server to which groom servers can dynamically register itself. Groom servers starts up with a BSPPeer instance - later, BSPPeer needs to be integrated with GroomServer - and a RPC proxy to contact the bsp master. After started, each groom periodically sends a heartbeat message that encloses its groom server status, including maximum task capacity, unused memory, and so on.

Each time the bsp master receives a heartbeat message, it brings up-to-date groom server status - the bsp master makes use of groom servers' status in order to effectively assign tasks to idle groom servers - and returns a heartbeat response that contains assigned tasks and others actions that a groom server has to do. For now, we have a FIFO job scheduler and very simple task assignment algorithms.

GroomServer

Wiki Markup
A [Groom Server|GroomServer] (shortly referred to as groom) is a process that manages life cycle of bsp tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and reports task statuses by means of periodical piggybacks with BSPMaster. Each groom is designed to run with HDFS or other distributed storages. Basically, a groom server and a data node should run on one physical node to get the best performance for data-locality. Note that in a massive parallel environment, the benefit of data locality is lost when large amount of virtual processes must be multiplexed onto physical processes\[2\].

Zookeeper

A Zookeeper is used to manage the efficient barrier synchronization of the BSPPeers. Later, it will also be used for the area of a fault tolerance system.

Communication and Synchronization Process

Each BSP task has a set of Outgoing Message Manager and Incoming Queue.

Outgoing Message Manager collects the message to be sent, serializes it, compresses it and puts it in a bundles. At barrier synchronization phase, each BSP task exchanges the bundles, deserializes it, decompresses it and puts it into the Incoming Queue.

System Diagram

Image Added

  1. BSPMaster starts up
  2. GroomServer starts up
  3. ZooKeeper cluster starts up
  4. GroomServer dynamically registers itself to BSPMaster
  5. GroomServer forks/ manages BSPPeer(s)
  6. BSPPeers communicate/ perform barrier synchronization through ZooKeeper cluster.

Reference

Wiki Markup
\[1\]. Valiant, Leslie G., A bridging model for parallel computation. 

Wiki Markup
\[2\]. David B. Skillicorn, Jonathan M. D. Hill, and W. F. [McColl]. Questions and Answers about BSP. Scientific Programming, 6(3):249-274, Fall 1997.

...