Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move all smpi colls to cpp.
[simgrid.git] / src / smpi / colls / bcast-scatter-rdb-allgather.cpp
1 #include "colls_private.h"
2 #include "src/smpi/smpi_mpi_dt_private.h"
3
4 static int scatter_for_bcast(
5     int root,
6     MPI_Comm comm,
7     int nbytes,
8     void *tmp_buf)
9 {
10     MPI_Status status;
11     int        rank, comm_size, src, dst;
12     int        relative_rank, mask;
13     int mpi_errno = MPI_SUCCESS;
14     int scatter_size, curr_size, recv_size = 0, send_size;
15
16     comm_size = smpi_comm_size(comm);
17     rank = smpi_comm_rank(comm);
18     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
19
20     /* use long message algorithm: binomial tree scatter followed by an allgather */
21
22     /* The scatter algorithm divides the buffer into nprocs pieces and
23        scatters them among the processes. Root gets the first piece,
24        root+1 gets the second piece, and so forth. Uses the same binomial
25        tree algorithm as above. Ceiling division
26        is used to compute the size of each piece. This means some
27        processes may not get any data. For example if bufsize = 97 and
28        nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
29        scattered data is stored at the same offset in the buffer as it is
30        on the root process. */ 
31
32     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
33     curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
34                                                 data */
35
36     mask = 0x1;
37     while (mask < comm_size)
38     {
39         if (relative_rank & mask)
40         {
41             src = rank - mask; 
42             if (src < 0) src += comm_size;
43             recv_size = nbytes - relative_rank*scatter_size;
44             /* recv_size is larger than what might actually be sent by the
45                sender. We don't need compute the exact value because MPI
46                allows you to post a larger recv.*/ 
47             if (recv_size <= 0)
48             {
49                 curr_size = 0; /* this process doesn't receive any data
50                                   because of uneven division */
51             }
52             else
53             {
54                 smpi_mpi_recv(((char *)tmp_buf +
55                                           relative_rank*scatter_size),
56                                          recv_size, MPI_BYTE, src,
57                                          COLL_TAG_BCAST, comm, &status);
58                 /* query actual size of data received */
59                 curr_size=smpi_mpi_get_count(&status, MPI_BYTE);
60             }
61             break;
62         }
63         mask <<= 1;
64     }
65
66     /* This process is responsible for all processes that have bits
67        set from the LSB upto (but not including) mask.  Because of
68        the "not including", we start by shifting mask back down
69        one. */
70
71     mask >>= 1;
72     while (mask > 0)
73     {
74         if (relative_rank + mask < comm_size)
75         {
76             send_size = curr_size - scatter_size * mask; 
77             /* mask is also the size of this process's subtree */
78
79             if (send_size > 0)
80             {
81                 dst = rank + mask;
82                 if (dst >= comm_size) dst -= comm_size;
83                 smpi_mpi_send(((char *)tmp_buf +
84                                           scatter_size*(relative_rank+mask)),
85                                          send_size, MPI_BYTE, dst,
86                                          COLL_TAG_BCAST, comm);
87                 curr_size -= send_size;
88             }
89         }
90         mask >>= 1;
91     }
92
93     return mpi_errno;
94 }
95
96 int
97 smpi_coll_tuned_bcast_scatter_rdb_allgather (
98     void *buffer, 
99     int count, 
100     MPI_Datatype datatype, 
101     int root, 
102     MPI_Comm comm)
103 {
104     MPI_Status status;
105     int rank, comm_size, dst;
106     int relative_rank, mask;
107     int mpi_errno = MPI_SUCCESS;
108     int scatter_size, curr_size, recv_size = 0;
109     int j, k, i, tmp_mask, is_contig, is_homogeneous;
110     MPI_Aint type_size = 0, nbytes = 0;
111     int relative_dst, dst_tree_root, my_tree_root, send_offset;
112     int recv_offset, tree_root, nprocs_completed, offset;
113     int position;
114     MPI_Aint true_extent, true_lb;
115     void *tmp_buf;
116
117     comm_size = smpi_comm_size(comm);
118     rank = smpi_comm_rank(comm);
119     relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
120
121     /* If there is only one process, return */
122     if (comm_size == 1) goto fn_exit;
123
124     //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
125     if(datatype->flags & DT_FLAG_CONTIGUOUS)
126         is_contig = 1;
127     else {
128         is_contig = 0;
129     }
130
131     is_homogeneous = 1;
132
133     /* MPI_Type_size() might not give the accurate size of the packed
134      * datatype for heterogeneous systems (because of padding, encoding,
135      * etc). On the other hand, MPI_Pack_size() can become very
136      * expensive, depending on the implementation, especially for
137      * heterogeneous systems. We want to use MPI_Type_size() wherever
138      * possible, and MPI_Pack_size() in other places.
139      */
140     if (is_homogeneous)
141         type_size=smpi_datatype_size(datatype);
142
143     nbytes = type_size * count;
144     if (nbytes == 0)
145         goto fn_exit; /* nothing to do */
146
147     if (is_contig && is_homogeneous)
148     {
149         /* contiguous and homogeneous. no need to pack. */
150         smpi_datatype_extent(datatype, &true_lb, &true_extent);
151
152         tmp_buf = (char *) buffer + true_lb;
153     }
154     else
155     {
156         tmp_buf=(void*)xbt_malloc(nbytes);
157
158         /* TODO: Pipeline the packing and communication */
159         position = 0;
160         if (rank == root) {
161             mpi_errno = smpi_mpi_pack(buffer, count, datatype, tmp_buf, nbytes,
162                                        &position, comm);
163             if (mpi_errno) xbt_die("crash while packing %d", mpi_errno);
164         }
165     }
166
167
168     scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
169
170     mpi_errno = scatter_for_bcast(root, comm,
171                                   nbytes, tmp_buf);
172     if (mpi_errno) {
173       xbt_die("crash while scattering %d", mpi_errno);
174     }
175
176     /* curr_size is the amount of data that this process now has stored in
177      * buffer at byte offset (relative_rank*scatter_size) */
178     curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size :  (nbytes - (relative_rank * scatter_size)) ;
179     if (curr_size < 0)
180         curr_size = 0;
181
182     /* medium size allgather and pof2 comm_size. use recurive doubling. */
183
184     mask = 0x1;
185     i = 0;
186     while (mask < comm_size)
187     {
188         relative_dst = relative_rank ^ mask;
189
190         dst = (relative_dst + root) % comm_size; 
191
192         /* find offset into send and recv buffers.
193            zero out the least significant "i" bits of relative_rank and
194            relative_dst to find root of src and dst
195            subtrees. Use ranks of roots as index to send from
196            and recv into  buffer */ 
197
198         dst_tree_root = relative_dst >> i;
199         dst_tree_root <<= i;
200
201         my_tree_root = relative_rank >> i;
202         my_tree_root <<= i;
203
204         send_offset = my_tree_root * scatter_size;
205         recv_offset = dst_tree_root * scatter_size;
206
207         if (relative_dst < comm_size)
208         {
209             smpi_mpi_sendrecv(((char *)tmp_buf + send_offset),
210                                          curr_size, MPI_BYTE, dst, COLL_TAG_BCAST, 
211                                          ((char *)tmp_buf + recv_offset),
212                                          (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset), 
213                                          MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
214             recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
215             curr_size += recv_size;
216         }
217
218         /* if some processes in this process's subtree in this step
219            did not have any destination process to communicate with
220            because of non-power-of-two, we need to send them the
221            data that they would normally have received from those
222            processes. That is, the haves in this subtree must send to
223            the havenots. We use a logarithmic recursive-halfing algorithm
224            for this. */
225
226         /* This part of the code will not currently be
227            executed because we are not using recursive
228            doubling for non power of two. Mark it as experimental
229            so that it doesn't show up as red in the coverage tests. */  
230
231         /* --BEGIN EXPERIMENTAL-- */
232         if (dst_tree_root + mask > comm_size)
233         {
234             nprocs_completed = comm_size - my_tree_root - mask;
235             /* nprocs_completed is the number of processes in this
236                subtree that have all the data. Send data to others
237                in a tree fashion. First find root of current tree
238                that is being divided into two. k is the number of
239                least-significant bits in this process's rank that
240                must be zeroed out to find the rank of the root */ 
241             j = mask;
242             k = 0;
243             while (j)
244             {
245                 j >>= 1;
246                 k++;
247             }
248             k--;
249
250             offset = scatter_size * (my_tree_root + mask);
251             tmp_mask = mask >> 1;
252
253             while (tmp_mask)
254             {
255                 relative_dst = relative_rank ^ tmp_mask;
256                 dst = (relative_dst + root) % comm_size; 
257
258                 tree_root = relative_rank >> k;
259                 tree_root <<= k;
260
261                 /* send only if this proc has data and destination
262                    doesn't have data. */
263
264                 /* if (rank == 3) { 
265                    printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
266                    fflush(stdout);
267                    }*/
268
269                 if ((relative_dst > relative_rank) && 
270                     (relative_rank < tree_root + nprocs_completed)
271                     && (relative_dst >= tree_root + nprocs_completed))
272                 {
273
274                     /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
275                        fflush(stdout); */
276                     smpi_mpi_send(((char *)tmp_buf + offset),
277                                              recv_size, MPI_BYTE, dst,
278                                              COLL_TAG_BCAST, comm);
279                     /* recv_size was set in the previous
280                        receive. that's the amount of data to be
281                        sent now. */
282                 }
283                 /* recv only if this proc. doesn't have data and sender
284                    has data */
285                 else if ((relative_dst < relative_rank) && 
286                          (relative_dst < tree_root + nprocs_completed) &&
287                          (relative_rank >= tree_root + nprocs_completed))
288                 {
289                     /* printf("Rank %d waiting to recv from rank %d\n",
290                        relative_rank, dst); */
291                     smpi_mpi_recv(((char *)tmp_buf + offset),
292                                              nbytes - offset, 
293                                              MPI_BYTE, dst, COLL_TAG_BCAST,
294                                              comm, &status);
295                     /* nprocs_completed is also equal to the no. of processes
296                        whose data we don't have */
297                     recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
298                     curr_size += recv_size;
299                     /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
300                        fflush(stdout);*/
301                 }
302                 tmp_mask >>= 1;
303                 k--;
304             }
305         }
306         /* --END EXPERIMENTAL-- */
307
308         mask <<= 1;
309         i++;
310     }
311
312     /* check that we received as much as we expected */
313     /* recvd_size may not be accurate for packed heterogeneous data */
314     if (is_homogeneous && curr_size != nbytes) {
315       xbt_die("we didn't receive enough !");
316     }
317
318     if (!is_contig || !is_homogeneous)
319     {
320         if (rank != root)
321         {
322             position = 0;
323             mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer,
324                                          count, datatype, comm);
325             if (mpi_errno) xbt_die("error when unpacking %d", mpi_errno);
326         }
327     }
328
329 fn_exit:
330 /*    xbt_free(tmp_buf);*/
331     return mpi_errno;
332 }