Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Concatenate nested namespaces (sonar).
[simgrid.git] / src / smpi / colls / allreduce / allreduce-smp-rdb.cpp
1 /* Copyright (c) 2013-2022. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 /* IMPLEMENTED BY PITCH PATARASUK
9    Non-topology-specific (however, number of cores/node need to be changed)
10    all-reduce operation designed for smp clusters
11    It uses 2-layer communication: binomial for intra-communication
12    and rdb for inter-communication*/
13
14 /* ** NOTE **
15    Use -DMPICH2 if this code does not compile.
16    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
17    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
18 */
19
20 //#include <star-reduction.c>
21
22 /*
23 This function performs all-reduce operation as follow.
24 1) binomial_tree reduce inside each SMP node
25 2) Recursive doubling intra-communication between root of each SMP node
26 3) binomial_tree bcast inside each SMP node
27 */
28 namespace simgrid::smpi {
29 int allreduce__smp_rdb(const void *send_buf, void *recv_buf, int count,
30                        MPI_Datatype dtype, MPI_Op op,
31                        MPI_Comm comm)
32 {
33   int comm_size, rank;
34   int tag = COLL_TAG_ALLREDUCE;
35   int mask, src, dst;
36   MPI_Status status;
37   if(comm->get_leaders_comm()==MPI_COMM_NULL){
38     comm->init_smp();
39   }
40   int num_core=1;
41   if (comm->is_uniform()){
42     num_core = comm->get_intra_comm()->size();
43   }
44   /*
45      #ifdef MPICH2_REDUCTION
46      MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
47      #else
48      MPI_User_function *uop;
49      MPIR_OP *op_ptr;
50      op_ptr = MPIR_ToPointer(op);
51      uop  = op_ptr->op;
52      #endif
53    */
54   comm_size = comm->size();
55   rank = comm->rank();
56   MPI_Aint extent;
57   extent = dtype->get_extent();
58   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
59
60   /* compute intra and inter ranking */
61   int intra_rank, inter_rank;
62   intra_rank = rank % num_core;
63   inter_rank = rank / num_core;
64
65   /* size of processes participate in intra communications =>
66      should be equal to number of machines */
67   int inter_comm_size = (comm_size + num_core - 1) / num_core;
68
69   /* copy input buffer to output buffer */
70   Request::sendrecv(send_buf, count, dtype, rank, tag,
71                recv_buf, count, dtype, rank, tag, comm, &status);
72
73   /* start binomial reduce intra communication inside each SMP node */
74   mask = 1;
75   while (mask < num_core) {
76     if ((mask & intra_rank) == 0) {
77       src = (inter_rank * num_core) + (intra_rank | mask);
78       if (src < comm_size) {
79         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
80         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
81       }
82     } else {
83       dst = (inter_rank * num_core) + (intra_rank & (~mask));
84       Request::send(recv_buf, count, dtype, dst, tag, comm);
85       break;
86     }
87     mask <<= 1;
88   }                             /* end binomial reduce intra-communication */
89
90
91   /* start rdb (recursive doubling) all-reduce inter-communication
92      between each SMP nodes : each node only have one process that can communicate
93      to other nodes */
94   if (intra_rank == 0) {
95
96     /* find nearest power-of-two less than or equal to inter_comm_size */
97     int pof2, rem, newrank, newdst;
98     pof2 = 1;
99     while (pof2 <= inter_comm_size)
100       pof2 <<= 1;
101     pof2 >>= 1;
102     rem = inter_comm_size - pof2;
103
104     /* In the non-power-of-two case, all even-numbered
105        processes of rank < 2*rem send their data to
106        (rank+1). These even-numbered processes no longer
107        participate in the algorithm until the very end.
108      */
109     if (inter_rank < 2 * rem) {
110       if (inter_rank % 2 == 0) {
111         dst = rank + num_core;
112         Request::send(recv_buf, count, dtype, dst, tag, comm);
113         newrank = -1;
114       } else {
115         src = rank - num_core;
116         Request::recv(tmp_buf, count, dtype, src, tag, comm, &status);
117         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
118         newrank = inter_rank / 2;
119       }
120     } else {
121       newrank = inter_rank - rem;
122     }
123
124     /* example inter-communication RDB rank change algorithm
125        0,4,8,12..36 <= true rank (assume 4 core per SMP)
126        0123 4567 89 <= inter_rank
127        1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
128        0 1 4567 89
129        0 1 2345 67 => newrank
130      */
131
132     if (newrank != -1) {
133       mask = 1;
134       while (mask < pof2) {
135         newdst = newrank ^ mask;
136         /* find real rank of dest */
137         dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
138         dst *= num_core;
139
140         /* exchange data in rdb manner */
141         Request::sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
142                      dst, tag, comm, &status);
143         if(op!=MPI_OP_NULL) op->apply( tmp_buf, recv_buf, &count, dtype);
144         mask <<= 1;
145       }
146     }
147
148     /* non pof2 case
149        left-over processes (all even ranks: < 2 * rem) get the result
150      */
151     if (inter_rank < 2 * rem) {
152       if (inter_rank % 2) {
153         Request::send(recv_buf, count, dtype, rank - num_core, tag, comm);
154       } else {
155         Request::recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
156       }
157     }
158   }
159
160   /* start binomial broadcast intra-communication inside each SMP nodes */
161   int num_core_in_current_smp = num_core;
162   if (inter_rank == (inter_comm_size - 1)) {
163     num_core_in_current_smp = comm_size - (inter_rank * num_core);
164   }
165   mask = 1;
166   while (mask < num_core_in_current_smp) {
167     if (intra_rank & mask) {
168       src = (inter_rank * num_core) + (intra_rank - mask);
169       Request::recv(recv_buf, count, dtype, src, tag, comm, &status);
170       break;
171     }
172     mask <<= 1;
173   }
174   mask >>= 1;
175
176   while (mask > 0) {
177     dst = (inter_rank * num_core) + (intra_rank + mask);
178     if (dst < comm_size) {
179       Request::send(recv_buf, count, dtype, dst, tag, comm);
180     }
181     mask >>= 1;
182   }
183
184   smpi_free_tmp_buffer(tmp_buf);
185   return MPI_SUCCESS;
186 }
187 } // namespace simgrid::smpi