Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / smpi / colls / bcast / bcast-scatter-rdb-allgather.cpp
1 /* Copyright (c) 2011-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "../colls_private.hpp"
7 #include "smpi_status.hpp"
8
9 namespace simgrid::smpi {
10
11 static int scatter_for_bcast(
12     int root,
13     MPI_Comm comm,
14     int nbytes,
15     void *tmp_buf)
16 {
17     MPI_Status status;
18     int        rank, comm_size, src, dst;
19     int        relative_rank, mask;
20     int mpi_errno = MPI_SUCCESS;
21     int scatter_size, curr_size, recv_size = 0, send_size;
22
23     comm_size = comm->size();
24     rank = comm->rank();
25     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
26
27     /* use long message algorithm: binomial tree scatter followed by an allgather */
28
29     /* The scatter algorithm divides the buffer into nprocs pieces and
30        scatters them among the processes. Root gets the first piece,
31        root+1 gets the second piece, and so forth. Uses the same binomial
32        tree algorithm as above. Ceiling division
33        is used to compute the size of each piece. This means some
34        processes may not get any data. For example if bufsize = 97 and
35        nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
36        scattered data is stored at the same offset in the buffer as it is
37        on the root process. */
38
39     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
40     curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
41                                                 data */
42
43     mask = 0x1;
44     while (mask < comm_size)
45     {
46         if (relative_rank & mask)
47         {
48             src = rank - mask;
49             if (src < 0) src += comm_size;
50             recv_size = nbytes - relative_rank*scatter_size;
51             /* recv_size is larger than what might actually be sent by the
52                sender. We don't need compute the exact value because MPI
53                allows you to post a larger recv.*/
54             if (recv_size <= 0)
55             {
56                 curr_size = 0; /* this process doesn't receive any data
57                                   because of uneven division */
58             }
59             else
60             {
61                 Request::recv(((char *)tmp_buf +
62                                           relative_rank*scatter_size),
63                                          recv_size, MPI_BYTE, src,
64                                          COLL_TAG_BCAST, comm, &status);
65                 /* query actual size of data received */
66                 curr_size=Status::get_count(&status, MPI_BYTE);
67             }
68             break;
69         }
70         mask <<= 1;
71     }
72
73     /* This process is responsible for all processes that have bits
74        set from the LSB up to (but not including) mask.  Because of
75        the "not including", we start by shifting mask back down
76        one. */
77
78     mask >>= 1;
79     while (mask > 0)
80     {
81         if (relative_rank + mask < comm_size)
82         {
83             send_size = curr_size - scatter_size * mask;
84             /* mask is also the size of this process's subtree */
85
86             if (send_size > 0)
87             {
88                 dst = rank + mask;
89                 if (dst >= comm_size) dst -= comm_size;
90                 Request::send(((char *)tmp_buf +
91                                           scatter_size*(relative_rank+mask)),
92                                          send_size, MPI_BYTE, dst,
93                                          COLL_TAG_BCAST, comm);
94                 curr_size -= send_size;
95             }
96         }
97         mask >>= 1;
98     }
99
100     return mpi_errno;
101 }
102
103
104 int bcast__scatter_rdb_allgather(
105     void *buffer,
106     int count,
107     MPI_Datatype datatype,
108     int root,
109     MPI_Comm comm)
110 {
111     MPI_Status status;
112     int rank, comm_size, dst;
113     int relative_rank, mask;
114     int mpi_errno = MPI_SUCCESS;
115     int scatter_size, curr_size, recv_size = 0;
116     int j, k, i, tmp_mask;
117     bool is_contig, is_homogeneous;
118     MPI_Aint type_size = 0, nbytes = 0;
119     int relative_dst, dst_tree_root, my_tree_root, send_offset;
120     int recv_offset, tree_root, nprocs_completed, offset;
121     int position;
122     MPI_Aint true_extent, true_lb;
123     void *tmp_buf;
124
125     comm_size = comm->size();
126     rank = comm->rank();
127     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
128
129     /* If there is only one process, return */
130     if (comm_size == 1) goto fn_exit;
131
132     //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
133     is_contig = ((datatype->flags() & DT_FLAG_CONTIGUOUS) != 0);
134
135     is_homogeneous = true;
136
137     /* MPI_Type_size() might not give the accurate size of the packed
138      * datatype for heterogeneous systems (because of padding, encoding,
139      * etc). On the other hand, MPI_Pack_size() can become very
140      * expensive, depending on the implementation, especially for
141      * heterogeneous systems. We want to use MPI_Type_size() wherever
142      * possible, and MPI_Pack_size() in other places.
143      */
144     if (is_homogeneous)
145         type_size=datatype->size();
146
147     nbytes = type_size * count;
148     if (nbytes == 0)
149         goto fn_exit; /* nothing to do */
150
151     if (is_contig && is_homogeneous)
152     {
153         /* contiguous and homogeneous. no need to pack. */
154         datatype->extent(&true_lb, &true_extent);
155
156         tmp_buf = (char *) buffer + true_lb;
157     }
158     else
159     {
160       tmp_buf = new unsigned char[nbytes];
161
162       /* TODO: Pipeline the packing and communication */
163       position = 0;
164       if (rank == root) {
165         mpi_errno = datatype->pack(buffer, count, tmp_buf, nbytes, &position, comm);
166         xbt_assert(mpi_errno == 0, "crash while packing %d", mpi_errno);
167       }
168     }
169
170
171     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
172
173     mpi_errno = scatter_for_bcast(root, comm,
174                                   nbytes, tmp_buf);
175     xbt_assert(mpi_errno == 0, "crash while scattering %d", mpi_errno);
176
177     /* curr_size is the amount of data that this process now has stored in
178      * buffer at byte offset (relative_rank*scatter_size) */
179     curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
180     if (curr_size < 0)
181         curr_size = 0;
182
183     /* medium size allgather and pof2 comm_size. use recurive doubling. */
184
185     mask = 0x1;
186     i = 0;
187     while (mask < comm_size)
188     {
189         relative_dst = relative_rank ^ mask;
190
191         dst = (relative_dst + root) % comm_size;
192
193         /* find offset into send and recv buffers.
194            zero out the least significant "i" bits of relative_rank and
195            relative_dst to find root of src and dst
196            subtrees. Use ranks of roots as index to send from
197            and recv into  buffer */
198
199         dst_tree_root = relative_dst >> i;
200         dst_tree_root <<= i;
201
202         my_tree_root = relative_rank >> i;
203         my_tree_root <<= i;
204
205         send_offset = my_tree_root * scatter_size;
206         recv_offset = dst_tree_root * scatter_size;
207
208         if (relative_dst < comm_size)
209         {
210             Request::sendrecv(((char *)tmp_buf + send_offset),
211                                          curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
212                                          ((char *)tmp_buf + recv_offset),
213                                          (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
214                                          MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
215             recv_size=Status::get_count(&status, MPI_BYTE);
216             curr_size += recv_size;
217         }
218
219         /* if some processes in this process's subtree in this step
220            did not have any destination process to communicate with
221            because of non-power-of-two, we need to send them the
222            data that they would normally have received from those
223            processes. That is, the haves in this subtree must send to
224            the havenots. We use a logarithmic recursive-halfing algorithm
225            for this. */
226
227         /* This part of the code will not currently be
228            executed because we are not using recursive
229            doubling for non power of two. Mark it as experimental
230            so that it doesn't show up as red in the coverage tests. */
231
232         /* --BEGIN EXPERIMENTAL-- */
233         if (dst_tree_root + mask > comm_size)
234         {
235             nprocs_completed = comm_size - my_tree_root - mask;
236             /* nprocs_completed is the number of processes in this
237                subtree that have all the data. Send data to others
238                in a tree fashion. First find root of current tree
239                that is being divided into two. k is the number of
240                least-significant bits in this process's rank that
241                must be zeroed out to find the rank of the root */
242             j = mask;
243             k = 0;
244             while (j)
245             {
246                 j >>= 1;
247                 k++;
248             }
249             k--;
250
251             offset = scatter_size * (my_tree_root + mask);
252             tmp_mask = mask >> 1;
253
254             while (tmp_mask)
255             {
256                 relative_dst = relative_rank ^ tmp_mask;
257                 dst = (relative_dst + root) % comm_size;
258
259                 tree_root = relative_rank >> k;
260                 tree_root <<= k;
261
262                 /* send only if this proc has data and destination
263                    doesn't have data. */
264
265                 /* if (rank == 3) {
266                    printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
267                    fflush(stdout);
268                    }*/
269
270                 if ((relative_dst > relative_rank) &&
271                     (relative_rank < tree_root + nprocs_completed)
272                     && (relative_dst >= tree_root + nprocs_completed))
273                 {
274
275                     /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
276                        fflush(stdout); */
277                     Request::send(((char *)tmp_buf + offset),
278                                              recv_size, MPI_BYTE, dst,
279                                              COLL_TAG_BCAST, comm);
280                     /* recv_size was set in the previous
281                        receive. that's the amount of data to be
282                        sent now. */
283                 }
284                 /* recv only if this proc. doesn't have data and sender
285                    has data */
286                 else if ((relative_dst < relative_rank) &&
287                          (relative_dst < tree_root + nprocs_completed) &&
288                          (relative_rank >= tree_root + nprocs_completed))
289                 {
290                     /* printf("Rank %d waiting to recv from rank %d\n",
291                        relative_rank, dst); */
292                     Request::recv(((char *)tmp_buf + offset),
293                                              nbytes - offset,
294                                              MPI_BYTE, dst, COLL_TAG_BCAST,
295                                              comm, &status);
296                     /* nprocs_completed is also equal to the no. of processes
297                        whose data we don't have */
298                     recv_size=Status::get_count(&status, MPI_BYTE);
299                     curr_size += recv_size;
300                     /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
301                        fflush(stdout);*/
302                 }
303                 tmp_mask >>= 1;
304                 k--;
305             }
306         }
307         /* --END EXPERIMENTAL-- */
308
309         mask <<= 1;
310         i++;
311     }
312
313     /* check that we received as much as we expected */
314     /* recvd_size may not be accurate for packed heterogeneous data */
315     xbt_assert(not is_homogeneous || curr_size == nbytes, "we didn't receive enough !");
316
317     if (not is_contig || not is_homogeneous) {
318       if (rank != root) {
319         position  = 0;
320         mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer, count, datatype, comm);
321         xbt_assert(mpi_errno == 0, "error when unpacking %d", mpi_errno);
322       }
323     }
324
325 fn_exit:
326   /* delete[] static_cast<unsigned char*>(tmp_buf); */
327   return mpi_errno;
328 }
329
330 } // namespace simgrid::smpi