Table of Contents |
---|
This effort is still a "work in progress". Please feel free to add comments.
But please make the content less visible by using smaller fonts. – Edward J. Yoon
Overview
This is intended to explain and to illustrate the concept of Hama. There are two main parts:
- How to store the matrices?
- How to perform matrix operations using MapReduce?
Introduction
Hama (Hadoop Matrix) is a distributed matrix computation package currently in incubation with Apache. It is a library of matrix operations for large-scale processing and development environments as well as a Map/Reduce framework for a large-scale numerical analysis and data mining, that need the intensive computation power of matrix inversion, e.g., linear regression, PCA, SVM and etc. It will be useful for many scientific applications, e.g., physics computations, linear algebra, computational fluid dynamics, statistics, graphic rendering and many more.
Block Diagram
http://wiki.apache.org/hama-data/attachments/Architecture/attachments/block.png
Implementation
User Interfaces
Storage Structure
Store Matrices & Graphs
The matrix or network structure that frequently changes should have flexible storage structure for easy update and indicies that point to the appropriate entry. Also, we need a model that uses the concept of column-iterative methods.
HBase is an open-source, distributed, column-oriented store modeled as a google bigtable. Hama uses Hbase to store the matrices and graphs which are represented mathematically.
Matrices are basically tables. They are ways of storing numbers and other things. Typical matrix has rows and columns which is often called a 2-way matrix because it has two dimensions. For example, you might have respondents-by-attitudes. Of course, you might collect the same data on the same people at 5 points in time. In that case, you either have 5 different 2-way matrices, or you could think of it as a 3-way matrix, that is respondent-by-attitude-by-time.
– Just a thought, considering the depleted activity in HBase should we not explore ways to avoid HBase ? --Prasen
– Hbase seems activity at this time, However Yes. We should think about it. --Edward
Structure Considerations
A lot of columns causes huge storage expense. So I propose that we store a piece of Vector to each cell.
No Format |
---|
DenseMatrix Table scheme:
column: metadata:
===============================================================================
row1 column:startLocation <sub-vector1> metadata:subVectorInterval <1000>
column:startLocation <sub-vector2> metadata:matrixType <DenseMatrix>
column:startLocation <sub-vector3> ...
...
row2
...
|
No Format |
---|
SparseMatrix Table scheme:
column: metadata:
===============================================================================
row1 column:column1 <entry1> metadata:matrixType <SparseMatrix>
column:column2 <entry2> ...
column:column3 <entry3>
...
row2
...
|
Algorithms
The Map/Reduce is designed to distributed process large data set by connecting many commodity computers together so, If there's a inter-node communication, the elapsed run time will be slower with more nodes. Consequently, an "effective" algorithm should avoid large amounts of communication.
Basic Algorithms
Addition
Addition of multiple matrices
Multiplication
- Iterative Approach
No Format |
---|
For i = 0 step 1 until N -1
Job: Computes the ith row of C = Matrix-Vector multiplication
Iterative job:
- A map task receives a row n of B as a key, and vector of row as its value
- Multiplying by all columns of ith row of A
- Reduce task find and add the ith product
1st
+ + + +
| a11 a12 a13 | | a11 a21 a31 |
| ... ... ... | X | a12 a22 a32 |
| ... ... ... | | a13 a23 a33 |
+ + + +
2nd
+ + + +
| ... ... ... | | a11 a21 a31 |
| a21 a22 a23 | X | a12 a22 a32 |
| ... ... ... | | a13 a23 a33 |
+ + + +
....
|
- Blocking Algorithm Approach
To mutliply two dense matrices A and B, We collect the blocks to 'collectionTable' firstly using map/reduce. Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) to avoid duplicated records.
No Format |
---|
CollectionTable:
matrix A matrix B
------------------------+-------------------------------
block(0, 0)-0 block(0, 0) block(0, 0)
block(0, 0)-1 block(0, 1) block(1, 0)
block(0, 0)-2 block(0, 2) block(2, 0)
... N ...
block(N-1, n-1)-(N^3-1) block(N-1, N-1) block(N-1, N-1)
|
Each row has a two sub matrices of a(i, k) and b(k, j) so that minimized data movement and network cost.
No Format |
---|
Blocking jobs:
Collect the blocks to 'collectionTable' from A and B.
- A map task receives a row n as a key, and vector of each row as its value
- emit (blockID, sub-vector) pairs
- Reduce task merges block structures based on the information of blockID
Multiplication job:
- A map task receives a blockID n as a key, and two sub-matrices of A and B as its value
- Multiply two sub-matrices: a[i][j] * b[j][k]
- Reduce task computes sum of blocks
- c[i][k] += multiplied blocks
|
Matrix Norm
- Find the maximum absolute row sum of matrix
Matrix.Norm.One is that find the maximum absolute row sum of matrix. Comparatively, it's a good fit with MapReduce model because doesn't need iterative jobs or table/file JOIN operations.
No Format |
---|
j=n
The maximum absolute row sum = max ( sum | a_{i,j} | )
1<=i<=n j=1
- A map task receives a row n as a key, and vector of each row as its value
- emit (row, the sum of the absolute value of each entries)
- Reduce task select the maximum one
|
NOTE: Matrix.infinity, Matrix.Maxvalue and Matrix.Frobenius are almost same with this.
Compute the transpose of matrix
The transpose of a matrix is another matrix in which the rows and columns have been reversed. The matrix must be square for this work.
No Format |
---|
+ + + +
| a11 a12 a13 | | a11 a21 a31 |
| a21 a22 a23 | => | a12 a22 a32 |
| a31 a32 a33 | | a13 a23 a33 |
+ + + +
- A map task receives a row n as a key, and vector of each row as its value
- emit (Reversed index, the entry with the given index)
- Reduce task sets the reversed values
|
Compute the determinant of square matrix
Decomposition Algorithms
Cholesky Decomposition
Singular Value Decompostion
Components
Wiki Markup |
---|
Apache Hama, based on Bulk Synchronous Parallel model\[1\], comprises three major components: |
- BSPMaster
- GroomServer
- Zookeeper.
It is very similar with Hadoop architecture, only except the portion of communication and synchronization mechanisms.
In a normal usecase the user submits a so called "Job" which is a definition of how to run a computation. A job once submitted will have multiple tasks that are launched across the cluster.
BSPMaster
BSPMaster is responsible for the following:
- Maintaining its own state.
- Maintaining groom server status.
- Maintaining supersteps and other counters in a cluster.
- Maintaining jobs and tasks.
- Scheduling Jobs and assigning tasks to groom servers
- Distributing execution classes and configuration across groom servers.
- Providing users with the cluster control interface (web and console based).
A BSP Master and multiple grooms are started by the script. Then, the bsp master starts up with a RPC server to which groom servers can dynamically register itself. Groom servers starts up with a BSPPeer instance - later, BSPPeer needs to be integrated with GroomServer - and a RPC proxy to contact the bsp master. After started, each groom periodically sends a heartbeat message that encloses its groom server status, including maximum task capacity, unused memory, and so on.
Each time the bsp master receives a heartbeat message, it brings up-to-date groom server status - the bsp master makes use of groom servers' status in order to effectively assign tasks to idle groom servers - and returns a heartbeat response that contains assigned tasks and others actions that a groom server has to do. For now, we have a FIFO job scheduler and very simple task assignment algorithms.
GroomServer
Wiki Markup |
---|
A [Groom Server|GroomServer] (shortly referred to as groom) is a process that manages life cycle of bsp tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and reports task statuses by means of periodical piggybacks with BSPMaster. Each groom is designed to run with HDFS or other distributed storages. Basically, a groom server and a data node should run on one physical node to get the best performance for data-locality. Note that in a massive parallel environment, the benefit of data locality is lost when large amount of virtual processes must be multiplexed onto physical processes\[2\]. |
Zookeeper
A Zookeeper is used to manage the efficient barrier synchronization of the BSPPeers. Later, it will also be used for the area of a fault tolerance system.
Communication and Synchronization Process
Each BSP task has a set of Outgoing Message Manager and Incoming Queue.
Outgoing Message Manager collects the message to be sent, serializes it, compresses it and puts it in a bundles. At barrier synchronization phase, each BSP task exchanges the bundles, deserializes it, decompresses it and puts it into the Incoming Queue.
System Diagram
- BSPMaster starts up
- GroomServer starts up
- ZooKeeper cluster starts up
- GroomServer dynamically registers itself to BSPMaster
- GroomServer forks/ manages BSPPeer(s)
- BSPPeers communicate/ perform barrier synchronization through ZooKeeper cluster.
Reference
Wiki Markup |
---|
\[1\]. Valiant, Leslie G., A bridging model for parallel computation. |
Wiki Markup |
---|
\[2\]. David B. Skillicorn, Jonathan M. D. Hill, and W. F. [McColl]. Questions and Answers about BSP. Scientific Programming, 6(3):249-274, Fall 1997. |
...