1 #include "colls_private.h"
2 #define MPIR_REDUCE_SCATTER_TAG 222
4 static inline int MPIU_Mirror_permutation(unsigned int x, int bits)
6 /* a mask for the high order bits that should be copied as-is */
7 int high_mask = ~((0x1 << bits) - 1);
8 int retval = x & high_mask;
11 for (i = 0; i < bits; ++i) {
12 unsigned int bitval = (x & (0x1 << i)) >> i; /* 0x1 or 0x0 */
13 retval |= bitval << ((bits - i) - 1);
20 int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int recvcounts[],
21 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
23 int rank, comm_size, i;
24 MPI_Aint extent, true_extent, true_lb;
27 int mpi_errno = MPI_SUCCESS;
28 int type_size, total_count, nbytes, dst, src;
30 comm_size = smpi_comm_size(comm);
31 rank = smpi_comm_rank(comm);
33 extent =smpi_datatype_get_extent(datatype);
34 smpi_datatype_extent(datatype, &true_lb, &true_extent);
36 if (smpi_op_is_commute(op)) {
40 disps = (int*)xbt_malloc( comm_size * sizeof(int));
43 for (i=0; i<comm_size; i++) {
44 disps[i] = total_count;
45 total_count += recvcounts[i];
48 if (total_count == 0) {
52 type_size= smpi_datatype_size(datatype);
53 nbytes = total_count * type_size;
56 if (sendbuf != MPI_IN_PLACE) {
57 /* copy local data into recvbuf */
58 smpi_datatype_copy(((char *)sendbuf+disps[rank]*extent),
59 recvcounts[rank], datatype, recvbuf,
60 recvcounts[rank], datatype);
63 /* allocate temporary buffer to store incoming data */
64 tmp_recvbuf = (void*)xbt_malloc(recvcounts[rank]*(max(true_extent,extent))+1);
65 /* adjust for potential negative lower bound in datatype */
66 tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
68 for (i=1; i<comm_size; i++) {
69 src = (rank - i + comm_size) % comm_size;
70 dst = (rank + i) % comm_size;
72 /* send the data that dst needs. recv data that this process
73 needs from src into tmp_recvbuf */
74 if (sendbuf != MPI_IN_PLACE)
75 smpi_mpi_sendrecv(((char *)sendbuf+disps[dst]*extent),
76 recvcounts[dst], datatype, dst,
77 MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
78 recvcounts[rank], datatype, src,
79 MPIR_REDUCE_SCATTER_TAG, comm,
82 smpi_mpi_sendrecv(((char *)recvbuf+disps[dst]*extent),
83 recvcounts[dst], datatype, dst,
84 MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
85 recvcounts[rank], datatype, src,
86 MPIR_REDUCE_SCATTER_TAG, comm,
89 if (is_commutative || (src < rank)) {
90 if (sendbuf != MPI_IN_PLACE) {
92 tmp_recvbuf, recvbuf, &recvcounts[rank],
97 tmp_recvbuf, ((char *)recvbuf+disps[rank]*extent),
98 &recvcounts[rank], &datatype);
99 /* we can't store the result at the beginning of
100 recvbuf right here because there is useful data
101 there that other process/processes need. at the
102 end, we will copy back the result to the
103 beginning of recvbuf. */
107 if (sendbuf != MPI_IN_PLACE) {
109 recvbuf, tmp_recvbuf, &recvcounts[rank], &datatype);
110 /* copy result back into recvbuf */
111 mpi_errno = smpi_datatype_copy(tmp_recvbuf, recvcounts[rank],
113 recvcounts[rank], datatype);
114 if (mpi_errno) return(mpi_errno);
118 ((char *)recvbuf+disps[rank]*extent),
119 tmp_recvbuf, &recvcounts[rank], &datatype);
120 /* copy result back into recvbuf */
121 mpi_errno = smpi_datatype_copy(tmp_recvbuf, recvcounts[rank],
125 recvcounts[rank], datatype);
126 if (mpi_errno) return(mpi_errno);
131 /* if MPI_IN_PLACE, move output data to the beginning of
132 recvbuf. already done for rank 0. */
133 if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) {
134 mpi_errno = smpi_datatype_copy(((char *)recvbuf +
136 recvcounts[rank], datatype,
138 recvcounts[rank], datatype );
139 if (mpi_errno) return(mpi_errno);
146 int smpi_coll_tuned_reduce_scatter_mpich_noncomm(void *sendbuf, void *recvbuf, int recvcounts[],
147 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
149 int mpi_errno = MPI_SUCCESS;
150 int comm_size = smpi_comm_size(comm) ;
151 int rank = smpi_comm_rank(comm);
155 int recv_offset, send_offset;
156 int block_size, total_count, size;
157 MPI_Aint true_extent, true_lb;
163 smpi_datatype_extent(datatype, &true_lb, &true_extent);
167 while (pof2 < comm_size) {
172 /* begin error checking */
173 xbt_assert(pof2 == comm_size); /* FIXME this version only works for power of 2 procs */
175 for (i = 0; i < (comm_size - 1); ++i) {
176 xbt_assert(recvcounts[i] == recvcounts[i+1]);
178 /* end error checking */
180 /* size of a block (count of datatype per block, NOT bytes per block) */
181 block_size = recvcounts[0];
182 total_count = block_size * comm_size;
184 tmp_buf0=( void *)xbt_malloc( true_extent * total_count);
185 tmp_buf1=( void *)xbt_malloc( true_extent * total_count);
186 /* adjust for potential negative lower bound in datatype */
187 tmp_buf0 = (void *)((char*)tmp_buf0 - true_lb);
188 tmp_buf1 = (void *)((char*)tmp_buf1 - true_lb);
190 /* Copy our send data to tmp_buf0. We do this one block at a time and
191 permute the blocks as we go according to the mirror permutation. */
192 for (i = 0; i < comm_size; ++i) {
193 mpi_errno = smpi_datatype_copy((char *)(sendbuf == MPI_IN_PLACE ? recvbuf : sendbuf) + (i * true_extent * block_size), block_size, datatype,
194 (char *)tmp_buf0 + (MPIU_Mirror_permutation(i, log2_comm_size) * true_extent * block_size), block_size, datatype);
195 if (mpi_errno) return(mpi_errno);
202 for (k = 0; k < log2_comm_size; ++k) {
203 /* use a double-buffering scheme to avoid local copies */
204 char *incoming_data = (buf0_was_inout ? tmp_buf1 : tmp_buf0);
205 char *outgoing_data = (buf0_was_inout ? tmp_buf0 : tmp_buf1);
206 int peer = rank ^ (0x1 << k);
210 /* we have the higher rank: send top half, recv bottom half */
214 /* we have the lower rank: recv top half, send bottom half */
218 smpi_mpi_sendrecv(outgoing_data + send_offset*true_extent,
219 size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
220 incoming_data + recv_offset*true_extent,
221 size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
222 comm, MPI_STATUS_IGNORE);
223 /* always perform the reduction at recv_offset, the data at send_offset
224 is now our peer's responsibility */
226 /* higher ranked value so need to call op(received_data, my_data) */
228 incoming_data + recv_offset*true_extent,
229 outgoing_data + recv_offset*true_extent,
231 buf0_was_inout = buf0_was_inout;
234 /* lower ranked value so need to call op(my_data, received_data) */
236 outgoing_data + recv_offset*true_extent,
237 incoming_data + recv_offset*true_extent,
239 buf0_was_inout = !buf0_was_inout;
242 /* the next round of send/recv needs to happen within the block (of size
243 "size") that we just received and reduced */
244 send_offset = recv_offset;
247 xbt_assert(size == recvcounts[rank]);
249 /* copy the reduced data to the recvbuf */
250 result_ptr = (char *)(buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
251 mpi_errno = smpi_datatype_copy(result_ptr, size, datatype,
252 recvbuf, size, datatype);
253 if (mpi_errno) return(mpi_errno);
259 int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int recvcounts[],
260 MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
262 int rank, comm_size, i;
263 MPI_Aint extent, true_extent, true_lb;
265 void *tmp_recvbuf, *tmp_results;
266 int mpi_errno = MPI_SUCCESS;
267 int type_size, dis[2], blklens[2], total_count, nbytes, dst;
268 int mask, dst_tree_root, my_tree_root, j, k;
270 MPI_Datatype sendtype, recvtype;
271 int nprocs_completed, tmp_mask, tree_root, is_commutative;
272 comm_size = smpi_comm_size(comm);
273 rank = smpi_comm_rank(comm);
275 extent =smpi_datatype_get_extent(datatype);
276 smpi_datatype_extent(datatype, &true_lb, &true_extent);
278 if (smpi_op_is_commute(op)) {
282 disps = (int*)xbt_malloc( comm_size * sizeof(int));
285 for (i=0; i<comm_size; i++) {
286 disps[i] = total_count;
287 total_count += recvcounts[i];
290 type_size= smpi_datatype_size(datatype);
291 nbytes = total_count * type_size;
294 /* noncommutative and (non-pof2 or block irregular), use recursive doubling. */
296 /* need to allocate temporary buffer to receive incoming data*/
297 tmp_recvbuf= (void *) xbt_malloc( total_count*(max(true_extent,extent)));
298 /* adjust for potential negative lower bound in datatype */
299 tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
301 /* need to allocate another temporary buffer to accumulate
303 tmp_results = (void *)xbt_malloc( total_count*(max(true_extent,extent)));
304 /* adjust for potential negative lower bound in datatype */
305 tmp_results = (void *)((char*)tmp_results - true_lb);
307 /* copy sendbuf into tmp_results */
308 if (sendbuf != MPI_IN_PLACE)
309 mpi_errno = smpi_datatype_copy(sendbuf, total_count, datatype,
310 tmp_results, total_count, datatype);
312 mpi_errno = smpi_datatype_copy(recvbuf, total_count, datatype,
313 tmp_results, total_count, datatype);
315 if (mpi_errno) return(mpi_errno);
319 while (mask < comm_size) {
322 dst_tree_root = dst >> i;
325 my_tree_root = rank >> i;
328 /* At step 1, processes exchange (n-n/p) amount of
329 data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
330 amount of data, and so forth. We use derived datatypes for this.
332 At each step, a process does not need to send data
333 indexed from my_tree_root to
334 my_tree_root+mask-1. Similarly, a process won't receive
335 data indexed from dst_tree_root to dst_tree_root+mask-1. */
337 /* calculate sendtype */
338 blklens[0] = blklens[1] = 0;
339 for (j=0; j<my_tree_root; j++)
340 blklens[0] += recvcounts[j];
341 for (j=my_tree_root+mask; j<comm_size; j++)
342 blklens[1] += recvcounts[j];
346 for (j=my_tree_root; (j<my_tree_root+mask) && (j<comm_size); j++)
347 dis[1] += recvcounts[j];
349 mpi_errno = smpi_datatype_indexed(2, blklens, dis, datatype, &sendtype);
350 if (mpi_errno) return(mpi_errno);
352 smpi_datatype_commit(&sendtype);
354 /* calculate recvtype */
355 blklens[0] = blklens[1] = 0;
356 for (j=0; j<dst_tree_root && j<comm_size; j++)
357 blklens[0] += recvcounts[j];
358 for (j=dst_tree_root+mask; j<comm_size; j++)
359 blklens[1] += recvcounts[j];
363 for (j=dst_tree_root; (j<dst_tree_root+mask) && (j<comm_size); j++)
364 dis[1] += recvcounts[j];
366 mpi_errno = smpi_datatype_indexed(2, blklens, dis, datatype, &recvtype);
367 if (mpi_errno) return(mpi_errno);
369 smpi_datatype_commit(&recvtype);
372 if (dst < comm_size) {
373 /* tmp_results contains data to be sent in each step. Data is
374 received in tmp_recvbuf and then accumulated into
375 tmp_results. accumulation is done later below. */
377 smpi_mpi_sendrecv(tmp_results, 1, sendtype, dst,
378 MPIR_REDUCE_SCATTER_TAG,
379 tmp_recvbuf, 1, recvtype, dst,
380 MPIR_REDUCE_SCATTER_TAG, comm,
385 /* if some processes in this process's subtree in this step
386 did not have any destination process to communicate with
387 because of non-power-of-two, we need to send them the
388 result. We use a logarithmic recursive-halfing algorithm
391 if (dst_tree_root + mask > comm_size) {
392 nprocs_completed = comm_size - my_tree_root - mask;
393 /* nprocs_completed is the number of processes in this
394 subtree that have all the data. Send data to others
395 in a tree fashion. First find root of current tree
396 that is being divided into two. k is the number of
397 least-significant bits in this process's rank that
398 must be zeroed out to find the rank of the root */
407 tmp_mask = mask >> 1;
409 dst = rank ^ tmp_mask;
411 tree_root = rank >> k;
414 /* send only if this proc has data and destination
415 doesn't have data. at any step, multiple processes
416 can send if they have the data */
418 (rank < tree_root + nprocs_completed)
419 && (dst >= tree_root + nprocs_completed)) {
420 /* send the current result */
421 smpi_mpi_send(tmp_recvbuf, 1, recvtype,
422 dst, MPIR_REDUCE_SCATTER_TAG,
425 /* recv only if this proc. doesn't have data and sender
427 else if ((dst < rank) &&
428 (dst < tree_root + nprocs_completed) &&
429 (rank >= tree_root + nprocs_completed)) {
430 smpi_mpi_recv(tmp_recvbuf, 1, recvtype, dst,
431 MPIR_REDUCE_SCATTER_TAG,
432 comm, MPI_STATUS_IGNORE);
440 /* The following reduction is done here instead of after
441 the MPIC_Sendrecv_ft or MPIC_Recv_ft above. This is
442 because to do it above, in the noncommutative
443 case, we would need an extra temp buffer so as not to
444 overwrite temp_recvbuf, because temp_recvbuf may have
445 to be communicated to other processes in the
446 non-power-of-two case. To avoid that extra allocation,
447 we do the reduce here. */
449 if (is_commutative || (dst_tree_root < my_tree_root)) {
452 tmp_recvbuf, tmp_results, &blklens[0],
455 ((char *)tmp_recvbuf + dis[1]*extent),
456 ((char *)tmp_results + dis[1]*extent),
457 &blklens[1], &datatype);
463 tmp_results, tmp_recvbuf, &blklens[0],
466 ((char *)tmp_results + dis[1]*extent),
467 ((char *)tmp_recvbuf + dis[1]*extent),
468 &blklens[1], &datatype);
470 /* copy result back into tmp_results */
471 mpi_errno = smpi_datatype_copy(tmp_recvbuf, 1, recvtype,
472 tmp_results, 1, recvtype);
473 if (mpi_errno) return(mpi_errno);
477 //smpi_datatype_free(&sendtype);
478 //smpi_datatype_free(&recvtype);
484 /* now copy final results from tmp_results to recvbuf */
485 mpi_errno = smpi_datatype_copy(((char *)tmp_results+disps[rank]*extent),
486 recvcounts[rank], datatype, recvbuf,
487 recvcounts[rank], datatype);
488 if (mpi_errno) return(mpi_errno);