1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
29 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30 * I have removed the part which handles "large" message sizes
31 * (non-overlapping version of reduce_Scatter).
34 /* copied function (with appropriate renaming) starts here */
37 * reduce_scatter_ompi_basic_recursivehalving
39 * Function: - reduce scatter implementation using recursive-halving
41 * Accepts: - same as MPI_Reduce_scatter()
42 * Returns: - MPI_SUCCESS or error code
43 * Limitation: - Works only for commutative operations.
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
54 int i, rank, size, count, err = MPI_SUCCESS;
55 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57 char *recv_buf = NULL, *recv_buf_free = NULL;
58 char *result_buf = NULL, *result_buf_free = NULL;
61 rank = smpi_comm_rank(comm);
62 size = smpi_comm_size(comm);
64 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65 if(!smpi_op_is_commute(op))
66 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
68 /* Find displacements and the like */
69 disps = (int*) xbt_malloc(sizeof(int) * size);
70 if (NULL == disps) return MPI_ERR_OTHER;
73 for (i = 0; i < (size - 1); ++i) {
74 disps[i + 1] = disps[i] + rcounts[i];
76 count = disps[size - 1] + rcounts[size - 1];
78 /* short cut the trivial case */
84 /* get datatype information */
85 smpi_datatype_extent(dtype, &lb, &extent);
86 smpi_datatype_extent(dtype, &true_lb, &true_extent);
87 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
89 /* Handle MPI_IN_PLACE */
90 if (MPI_IN_PLACE == sbuf) {
94 /* Allocate temporary receive buffer. */
95 recv_buf_free = (char*) smpi_get_tmp_recvbuffer(buf_size);
97 recv_buf = recv_buf_free - lb;
98 if (NULL == recv_buf_free) {
103 /* allocate temporary buffer for results */
104 result_buf_free = (char*) smpi_get_tmp_sendbuffer(buf_size);
106 result_buf = result_buf_free - lb;
108 /* copy local buffer into the temporary results */
109 err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
110 if (MPI_SUCCESS != err) goto cleanup;
112 /* figure out power of two mapping: grow until larger than
113 comm size, then go back one, to get the largest power of
114 two less than comm size */
115 while (tmp_size <= size) tmp_size <<= 1;
117 remain = size - tmp_size;
119 /* If comm size is not a power of two, have the first "remain"
120 procs with an even rank send to rank + 1, leaving a power of
121 two procs to do the rest of the algorithm */
122 if (rank < 2 * remain) {
123 if ((rank & 1) == 0) {
124 smpi_mpi_send(result_buf, count, dtype, rank + 1,
125 COLL_TAG_REDUCE_SCATTER,
127 /* we don't participate from here on out */
130 smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
131 COLL_TAG_REDUCE_SCATTER,
132 comm, MPI_STATUS_IGNORE);
134 /* integrate their results into our temp results */
135 smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
137 /* adjust rank to be the bottom "remain" ranks */
141 /* just need to adjust rank to show that the bottom "even
142 remain" ranks dropped out */
143 tmp_rank = rank - remain;
146 /* For ranks not kicked out by the above code, perform the
149 int *tmp_disps = NULL, *tmp_rcounts = NULL;
150 int mask, send_index, recv_index, last_index;
152 /* recalculate disps and rcounts to account for the
153 special "remainder" processes that are no longer doing
155 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
156 if (NULL == tmp_rcounts) {
160 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
161 if (NULL == tmp_disps) {
162 xbt_free(tmp_rcounts);
167 for (i = 0 ; i < tmp_size ; ++i) {
169 /* need to include old neighbor as well */
170 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
172 tmp_rcounts[i] = rcounts[i + remain];
177 for (i = 0; i < tmp_size - 1; ++i) {
178 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
181 /* do the recursive halving communication. Don't use the
182 dimension information on the communicator because I
183 think the information is invalidated by our "shrinking"
184 of the communicator */
185 mask = tmp_size >> 1;
186 send_index = recv_index = 0;
187 last_index = tmp_size;
189 int tmp_peer, peer, send_count, recv_count;
192 tmp_peer = tmp_rank ^ mask;
193 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
195 /* figure out if we're sending, receiving, or both */
196 send_count = recv_count = 0;
197 if (tmp_rank < tmp_peer) {
198 send_index = recv_index + mask;
199 for (i = send_index ; i < last_index ; ++i) {
200 send_count += tmp_rcounts[i];
202 for (i = recv_index ; i < send_index ; ++i) {
203 recv_count += tmp_rcounts[i];
206 recv_index = send_index + mask;
207 for (i = send_index ; i < recv_index ; ++i) {
208 send_count += tmp_rcounts[i];
210 for (i = recv_index ; i < last_index ; ++i) {
211 recv_count += tmp_rcounts[i];
215 /* actual data transfer. Send from result_buf,
216 receive into recv_buf */
217 if (send_count > 0 && recv_count != 0) {
218 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
219 recv_count, dtype, peer,
220 COLL_TAG_REDUCE_SCATTER,
222 if (MPI_SUCCESS != err) {
223 xbt_free(tmp_rcounts);
228 if (recv_count > 0 && send_count != 0) {
229 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
230 send_count, dtype, peer,
231 COLL_TAG_REDUCE_SCATTER,
233 if (MPI_SUCCESS != err) {
234 xbt_free(tmp_rcounts);
239 if (send_count > 0 && recv_count != 0) {
240 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
243 /* if we received something on this step, push it into
244 the results buffer */
245 if (recv_count > 0) {
247 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
248 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
249 &recv_count, &dtype);
252 /* update for next iteration */
253 send_index = recv_index;
254 last_index = recv_index + mask;
258 /* copy local results from results buffer into real receive buffer */
259 if (0 != rcounts[rank]) {
260 err = smpi_datatype_copy(result_buf + disps[rank] * extent,
261 rcounts[rank], dtype,
262 rbuf, rcounts[rank], dtype);
263 if (MPI_SUCCESS != err) {
264 xbt_free(tmp_rcounts);
270 xbt_free(tmp_rcounts);
274 /* Now fix up the non-power of two case, by having the odd
275 procs send the even procs the proper results */
276 if (rank < (2 * remain)) {
277 if ((rank & 1) == 0) {
279 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
280 COLL_TAG_REDUCE_SCATTER,
281 comm, MPI_STATUS_IGNORE);
284 if (rcounts[rank - 1]) {
285 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
286 rcounts[rank - 1], dtype, rank - 1,
287 COLL_TAG_REDUCE_SCATTER,
294 if (NULL != disps) xbt_free(disps);
295 if (NULL != recv_buf_free) smpi_free_tmp_buffer(recv_buf_free);
296 if (NULL != result_buf_free) smpi_free_tmp_buffer(result_buf_free);
301 /* copied function (with appropriate renaming) ends here */
305 * smpi_coll_tuned_reduce_scatter_ompi_ring
307 * Function: Ring algorithm for reduce_scatter operation
308 * Accepts: Same as MPI_Reduce_scatter()
309 * Returns: MPI_SUCCESS or error code
311 * Description: Implements ring algorithm for reduce_scatter:
312 * the block sizes defined in rcounts are exchanged and
313 8 updated until they reach proper destination.
314 * Algorithm requires 2 * max(rcounts) extra buffering
316 * Limitations: The algorithm DOES NOT preserve order of operations so it
317 * can be used only for commutative operations.
318 * Example on 5 nodes:
321 * [00] [10] -> [20] [30] [40]
322 * [01] [11] [21] -> [31] [41]
323 * [02] [12] [22] [32] -> [42]
324 * -> [03] [13] [23] [33] [43] --> ..
325 * [04] -> [14] [24] [34] [44]
328 * Step 0: rank r sends block (r-1) to rank (r+1) and
329 * receives block (r+1) from rank (r-1) [with wraparound].
331 * [00] [10] [10+20] -> [30] [40]
332 * [01] [11] [21] [21+31] -> [41]
333 * -> [02] [12] [22] [32] [32+42] -->..
334 * [43+03] -> [13] [23] [33] [43]
335 * [04] [04+14] -> [24] [34] [44]
339 * [00] [10] [10+20] [10+20+30] -> [40]
340 * -> [01] [11] [21] [21+31] [21+31+41] ->
341 * [32+42+02] -> [12] [22] [32] [32+42]
342 * [03] [43+03+13] -> [23] [33] [43]
343 * [04] [04+14] [04+14+24] -> [34] [44]
347 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
348 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
349 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
350 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
351 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
355 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
356 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
357 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
358 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
359 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
364 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
370 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
371 int inbi, *displs = NULL;
372 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
373 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
374 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
375 MPI_Request reqs[2] = {NULL, NULL};
377 size = smpi_comm_size(comm);
378 rank = smpi_comm_rank(comm);
380 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
383 /* Determine the maximum number of elements per node,
384 corresponding block size, and displacements array.
386 displs = (int*) xbt_malloc(size * sizeof(int));
387 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
389 total_count = rcounts[0];
390 max_block_count = rcounts[0];
391 for (i = 1; i < size; i++) {
392 displs[i] = total_count;
393 total_count += rcounts[i];
394 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
397 /* Special case for size == 1 */
399 if (MPI_IN_PLACE != sbuf) {
400 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
401 if (ret < 0) { line = __LINE__; goto error_hndl; }
407 /* Allocate and initialize temporary buffers, we need:
408 - a temporary buffer to perform reduction (size total_count) since
409 rbuf can be of rcounts[rank] size.
410 - up to two temporary buffers used for communication/computation overlap.
412 smpi_datatype_extent(dtype, &lb, &extent);
413 smpi_datatype_extent(dtype, &true_lb, &true_extent);
415 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
417 accumbuf_free = (char*)smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
418 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
419 accumbuf = accumbuf_free - lb;
421 inbuf_free[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
422 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
423 inbuf[0] = inbuf_free[0] - lb;
425 inbuf_free[1] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
426 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
427 inbuf[1] = inbuf_free[1] - lb;
430 /* Handle MPI_IN_PLACE for size > 1 */
431 if (MPI_IN_PLACE == sbuf) {
435 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
436 if (ret < 0) { line = __LINE__; goto error_hndl; }
438 /* Computation loop */
441 For each of the remote nodes:
442 - post irecv for block (r-2) from (r-1) with wrap around
443 - send block (r-1) to (r+1)
444 - in loop for every step k = 2 .. n
445 - post irecv for block (r - 1 + n - k) % n
446 - wait on block (r + n - k) % n to arrive
447 - compute on block (r + n - k ) % n
448 - send block (r + n - k) % n
450 - compute on block (r)
451 - copy block (r) to rbuf
452 Note that we must be careful when computing the begining of buffers and
453 for send operations and computation we must compute the exact block size.
455 send_to = (rank + 1) % size;
456 recv_from = (rank + size - 1) % size;
459 /* Initialize first receive from the neighbor on the left */
460 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
461 COLL_TAG_REDUCE_SCATTER, comm
463 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
464 smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
465 COLL_TAG_REDUCE_SCATTER,
468 for (k = 2; k < size; k++) {
469 const int prevblock = (rank + size - k) % size;
473 /* Post irecv for the current block */
474 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
475 COLL_TAG_REDUCE_SCATTER, comm
478 /* Wait on previous block to arrive */
479 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
481 /* Apply operation on previous block: result goes to rbuf
482 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
484 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
485 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
487 /* send previous block to send_to */
488 smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
489 COLL_TAG_REDUCE_SCATTER,
493 /* Wait on the last block to arrive */
494 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
496 /* Apply operation on the last block (my block)
497 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
498 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
499 smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
501 /* Copy result from tmprecv to rbuf */
502 ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
503 if (ret < 0) { line = __LINE__; goto error_hndl; }
505 if (NULL != displs) xbt_free(displs);
506 if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
507 if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
508 if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
513 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
514 __FILE__, line, rank, ret);
515 if (NULL != displs) xbt_free(displs);
516 if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
517 if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
518 if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);