1 /* Copyright (c) 2013-2017. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "../colls_private.h"
25 #include "../coll_tuned_topo.h"
28 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
29 * I have removed the part which handles "large" message sizes
30 * (non-overlapping version of reduce_Scatter).
33 /* copied function (with appropriate renaming) starts here */
36 * reduce_scatter_ompi_basic_recursivehalving
38 * Function: - reduce scatter implementation using recursive-halving
40 * Accepts: - same as MPI_Reduce_scatter()
41 * Returns: - MPI_SUCCESS or error code
42 * Limitation: - Works only for commutative operations.
47 Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf,
55 int i, rank, size, count, err = MPI_SUCCESS;
56 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
57 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
58 char *recv_buf = NULL, *recv_buf_free = NULL;
59 char *result_buf = NULL, *result_buf_free = NULL;
65 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
66 if ((op != MPI_OP_NULL && not op->is_commutative()))
67 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
69 /* Find displacements and the like */
70 disps = (int*) xbt_malloc(sizeof(int) * size);
71 if (NULL == disps) return MPI_ERR_OTHER;
74 for (i = 0; i < (size - 1); ++i) {
75 disps[i + 1] = disps[i] + rcounts[i];
77 count = disps[size - 1] + rcounts[size - 1];
79 /* short cut the trivial case */
85 /* get datatype information */
86 dtype->extent(&lb, &extent);
87 dtype->extent(&true_lb, &true_extent);
88 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
90 /* Handle MPI_IN_PLACE */
91 if (MPI_IN_PLACE == sbuf) {
95 /* Allocate temporary receive buffer. */
96 recv_buf_free = (char*) smpi_get_tmp_recvbuffer(buf_size);
98 recv_buf = recv_buf_free - lb;
99 if (NULL == recv_buf_free) {
104 /* allocate temporary buffer for results */
105 result_buf_free = (char*) smpi_get_tmp_sendbuffer(buf_size);
107 result_buf = result_buf_free - lb;
109 /* copy local buffer into the temporary results */
110 err =Datatype::copy(sbuf, count, dtype, result_buf, count, dtype);
111 if (MPI_SUCCESS != err) goto cleanup;
113 /* figure out power of two mapping: grow until larger than
114 comm size, then go back one, to get the largest power of
115 two less than comm size */
116 while (tmp_size <= size) tmp_size <<= 1;
118 remain = size - tmp_size;
120 /* If comm size is not a power of two, have the first "remain"
121 procs with an even rank send to rank + 1, leaving a power of
122 two procs to do the rest of the algorithm */
123 if (rank < 2 * remain) {
124 if ((rank & 1) == 0) {
125 Request::send(result_buf, count, dtype, rank + 1,
126 COLL_TAG_REDUCE_SCATTER,
128 /* we don't participate from here on out */
131 Request::recv(recv_buf, count, dtype, rank - 1,
132 COLL_TAG_REDUCE_SCATTER,
133 comm, MPI_STATUS_IGNORE);
135 /* integrate their results into our temp results */
136 if(op!=MPI_OP_NULL) op->apply( recv_buf, result_buf, &count, dtype);
138 /* adjust rank to be the bottom "remain" ranks */
142 /* just need to adjust rank to show that the bottom "even
143 remain" ranks dropped out */
144 tmp_rank = rank - remain;
147 /* For ranks not kicked out by the above code, perform the
150 int *tmp_disps = NULL, *tmp_rcounts = NULL;
151 int mask, send_index, recv_index, last_index;
153 /* recalculate disps and rcounts to account for the
154 special "remainder" processes that are no longer doing
156 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
157 if (NULL == tmp_rcounts) {
161 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
162 if (NULL == tmp_disps) {
163 xbt_free(tmp_rcounts);
168 for (i = 0 ; i < tmp_size ; ++i) {
170 /* need to include old neighbor as well */
171 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
173 tmp_rcounts[i] = rcounts[i + remain];
178 for (i = 0; i < tmp_size - 1; ++i) {
179 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
182 /* do the recursive halving communication. Don't use the
183 dimension information on the communicator because I
184 think the information is invalidated by our "shrinking"
185 of the communicator */
186 mask = tmp_size >> 1;
187 send_index = recv_index = 0;
188 last_index = tmp_size;
190 int tmp_peer, peer, send_count, recv_count;
193 tmp_peer = tmp_rank ^ mask;
194 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
196 /* figure out if we're sending, receiving, or both */
197 send_count = recv_count = 0;
198 if (tmp_rank < tmp_peer) {
199 send_index = recv_index + mask;
200 for (i = send_index ; i < last_index ; ++i) {
201 send_count += tmp_rcounts[i];
203 for (i = recv_index ; i < send_index ; ++i) {
204 recv_count += tmp_rcounts[i];
207 recv_index = send_index + mask;
208 for (i = send_index ; i < recv_index ; ++i) {
209 send_count += tmp_rcounts[i];
211 for (i = recv_index ; i < last_index ; ++i) {
212 recv_count += tmp_rcounts[i];
216 /* actual data transfer. Send from result_buf,
217 receive into recv_buf */
218 if (send_count > 0 && recv_count != 0) {
219 request=Request::irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
220 recv_count, dtype, peer,
221 COLL_TAG_REDUCE_SCATTER,
223 if (MPI_SUCCESS != err) {
224 xbt_free(tmp_rcounts);
229 if (recv_count > 0 && send_count != 0) {
230 Request::send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
231 send_count, dtype, peer,
232 COLL_TAG_REDUCE_SCATTER,
234 if (MPI_SUCCESS != err) {
235 xbt_free(tmp_rcounts);
240 if (send_count > 0 && recv_count != 0) {
241 Request::wait(&request, MPI_STATUS_IGNORE);
244 /* if we received something on this step, push it into
245 the results buffer */
246 if (recv_count > 0) {
247 if(op!=MPI_OP_NULL) op->apply(
248 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
249 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
253 /* update for next iteration */
254 send_index = recv_index;
255 last_index = recv_index + mask;
259 /* copy local results from results buffer into real receive buffer */
260 if (0 != rcounts[rank]) {
261 err = Datatype::copy(result_buf + disps[rank] * extent,
262 rcounts[rank], dtype,
263 rbuf, rcounts[rank], dtype);
264 if (MPI_SUCCESS != err) {
265 xbt_free(tmp_rcounts);
271 xbt_free(tmp_rcounts);
275 /* Now fix up the non-power of two case, by having the odd
276 procs send the even procs the proper results */
277 if (rank < (2 * remain)) {
278 if ((rank & 1) == 0) {
280 Request::recv(rbuf, rcounts[rank], dtype, rank + 1,
281 COLL_TAG_REDUCE_SCATTER,
282 comm, MPI_STATUS_IGNORE);
285 if (rcounts[rank - 1]) {
286 Request::send(result_buf + disps[rank - 1] * extent,
287 rcounts[rank - 1], dtype, rank - 1,
288 COLL_TAG_REDUCE_SCATTER,
295 if (NULL != disps) xbt_free(disps);
296 if (NULL != recv_buf_free) smpi_free_tmp_buffer(recv_buf_free);
297 if (NULL != result_buf_free) smpi_free_tmp_buffer(result_buf_free);
302 /* copied function (with appropriate renaming) ends here */
306 * Coll_reduce_scatter_ompi_ring::reduce_scatter
308 * Function: Ring algorithm for reduce_scatter operation
309 * Accepts: Same as MPI_Reduce_scatter()
310 * Returns: MPI_SUCCESS or error code
312 * Description: Implements ring algorithm for reduce_scatter:
313 * the block sizes defined in rcounts are exchanged and
314 8 updated until they reach proper destination.
315 * Algorithm requires 2 * max(rcounts) extra buffering
317 * Limitations: The algorithm DOES NOT preserve order of operations so it
318 * can be used only for commutative operations.
319 * Example on 5 nodes:
322 * [00] [10] -> [20] [30] [40]
323 * [01] [11] [21] -> [31] [41]
324 * [02] [12] [22] [32] -> [42]
325 * -> [03] [13] [23] [33] [43] --> ..
326 * [04] -> [14] [24] [34] [44]
329 * Step 0: rank r sends block (r-1) to rank (r+1) and
330 * receives block (r+1) from rank (r-1) [with wraparound].
332 * [00] [10] [10+20] -> [30] [40]
333 * [01] [11] [21] [21+31] -> [41]
334 * -> [02] [12] [22] [32] [32+42] -->..
335 * [43+03] -> [13] [23] [33] [43]
336 * [04] [04+14] -> [24] [34] [44]
340 * [00] [10] [10+20] [10+20+30] -> [40]
341 * -> [01] [11] [21] [21+31] [21+31+41] ->
342 * [32+42+02] -> [12] [22] [32] [32+42]
343 * [03] [43+03+13] -> [23] [33] [43]
344 * [04] [04+14] [04+14+24] -> [34] [44]
348 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
349 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
350 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
351 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
352 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
356 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
357 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
358 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
359 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
360 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
365 Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcounts,
371 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
372 int inbi, *displs = NULL;
373 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
374 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
375 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
376 MPI_Request reqs[2] = {NULL, NULL};
381 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
384 /* Determine the maximum number of elements per node,
385 corresponding block size, and displacements array.
387 displs = (int*) xbt_malloc(size * sizeof(int));
388 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
390 total_count = rcounts[0];
391 max_block_count = rcounts[0];
392 for (i = 1; i < size; i++) {
393 displs[i] = total_count;
394 total_count += rcounts[i];
395 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
398 /* Special case for size == 1 */
400 if (MPI_IN_PLACE != sbuf) {
401 ret = Datatype::copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
402 if (ret < 0) { line = __LINE__; goto error_hndl; }
408 /* Allocate and initialize temporary buffers, we need:
409 - a temporary buffer to perform reduction (size total_count) since
410 rbuf can be of rcounts[rank] size.
411 - up to two temporary buffers used for communication/computation overlap.
413 dtype->extent(&lb, &extent);
414 dtype->extent(&true_lb, &true_extent);
416 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
418 accumbuf_free = (char*)smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
419 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
420 accumbuf = accumbuf_free - lb;
422 inbuf_free[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
423 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
424 inbuf[0] = inbuf_free[0] - lb;
426 inbuf_free[1] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
427 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
428 inbuf[1] = inbuf_free[1] - lb;
431 /* Handle MPI_IN_PLACE for size > 1 */
432 if (MPI_IN_PLACE == sbuf) {
436 ret = Datatype::copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
437 if (ret < 0) { line = __LINE__; goto error_hndl; }
439 /* Computation loop */
442 For each of the remote nodes:
443 - post irecv for block (r-2) from (r-1) with wrap around
444 - send block (r-1) to (r+1)
445 - in loop for every step k = 2 .. n
446 - post irecv for block (r - 1 + n - k) % n
447 - wait on block (r + n - k) % n to arrive
448 - compute on block (r + n - k ) % n
449 - send block (r + n - k) % n
451 - compute on block (r)
452 - copy block (r) to rbuf
453 Note that we must be careful when computing the begining of buffers and
454 for send operations and computation we must compute the exact block size.
456 send_to = (rank + 1) % size;
457 recv_from = (rank + size - 1) % size;
460 /* Initialize first receive from the neighbor on the left */
461 reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
462 COLL_TAG_REDUCE_SCATTER, comm
464 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
465 Request::send(tmpsend, rcounts[recv_from], dtype, send_to,
466 COLL_TAG_REDUCE_SCATTER,
469 for (k = 2; k < size; k++) {
470 const int prevblock = (rank + size - k) % size;
474 /* Post irecv for the current block */
475 reqs[inbi]=Request::irecv(inbuf[inbi], max_block_count, dtype, recv_from,
476 COLL_TAG_REDUCE_SCATTER, comm
479 /* Wait on previous block to arrive */
480 Request::wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
482 /* Apply operation on previous block: result goes to rbuf
483 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
485 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
486 if(op!=MPI_OP_NULL) op->apply( inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), dtype);
488 /* send previous block to send_to */
489 Request::send(tmprecv, rcounts[prevblock], dtype, send_to,
490 COLL_TAG_REDUCE_SCATTER,
494 /* Wait on the last block to arrive */
495 Request::wait(&reqs[inbi], MPI_STATUS_IGNORE);
497 /* Apply operation on the last block (my block)
498 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
499 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
500 if(op!=MPI_OP_NULL) op->apply( inbuf[inbi], tmprecv, &(rcounts[rank]), dtype);
502 /* Copy result from tmprecv to rbuf */
503 ret = Datatype::copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
504 if (ret < 0) { line = __LINE__; goto error_hndl; }
506 if (NULL != displs) xbt_free(displs);
507 if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
508 if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
509 if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
514 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
515 __FILE__, line, rank, ret);
516 if (NULL != displs) xbt_free(displs);
517 if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
518 if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
519 if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);