Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migration of unmigrated content due to installation of a new plugin
Table of Contents

Components

TableOfContents(5)

This effort is still a "work in progress". Please feel free to add comments. BRBut please make the content less visible by using smaller fonts. – Edward J. Yoon

Overview

This is intended to explain and to illustrate the concept of Hama. There are two main parts:

  • How to store the matrices?
  • How to perform matrix operations using MapReduce?

Building Block

Wiki Markup
\[http://wiki.apache.org/hama-data/attachments/Architecture/attachments/block.png\]

Store Matrices & Graphs

The matrix or network structure that frequently changes should have flexible storage structure for easy update and indicies that point to the appropriate entry. Also, we need a model that uses the concept of column-iterative methods.

Wiki Markup
Hama use a \[http://hadoop.apache.org/hbase/ Hbase\] to store the matrices.

Matrices are basically tables. They are ways of storing numbers and other things. Typical matrix has rows and columns which is often called a 2-way matrix because it has two dimensions. For example, you might have respondents-by-attitudes. Of course, you might collect the same data on the same people at 5 points in time. In that case, you either have 5 different 2-way matrices, or you could think of it as a 3-way matrix, that is respondent-by-attitude-by-time.

Just a thought, considering the depleted activity in HBase should we not explore ways to avoid HBase ? --Prasen
Hbase seems activity at this time, However Yes. We should think about it. --Edward

Represent a graph using adjacency matrix

Perform matrix operations

The Hadoop/Hbase is designed to efficiently process large data set by connecting many commodity computers together to work in parallel but, If there's a inter-node communication, the elapsed run time will be slower with more nodes. Consequently, an "effective" algorithm should avoid large amounts of communication.

Algorithms

Dense Matrix-Matrix addition

The addition of multiple matrices

Matrix-Matrix multiplication

Dense Matrices multiplication

Iterative Approach
No Format

For i = 0 step 1 until N -1
  Job: Computes the i^th^ row of C = Matrix-Vector multiplication

Iterative job:

- A map task receives a row n of B as a key, and vector of row as its value
 - Multiplying by all columns of ith row of A
- Reduce task find and add the ith product

1st
+             +   +             +
| a11 a12 a13 |   | a11 a21 a31 |
| ... ... ... | X | a12 a22 a32 |
| ... ... ... |   | a13 a23 a33 |
+             +   +             +

2nd
+             +   +             +
| ... ... ... |   | a11 a21 a31 |
| a21 a22 a23 | X | a12 a22 a32 |
| ... ... ... |   | a13 a23 a33 |
+             +   +             +
....

Blocking Algorithm Approach

To mutliply two dense matrices A and B, We collect the blocks to 'collectionTable' firstly using map/reduce. Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k) to avoid duplicated records.

No Format

CollectionTable:

                            matrix A         matrix B
------------------------+-------------------------------             
block(0, 0)-0               block(0, 0)      block(0, 0)
block(0, 0)-1               block(0, 1)      block(1, 0)
block(0, 0)-2               block(0, 2)      block(2, 0)
...         N               ...
block(N-1, n-1)-(N^3-1)     block(N-1, N-1)  block(N-1, N-1)

Each row has a two sub matrices of a(i, k) and b(k, j) so that minimized data movement and network cost.

No Format

Blocking jobs:

Collect the blocks to 'collectionTable' from A and B.

- A map task receives a row n as a key, and vector of each row as its value
 - emit (blockID, sub-vector) pairs
- Reduce task merges block structures based on the information of blockID

Multiplication job:

- A map task receives a blockID n as a key, and two sub-matrices of A and B as its value
 - Multiply two sub-matrices: a[i][j] * b[j][k]
- Reduce task computes sum of blocks 
 - c[i][k] += multiplied blocks

If A stored on Hbase and we collect the B to A, Can we reduce the run time of blocking job. --Edward
Hmm, It's possible to blocking at once. --Edward

Sparse Matrices multiplication

Find the maximum absolute row sum of dense/sparse matrix

Matrix.Norm.One is that find the maximum absolute row sum of matrix. Comparatively, it's a good fit with MapReduce model because doesn't need iterative jobs or table/file JOIN operations.

No Format


                                         j=n
The maximum absolute row sum =   max   ( sum | a_{i,j} | ) 
                               1<=i<=n   j=1


- A map task receives a row n as a key, and vector of each row as its value
 - emit (row, the sum of the absolute value of each entries)
- Reduce task select the maximum one

Compute the infinity norm

It's a maximum absolute column sum of dense/sparse matrix.

No Format

                                            i=n
The maximum absolute column sum =   max   ( sum | a_{i,j} | )
                                  1<=j<=n   i=1

- A map task receives a row n as a key, and vector of each row as its value
 - emit (column, the absolute value of each entries)
- Reduce task sum the absolute values of column, select the one maximum one.

Compute the transpose of dense/sparse matrix

The transpose of a matrix is another matrix in which the rows and columns have been reversed. The matrix must be square for this work.

No Format

+             +    +             +
| a11 a12 a13 |    | a11 a21 a31 |
| a21 a22 a23 | => | a12 a22 a32 |
| a31 a32 a33 |    | a13 a23 a33 |
+             +    +             +

- A map task receives a row n as a key, and vector of each row as its value
 - emit (Reversed index, the entry with the given index)

ex) row = 1, { 1: a11, 2: a12, 3: a13 }

