1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
26 #include "xbt/replay.h"
29 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
30 * I have removed the part which handles "large" message sizes
31 * (non-overlapping version of reduce_Scatter).
34 /* copied function (with appropriate renaming) starts here */
37 * reduce_scatter_ompi_basic_recursivehalving
39 * Function: - reduce scatter implementation using recursive-halving
41 * Accepts: - same as MPI_Reduce_scatter()
42 * Returns: - MPI_SUCCESS or error code
43 * Limitation: - Works only for commutative operations.
46 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
54 int i, rank, size, count, err = MPI_SUCCESS;
55 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
56 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
57 char *recv_buf = NULL, *recv_buf_free = NULL;
58 char *result_buf = NULL, *result_buf_free = NULL;
61 rank = smpi_comm_rank(comm);
62 size = smpi_comm_size(comm);
64 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
65 if(!smpi_op_is_commute(op))
66 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
68 /* Find displacements and the like */
69 disps = (int*) xbt_malloc(sizeof(int) * size);
70 if (NULL == disps) return MPI_ERR_OTHER;
73 for (i = 0; i < (size - 1); ++i) {
74 disps[i + 1] = disps[i] + rcounts[i];
76 count = disps[size - 1] + rcounts[size - 1];
78 /* short cut the trivial case */
84 /* get datatype information */
85 smpi_datatype_extent(dtype, &lb, &extent);
86 smpi_datatype_extent(dtype, &true_lb, &true_extent);
87 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
89 /* Handle MPI_IN_PLACE */
90 if (MPI_IN_PLACE == sbuf) {
94 /* Allocate temporary receive buffer. */
95 if(_xbt_replay_is_active()){
96 recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
98 recv_buf_free = (char*) xbt_malloc(buf_size);
100 recv_buf = recv_buf_free - lb;
101 if (NULL == recv_buf_free) {
106 /* allocate temporary buffer for results */
107 if(_xbt_replay_is_active()){
108 result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
110 result_buf_free = (char*) xbt_malloc(buf_size);
112 result_buf = result_buf_free - lb;
114 /* copy local buffer into the temporary results */
115 err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
116 if (MPI_SUCCESS != err) goto cleanup;
118 /* figure out power of two mapping: grow until larger than
119 comm size, then go back one, to get the largest power of
120 two less than comm size */
121 while (tmp_size <= size) tmp_size <<= 1;
123 remain = size - tmp_size;
125 /* If comm size is not a power of two, have the first "remain"
126 procs with an even rank send to rank + 1, leaving a power of
127 two procs to do the rest of the algorithm */
128 if (rank < 2 * remain) {
129 if ((rank & 1) == 0) {
130 smpi_mpi_send(result_buf, count, dtype, rank + 1,
131 COLL_TAG_REDUCE_SCATTER,
133 /* we don't participate from here on out */
136 smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
137 COLL_TAG_REDUCE_SCATTER,
138 comm, MPI_STATUS_IGNORE);
140 /* integrate their results into our temp results */
141 smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
143 /* adjust rank to be the bottom "remain" ranks */
147 /* just need to adjust rank to show that the bottom "even
148 remain" ranks dropped out */
149 tmp_rank = rank - remain;
152 /* For ranks not kicked out by the above code, perform the
155 int *tmp_disps = NULL, *tmp_rcounts = NULL;
156 int mask, send_index, recv_index, last_index;
158 /* recalculate disps and rcounts to account for the
159 special "remainder" processes that are no longer doing
161 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
162 if (NULL == tmp_rcounts) {
166 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
167 if (NULL == tmp_disps) {
168 xbt_free(tmp_rcounts);
173 for (i = 0 ; i < tmp_size ; ++i) {
175 /* need to include old neighbor as well */
176 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
178 tmp_rcounts[i] = rcounts[i + remain];
183 for (i = 0; i < tmp_size - 1; ++i) {
184 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
187 /* do the recursive halving communication. Don't use the
188 dimension information on the communicator because I
189 think the information is invalidated by our "shrinking"
190 of the communicator */
191 mask = tmp_size >> 1;
192 send_index = recv_index = 0;
193 last_index = tmp_size;
195 int tmp_peer, peer, send_count, recv_count;
198 tmp_peer = tmp_rank ^ mask;
199 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
201 /* figure out if we're sending, receiving, or both */
202 send_count = recv_count = 0;
203 if (tmp_rank < tmp_peer) {
204 send_index = recv_index + mask;
205 for (i = send_index ; i < last_index ; ++i) {
206 send_count += tmp_rcounts[i];
208 for (i = recv_index ; i < send_index ; ++i) {
209 recv_count += tmp_rcounts[i];
212 recv_index = send_index + mask;
213 for (i = send_index ; i < recv_index ; ++i) {
214 send_count += tmp_rcounts[i];
216 for (i = recv_index ; i < last_index ; ++i) {
217 recv_count += tmp_rcounts[i];
221 /* actual data transfer. Send from result_buf,
222 receive into recv_buf */
223 if (send_count > 0 && recv_count != 0) {
224 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
225 recv_count, dtype, peer,
226 COLL_TAG_REDUCE_SCATTER,
228 if (MPI_SUCCESS != err) {
229 xbt_free(tmp_rcounts);
234 if (recv_count > 0 && send_count != 0) {
235 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
236 send_count, dtype, peer,
237 COLL_TAG_REDUCE_SCATTER,
239 if (MPI_SUCCESS != err) {
240 xbt_free(tmp_rcounts);
245 if (send_count > 0 && recv_count != 0) {
246 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
249 /* if we received something on this step, push it into
250 the results buffer */
251 if (recv_count > 0) {
253 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
254 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
255 &recv_count, &dtype);
258 /* update for next iteration */
259 send_index = recv_index;
260 last_index = recv_index + mask;
264 /* copy local results from results buffer into real receive buffer */
265 if (0 != rcounts[rank]) {
266 err = smpi_datatype_copy(result_buf + disps[rank] * extent,
267 rcounts[rank], dtype,
268 rbuf, rcounts[rank], dtype);
269 if (MPI_SUCCESS != err) {
270 xbt_free(tmp_rcounts);
276 xbt_free(tmp_rcounts);
280 /* Now fix up the non-power of two case, by having the odd
281 procs send the even procs the proper results */
282 if (rank < (2 * remain)) {
283 if ((rank & 1) == 0) {
285 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
286 COLL_TAG_REDUCE_SCATTER,
287 comm, MPI_STATUS_IGNORE);
290 if (rcounts[rank - 1]) {
291 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
292 rcounts[rank - 1], dtype, rank - 1,
293 COLL_TAG_REDUCE_SCATTER,
300 if (NULL != disps) xbt_free(disps);
302 if (!_xbt_replay_is_active()){
303 if (NULL != recv_buf_free) xbt_free(recv_buf_free);
304 if (NULL != result_buf_free) xbt_free(result_buf_free);
306 if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
307 if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
312 /* copied function (with appropriate renaming) ends here */
316 * smpi_coll_tuned_reduce_scatter_ompi_ring
318 * Function: Ring algorithm for reduce_scatter operation
319 * Accepts: Same as MPI_Reduce_scatter()
320 * Returns: MPI_SUCCESS or error code
322 * Description: Implements ring algorithm for reduce_scatter:
323 * the block sizes defined in rcounts are exchanged and
324 8 updated until they reach proper destination.
325 * Algorithm requires 2 * max(rcounts) extra buffering
327 * Limitations: The algorithm DOES NOT preserve order of operations so it
328 * can be used only for commutative operations.
329 * Example on 5 nodes:
332 * [00] [10] -> [20] [30] [40]
333 * [01] [11] [21] -> [31] [41]
334 * [02] [12] [22] [32] -> [42]
335 * -> [03] [13] [23] [33] [43] --> ..
336 * [04] -> [14] [24] [34] [44]
339 * Step 0: rank r sends block (r-1) to rank (r+1) and
340 * receives block (r+1) from rank (r-1) [with wraparound].
342 * [00] [10] [10+20] -> [30] [40]
343 * [01] [11] [21] [21+31] -> [41]
344 * -> [02] [12] [22] [32] [32+42] -->..
345 * [43+03] -> [13] [23] [33] [43]
346 * [04] [04+14] -> [24] [34] [44]
350 * [00] [10] [10+20] [10+20+30] -> [40]
351 * -> [01] [11] [21] [21+31] [21+31+41] ->
352 * [32+42+02] -> [12] [22] [32] [32+42]
353 * [03] [43+03+13] -> [23] [33] [43]
354 * [04] [04+14] [04+14+24] -> [34] [44]
358 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
359 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
360 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
361 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
362 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
366 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
367 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
368 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
369 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
370 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
375 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
381 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
382 int inbi, *displs = NULL;
383 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
384 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
385 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
386 MPI_Request reqs[2] = {NULL, NULL};
388 size = smpi_comm_size(comm);
389 rank = smpi_comm_rank(comm);
391 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
394 /* Determine the maximum number of elements per node,
395 corresponding block size, and displacements array.
397 displs = (int*) xbt_malloc(size * sizeof(int));
398 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
400 total_count = rcounts[0];
401 max_block_count = rcounts[0];
402 for (i = 1; i < size; i++) {
403 displs[i] = total_count;
404 total_count += rcounts[i];
405 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
408 /* Special case for size == 1 */
410 if (MPI_IN_PLACE != sbuf) {
411 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
412 if (ret < 0) { line = __LINE__; goto error_hndl; }
418 /* Allocate and initialize temporary buffers, we need:
419 - a temporary buffer to perform reduction (size total_count) since
420 rbuf can be of rcounts[rank] size.
421 - up to two temporary buffers used for communication/computation overlap.
423 smpi_datatype_extent(dtype, &lb, &extent);
424 smpi_datatype_extent(dtype, &true_lb, &true_extent);
426 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
428 accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
429 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
430 accumbuf = accumbuf_free - lb;
432 inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
433 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
434 inbuf[0] = inbuf_free[0] - lb;
436 inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
437 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
438 inbuf[1] = inbuf_free[1] - lb;
441 /* Handle MPI_IN_PLACE for size > 1 */
442 if (MPI_IN_PLACE == sbuf) {
446 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
447 if (ret < 0) { line = __LINE__; goto error_hndl; }
449 /* Computation loop */
452 For each of the remote nodes:
453 - post irecv for block (r-2) from (r-1) with wrap around
454 - send block (r-1) to (r+1)
455 - in loop for every step k = 2 .. n
456 - post irecv for block (r - 1 + n - k) % n
457 - wait on block (r + n - k) % n to arrive
458 - compute on block (r + n - k ) % n
459 - send block (r + n - k) % n
461 - compute on block (r)
462 - copy block (r) to rbuf
463 Note that we must be careful when computing the begining of buffers and
464 for send operations and computation we must compute the exact block size.
466 send_to = (rank + 1) % size;
467 recv_from = (rank + size - 1) % size;
470 /* Initialize first receive from the neighbor on the left */
471 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
472 COLL_TAG_REDUCE_SCATTER, comm
474 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
475 smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
476 COLL_TAG_REDUCE_SCATTER,
479 for (k = 2; k < size; k++) {
480 const int prevblock = (rank + size - k) % size;
484 /* Post irecv for the current block */
485 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
486 COLL_TAG_REDUCE_SCATTER, comm
489 /* Wait on previous block to arrive */
490 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
492 /* Apply operation on previous block: result goes to rbuf
493 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
495 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
496 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
498 /* send previous block to send_to */
499 smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
500 COLL_TAG_REDUCE_SCATTER,
504 /* Wait on the last block to arrive */
505 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
507 /* Apply operation on the last block (my block)
508 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
509 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
510 smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
512 /* Copy result from tmprecv to rbuf */
513 ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
514 if (ret < 0) { line = __LINE__; goto error_hndl; }
516 if (NULL != displs) xbt_free(displs);
517 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
518 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
519 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
524 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
525 __FILE__, line, rank, ret);
526 if (NULL != displs) xbt_free(displs);
527 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
528 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
529 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);