PR is closed and the Horovod solution will be used for the distributed training.

The current implementation is partially merged into Horovod solution.

Therefore, the new solution will take the advantages of both this PR and Horovod so it will be very nice for the community.

https://github.com/apache/incubator-mxnet/pull/10696

 

Problem Statement

MXNet inherent distributed training mechanism, parameter server, provides efficient communication in ASGD and fault tolerance especially in cloud environment. But we found that in small-scale number of nodes (8-64) mpi allreduce can achieved the scaling efficiency close to linear while there's no extra server node deployment. So we suggest to add mpi-allreduce as an alternative choice for customer in MXNet multi-node distributed training. 

Following is the performance data we collect for MXNet multi-node in 10GbE Network.

Machine: SKX6148, Network: 10GbE, Topology: VGG16, Local Batch Size: 64, KVStore Type: dist_sync. Parameter Server

work numserver numPer Node FPS(pic/s)Scaling Efficiency
88(worker and server share node)19.8767.81%
8827.393.17%
8422.777.47%
8211.1137.90%

Command line: python tools/launch.py -n 8 -s <server_num> --launcher ssh -H hosts python example/image-classification/train_vgg16.py --kv-store dist_sync


Following is the result of MXNet multinode with mpi allreduce supported from our proof of concept (ready):

Node NumPer Node FPS(pic/s)Scaling Efficiency
827.7694.74%

Command line: mpirun -n 8 -ppn 1 -machinefile hosts python example/image-classification/train_vgg16.py --kv-store dist_sync_mpi

MPI Allreduce's good scalability comes from the tremendous communication time decrease compared with parameter server if not allocated enough server num.

Following is allreduce benchmark (400M payload 8 worker num) for reference:

Methodserver numworker numTime(s)
ParameterServer (push+pull)186.5
ParameterServer (push+pull)283.4
ParameterServer (push+pull)482.0
ParameterServer (push+pull)881.2
MPI.AllreduceN/A81.0


From the performance data, we can draw the following conclusions:

a)    MPI Allreduce don't need extra server node and it can achieve the best performance which parameter server can reach in synchronous SGD multi-node training. While for parameter server, if not well configured, insufficient servers will be the hot spots in terms of network bandwidth.

b)    Unlike parameter server, MPI Allreduce is easy for hardware deployment. Because in parameter server, the configuration of server:worker ratio needs meticulously calculated and it's not fixed (depending upon topology and network bandwidth). Moreover, if the parameter servers are configured with low end machine, mxnet has to be compiled with low end machine configuration for the final deployment. (e.g. If the server doesn't support AVX2 instruction while workers support AVX2 instructions, the MXNet has to be compiled without AVX2 supported.)

      Following is the overview architecture of mpi-allreduce and parameter server:

                                     

                                      AllReduce-based distributed training


                                                 PS-based distributed training (ref here)

No coincidence, in 2016-2017, both Baidu and Uber propose their mpi-allreduce-based distributed training framework (tensorflow-allreduce, horovod) for tensorflow, considering the drawbacks we mentioned in tensorflow inherent parameter server based distributed training.

http://research.baidu.com/bringing-hpc-techniques-deep-learning

https://github.com/uber/horovod

https://github.com/baidu-research/tensorflow-allreduce/tree/master/tensorflow/contrib/mpi

Moreover, we noticed that most of mainstream Deep Learning frameworks have mpi-allreduce-based distributed training mechnism:

FrameworkDistributed Communication Mechanism
Tensorflow

PS + mpi-allreduce(baidu allreduce)
(uber horovod)

MXNetPS
Caffempi-allreduce
Torch + PyTorchmpi-allreduce
Chainermpi-allreduce(mpi4py)

 

Goal

Besides existing distributed training mechanism parameter server in MXNet, we suggest to add mpi-allreduce as an alternative distributed training mechanism which can significantly enhance multi-node scaling efficiency for synchronous SGD distributed training with least cost. 

POC & Usage

Our POC already implemented python package named mpi-collectives as mxnet submodule. The initial experiment shows good performance just as upper data shows. For allreduce based mxnet multi-node, most of exposed interfaces in mpi_collectives.kvstore is the same as default kvstore. Following is the examples of using them: 

Note: Following examples is based upon the assumption that mpi.kvstore (dist_sync_mpi) is implemented in  python package. If mpi.kvstore is implemented in CPP, there's no need to import mpi_collectives package. 

Typical example of image classification:

import mxnet as mx

import mpi_collectives as mpi

def fit(args, network, data_loader, **kwargs):
"""
train a model
args : argparse returns
network : the symbol definition of the nerual network
data_loader : function that returns the train and val data iterators
"""
# kvstore
kv = mpi.kvstore.create('dist_sync_mpi')

# data iterators
(train, val) = data_loader(args, kv)

...

# run
model.fit(train,
begin_epoch=args.load_epoch if args.load_epoch else 0,
num_epoch=args.num_epochs,
eval_data=val,
eval_metric=eval_metrics,
kvstore=kv,
optimizer=args.optimizer,
optimizer_params=optimizer_params,
initializer=initializer,
arg_params=arg_params,
aux_params=aux_params,
batch_end_callback=batch_end_callbacks,
epoch_end_callback=checkpoint,
allow_missing=True,
monitor=monitor)

