MPI Note[8]: 分布式机器学习AllReduce

最近华为发布了2000亿参数的模型盘古,阿里也有推出了270亿参数的PLUG模型.竞争越来越激烈。另一方面则是中美之间关于E级超算的竞争。

关于分布式机器学习,以前有一篇文章讲过一些内容[1].神经网络计算通常是一些矩阵-向量乘法是指 , 其中 是一个 维的矩阵, 是一个 维向量,列向量 可以按如下方式计算:

并行算法相对简单,我们可以为每个进程分配 行进行计算即可. 或者对矩阵进行一些特殊的划分减小通信量:

MPI Note[8]: 分布式机器学习AllReduce

当然矩阵的并行计算我们还是在下一个章节再来谈,今天先来看另一个重要的问题,训练参数的同步,也就是经常会听到的一个词AllReduce.而且随着系统规模扩大,它对整个训练速度的影响越来越关键..

MPI Note[8]: 分布式机器学习AllReduce

分布式机器学习

由于数据规模模型规模的扩大必须利用计算机集群来构建分布式机器学习框架,并行处理的方式主要是在两个维度。数据规模比较好理解随着IOT和数据采集,可供于训练的数据集通常可以达到数十TB, 而模型规模主要是业界竞争的重点,例如世界第一华为盘古2000亿, 全球领先GPT-3参数已经达到1700亿,而常见的各厂家模型也到了数十亿级(nVidia MegatronLM 83亿,微软Turing-NLG 170亿,阿里PLUG 270亿).

数据并行很容易解释,主要是如何存储训练样本,并且在多机器之间传递混淆样本,基本上大家大同小异的都在采用SSD、分布式存储解决这些问题.另一个问题便是模型并行,当单个工作节点无法存储时,就需要对模型本身进行分割,而如何分割这个模型又是一个最优化的过程…

模型分割了,那么就存在一个非常关键的问题了,参数的同步:

而通常的做法是采用Model Average(MA)需要将每个子模型的参数平均,那么需要在多个节点中将参数求和的处理方式:

这个过程就是AllReduce操作.关于AllReduce可以参考腾讯机智团队分享–AllReduce算法的前世今生[2] 讲的蛮细的,而今天我们专门来用MPI实现一下RingAllreduce[3]

RingAllreduce并不是需要将Underlay网络构造成一个环,普通的CLOS交换网(当然要保证无阻塞)就可以了,通过环状通信的方式,完全利用了所有的带宽,并且非常聪明的避开了Incast的影响.

Ring AllReduce

准备数据

我们还是采用一个MPI集群来做这个实验,首先我们来构造一些数据,因为没有那么多张GPU和服务器集群,所以伪造几百万个便于验证的浮点数作为参数集,同样的参数构造两组是为了对比ringAllReduce和腾讯提出的层次化ringAllReduce

    int num_ele_per_node = atoi(argv[2]);
    
    float *local_param = (float *)malloc(sizeof(float) * num_ele_per_node);
    float *local_param2 = (float *)malloc(sizeof(float) * num_ele_per_node);
    for (int i = 0; i < num_ele_per_node; i++)
    {
        local_param[i] = world_rank;
        local_param2[i] = world_rank;
    }

标准库实现

标准的MPI Allreduce实现如下,稍后我们会用global_sum来验证我们自己写的ringAllreduce结果.

    float *global_sum = (float *)malloc(sizeof(float) * num_ele_per_node);
    MPI_Allreduce(local_param, global_sum, num_ele_per_node, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);

当然这一步也可以采用如下方式计时对比后面的ringAllreduce算法.

    //start time
    MPI_Barrier(MPI_COMM_WORLD);
    mpi_start_time = MPI_Wtime();

    MPI_Allreduce(local_param, global_sum, num_ele_per_node, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);
    
    //endtime
    MPI_Barrier(MPI_COMM_WORLD);
    mpi_end_time = MPI_Wtime();
    if (world_rank == 0)
        printf("mpi  time:%15.0fus\n", (mpi_end_time - mpi_start_time) * 1e6);

RingAllreduce

接下来我们来构造这个函数, Reduce Operator我们hardcode成reduceSUM了.

void reduceSUM(float *dst, float *src, int size)
{
    for (int i = 0; i < size; i++)
        dst[i] += src[i];
}

void ringAllReduce(float *data, int count, MPI_Datatype datatype, MPI_Comm communicator)

首先我们需要将数据划分成N个Segment,如果不能整除,那么剩下的数每个segment多一个,尽量使得segment划分均匀,然后再划分的同时,我们记录下每段的开始元素位置.

  int comm_rank;
    int comm_size;
    MPI_Comm_rank(communicator, &comm_rank);
    MPI_Comm_size(communicator, &comm_size);

    //split dataset
    int segment_size = count / comm_size;
    int residual = count % comm_size;
    int *segment_sizes = (int *)malloc(sizeof(int) * comm_size);
    int *segment_start_ptr = (int *)malloc(sizeof(int) * comm_size);

    int segment_ptr = 0;
    for (int i = 0; i < comm_size; i++)
    {
        segment_start_ptr[i] = segment_ptr;
        segment_sizes[i] = segment_size;
        if (i < residual)
            segment_sizes[i]++;
        segment_ptr += segment_sizes[i];
    }
    
    //verify
    if (segment_start_ptr[comm_size - 1] + segment_sizes[comm_size - 1] != count)
    {
        MPI_Abort(MPI_COMM_WORLD, MPI_ERR_COUNT);
    }

接下来我们做N-1轮迭代,每一轮中,我们将数据发送到下一个节点,然后由下一个节点接收完成后,执行reduceSUM

