Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9cf489b424d07c9c4fbb9fd444878dc636a1e63e
[simgrid.git] / src / smpi / colls / bcast / bcast-scatter-rdb-allgather.cpp
1 /* Copyright (c) 2011-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "../colls_private.hpp"
7 #include "smpi_status.hpp"
8
9 namespace simgrid{
10 namespace smpi{
11
12 static int scatter_for_bcast(
13     int root,
14     MPI_Comm comm,
15     int nbytes,
16     void *tmp_buf)
17 {
18     MPI_Status status;
19     int        rank, comm_size, src, dst;
20     int        relative_rank, mask;
21     int mpi_errno = MPI_SUCCESS;
22     int scatter_size, curr_size, recv_size = 0, send_size;
23
24     comm_size = comm->size();
25     rank = comm->rank();
26     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
27
28     /* use long message algorithm: binomial tree scatter followed by an allgather */
29
30     /* The scatter algorithm divides the buffer into nprocs pieces and
31        scatters them among the processes. Root gets the first piece,
32        root+1 gets the second piece, and so forth. Uses the same binomial
33        tree algorithm as above. Ceiling division
34        is used to compute the size of each piece. This means some
35        processes may not get any data. For example if bufsize = 97 and
36        nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
37        scattered data is stored at the same offset in the buffer as it is
38        on the root process. */
39
40     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
41     curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
42                                                 data */
43
44     mask = 0x1;
45     while (mask < comm_size)
46     {
47         if (relative_rank & mask)
48         {
49             src = rank - mask;
50             if (src < 0) src += comm_size;
51             recv_size = nbytes - relative_rank*scatter_size;
52             /* recv_size is larger than what might actually be sent by the
53                sender. We don't need compute the exact value because MPI
54                allows you to post a larger recv.*/
55             if (recv_size <= 0)
56             {
57                 curr_size = 0; /* this process doesn't receive any data
58                                   because of uneven division */
59             }
60             else
61             {
62                 Request::recv(((char *)tmp_buf +
63                                           relative_rank*scatter_size),
64                                          recv_size, MPI_BYTE, src,
65                                          COLL_TAG_BCAST, comm, &status);
66                 /* query actual size of data received */
67                 curr_size=Status::get_count(&status, MPI_BYTE);
68             }
69             break;
70         }
71         mask <<= 1;
72     }
73
74     /* This process is responsible for all processes that have bits
75        set from the LSB upto (but not including) mask.  Because of
76        the "not including", we start by shifting mask back down
77        one. */
78
79     mask >>= 1;
80     while (mask > 0)
81     {
82         if (relative_rank + mask < comm_size)
83         {
84             send_size = curr_size - scatter_size * mask;
85             /* mask is also the size of this process's subtree */
86
87             if (send_size > 0)
88             {
89                 dst = rank + mask;
90                 if (dst >= comm_size) dst -= comm_size;
91                 Request::send(((char *)tmp_buf +
92                                           scatter_size*(relative_rank+mask)),
93                                          send_size, MPI_BYTE, dst,
94                                          COLL_TAG_BCAST, comm);
95                 curr_size -= send_size;
96             }
97         }
98         mask >>= 1;
99     }
100
101     return mpi_errno;
102 }
103
104
105 int bcast__scatter_rdb_allgather(
106     void *buffer,
107     int count,
108     MPI_Datatype datatype,
109     int root,
110     MPI_Comm comm)
111 {
112     MPI_Status status;
113     int rank, comm_size, dst;
114     int relative_rank, mask;
115     int mpi_errno = MPI_SUCCESS;
116     int scatter_size, curr_size, recv_size = 0;
117     int j, k, i, tmp_mask, is_contig, is_homogeneous;
118     MPI_Aint type_size = 0, nbytes = 0;
119     int relative_dst, dst_tree_root, my_tree_root, send_offset;
120     int recv_offset, tree_root, nprocs_completed, offset;
121     int position;
122     MPI_Aint true_extent, true_lb;
123     void *tmp_buf;
124
125     comm_size = comm->size();
126     rank = comm->rank();
127     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
128
129     /* If there is only one process, return */
130     if (comm_size == 1) goto fn_exit;
131
132     //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
133     if(datatype->flags() & DT_FLAG_CONTIGUOUS)
134         is_contig = 1;
135     else {
136         is_contig = 0;
137     }
138
139     is_homogeneous = 1;
140
141     /* MPI_Type_size() might not give the accurate size of the packed
142      * datatype for heterogeneous systems (because of padding, encoding,
143      * etc). On the other hand, MPI_Pack_size() can become very
144      * expensive, depending on the implementation, especially for
145      * heterogeneous systems. We want to use MPI_Type_size() wherever
146      * possible, and MPI_Pack_size() in other places.
147      */
148     if (is_homogeneous)
149         type_size=datatype->size();
150
151     nbytes = type_size * count;
152     if (nbytes == 0)
153         goto fn_exit; /* nothing to do */
154
155     if (is_contig && is_homogeneous)
156     {
157         /* contiguous and homogeneous. no need to pack. */
158         datatype->extent(&true_lb, &true_extent);
159
160         tmp_buf = (char *) buffer + true_lb;
161     }
162     else
163     {
164       tmp_buf = new unsigned char[nbytes];
165
166       /* TODO: Pipeline the packing and communication */
167       position = 0;
168       if (rank == root) {
169         mpi_errno = datatype->pack(buffer, count, tmp_buf, nbytes, &position, comm);
170         if (mpi_errno)
171           xbt_die("crash while packing %d", mpi_errno);
172       }
173     }
174
175
176     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
177
178     mpi_errno = scatter_for_bcast(root, comm,
179                                   nbytes, tmp_buf);
180     if (mpi_errno) {
181       xbt_die("crash while scattering %d", mpi_errno);
182     }
183
184     /* curr_size is the amount of data that this process now has stored in
185      * buffer at byte offset (relative_rank*scatter_size) */
186     curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
187     if (curr_size < 0)
188         curr_size = 0;
189
190     /* medium size allgather and pof2 comm_size. use recurive doubling. */
191
192     mask = 0x1;
193     i = 0;
194     while (mask < comm_size)
195     {
196         relative_dst = relative_rank ^ mask;
197
198         dst = (relative_dst + root) % comm_size;
199
200         /* find offset into send and recv buffers.
201            zero out the least significant "i" bits of relative_rank and
202            relative_dst to find root of src and dst
203            subtrees. Use ranks of roots as index to send from
204            and recv into  buffer */
205
206         dst_tree_root = relative_dst >> i;
207         dst_tree_root <<= i;
208
209         my_tree_root = relative_rank >> i;
210         my_tree_root <<= i;
211
212         send_offset = my_tree_root * scatter_size;
213         recv_offset = dst_tree_root * scatter_size;
214
215         if (relative_dst < comm_size)
216         {
217             Request::sendrecv(((char *)tmp_buf + send_offset),
218                                          curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
219                                          ((char *)tmp_buf + recv_offset),
220                                          (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
221                                          MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
222             recv_size=Status::get_count(&status, MPI_BYTE);
223             curr_size += recv_size;
224         }
225
226         /* if some processes in this process's subtree in this step
227            did not have any destination process to communicate with
228            because of non-power-of-two, we need to send them the
229            data that they would normally have received from those
230            processes. That is, the haves in this subtree must send to
231            the havenots. We use a logarithmic recursive-halfing algorithm
232            for this. */
233
234         /* This part of the code will not currently be
235            executed because we are not using recursive
236            doubling for non power of two. Mark it as experimental
237            so that it doesn't show up as red in the coverage tests. */
238
239         /* --BEGIN EXPERIMENTAL-- */
240         if (dst_tree_root + mask > comm_size)
241         {
242             nprocs_completed = comm_size - my_tree_root - mask;
243             /* nprocs_completed is the number of processes in this
244                subtree that have all the data. Send data to others
245                in a tree fashion. First find root of current tree
246                that is being divided into two. k is the number of
247                least-significant bits in this process's rank that
248                must be zeroed out to find the rank of the root */
249             j = mask;
250             k = 0;
251             while (j)
252             {
253                 j >>= 1;
254                 k++;
255             }
256             k--;
257
258             offset = scatter_size * (my_tree_root + mask);
259             tmp_mask = mask >> 1;
260
261             while (tmp_mask)
262             {
263                 relative_dst = relative_rank ^ tmp_mask;
264                 dst = (relative_dst + root) % comm_size;
265
266                 tree_root = relative_rank >> k;
267                 tree_root <<= k;
268
269                 /* send only if this proc has data and destination
270                    doesn't have data. */
271
272                 /* if (rank == 3) {
273                    printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
274                    fflush(stdout);
275                    }*/
276
277                 if ((relative_dst > relative_rank) &&
278                     (relative_rank < tree_root + nprocs_completed)
279                     && (relative_dst >= tree_root + nprocs_completed))
280                 {
281
282                     /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
283                        fflush(stdout); */
284                     Request::send(((char *)tmp_buf + offset),
285                                              recv_size, MPI_BYTE, dst,
286                                              COLL_TAG_BCAST, comm);
287                     /* recv_size was set in the previous
288                        receive. that's the amount of data to be
289                        sent now. */
290                 }
291                 /* recv only if this proc. doesn't have data and sender
292                    has data */
293                 else if ((relative_dst < relative_rank) &&
294                          (relative_dst < tree_root + nprocs_completed) &&
295                          (relative_rank >= tree_root + nprocs_completed))
296                 {
297                     /* printf("Rank %d waiting to recv from rank %d\n",
298                        relative_rank, dst); */
299                     Request::recv(((char *)tmp_buf + offset),
300                                              nbytes - offset,
301                                              MPI_BYTE, dst, COLL_TAG_BCAST,
302                                              comm, &status);
303                     /* nprocs_completed is also equal to the no. of processes
304                        whose data we don't have */
305                     recv_size=Status::get_count(&status, MPI_BYTE);
306                     curr_size += recv_size;
307                     /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
308                        fflush(stdout);*/
309                 }
310                 tmp_mask >>= 1;
311                 k--;
312             }
313         }
314         /* --END EXPERIMENTAL-- */
315
316         mask <<= 1;
317         i++;
318     }
319
320     /* check that we received as much as we expected */
321     /* recvd_size may not be accurate for packed heterogeneous data */
322     if (is_homogeneous && curr_size != nbytes) {
323       xbt_die("we didn't receive enough !");
324     }
325
326     if (not is_contig || not is_homogeneous) {
327       if (rank != root) {
328         position  = 0;
329         mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer, count, datatype, comm);
330         if (mpi_errno)
331           xbt_die("error when unpacking %d", mpi_errno);
332       }
333     }
334
335 fn_exit:
336   /* delete[] static_cast<unsigned char*>(tmp_buf); */
337   return mpi_errno;
338 }
339
340 }
341 }