Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[sonar] Use unsigned char* for smpi buffers.
[simgrid.git] / src / smpi / colls / reduce_scatter / reduce_scatter-mpich.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 #include <algorithm>
9
10 static inline int MPIU_Mirror_permutation(unsigned int x, int bits)
11 {
12     /* a mask for the high order bits that should be copied as-is */
13     int high_mask = ~((0x1 << bits) - 1);
14     int retval = x & high_mask;
15     int i;
16
17     for (i = 0; i < bits; ++i) {
18         unsigned int bitval = (x & (0x1 << i)) >> i; /* 0x1 or 0x0 */
19         retval |= bitval << ((bits - i) - 1);
20     }
21
22     return retval;
23 }
24 namespace simgrid{
25 namespace smpi{
26
27 int Coll_reduce_scatter_mpich_pair::reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[],
28                               MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
29 {
30     int   rank, comm_size, i;
31     MPI_Aint extent, true_extent, true_lb;
32     unsigned char* tmp_recvbuf;
33     int mpi_errno = MPI_SUCCESS;
34     int total_count, dst, src;
35     int is_commutative;
36     comm_size = comm->size();
37     rank = comm->rank();
38
39     extent =datatype->get_extent();
40     datatype->extent(&true_lb, &true_extent);
41
42     if (op->is_commutative()) {
43         is_commutative = 1;
44     }
45
46     int* disps = new int[comm_size];
47
48     total_count = 0;
49     for (i=0; i<comm_size; i++) {
50         disps[i] = total_count;
51         total_count += recvcounts[i];
52     }
53
54     if (total_count == 0) {
55       delete[] disps;
56       return MPI_ERR_COUNT;
57     }
58
59         if (sendbuf != MPI_IN_PLACE) {
60             /* copy local data into recvbuf */
61             Datatype::copy(((char *)sendbuf+disps[rank]*extent),
62                                        recvcounts[rank], datatype, recvbuf,
63                                        recvcounts[rank], datatype);
64         }
65
66         /* allocate temporary buffer to store incoming data */
67         tmp_recvbuf = smpi_get_tmp_recvbuffer(recvcounts[rank] * std::max(true_extent, extent) + 1);
68         /* adjust for potential negative lower bound in datatype */
69         tmp_recvbuf = tmp_recvbuf - true_lb;
70
71         for (i=1; i<comm_size; i++) {
72             src = (rank - i + comm_size) % comm_size;
73             dst = (rank + i) % comm_size;
74
75             /* send the data that dst needs. recv data that this process
76                needs from src into tmp_recvbuf */
77             if (sendbuf != MPI_IN_PLACE)
78                 Request::sendrecv(((char *)sendbuf+disps[dst]*extent),
79                                              recvcounts[dst], datatype, dst,
80                                              COLL_TAG_SCATTER, tmp_recvbuf,
81                                              recvcounts[rank], datatype, src,
82                                              COLL_TAG_SCATTER, comm,
83                                              MPI_STATUS_IGNORE);
84             else
85                 Request::sendrecv(((char *)recvbuf+disps[dst]*extent),
86                                              recvcounts[dst], datatype, dst,
87                                              COLL_TAG_SCATTER, tmp_recvbuf,
88                                              recvcounts[rank], datatype, src,
89                                              COLL_TAG_SCATTER, comm,
90                                              MPI_STATUS_IGNORE);
91
92             if (is_commutative || (src < rank)) {
93                 if (sendbuf != MPI_IN_PLACE) {
94                   if (op != MPI_OP_NULL)
95                     op->apply(tmp_recvbuf, recvbuf, &recvcounts[rank], datatype);
96                 }
97                 else {
98                   if (op != MPI_OP_NULL)
99                     op->apply(tmp_recvbuf, ((char*)recvbuf + disps[rank] * extent), &recvcounts[rank], datatype);
100                   /* we can't store the result at the beginning of
101                      recvbuf right here because there is useful data
102                      there that other process/processes need. at the
103                      end, we will copy back the result to the
104                      beginning of recvbuf. */
105                 }
106             }
107             else {
108                 if (sendbuf != MPI_IN_PLACE) {
109                   if (op != MPI_OP_NULL)
110                     op->apply(recvbuf, tmp_recvbuf, &recvcounts[rank], datatype);
111                   /* copy result back into recvbuf */
112                   mpi_errno =
113                       Datatype::copy(tmp_recvbuf, recvcounts[rank], datatype, recvbuf, recvcounts[rank], datatype);
114                   if (mpi_errno)
115                     return (mpi_errno);
116                 }
117                 else {
118                   if (op != MPI_OP_NULL)
119                     op->apply(((char*)recvbuf + disps[rank] * extent), tmp_recvbuf, &recvcounts[rank], datatype);
120                   /* copy result back into recvbuf */
121                   mpi_errno = Datatype::copy(tmp_recvbuf, recvcounts[rank], datatype,
122                                              ((char*)recvbuf + disps[rank] * extent), recvcounts[rank], datatype);
123                   if (mpi_errno)
124                     return (mpi_errno);
125                 }
126             }
127         }
128
129         /* if MPI_IN_PLACE, move output data to the beginning of
130            recvbuf. already done for rank 0. */
131         if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) {
132             mpi_errno = Datatype::copy(((char *)recvbuf +
133                                         disps[rank]*extent),
134                                        recvcounts[rank], datatype,
135                                        recvbuf,
136                                        recvcounts[rank], datatype );
137             if (mpi_errno) return(mpi_errno);
138         }
139
140         delete[] disps;
141         smpi_free_tmp_buffer(tmp_recvbuf);
142
143         return MPI_SUCCESS;
144 }
145
146
147 int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[],
148                               MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
149 {
150     int mpi_errno = MPI_SUCCESS;
151     int comm_size = comm->size() ;
152     int rank = comm->rank();
153     int pof2;
154     int log2_comm_size;
155     int i, k;
156     int recv_offset, send_offset;
157     int block_size, total_count, size;
158     MPI_Aint true_extent, true_lb;
159     int buf0_was_inout;
160     unsigned char* tmp_buf0;
161     unsigned char* tmp_buf1;
162     unsigned char* result_ptr;
163
164     datatype->extent(&true_lb, &true_extent);
165
166     pof2 = 1;
167     log2_comm_size = 0;
168     while (pof2 < comm_size) {
169         pof2 <<= 1;
170         ++log2_comm_size;
171     }
172
173     /* begin error checking */
174     xbt_assert(pof2 == comm_size); /* FIXME this version only works for power of 2 procs */
175
176     for (i = 0; i < (comm_size - 1); ++i) {
177         xbt_assert(recvcounts[i] == recvcounts[i+1]);
178     }
179     /* end error checking */
180
181     /* size of a block (count of datatype per block, NOT bytes per block) */
182     block_size = recvcounts[0];
183     total_count = block_size * comm_size;
184
185     tmp_buf0                     = smpi_get_tmp_sendbuffer(true_extent * total_count);
186     tmp_buf1                     = smpi_get_tmp_recvbuffer(true_extent * total_count);
187     unsigned char* tmp_buf0_save = tmp_buf0;
188     unsigned char* tmp_buf1_save = tmp_buf1;
189
190     /* adjust for potential negative lower bound in datatype */
191     tmp_buf0 = tmp_buf0 - true_lb;
192     tmp_buf1 = tmp_buf1 - true_lb;
193
194     /* Copy our send data to tmp_buf0.  We do this one block at a time and
195        permute the blocks as we go according to the mirror permutation. */
196     for (i = 0; i < comm_size; ++i) {
197       mpi_errno = Datatype::copy(
198           static_cast<const char*>(sendbuf == MPI_IN_PLACE ? recvbuf : sendbuf) + (i * true_extent * block_size), block_size,
199           datatype, tmp_buf0 + (MPIU_Mirror_permutation(i, log2_comm_size) * true_extent * block_size), block_size,
200           datatype);
201       if (mpi_errno)
202         return mpi_errno;
203     }
204     buf0_was_inout = 1;
205
206     send_offset = 0;
207     recv_offset = 0;
208     size = total_count;
209     for (k = 0; k < log2_comm_size; ++k) {
210         /* use a double-buffering scheme to avoid local copies */
211         unsigned char* incoming_data = buf0_was_inout ? tmp_buf1 : tmp_buf0;
212         unsigned char* outgoing_data = buf0_was_inout ? tmp_buf0 : tmp_buf1;
213         int peer = rank ^ (0x1 << k);
214         size /= 2;
215
216         if (rank > peer) {
217             /* we have the higher rank: send top half, recv bottom half */
218             recv_offset += size;
219         }
220         else {
221             /* we have the lower rank: recv top half, send bottom half */
222             send_offset += size;
223         }
224
225         Request::sendrecv(outgoing_data + send_offset*true_extent,
226                                      size, datatype, peer, COLL_TAG_SCATTER,
227                                      incoming_data + recv_offset*true_extent,
228                                      size, datatype, peer, COLL_TAG_SCATTER,
229                                      comm, MPI_STATUS_IGNORE);
230         /* always perform the reduction at recv_offset, the data at send_offset
231            is now our peer's responsibility */
232         if (rank > peer) {
233             /* higher ranked value so need to call op(received_data, my_data) */
234             if(op!=MPI_OP_NULL) op->apply(
235                    incoming_data + recv_offset*true_extent,
236                      outgoing_data + recv_offset*true_extent,
237                      &size, datatype );
238             /* buf0_was_inout = buf0_was_inout; */
239         }
240         else {
241             /* lower ranked value so need to call op(my_data, received_data) */
242             if (op != MPI_OP_NULL)
243               op->apply(outgoing_data + recv_offset * true_extent, incoming_data + recv_offset * true_extent, &size,
244                         datatype);
245             buf0_was_inout = not buf0_was_inout;
246         }
247
248         /* the next round of send/recv needs to happen within the block (of size
249            "size") that we just received and reduced */
250         send_offset = recv_offset;
251     }
252
253     xbt_assert(size == recvcounts[rank]);
254
255     /* copy the reduced data to the recvbuf */
256     result_ptr = (buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
257     mpi_errno = Datatype::copy(result_ptr, size, datatype,
258                                recvbuf, size, datatype);
259     smpi_free_tmp_buffer(tmp_buf0_save);
260     smpi_free_tmp_buffer(tmp_buf1_save);
261     if (mpi_errno) return(mpi_errno);
262     return MPI_SUCCESS;
263 }
264
265
266
267 int Coll_reduce_scatter_mpich_rdb::reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[],
268                               MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
269 {
270     int   rank, comm_size, i;
271     MPI_Aint extent, true_extent, true_lb;
272     int mpi_errno = MPI_SUCCESS;
273     int dis[2], blklens[2], total_count, dst;
274     int mask, dst_tree_root, my_tree_root, j, k;
275     int received;
276     MPI_Datatype sendtype, recvtype;
277     int nprocs_completed, tmp_mask, tree_root, is_commutative=0;
278     comm_size = comm->size();
279     rank = comm->rank();
280
281     extent =datatype->get_extent();
282     datatype->extent(&true_lb, &true_extent);
283
284     if ((op==MPI_OP_NULL) || op->is_commutative()) {
285         is_commutative = 1;
286     }
287
288     int* disps = new int[comm_size];
289
290     total_count = 0;
291     for (i=0; i<comm_size; i++) {
292         disps[i] = total_count;
293         total_count += recvcounts[i];
294     }
295
296             /* noncommutative and (non-pof2 or block irregular), use recursive doubling. */
297
298             /* need to allocate temporary buffer to receive incoming data*/
299     unsigned char* tmp_recvbuf = smpi_get_tmp_recvbuffer(total_count * std::max(true_extent, extent));
300     /* adjust for potential negative lower bound in datatype */
301     tmp_recvbuf = tmp_recvbuf - true_lb;
302
303     /* need to allocate another temporary buffer to accumulate
304        results */
305     unsigned char* tmp_results = smpi_get_tmp_sendbuffer(total_count * std::max(true_extent, extent));
306     /* adjust for potential negative lower bound in datatype */
307     tmp_results = tmp_results - true_lb;
308
309     /* copy sendbuf into tmp_results */
310     if (sendbuf != MPI_IN_PLACE)
311       mpi_errno = Datatype::copy(sendbuf, total_count, datatype, tmp_results, total_count, datatype);
312     else
313       mpi_errno = Datatype::copy(recvbuf, total_count, datatype, tmp_results, total_count, datatype);
314
315     if (mpi_errno)
316       return (mpi_errno);
317
318     mask = 0x1;
319     i    = 0;
320     while (mask < comm_size) {
321       dst = rank ^ mask;
322
323       dst_tree_root = dst >> i;
324       dst_tree_root <<= i;
325
326       my_tree_root = rank >> i;
327       my_tree_root <<= i;
328
329       /* At step 1, processes exchange (n-n/p) amount of
330          data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
331          amount of data, and so forth. We use derived datatypes for this.
332
333          At each step, a process does not need to send data
334          indexed from my_tree_root to
335          my_tree_root+mask-1. Similarly, a process won't receive
336          data indexed from dst_tree_root to dst_tree_root+mask-1. */
337
338       /* calculate sendtype */
339       blklens[0] = blklens[1] = 0;
340       for (j = 0; j < my_tree_root; j++)
341         blklens[0] += recvcounts[j];
342       for (j = my_tree_root + mask; j < comm_size; j++)
343         blklens[1] += recvcounts[j];
344
345       dis[0] = 0;
346       dis[1] = blklens[0];
347       for (j = my_tree_root; (j < my_tree_root + mask) && (j < comm_size); j++)
348         dis[1] += recvcounts[j];
349
350       mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &sendtype);
351       if (mpi_errno)
352         return (mpi_errno);
353
354       sendtype->commit();
355
356       /* calculate recvtype */
357       blklens[0] = blklens[1] = 0;
358       for (j = 0; j < dst_tree_root && j < comm_size; j++)
359         blklens[0] += recvcounts[j];
360       for (j = dst_tree_root + mask; j < comm_size; j++)
361         blklens[1] += recvcounts[j];
362
363       dis[0] = 0;
364       dis[1] = blklens[0];
365       for (j = dst_tree_root; (j < dst_tree_root + mask) && (j < comm_size); j++)
366         dis[1] += recvcounts[j];
367
368       mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &recvtype);
369       if (mpi_errno)
370         return (mpi_errno);
371
372       recvtype->commit();
373
374       received = 0;
375       if (dst < comm_size) {
376         /* tmp_results contains data to be sent in each step. Data is
377            received in tmp_recvbuf and then accumulated into
378            tmp_results. accumulation is done later below.   */
379
380         Request::sendrecv(tmp_results, 1, sendtype, dst, COLL_TAG_SCATTER, tmp_recvbuf, 1, recvtype, dst,
381                           COLL_TAG_SCATTER, comm, MPI_STATUS_IGNORE);
382         received = 1;
383       }
384
385       /* if some processes in this process's subtree in this step
386          did not have any destination process to communicate with
387          because of non-power-of-two, we need to send them the
388          result. We use a logarithmic recursive-halfing algorithm
389          for this. */
390
391       if (dst_tree_root + mask > comm_size) {
392         nprocs_completed = comm_size - my_tree_root - mask;
393         /* nprocs_completed is the number of processes in this
394            subtree that have all the data. Send data to others
395            in a tree fashion. First find root of current tree
396            that is being divided into two. k is the number of
397            least-significant bits in this process's rank that
398            must be zeroed out to find the rank of the root */
399         j = mask;
400         k = 0;
401         while (j) {
402           j >>= 1;
403           k++;
404         }
405         k--;
406
407         tmp_mask = mask >> 1;
408         while (tmp_mask) {
409           dst = rank ^ tmp_mask;
410
411           tree_root = rank >> k;
412           tree_root <<= k;
413
414           /* send only if this proc has data and destination
415              doesn't have data. at any step, multiple processes
416              can send if they have the data */
417           if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) {
418             /* send the current result */
419             Request::send(tmp_recvbuf, 1, recvtype, dst, COLL_TAG_SCATTER, comm);
420           }
421           /* recv only if this proc. doesn't have data and sender
422              has data */
423           else if ((dst < rank) && (dst < tree_root + nprocs_completed) && (rank >= tree_root + nprocs_completed)) {
424             Request::recv(tmp_recvbuf, 1, recvtype, dst, COLL_TAG_SCATTER, comm, MPI_STATUS_IGNORE);
425             received = 1;
426           }
427           tmp_mask >>= 1;
428           k--;
429         }
430       }
431
432       /* The following reduction is done here instead of after
433          the MPIC_Sendrecv_ft or MPIC_Recv_ft above. This is
434          because to do it above, in the noncommutative
435          case, we would need an extra temp buffer so as not to
436          overwrite temp_recvbuf, because temp_recvbuf may have
437          to be communicated to other processes in the
438          non-power-of-two case. To avoid that extra allocation,
439          we do the reduce here. */
440       if (received) {
441         if (is_commutative || (dst_tree_root < my_tree_root)) {
442           {
443             if (op != MPI_OP_NULL)
444               op->apply(tmp_recvbuf, tmp_results, &blklens[0], datatype);
445             if (op != MPI_OP_NULL)
446               op->apply(tmp_recvbuf + dis[1] * extent, tmp_results + dis[1] * extent, &blklens[1], datatype);
447           }
448         } else {
449           {
450             if (op != MPI_OP_NULL)
451               op->apply(tmp_results, tmp_recvbuf, &blklens[0], datatype);
452             if (op != MPI_OP_NULL)
453               op->apply(tmp_results + dis[1] * extent, tmp_recvbuf + dis[1] * extent, &blklens[1], datatype);
454           }
455           /* copy result back into tmp_results */
456           mpi_errno = Datatype::copy(tmp_recvbuf, 1, recvtype, tmp_results, 1, recvtype);
457           if (mpi_errno)
458             return (mpi_errno);
459         }
460       }
461
462       Datatype::unref(sendtype);
463       Datatype::unref(recvtype);
464
465       mask <<= 1;
466       i++;
467             }
468
469             /* now copy final results from tmp_results to recvbuf */
470             mpi_errno = Datatype::copy(tmp_results + disps[rank] * extent, recvcounts[rank], datatype, recvbuf,
471                                        recvcounts[rank], datatype);
472             if (mpi_errno) return(mpi_errno);
473
474     delete[] disps;
475     smpi_free_tmp_buffer(tmp_recvbuf);
476     smpi_free_tmp_buffer(tmp_results);
477     return MPI_SUCCESS;
478         }
479 }
480 }
481