你可以注意到每个节点的上一个节点和下一个节点为

int next(int rank, int size)
{
    return ((rank + 1) % size);
}

int prev(int rank, int size)
{
    return ((size + rank - 1) % size);
}

而每一轮迭代需要传输的segment为

   int recv_chunk = (comm_rank - iter - 1 + comm_size) % comm_size;
   int send_chunk = (comm_rank - iter + comm_size) % comm_size;

因此RingAllreduce计算如下:


    MPI_Status recv_status;
    MPI_Request recv_req;

    float *buffer = (float *)malloc(sizeof(float) * segment_sizes[0]);
    for (int iter = 0; iter < comm_size - 1; iter++)
    {

        int recv_chunk = (comm_rank - iter - 1 + comm_size) % comm_size;
        int send_chunk = (comm_rank - iter + comm_size) % comm_size;
        float *sending_segment = &(data[segment_start_ptr[send_chunk]]);
        MPI_Irecv(buffer, segment_sizes[recv_chunk], datatype, prev(comm_rank, comm_size), 0, communicator, &recv_req);
        MPI_Send(sending_segment, segment_sizes[send_chunk], datatype, next(comm_rank, comm_size), 0, communicator);
        float *updating_segment = &(data[segment_start_ptr[recv_chunk]]);

        MPI_Wait(&recv_req, &recv_status);
        //after send recieve finshed, execute reduce
        reduceSUM(updating_segment, buffer, segment_sizes[recv_chunk]);
    }

紧接着第二三四轮… 

N-1轮处理完了然后执行AllGather操作,将每个完成计算的Segment数据分发给其它节点. 

    for (int iter = 0; iter < comm_size - 1; iter++)
    {
        int recv_chunk = (comm_rank - iter + comm_size) % comm_size;
        int send_chunk = (comm_rank - iter + 1 + comm_size) % comm_size;
        float *sending_segment = &(data[segment_start_ptr[send_chunk]]);
        float *updating_segment = &(data[segment_start_ptr[recv_chunk]]);
        MPI_Sendrecv(sending_segment, segment_sizes[send_chunk], datatype, next(comm_rank, comm_size), 0, updating_segment, segment_sizes[recv_chunk], datatype, prev(comm_rank, comm_size), 0, communicator, &recv_status);
    }

后续流程如下:

当然我们在计算时还可以考虑NUMA结构或者物理机器多核结构,采用层次化的方式处理,例如腾讯利用GPU的NVLink带宽,组内直接Reduce,然后组间在RingAllReduce,做完后再组内BCast

我们来看一下MPI节点分割的技巧, 通过取模染色colorMPI_Comm_split即可分为不同的subgroup,然后将每个subgroup中找一个leader出来作为maingroup.

 // Split subgroup by host and create SUBGRP_COMM
    int color = world_rank / node_per_host;
    MPI_Comm SUBGRP_COMM;
    MPI_Comm_split(MPI_COMM_WORLD, color, world_rank, &SUBGRP_COMM);

    int subgrp_rank, subgrp_size;
    MPI_Comm_rank(SUBGRP_COMM, &subgrp_rank);
    MPI_Comm_size(SUBGRP_COMM, &subgrp_size);

    // Create main group
    MPI_Group main_grp, world_grp;
    MPI_Comm MAINGRP_COMM;
    MPI_Comm_group(MPI_COMM_WORLD, &world_grp);

    int host_num = world_size / node_per_host;
    int *maingrp_ranks = (int *)malloc(sizeof(int) * host_num);
    for (int i = 0; i < host_num; i++)
    {
        maingrp_ranks[i] = i * node_per_host;
    }

    MPI_Group_incl(world_grp, host_num, maingrp_ranks, &main_grp);
    MPI_Comm_create_group(MPI_COMM_WORLD, main_grp, 0, &MAINGRP_COMM);

后面我们就用到了两种算法来对比,第一个是层次化的算法

    ringAllReduce(local_param, num_ele_per_node, MPI_FLOAT, SUBGRP_COMM);
    if (MAINGRP_COMM != MPI_COMM_NULL)
    {
        ringAllReduce(local_param, num_ele_per_node, MPI_FLOAT, MAINGRP_COMM);
    }
    MPI_Bcast(local_param, num_ele_per_node, MPI_FLOAT, 0, SUBGRP_COMM);

第二个是直接对MPI_COMM_WORLD加载ringAllReduce

ringAllReduce(local_param2, num_ele_per_node, MPI_FLOAT, MPI_COMM_WORLD);

计算结果

kevin@netdev:~/Desktop/mpi/07_ringallreduce$ mpicc collective.c main.c -o foo
kevin@netdev:~/Desktop/mpi/07_ringallreduce$ mpiexec -np 16 ./foo 4 6000000
build-in mpi  time:         146276us
hierachy-ring time:         163840us
allnode-ring  time:         101049us

源代码 collective.c


collective.h

void ringAllReduce(float *data, int count,  MPI_Datatype datatype, MPI_Comm communicator);

main.c

Reference

[1]

Ruta for AI:分布式机器学习的网络优化: https://mp.weixin.qq.com/s/bCX4Rbyb21NbDrgNg5Utlw


[2]

腾讯机智团队分享--AllReduce算法的前世今生: https://zhuanlan.zhihu.com/p/79030485


[3]

浅谈Tensorflow分布式架构:ring all-reduce算法: https://zhuanlan.zhihu.com/p/69797852

MPI Note[8]: 分布式机器学习AllReduce》来自互联网,仅为收藏学习,如侵权请联系删除。本文URL:http://www.bookhoes.com/751.html