Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into CRTP
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-rdb.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topoloty-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for intra-communication
12    and rdb for inter-communication*/
13
14
15 /* ** NOTE **
16    Use -DMPICH2 if this code does not compile.
17    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
18    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
19 */
20
21 //#include <star-reduction.c>
22
23 /*
24 This fucntion performs all-reduce operation as follow.
25 1) binomial_tree reduce inside each SMP node
26 2) Recursive doubling intra-communication between root of each SMP node
27 3) binomial_tree bcast inside each SMP node
28 */
29 namespace simgrid{
30 namespace smpi{
31 int Coll_allreduce_smp_rdb::allreduce(const void *send_buf, void *recv_buf, int count,
32                                       MPI_Datatype dtype, MPI_Op op,
33                                       MPI_Comm comm)
34 {
35   int comm_size, rank;
36   int tag = COLL_TAG_ALLREDUCE;
37   int mask, src, dst;
38   MPI_Status status;
39   if(comm->get_leaders_comm()==MPI_COMM_NULL){
40     comm->init_smp();
41   }
42   int num_core=1;
43   if (comm->is_uniform()){
44     num_core = comm->get_intra_comm()->size();
45   }
46   /*
47      #ifdef MPICH2_REDUCTION
48      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
49      #else
50      MPI_User_function *uop;
51      MPIR_OP *op_ptr;
52      op_ptr = MPIR_ToPointer(op);
53      uop  = op_ptr->op;
54      #endif
55    */
56   comm_size = comm->size();
57   rank = comm->rank();
58   MPI_Aint extent;
59   extent = dtype->get_extent();
60   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
61
62   /* compute intra and inter ranking */
63   int intra_rank, inter_rank;
64   intra_rank = rank % num_core;
65   inter_rank = rank / num_core;
66
67   /* size of processes participate in intra communications =>
68      should be equal to number of machines */
69   int inter_comm_size = (comm_size + num_core - 1) / num_core;
70
71   /* copy input buffer to output buffer */
72   Request::sendrecv(send_buf, count, dtype, rank, tag,
73                recv_buf, count, dtype, rank, tag, comm, &status);
74
75   /* start binomial reduce intra communication inside each SMP node */
76   mask = 1;
77   while (mask < num_core) {
78     if ((mask & intra_rank) == 0) {
79       src = (inter_rank * num_core) + (intra_rank | mask);
80       if (src < comm_size) {
81         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
82         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
83       }
84     } else {
85       dst = (inter_rank * num_core) + (intra_rank & (~mask));
86       Request::send(recv_buf, count, dtype, dst, tag, comm);
87       break;
88     }
89     mask <<= 1;
90   }                             /* end binomial reduce intra-communication */
91
92
93   /* start rdb (recursive doubling) all-reduce inter-communication
94      between each SMP nodes : each node only have one process that can communicate
95      to other nodes */
96   if (intra_rank == 0) {
97
98     /* find nearest power-of-two less than or equal to inter_comm_size */
99     int pof2, rem, newrank, newdst;
100     pof2 = 1;
101     while (pof2 <= inter_comm_size)
102       pof2 <<= 1;
103     pof2 >>= 1;
104     rem = inter_comm_size - pof2;
105
106     /* In the non-power-of-two case, all even-numbered
107        processes of rank < 2*rem send their data to
108        (rank+1). These even-numbered processes no longer
109        participate in the algorithm until the very end.
110      */
111     if (inter_rank < 2 * rem) {
112       if (inter_rank % 2 == 0) {
113         dst = rank + num_core;
114         Request::send(recv_buf, count, dtype, dst, tag, comm);
115         newrank = -1;
116       } else {
117         src = rank - num_core;
118         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
119         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
120         newrank = inter_rank / 2;
121       }
122     } else {
123       newrank = inter_rank - rem;
124     }
125
126     /* example inter-communication RDB rank change algorithm
127        0,4,8,12..36 <= true rank (assume 4 core per SMP)
128        0123 4567 89 <= inter_rank
129        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
130        0 1 4567 89
131        0 1 2345 67 => newrank
132      */
133
134     if (newrank != -1) {
135       mask = 1;
136       while (mask < pof2) {
137         newdst = newrank ^ mask;
138         /* find real rank of dest */
139         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
140         dst *= num_core;
141
142         /* exchange data in rdb manner */
143         Request::sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
144                      dst, tag, comm, &status);
145         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
146         mask <<= 1;
147       }
148     }
149
150     /* non pof2 case
151        left-over processes (all even ranks: < 2 * rem) get the result
152      */
153     if (inter_rank < 2 * rem) {
154       if (inter_rank % 2) {
155         Request::send(recv_buf, count, dtype, rank - num_core, tag, comm);
156       } else {
157         Request::recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
158       }
159     }
160   }
161
162   /* start binomial broadcast intra-communication inside each SMP nodes */
163   int num_core_in_current_smp = num_core;
164   if (inter_rank == (inter_comm_size - 1)) {
165     num_core_in_current_smp = comm_size - (inter_rank * num_core);
166   }
167   mask = 1;
168   while (mask < num_core_in_current_smp) {
169     if (intra_rank & mask) {
170       src = (inter_rank * num_core) + (intra_rank - mask);
171       Request::recv(recv_buf, count, dtype, src, tag, comm, &status);
172       break;
173     }
174     mask <<= 1;
175   }
176   mask >>= 1;
177
178   while (mask > 0) {
179     dst = (inter_rank * num_core) + (intra_rank + mask);
180     if (dst < comm_size) {
181       Request::send(recv_buf, count, dtype, dst, tag, comm);
182     }
183     mask >>= 1;
184   }
185
186   smpi_free_tmp_buffer(tmp_buf);
187   return MPI_SUCCESS;
188 }
189 }
190 }