1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
29 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30 * I have removed the part which handles "large" message sizes
31 * (non-overlapping version of reduce_Scatter).
34 /* copied function (with appropriate renaming) starts here */
37 * reduce_scatter_ompi_basic_recursivehalving
39 * Function: - reduce scatter implementation using recursive-halving
41 * Accepts: - same as MPI_Reduce_scatter()
42 * Returns: - MPI_SUCCESS or error code
43 * Limitation: - Works only for commutative operations.
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
54 int i, rank, size, count, err = MPI_SUCCESS;
55 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57 char *recv_buf = NULL, *recv_buf_free = NULL;
58 char *result_buf = NULL, *result_buf_free = NULL;
61 rank = smpi_comm_rank(comm);
62 size = smpi_comm_size(comm);
64 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65 if(!smpi_op_is_commute(op))
66 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
68 /* Find displacements and the like */
69 disps = (int*) xbt_malloc(sizeof(int) * size);
70 if (NULL == disps) return MPI_ERR_OTHER;
73 for (i = 0; i < (size - 1); ++i) {
74 disps[i + 1] = disps[i] + rcounts[i];
76 count = disps[size - 1] + rcounts[size - 1];
78 /* short cut the trivial case */
84 /* get datatype information */
85 smpi_datatype_extent(dtype, &lb, &extent);
86 smpi_datatype_extent(dtype, &true_lb, &true_extent);
87 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
89 /* Handle MPI_IN_PLACE */
90 if (MPI_IN_PLACE == sbuf) {
94 /* Allocate temporary receive buffer. */
96 if(_xbt_replay_is_active()){
97 recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
100 recv_buf_free = (char*) xbt_malloc(buf_size);
102 recv_buf = recv_buf_free - lb;
103 if (NULL == recv_buf_free) {
108 /* allocate temporary buffer for results */
110 if(_xbt_replay_is_active()){
111 result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
114 result_buf_free = (char*) xbt_malloc(buf_size);
116 result_buf = result_buf_free - lb;
118 /* copy local buffer into the temporary results */
119 err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
120 if (MPI_SUCCESS != err) goto cleanup;
122 /* figure out power of two mapping: grow until larger than
123 comm size, then go back one, to get the largest power of
124 two less than comm size */
125 while (tmp_size <= size) tmp_size <<= 1;
127 remain = size - tmp_size;
129 /* If comm size is not a power of two, have the first "remain"
130 procs with an even rank send to rank + 1, leaving a power of
131 two procs to do the rest of the algorithm */
132 if (rank < 2 * remain) {
133 if ((rank & 1) == 0) {
134 smpi_mpi_send(result_buf, count, dtype, rank + 1,
135 COLL_TAG_REDUCE_SCATTER,
137 /* we don't participate from here on out */
140 smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
141 COLL_TAG_REDUCE_SCATTER,
142 comm, MPI_STATUS_IGNORE);
144 /* integrate their results into our temp results */
145 smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
147 /* adjust rank to be the bottom "remain" ranks */
151 /* just need to adjust rank to show that the bottom "even
152 remain" ranks dropped out */
153 tmp_rank = rank - remain;
156 /* For ranks not kicked out by the above code, perform the
159 int *tmp_disps = NULL, *tmp_rcounts = NULL;
160 int mask, send_index, recv_index, last_index;
162 /* recalculate disps and rcounts to account for the
163 special "remainder" processes that are no longer doing
165 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
166 if (NULL == tmp_rcounts) {
170 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
171 if (NULL == tmp_disps) {
172 xbt_free(tmp_rcounts);
177 for (i = 0 ; i < tmp_size ; ++i) {
179 /* need to include old neighbor as well */
180 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
182 tmp_rcounts[i] = rcounts[i + remain];
187 for (i = 0; i < tmp_size - 1; ++i) {
188 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
191 /* do the recursive halving communication. Don't use the
192 dimension information on the communicator because I
193 think the information is invalidated by our "shrinking"
194 of the communicator */
195 mask = tmp_size >> 1;
196 send_index = recv_index = 0;
197 last_index = tmp_size;
199 int tmp_peer, peer, send_count, recv_count;
202 tmp_peer = tmp_rank ^ mask;
203 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
205 /* figure out if we're sending, receiving, or both */
206 send_count = recv_count = 0;
207 if (tmp_rank < tmp_peer) {
208 send_index = recv_index + mask;
209 for (i = send_index ; i < last_index ; ++i) {
210 send_count += tmp_rcounts[i];
212 for (i = recv_index ; i < send_index ; ++i) {
213 recv_count += tmp_rcounts[i];
216 recv_index = send_index + mask;
217 for (i = send_index ; i < recv_index ; ++i) {
218 send_count += tmp_rcounts[i];
220 for (i = recv_index ; i < last_index ; ++i) {
221 recv_count += tmp_rcounts[i];
225 /* actual data transfer. Send from result_buf,
226 receive into recv_buf */
227 if (send_count > 0 && recv_count != 0) {
228 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
229 recv_count, dtype, peer,
230 COLL_TAG_REDUCE_SCATTER,
232 if (MPI_SUCCESS != err) {
233 xbt_free(tmp_rcounts);
238 if (recv_count > 0 && send_count != 0) {
239 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
240 send_count, dtype, peer,
241 COLL_TAG_REDUCE_SCATTER,
243 if (MPI_SUCCESS != err) {
244 xbt_free(tmp_rcounts);
249 if (send_count > 0 && recv_count != 0) {
250 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
253 /* if we received something on this step, push it into
254 the results buffer */
255 if (recv_count > 0) {
257 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
258 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
259 &recv_count, &dtype);
262 /* update for next iteration */
263 send_index = recv_index;
264 last_index = recv_index + mask;
268 /* copy local results from results buffer into real receive buffer */
269 if (0 != rcounts[rank]) {
270 err = smpi_datatype_copy(result_buf + disps[rank] * extent,
271 rcounts[rank], dtype,
272 rbuf, rcounts[rank], dtype);
273 if (MPI_SUCCESS != err) {
274 xbt_free(tmp_rcounts);
280 xbt_free(tmp_rcounts);
284 /* Now fix up the non-power of two case, by having the odd
285 procs send the even procs the proper results */
286 if (rank < (2 * remain)) {
287 if ((rank & 1) == 0) {
289 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
290 COLL_TAG_REDUCE_SCATTER,
291 comm, MPI_STATUS_IGNORE);
294 if (rcounts[rank - 1]) {
295 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
296 rcounts[rank - 1], dtype, rank - 1,
297 COLL_TAG_REDUCE_SCATTER,
304 if (NULL != disps) xbt_free(disps);
306 if (!_xbt_replay_is_active()){
307 if (NULL != recv_buf_free) xbt_free(recv_buf_free);
308 if (NULL != result_buf_free) xbt_free(result_buf_free);
311 if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
312 if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
320 /* copied function (with appropriate renaming) ends here */
324 * smpi_coll_tuned_reduce_scatter_ompi_ring
326 * Function: Ring algorithm for reduce_scatter operation
327 * Accepts: Same as MPI_Reduce_scatter()
328 * Returns: MPI_SUCCESS or error code
330 * Description: Implements ring algorithm for reduce_scatter:
331 * the block sizes defined in rcounts are exchanged and
332 8 updated until they reach proper destination.
333 * Algorithm requires 2 * max(rcounts) extra buffering
335 * Limitations: The algorithm DOES NOT preserve order of operations so it
336 * can be used only for commutative operations.
337 * Example on 5 nodes:
340 * [00] [10] -> [20] [30] [40]
341 * [01] [11] [21] -> [31] [41]
342 * [02] [12] [22] [32] -> [42]
343 * -> [03] [13] [23] [33] [43] --> ..
344 * [04] -> [14] [24] [34] [44]
347 * Step 0: rank r sends block (r-1) to rank (r+1) and
348 * receives block (r+1) from rank (r-1) [with wraparound].
350 * [00] [10] [10+20] -> [30] [40]
351 * [01] [11] [21] [21+31] -> [41]
352 * -> [02] [12] [22] [32] [32+42] -->..
353 * [43+03] -> [13] [23] [33] [43]
354 * [04] [04+14] -> [24] [34] [44]
358 * [00] [10] [10+20] [10+20+30] -> [40]
359 * -> [01] [11] [21] [21+31] [21+31+41] ->
360 * [32+42+02] -> [12] [22] [32] [32+42]
361 * [03] [43+03+13] -> [23] [33] [43]
362 * [04] [04+14] [04+14+24] -> [34] [44]
366 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
367 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
368 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
369 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
370 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
374 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
375 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
376 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
377 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
378 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
383 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
389 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
390 int inbi, *displs = NULL;
391 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
392 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
393 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
394 MPI_Request reqs[2] = {NULL, NULL};
396 size = smpi_comm_size(comm);
397 rank = smpi_comm_rank(comm);
399 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
402 /* Determine the maximum number of elements per node,
403 corresponding block size, and displacements array.
405 displs = (int*) xbt_malloc(size * sizeof(int));
406 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
408 total_count = rcounts[0];
409 max_block_count = rcounts[0];
410 for (i = 1; i < size; i++) {
411 displs[i] = total_count;
412 total_count += rcounts[i];
413 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
416 /* Special case for size == 1 */
418 if (MPI_IN_PLACE != sbuf) {
419 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
420 if (ret < 0) { line = __LINE__; goto error_hndl; }
426 /* Allocate and initialize temporary buffers, we need:
427 - a temporary buffer to perform reduction (size total_count) since
428 rbuf can be of rcounts[rank] size.
429 - up to two temporary buffers used for communication/computation overlap.
431 smpi_datatype_extent(dtype, &lb, &extent);
432 smpi_datatype_extent(dtype, &true_lb, &true_extent);
434 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
436 accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
437 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
438 accumbuf = accumbuf_free - lb;
440 inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
441 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
442 inbuf[0] = inbuf_free[0] - lb;
444 inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
445 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
446 inbuf[1] = inbuf_free[1] - lb;
449 /* Handle MPI_IN_PLACE for size > 1 */
450 if (MPI_IN_PLACE == sbuf) {
454 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
455 if (ret < 0) { line = __LINE__; goto error_hndl; }
457 /* Computation loop */
460 For each of the remote nodes:
461 - post irecv for block (r-2) from (r-1) with wrap around
462 - send block (r-1) to (r+1)
463 - in loop for every step k = 2 .. n
464 - post irecv for block (r - 1 + n - k) % n
465 - wait on block (r + n - k) % n to arrive
466 - compute on block (r + n - k ) % n
467 - send block (r + n - k) % n
469 - compute on block (r)
470 - copy block (r) to rbuf
471 Note that we must be careful when computing the begining of buffers and
472 for send operations and computation we must compute the exact block size.
474 send_to = (rank + 1) % size;
475 recv_from = (rank + size - 1) % size;
478 /* Initialize first receive from the neighbor on the left */
479 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
480 COLL_TAG_REDUCE_SCATTER, comm
482 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
483 smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
484 COLL_TAG_REDUCE_SCATTER,
487 for (k = 2; k < size; k++) {
488 const int prevblock = (rank + size - k) % size;
492 /* Post irecv for the current block */
493 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
494 COLL_TAG_REDUCE_SCATTER, comm
497 /* Wait on previous block to arrive */
498 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
500 /* Apply operation on previous block: result goes to rbuf
501 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
503 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
504 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
506 /* send previous block to send_to */
507 smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
508 COLL_TAG_REDUCE_SCATTER,
512 /* Wait on the last block to arrive */
513 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
515 /* Apply operation on the last block (my block)
516 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
517 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
518 smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
520 /* Copy result from tmprecv to rbuf */
521 ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
522 if (ret < 0) { line = __LINE__; goto error_hndl; }
524 if (NULL != displs) xbt_free(displs);
525 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
526 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
527 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
532 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
533 __FILE__, line, rank, ret);
534 if (NULL != displs) xbt_free(displs);
535 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
536 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
537 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);