Architecture

Nodes

There are two types of node in a MPI parallelism distributed training architecture: coordinator and normal worker node. Each node will have a dedicated CPU thread to send and receive MPI messages.

  • Coordinator node: Coordinator node is the rank 0 node of all MPI worker nodes. The dedicated thread on Coordinator node is designed to monitor tensor status on all worker nodes. If the tensor are ready in all nodes, coordinator node will send out start allreduce command of  the tensor to all worker node. This sync mechanism is to make sure that the all reduce executions for every tensor are in the same order in all nodes.
  • Normal worker node: The dedicated thread on worker node is designed to receive all reduce request from local node. If received any, it will notify the coordinator of the readiness of that tensor and then waits for start allreduce command from coordinator.

KVStore

To add MPI allReduce distributed training to MXNet and make it cooperate with existing components, like dependency engine, execution engine, operators, NDArray, a new KVStore-MPI is implemented to extend the functionality of the existing KVStore in MXNet. In this section, we will introduce the design and APIs of KVStore-MPI.

Implementation

1)      CPP

To support more frontend languages and improve the efficiency of KVStore-MPI, implementing KVStore-MPI with CPP and making it at the same level of the original KVStore is our preferrable option since originally KVStore factory pattern is implemented in CPP level. 

2)      Python

Python is widely used by data scientists and undoubtedly the dominant language in deep learning and machine learning field. A Python oriented KVStore-MPI can be easily implemented to support most of MXNet distributed training workloads. Besides that, pretty few code of MXNet core components need be changed for a Python oriented KVStore-MPI interface.

Launching

In order to support MPI allReduce distributed training in MXNet, a new option is added to the original job launching script. The new launching script will be backward compatible.

Python launch.py -n 8 -H hosts --launcher ssh --sync-dst-dir /tmp/mxnet_job/ --mode mpi python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 --kvstore dist_sync

 Options:

  • -n: denotes the number of worker nodes to be launched
  • -H: requires the path of the hosts file which contains IPs of the machines in the cluster
  • --launcher: denotes the mode of communication. The options are: ssh, mpi, sge, yarn, local
  • --sync-dst-dir: takes the path of a directory on all hosts to which the current working directory will be synchronized
  • --mode: new option to support MPI parallelism distributed training. When it’s specified as “mpi”. The training will be performed will MPI-based distributed training and KVStore type will be specified to “dist_sync_mpi” by default.

Future GPU extension

Currently, our allReduce-based distributed training solution is designed and demoed on CPU multi-node without GPU card. But it can be easily extended to multi-node with multi-card on each node. In that scenario, tensors on each GPU card will be gathered to host node firstly with the existing methods for parameter server, like KVStore-local-device and KVStore-NCCL. Then our new allReduce method will handle tensors among all CPU nodes.

Build

a) If allReduce-based solution is implemented in CPP level
If you want to build mxnet with MPI support, you should specify USE_MPI_DIST_KVSTORE flag in make/config.mk
USE_MPI_DIST_KVSTORE = 1
And you need specify MPI root dir.
MPI_ROOT=/usr/mpi

b) If allReduce-based soluton is implemented in python package as mxnet submodule
Same as the upper, you need to specify USE_MPI_DIST_KVSTORE and MPI_ROOT. Besides, this submodule will built after libmxnet has been built out.

API Spec

following MPI KVStore interfaces will be exposed and used in Mxnet

  1. init(self, key, value):
    Initializes a single or a sequence of key-value pairs into the store.
    Not supported in kvstore with type dist_sync_mpi

  2. pushpull(self, key, ins, outs, priority=0):
    Use this command to replace KVStore push and pull operation.
    pushpull API is a new interface for “dist_sync_mpi” KVStore and MPI-based distributed training. It fuses the original push and pull API of KVStore into one API and 
    offers a convenient approach to aggregate tensors with MPI allreduce APIs.

  3. broadcast(self, key, value, root_rank, priority=0):
    Use this command to broadcast tensors in root_rank to all other nodes
    broadcast API is a new interface for "dist_sync_mpi" KVStore and MPI-based distributed training. It will broadcast the value of tensor in root_rank to all other nodes with MPI broadcast APIs.

  4. push(self, key, value, priority=0):
    Not supported in kvstore with type dist_sync_mpi

  5. pull(self, key, out=None, priority=0):
    Not supported in kvstore with type dist_sync_mpi

  6. row_sparse_pull(self, key, out=None, priority=0, row_ids=None):
    Not supported in kvstore with type dist_sync_mpi

  7. set_gradient_compression(self, compression_params):
    Specifies type of low-bit quantization for gradient compression and additional arguments depending on the type of compression being used. Currently it's not supported in kvstore with type dist_sync_mpi

  8. set_optimizer(self, optimizer):
    Not supported in kvstore with type dist_sync_mpi

  9. type(self):
    Returns the type of this kvstore.

  10. Rank(self):
    Returns the index of the current process in the MPI group.

  11. num_workers(self):
    Returns the number of running MPI processes.

  12. save_optimizer_states(self, fname, dump_optimizer=False):
    Not supported in kvstore with type dist_sync_mpi

  13. def load_optimizer_states(self, fname):
    Not supported in kvstore with type dist_sync_mpi

    

  • No labels