Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines.
[simgrid.git] / src / smpi / colls / bcast / bcast-scatter-rdb-allgather.cpp
1 /* Copyright (c) 2011-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "../colls_private.hpp"
7 #include "smpi_status.hpp"
8
9 namespace simgrid{
10 namespace smpi{
11
12 static int scatter_for_bcast(
13     int root,
14     MPI_Comm comm,
15     int nbytes,
16     void *tmp_buf)
17 {
18     MPI_Status status;
19     int        rank, comm_size, src, dst;
20     int        relative_rank, mask;
21     int mpi_errno = MPI_SUCCESS;
22     int scatter_size, curr_size, recv_size = 0, send_size;
23
24     comm_size = comm->size();
25     rank = comm->rank();
26     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
27
28     /* use long message algorithm: binomial tree scatter followed by an allgather */
29
30     /* The scatter algorithm divides the buffer into nprocs pieces and
31        scatters them among the processes. Root gets the first piece,
32        root+1 gets the second piece, and so forth. Uses the same binomial
33        tree algorithm as above. Ceiling division
34        is used to compute the size of each piece. This means some
35        processes may not get any data. For example if bufsize = 97 and
36        nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
37        scattered data is stored at the same offset in the buffer as it is
38        on the root process. */
39
40     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
41     curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
42                                                 data */
43
44     mask = 0x1;
45     while (mask < comm_size)
46     {
47         if (relative_rank & mask)
48         {
49             src = rank - mask;
50             if (src < 0) src += comm_size;
51             recv_size = nbytes - relative_rank*scatter_size;
52             /* recv_size is larger than what might actually be sent by the
53                sender. We don't need compute the exact value because MPI
54                allows you to post a larger recv.*/
55             if (recv_size <= 0)
56             {
57                 curr_size = 0; /* this process doesn't receive any data
58                                   because of uneven division */
59             }
60             else
61             {
62                 Request::recv(((char *)tmp_buf +
63                                           relative_rank*scatter_size),
64                                          recv_size, MPI_BYTE, src,
65                                          COLL_TAG_BCAST, comm, &status);
66                 /* query actual size of data received */
67                 curr_size=Status::get_count(&status, MPI_BYTE);
68             }
69             break;
70         }
71         mask <<= 1;
72     }
73
74     /* This process is responsible for all processes that have bits
75        set from the LSB up to (but not including) mask.  Because of
76        the "not including", we start by shifting mask back down
77        one. */
78
79     mask >>= 1;
80     while (mask > 0)
81     {
82         if (relative_rank + mask < comm_size)
83         {
84             send_size = curr_size - scatter_size * mask;
85             /* mask is also the size of this process's subtree */
86
87             if (send_size > 0)
88             {
89                 dst = rank + mask;
90                 if (dst >= comm_size) dst -= comm_size;
91                 Request::send(((char *)tmp_buf +
92                                           scatter_size*(relative_rank+mask)),
93                                          send_size, MPI_BYTE, dst,
94                                          COLL_TAG_BCAST, comm);
95                 curr_size -= send_size;
96             }
97         }
98         mask >>= 1;
99     }
100
101     return mpi_errno;
102 }
103
104
105 int bcast__scatter_rdb_allgather(
106     void *buffer,
107     int count,
108     MPI_Datatype datatype,
109     int root,
110     MPI_Comm comm)
111 {
112     MPI_Status status;
113     int rank, comm_size, dst;
114     int relative_rank, mask;
115     int mpi_errno = MPI_SUCCESS;
116     int scatter_size, curr_size, recv_size = 0;
117     int j, k, i, tmp_mask;
118     bool is_contig, is_homogeneous;
119     MPI_Aint type_size = 0, nbytes = 0;
120     int relative_dst, dst_tree_root, my_tree_root, send_offset;
121     int recv_offset, tree_root, nprocs_completed, offset;
122     int position;
123     MPI_Aint true_extent, true_lb;
124     void *tmp_buf;
125
126     comm_size = comm->size();
127     rank = comm->rank();
128     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
129
130     /* If there is only one process, return */
131     if (comm_size == 1) goto fn_exit;
132
133     //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
134     is_contig = ((datatype->flags() & DT_FLAG_CONTIGUOUS) != 0);
135
136     is_homogeneous = true;
137
138     /* MPI_Type_size() might not give the accurate size of the packed
139      * datatype for heterogeneous systems (because of padding, encoding,
140      * etc). On the other hand, MPI_Pack_size() can become very
141      * expensive, depending on the implementation, especially for
142      * heterogeneous systems. We want to use MPI_Type_size() wherever
143      * possible, and MPI_Pack_size() in other places.
144      */
145     if (is_homogeneous)
146         type_size=datatype->size();
147
148     nbytes = type_size * count;
149     if (nbytes == 0)
150         goto fn_exit; /* nothing to do */
151
152     if (is_contig && is_homogeneous)
153     {
154         /* contiguous and homogeneous. no need to pack. */
155         datatype->extent(&true_lb, &true_extent);
156
157         tmp_buf = (char *) buffer + true_lb;
158     }
159     else
160     {
161       tmp_buf = new unsigned char[nbytes];
162
163       /* TODO: Pipeline the packing and communication */
164       position = 0;
165       if (rank == root) {
166         mpi_errno = datatype->pack(buffer, count, tmp_buf, nbytes, &position, comm);
167         if (mpi_errno)
168           xbt_die("crash while packing %d", mpi_errno);
169       }
170     }
171
172
173     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
174
175     mpi_errno = scatter_for_bcast(root, comm,
176                                   nbytes, tmp_buf);
177     if (mpi_errno) {
178       xbt_die("crash while scattering %d", mpi_errno);
179     }
180
181     /* curr_size is the amount of data that this process now has stored in
182      * buffer at byte offset (relative_rank*scatter_size) */
183     curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
184     if (curr_size < 0)
185         curr_size = 0;
186
187     /* medium size allgather and pof2 comm_size. use recurive doubling. */
188
189     mask = 0x1;
190     i = 0;
191     while (mask < comm_size)
192     {
193         relative_dst = relative_rank ^ mask;
194
195         dst = (relative_dst + root) % comm_size;
196
197         /* find offset into send and recv buffers.
198            zero out the least significant "i" bits of relative_rank and
199            relative_dst to find root of src and dst
200            subtrees. Use ranks of roots as index to send from
201            and recv into  buffer */
202
203         dst_tree_root = relative_dst >> i;
204         dst_tree_root <<= i;
205
206         my_tree_root = relative_rank >> i;
207         my_tree_root <<= i;
208
209         send_offset = my_tree_root * scatter_size;
210         recv_offset = dst_tree_root * scatter_size;
211
212         if (relative_dst < comm_size)
213         {
214             Request::sendrecv(((char *)tmp_buf + send_offset),
215                                          curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
216                                          ((char *)tmp_buf + recv_offset),
217                                          (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
218                                          MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
219             recv_size=Status::get_count(&status, MPI_BYTE);
220             curr_size += recv_size;
221         }
222
223         /* if some processes in this process's subtree in this step
224            did not have any destination process to communicate with
225            because of non-power-of-two, we need to send them the
226            data that they would normally have received from those
227            processes. That is, the haves in this subtree must send to
228            the havenots. We use a logarithmic recursive-halfing algorithm
229            for this. */
230
231         /* This part of the code will not currently be
232            executed because we are not using recursive
233            doubling for non power of two. Mark it as experimental
234            so that it doesn't show up as red in the coverage tests. */
235
236         /* --BEGIN EXPERIMENTAL-- */
237         if (dst_tree_root + mask > comm_size)
238         {
239             nprocs_completed = comm_size - my_tree_root - mask;
240             /* nprocs_completed is the number of processes in this
241                subtree that have all the data. Send data to others
242                in a tree fashion. First find root of current tree
243                that is being divided into two. k is the number of
244                least-significant bits in this process's rank that
245                must be zeroed out to find the rank of the root */
246             j = mask;
247             k = 0;
248             while (j)
249             {
250                 j >>= 1;
251                 k++;
252             }
253             k--;
254
255             offset = scatter_size * (my_tree_root + mask);
256             tmp_mask = mask >> 1;
257
258             while (tmp_mask)
259             {
260                 relative_dst = relative_rank ^ tmp_mask;
261                 dst = (relative_dst + root) % comm_size;
262
263                 tree_root = relative_rank >> k;
264                 tree_root <<= k;
265
266                 /* send only if this proc has data and destination
267                    doesn't have data. */
268
269                 /* if (rank == 3) {
270                    printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
271                    fflush(stdout);
272                    }*/
273
274                 if ((relative_dst > relative_rank) &&
275                     (relative_rank < tree_root + nprocs_completed)
276                     && (relative_dst >= tree_root + nprocs_completed))
277                 {
278
279                     /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
280                        fflush(stdout); */
281                     Request::send(((char *)tmp_buf + offset),
282                                              recv_size, MPI_BYTE, dst,
283                                              COLL_TAG_BCAST, comm);
284                     /* recv_size was set in the previous
285                        receive. that's the amount of data to be
286                        sent now. */
287                 }
288                 /* recv only if this proc. doesn't have data and sender
289                    has data */
290                 else if ((relative_dst < relative_rank) &&
291                          (relative_dst < tree_root + nprocs_completed) &&
292                          (relative_rank >= tree_root + nprocs_completed))
293                 {
294                     /* printf("Rank %d waiting to recv from rank %d\n",
295                        relative_rank, dst); */
296                     Request::recv(((char *)tmp_buf + offset),
297                                              nbytes - offset,
298                                              MPI_BYTE, dst, COLL_TAG_BCAST,
299                                              comm, &status);
300                     /* nprocs_completed is also equal to the no. of processes
301                        whose data we don't have */
302                     recv_size=Status::get_count(&status, MPI_BYTE);
303                     curr_size += recv_size;
304                     /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
305                        fflush(stdout);*/
306                 }
307                 tmp_mask >>= 1;
308                 k--;
309             }
310         }
311         /* --END EXPERIMENTAL-- */
312
313         mask <<= 1;
314         i++;
315     }
316
317     /* check that we received as much as we expected */
318     /* recvd_size may not be accurate for packed heterogeneous data */
319     if (is_homogeneous && curr_size != nbytes) {
320       xbt_die("we didn't receive enough !");
321     }
322
323     if (not is_contig || not is_homogeneous) {
324       if (rank != root) {
325         position  = 0;
326         mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer, count, datatype, comm);
327         if (mpi_errno)
328           xbt_die("error when unpacking %d", mpi_errno);
329       }
330     }
331
332 fn_exit:
333   /* delete[] static_cast<unsigned char*>(tmp_buf); */
334   return mpi_errno;
335 }
336
337 }
338 }