1 #include "colls_private.h"
2 #include "src/smpi/smpi_mpi_dt_private.h"
4 static int scatter_for_bcast(
11 int rank, comm_size, src, dst;
12 int relative_rank, mask;
13 int mpi_errno = MPI_SUCCESS;
14 int scatter_size, curr_size, recv_size = 0, send_size;
16 comm_size = smpi_comm_size(comm);
17 rank = smpi_comm_rank(comm);
18 relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
20 /* use long message algorithm: binomial tree scatter followed by an allgather */
22 /* The scatter algorithm divides the buffer into nprocs pieces and
23 scatters them among the processes. Root gets the first piece,
24 root+1 gets the second piece, and so forth. Uses the same binomial
25 tree algorithm as above. Ceiling division
26 is used to compute the size of each piece. This means some
27 processes may not get any data. For example if bufsize = 97 and
28 nprocs = 16, ranks 15 and 16 will get 0 data. On each process, the
29 scattered data is stored at the same offset in the buffer as it is
30 on the root process. */
32 scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
33 curr_size = (rank == root) ? nbytes : 0; /* root starts with all the
37 while (mask < comm_size)
39 if (relative_rank & mask)
42 if (src < 0) src += comm_size;
43 recv_size = nbytes - relative_rank*scatter_size;
44 /* recv_size is larger than what might actually be sent by the
45 sender. We don't need compute the exact value because MPI
46 allows you to post a larger recv.*/
49 curr_size = 0; /* this process doesn't receive any data
50 because of uneven division */
54 smpi_mpi_recv(((char *)tmp_buf +
55 relative_rank*scatter_size),
56 recv_size, MPI_BYTE, src,
57 COLL_TAG_BCAST, comm, &status);
58 /* query actual size of data received */
59 curr_size=smpi_mpi_get_count(&status, MPI_BYTE);
66 /* This process is responsible for all processes that have bits
67 set from the LSB upto (but not including) mask. Because of
68 the "not including", we start by shifting mask back down
74 if (relative_rank + mask < comm_size)
76 send_size = curr_size - scatter_size * mask;
77 /* mask is also the size of this process's subtree */
82 if (dst >= comm_size) dst -= comm_size;
83 smpi_mpi_send(((char *)tmp_buf +
84 scatter_size*(relative_rank+mask)),
85 send_size, MPI_BYTE, dst,
86 COLL_TAG_BCAST, comm);
87 curr_size -= send_size;
97 smpi_coll_tuned_bcast_scatter_rdb_allgather (
100 MPI_Datatype datatype,
105 int rank, comm_size, dst;
106 int relative_rank, mask;
107 int mpi_errno = MPI_SUCCESS;
108 int scatter_size, curr_size, recv_size = 0;
109 int j, k, i, tmp_mask, is_contig, is_homogeneous;
110 MPI_Aint type_size = 0, nbytes = 0;
111 int relative_dst, dst_tree_root, my_tree_root, send_offset;
112 int recv_offset, tree_root, nprocs_completed, offset;
114 MPI_Aint true_extent, true_lb;
117 comm_size = smpi_comm_size(comm);
118 rank = smpi_comm_rank(comm);
119 relative_rank = (rank >= root) ? rank - root : rank - root + comm_size;
121 /* If there is only one process, return */
122 if (comm_size == 1) goto fn_exit;
124 //if (HANDLE_GET_KIND(datatype) == HANDLE_KIND_BUILTIN)
125 if(datatype->flags & DT_FLAG_CONTIGUOUS)
133 /* MPI_Type_size() might not give the accurate size of the packed
134 * datatype for heterogeneous systems (because of padding, encoding,
135 * etc). On the other hand, MPI_Pack_size() can become very
136 * expensive, depending on the implementation, especially for
137 * heterogeneous systems. We want to use MPI_Type_size() wherever
138 * possible, and MPI_Pack_size() in other places.
141 type_size=smpi_datatype_size(datatype);
143 nbytes = type_size * count;
145 goto fn_exit; /* nothing to do */
147 if (is_contig && is_homogeneous)
149 /* contiguous and homogeneous. no need to pack. */
150 smpi_datatype_extent(datatype, &true_lb, &true_extent);
152 tmp_buf = (char *) buffer + true_lb;
156 tmp_buf=(void*)xbt_malloc(nbytes);
158 /* TODO: Pipeline the packing and communication */
161 mpi_errno = smpi_mpi_pack(buffer, count, datatype, tmp_buf, nbytes,
163 if (mpi_errno) xbt_die("crash while packing %d", mpi_errno);
168 scatter_size = (nbytes + comm_size - 1)/comm_size; /* ceiling division */
170 mpi_errno = scatter_for_bcast(root, comm,
173 xbt_die("crash while scattering %d", mpi_errno);
176 /* curr_size is the amount of data that this process now has stored in
177 * buffer at byte offset (relative_rank*scatter_size) */
178 curr_size = scatter_size < (nbytes - (relative_rank * scatter_size)) ? scatter_size : (nbytes - (relative_rank * scatter_size)) ;
182 /* medium size allgather and pof2 comm_size. use recurive doubling. */
186 while (mask < comm_size)
188 relative_dst = relative_rank ^ mask;
190 dst = (relative_dst + root) % comm_size;
192 /* find offset into send and recv buffers.
193 zero out the least significant "i" bits of relative_rank and
194 relative_dst to find root of src and dst
195 subtrees. Use ranks of roots as index to send from
196 and recv into buffer */
198 dst_tree_root = relative_dst >> i;
201 my_tree_root = relative_rank >> i;
204 send_offset = my_tree_root * scatter_size;
205 recv_offset = dst_tree_root * scatter_size;
207 if (relative_dst < comm_size)
209 smpi_mpi_sendrecv(((char *)tmp_buf + send_offset),
210 curr_size, MPI_BYTE, dst, COLL_TAG_BCAST,
211 ((char *)tmp_buf + recv_offset),
212 (nbytes-recv_offset < 0 ? 0 : nbytes-recv_offset),
213 MPI_BYTE, dst, COLL_TAG_BCAST, comm, &status);
214 recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
215 curr_size += recv_size;
218 /* if some processes in this process's subtree in this step
219 did not have any destination process to communicate with
220 because of non-power-of-two, we need to send them the
221 data that they would normally have received from those
222 processes. That is, the haves in this subtree must send to
223 the havenots. We use a logarithmic recursive-halfing algorithm
226 /* This part of the code will not currently be
227 executed because we are not using recursive
228 doubling for non power of two. Mark it as experimental
229 so that it doesn't show up as red in the coverage tests. */
231 /* --BEGIN EXPERIMENTAL-- */
232 if (dst_tree_root + mask > comm_size)
234 nprocs_completed = comm_size - my_tree_root - mask;
235 /* nprocs_completed is the number of processes in this
236 subtree that have all the data. Send data to others
237 in a tree fashion. First find root of current tree
238 that is being divided into two. k is the number of
239 least-significant bits in this process's rank that
240 must be zeroed out to find the rank of the root */
250 offset = scatter_size * (my_tree_root + mask);
251 tmp_mask = mask >> 1;
255 relative_dst = relative_rank ^ tmp_mask;
256 dst = (relative_dst + root) % comm_size;
258 tree_root = relative_rank >> k;
261 /* send only if this proc has data and destination
262 doesn't have data. */
265 printf("rank %d, dst %d, root %d, nprocs_completed %d\n", relative_rank, relative_dst, tree_root, nprocs_completed);
269 if ((relative_dst > relative_rank) &&
270 (relative_rank < tree_root + nprocs_completed)
271 && (relative_dst >= tree_root + nprocs_completed))
274 /* printf("Rank %d, send to %d, offset %d, size %d\n", rank, dst, offset, recv_size);
276 smpi_mpi_send(((char *)tmp_buf + offset),
277 recv_size, MPI_BYTE, dst,
278 COLL_TAG_BCAST, comm);
279 /* recv_size was set in the previous
280 receive. that's the amount of data to be
283 /* recv only if this proc. doesn't have data and sender
285 else if ((relative_dst < relative_rank) &&
286 (relative_dst < tree_root + nprocs_completed) &&
287 (relative_rank >= tree_root + nprocs_completed))
289 /* printf("Rank %d waiting to recv from rank %d\n",
290 relative_rank, dst); */
291 smpi_mpi_recv(((char *)tmp_buf + offset),
293 MPI_BYTE, dst, COLL_TAG_BCAST,
295 /* nprocs_completed is also equal to the no. of processes
296 whose data we don't have */
297 recv_size=smpi_mpi_get_count(&status, MPI_BYTE);
298 curr_size += recv_size;
299 /* printf("Rank %d, recv from %d, offset %d, size %d\n", rank, dst, offset, recv_size);
306 /* --END EXPERIMENTAL-- */
312 /* check that we received as much as we expected */
313 /* recvd_size may not be accurate for packed heterogeneous data */
314 if (is_homogeneous && curr_size != nbytes) {
315 xbt_die("we didn't receive enough !");
318 if (!is_contig || !is_homogeneous)
323 mpi_errno = MPI_Unpack(tmp_buf, nbytes, &position, buffer,
324 count, datatype, comm);
325 if (mpi_errno) xbt_die("error when unpacking %d", mpi_errno);
330 /* xbt_free(tmp_buf);*/