Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines with new year.
[simgrid.git] / src / smpi / colls / bcast / bcast-scatter-rdb-allgather.cpp
1 /* Copyright (c) 2011-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "../colls_private.hpp"
7 #include "smpi_status.hpp"
8
9 namespace simgrid{
10 namespace smpi{
11
12 static int scatter_for_bcast(
13     int root,
14     MPI_Comm comm,
15     int nbytes,
16     void *tmp_buf)
17 {
18     MPI_Status status;
19     int        rank, comm_size, src, dst;
20     int        relative_rank, mask;
21     int mpi_errno = MPI_SUCCESS;
22     int scatter_size, curr_size, recv_size = 0, send_size;
23
24     comm_size = comm->size();
25     rank = comm->rank();
26     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
27
28     /* use long message algorithm: binomial tree scatter followed by an allgather */
29
30     /* The scatter algorithm divides the buffer into nprocs pieces and
31        scatters them among the processes. Root gets the first piece,
32        root+1 gets the second piece, and so forth. Uses the same binomial
33        tree algorithm as above. Ceiling division
34        is used to compute the size of each piece. This means some
35        processes may not get any data. For example if bufsize = 97 and
36        nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
37        scattered data is stored at the same offset in the buffer as it is
38        on the root process. */
39
40     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
41     curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
42                                                 data */
43
44     mask = 0x1;
45     while (mask < comm_size)
46     {
47         if (relative_rank & mask)
48         {
49             src = rank - mask;
50             if (src < 0) src += comm_size;
51             recv_size = nbytes - relative_rank*scatter_size;
52             /* recv_size is larger than what might actually be sent by the
53                sender. We don't need compute the exact value because MPI
54                allows you to post a larger recv.*/
55             if (recv_size <= 0)
56             {
57                 curr_size = 0; /* this process doesn't receive any data
58                                   because of uneven division */
59             }
60             else
61             {
62                 Request::recv(((char *)tmp_buf +
63                                           relative_rank*scatter_size),
64                                          recv_size, MPI_BYTE, src,
65                                          COLL_TAG_BCAST, comm, &status);
66                 /* query actual size of data received */
67                 curr_size=Status::get_count(&status, MPI_BYTE);
68             }
69             break;
70         }
71         mask <<= 1;
72     }
73
74     /* This process is responsible for all processes that have bits
75        set from the LSB upto (but not including) mask.  Because of
76        the "not including", we start by shifting mask back down
77        one. */
78
79     mask >>= 1;
80     while (mask > 0)
81     {
82         if (relative_rank + mask < comm_size)
83         {
84             send_size = curr_size - scatter_size * mask;
85             /* mask is also the size of this process's subtree */
86
87             if (send_size > 0)
88             {
89                 dst = rank + mask;
90                 if (dst >= comm_size) dst -= comm_size;
91                 Request::send(((char *)tmp_buf +
92                                           scatter_size*(relative_rank+mask)),
93                                          send_size, MPI_BYTE, dst,
94                                          COLL_TAG_BCAST, comm);
95                 curr_size -= send_size;
96             }
97         }
98         mask >>= 1;
99     }
100
101     return mpi_errno;
102 }
103
104
105 int
106 Coll_bcast_scatter_rdb_allgather::bcast (
107     void *buffer,
108     int count,
109     MPI_Datatype datatype,
110     int root,
111     MPI_Comm comm)
112 {
113     MPI_Status status;
114     int rank, comm_size, dst;
115     int relative_rank, mask;
116     int mpi_errno = MPI_SUCCESS;
117     int scatter_size, curr_size, recv_size = 0;
118     int j, k, i, tmp_mask, is_contig, is_homogeneous;
119     MPI_Aint type_size = 0, nbytes = 0;
120     int relative_dst, dst_tree_root, my_tree_root, send_offset;
121     int recv_offset, tree_root, nprocs_completed, offset;
122     int position;
123     MPI_Aint true_extent, true_lb;
124     void *tmp_buf;
125
126     comm_size = comm->size();
127     rank = comm->rank();
128     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
129
130     /* If there is only one process, return */
131     if (comm_size == 1) goto fn_exit;
132
133     //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
134     if(datatype->flags() & DT_FLAG_CONTIGUOUS)
135         is_contig = 1;
136     else {
137         is_contig = 0;
138     }
139
140     is_homogeneous = 1;
141
142     /* MPI_Type_size() might not give the accurate size of the packed
143      * datatype for heterogeneous systems (because of padding, encoding,
144      * etc). On the other hand, MPI_Pack_size() can become very
145      * expensive, depending on the implementation, especially for
146      * heterogeneous systems. We want to use MPI_Type_size() wherever
147      * possible, and MPI_Pack_size() in other places.
148      */
149     if (is_homogeneous)
150         type_size=datatype->size();
151
152     nbytes = type_size * count;
153     if (nbytes == 0)
154         goto fn_exit; /* nothing to do */
155
156     if (is_contig && is_homogeneous)
157     {
158         /* contiguous and homogeneous. no need to pack. */
159         datatype->extent(&true_lb, &true_extent);
160
161         tmp_buf = (char *) buffer + true_lb;
162     }
163     else
164     {
165         tmp_buf=(void*)xbt_malloc(nbytes);
166
167         /* TODO: Pipeline the packing and communication */
168         position = 0;
169         if (rank == root) {
170             mpi_errno = datatype->pack(buffer, count, tmp_buf, nbytes,
171                                        &position, comm);
172             if (mpi_errno) xbt_die("crash while packing %d", mpi_errno);
173         }
174     }
175
176
177     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
178
179     mpi_errno = scatter_for_bcast(root, comm,
180                                   nbytes, tmp_buf);
181     if (mpi_errno) {
182       xbt_die("crash while scattering %d", mpi_errno);
183     }
184
185     /* curr_size is the amount of data that this process now has stored in
186      * buffer at byte offset (relative_rank*scatter_size) */
187     curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
188     if (curr_size < 0)
189         curr_size = 0;
190
191     /* medium size allgather and pof2 comm_size. use recurive doubling. */
192
193     mask = 0x1;
194     i = 0;
195     while (mask < comm_size)
196     {
197         relative_dst = relative_rank ^ mask;
198
199         dst = (relative_dst + root) % comm_size;
200
201         /* find offset into send and recv buffers.
202            zero out the least significant "i" bits of relative_rank and
203            relative_dst to find root of src and dst
204            subtrees. Use ranks of roots as index to send from
205            and recv into  buffer */
206
207         dst_tree_root = relative_dst >> i;
208         dst_tree_root <<= i;
209
210         my_tree_root = relative_rank >> i;
211         my_tree_root <<= i;
212
213         send_offset = my_tree_root * scatter_size;
214         recv_offset = dst_tree_root * scatter_size;
215
216         if (relative_dst < comm_size)
217         {
218             Request::sendrecv(((char *)tmp_buf + send_offset),
219                                          curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
220                                          ((char *)tmp_buf + recv_offset),
221                                          (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
222                                          MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
223             recv_size=Status::get_count(&status, MPI_BYTE);
224             curr_size += recv_size;
225         }
226
227         /* if some processes in this process's subtree in this step
228            did not have any destination process to communicate with
229            because of non-power-of-two, we need to send them the
230            data that they would normally have received from those
231            processes. That is, the haves in this subtree must send to
232            the havenots. We use a logarithmic recursive-halfing algorithm
233            for this. */
234
235         /* This part of the code will not currently be
236            executed because we are not using recursive
237            doubling for non power of two. Mark it as experimental
238            so that it doesn't show up as red in the coverage tests. */
239
240         /* --BEGIN EXPERIMENTAL-- */
241         if (dst_tree_root + mask > comm_size)
242         {
243             nprocs_completed = comm_size - my_tree_root - mask;
244             /* nprocs_completed is the number of processes in this
245                subtree that have all the data. Send data to others
246                in a tree fashion. First find root of current tree
247                that is being divided into two. k is the number of
248                least-significant bits in this process's rank that
249                must be zeroed out to find the rank of the root */
250             j = mask;
251             k = 0;
252             while (j)
253             {
254                 j >>= 1;
255                 k++;
256             }
257             k--;
258
259             offset = scatter_size * (my_tree_root + mask);
260             tmp_mask = mask >> 1;
261
262             while (tmp_mask)
263             {
264                 relative_dst = relative_rank ^ tmp_mask;
265                 dst = (relative_dst + root) % comm_size;
266
267                 tree_root = relative_rank >> k;
268                 tree_root <<= k;
269
270                 /* send only if this proc has data and destination
271                    doesn't have data. */
272
273                 /* if (rank == 3) {
274                    printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
275                    fflush(stdout);
276                    }*/
277
278                 if ((relative_dst > relative_rank) &&
279                     (relative_rank < tree_root + nprocs_completed)
280                     && (relative_dst >= tree_root + nprocs_completed))
281                 {
282
283                     /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
284                        fflush(stdout); */
285                     Request::send(((char *)tmp_buf + offset),
286                                              recv_size, MPI_BYTE, dst,
287                                              COLL_TAG_BCAST, comm);
288                     /* recv_size was set in the previous
289                        receive. that's the amount of data to be
290                        sent now. */
291                 }
292                 /* recv only if this proc. doesn't have data and sender
293                    has data */
294                 else if ((relative_dst < relative_rank) &&
295                          (relative_dst < tree_root + nprocs_completed) &&
296                          (relative_rank >= tree_root + nprocs_completed))
297                 {
298                     /* printf("Rank %d waiting to recv from rank %d\n",
299                        relative_rank, dst); */
300                     Request::recv(((char *)tmp_buf + offset),
301                                              nbytes - offset,
302                                              MPI_BYTE, dst, COLL_TAG_BCAST,
303                                              comm, &status);
304                     /* nprocs_completed is also equal to the no. of processes
305                        whose data we don't have */
306                     recv_size=Status::get_count(&status, MPI_BYTE);
307                     curr_size += recv_size;
308                     /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
309                        fflush(stdout);*/
310                 }
311                 tmp_mask >>= 1;
312                 k--;
313             }
314         }
315         /* --END EXPERIMENTAL-- */
316
317         mask <<= 1;
318         i++;
319     }
320
321     /* check that we received as much as we expected */
322     /* recvd_size may not be accurate for packed heterogeneous data */
323     if (is_homogeneous && curr_size != nbytes) {
324       xbt_die("we didn't receive enough !");
325     }
326
327     if (not is_contig || not is_homogeneous) {
328       if (rank != root) {
329         position  = 0;
330         mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer, count, datatype, comm);
331         if (mpi_errno)
332           xbt_die("error when unpacking %d", mpi_errno);
333       }
334     }
335
336 fn_exit:
337 /*    xbt_free(tmp_buf);*/
338     return mpi_errno;
339 }
340
341 }
342 }