a11, a11
a21, a12
a31, a13
...

- Reduce task sets the reversed values

Compute the determinant of square matrix

Decompositions

Cholesky Decomposition

Apache Hama, based on Bulk Synchronous Parallel model\[1\], comprises three major components: 

It is very similar with Hadoop architecture, only except the portion of communication and synchronization mechanisms.

In a normal usecase the user submits a so called "Job" which is a definition of how to run a computation. A job once submitted will have multiple tasks that are launched across the cluster.

BSPMaster

BSPMaster is responsible for the following:

  • Maintaining its own state.
  • Maintaining groom server status.
  • Maintaining supersteps and other counters in a cluster.
  • Maintaining jobs and tasks.
  • Scheduling Jobs and assigning tasks to groom servers
  • Distributing execution classes and configuration across groom servers.
  • Providing users with the cluster control interface (web and console based).

A BSP Master and multiple grooms are started by the script. Then, the bsp master starts up with a RPC server to which groom servers can dynamically register itself. Groom servers starts up with a BSPPeer instance - later, BSPPeer needs to be integrated with GroomServer - and a RPC proxy to contact the bsp master. After started, each groom periodically sends a heartbeat message that encloses its groom server status, including maximum task capacity, unused memory, and so on.

Each time the bsp master receives a heartbeat message, it brings up-to-date groom server status - the bsp master makes use of groom servers' status in order to effectively assign tasks to idle groom servers - and returns a heartbeat response that contains assigned tasks and others actions that a groom server has to do. For now, we have a FIFO job scheduler and very simple task assignment algorithms.

GroomServer

Wiki Markup
A [Groom Server|GroomServer] (shortly referred to as groom) is a process that manages life cycle of bsp tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and reports task statuses by means of periodical piggybacks with BSPMaster. Each groom is designed to run with HDFS or other distributed storages. Basically, a groom server and a data node should run on one physical node to get the best performance for data-locality. Note that in a massive parallel environment, the benefit of data locality is lost when large amount of virtual processes must be multiplexed onto physical processes\[2\].

Zookeeper

A Zookeeper is used to manage the efficient barrier synchronization of the BSPPeers. Later, it will also be used for the area of a fault tolerance system.

Communication and Synchronization Process

Each BSP task has a set of Outgoing Message Manager and Incoming Queue.

Outgoing Message Manager collects the message to be sent, serializes it, compresses it and puts it in a bundles. At barrier synchronization phase, each BSP task exchanges the bundles, deserializes it, decompresses it and puts it into the Incoming Queue.

System Diagram

Image Added

  1. BSPMaster starts up
  2. GroomServer starts up
  3. ZooKeeper cluster starts up
  4. GroomServer dynamically registers itself to BSPMaster
  5. GroomServer forks/ manages BSPPeer(s)
  6. BSPPeers communicate/ perform barrier synchronization through ZooKeeper cluster.

Reference

Wiki Markup
\[1\]. Valiant, Leslie G., A bridging model for parallel computation. 

Wiki Markup
\[2\]. David B. Skillicorn, Jonathan M. D. Hill, and W. F. [McColl]. Questions and Answers about BSP. Scientific Programming, 6(3):249-274, Fall 1997.

...