1 /* Copyright (c) 2013-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
8 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
9 * University Research and Technology
10 * Corporation. All rights reserved.
11 * Copyright (c) 2004-2012 The University of Tennessee and The University
12 * of Tennessee Research Foundation. All rights
14 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
15 * University of Stuttgart. All rights reserved.
16 * Copyright (c) 2004-2005 The Regents of the University of California.
17 * All rights reserved.
18 * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
19 * Copyright (c) 2009 University of Houston. All rights reserved.
21 * Additional copyrights may follow
24 #include "colls_private.h"
25 #include "coll_tuned_topo.h"
28 * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
29 * I have removed the part which handles "large" message sizes
30 * (non-overlapping version of reduce_Scatter).
33 /* copied function (with appropriate renaming) starts here */
36 * reduce_scatter_ompi_basic_recursivehalving
38 * Function: - reduce scatter implementation using recursive-halving
40 * Accepts: - same as MPI_Reduce_scatter()
41 * Returns: - MPI_SUCCESS or error code
42 * Limitation: - Works only for commutative operations.
45 smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
53 int i, rank, size, count, err = MPI_SUCCESS;
54 int tmp_size=1, remain = 0, tmp_rank, *disps = NULL;
55 ptrdiff_t true_lb, true_extent, lb, extent, buf_size;
56 char *recv_buf = NULL, *recv_buf_free = NULL;
57 char *result_buf = NULL, *result_buf_free = NULL;
60 rank = smpi_comm_rank(comm);
61 size = smpi_comm_size(comm);
63 XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
64 if(!smpi_op_is_commute(op))
65 THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
67 /* Find displacements and the like */
68 disps = (int*) xbt_malloc(sizeof(int) * size);
69 if (NULL == disps) return MPI_ERR_OTHER;
72 for (i = 0; i < (size - 1); ++i) {
73 disps[i + 1] = disps[i] + rcounts[i];
75 count = disps[size - 1] + rcounts[size - 1];
77 /* short cut the trivial case */
83 /* get datatype information */
84 smpi_datatype_extent(dtype, &lb, &extent);
85 smpi_datatype_extent(dtype, &true_lb, &true_extent);
86 buf_size = true_extent + (ptrdiff_t)(count - 1) * extent;
88 /* Handle MPI_IN_PLACE */
89 if (MPI_IN_PLACE == sbuf) {
93 /* Allocate temporary receive buffer. */
94 recv_buf_free = (char*) xbt_malloc(buf_size);
95 recv_buf = recv_buf_free - lb;
96 if (NULL == recv_buf_free) {
101 /* allocate temporary buffer for results */
102 result_buf_free = (char*) xbt_malloc(buf_size);
103 result_buf = result_buf_free - lb;
105 /* copy local buffer into the temporary results */
106 err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype);
107 if (MPI_SUCCESS != err) goto cleanup;
109 /* figure out power of two mapping: grow until larger than
110 comm size, then go back one, to get the largest power of
111 two less than comm size */
112 while (tmp_size <= size) tmp_size <<= 1;
114 remain = size - tmp_size;
116 /* If comm size is not a power of two, have the first "remain"
117 procs with an even rank send to rank + 1, leaving a power of
118 two procs to do the rest of the algorithm */
119 if (rank < 2 * remain) {
120 if ((rank & 1) == 0) {
121 smpi_mpi_send(result_buf, count, dtype, rank + 1,
122 COLL_TAG_REDUCE_SCATTER,
124 /* we don't participate from here on out */
127 smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
128 COLL_TAG_REDUCE_SCATTER,
129 comm, MPI_STATUS_IGNORE);
131 /* integrate their results into our temp results */
132 smpi_op_apply(op, recv_buf, result_buf, &count, &dtype);
134 /* adjust rank to be the bottom "remain" ranks */
138 /* just need to adjust rank to show that the bottom "even
139 remain" ranks dropped out */
140 tmp_rank = rank - remain;
143 /* For ranks not kicked out by the above code, perform the
146 int *tmp_disps = NULL, *tmp_rcounts = NULL;
147 int mask, send_index, recv_index, last_index;
149 /* recalculate disps and rcounts to account for the
150 special "remainder" processes that are no longer doing
152 tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int));
153 if (NULL == tmp_rcounts) {
157 tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int));
158 if (NULL == tmp_disps) {
159 xbt_free(tmp_rcounts);
164 for (i = 0 ; i < tmp_size ; ++i) {
166 /* need to include old neighbor as well */
167 tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2];
169 tmp_rcounts[i] = rcounts[i + remain];
174 for (i = 0; i < tmp_size - 1; ++i) {
175 tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i];
178 /* do the recursive halving communication. Don't use the
179 dimension information on the communicator because I
180 think the information is invalidated by our "shrinking"
181 of the communicator */
182 mask = tmp_size >> 1;
183 send_index = recv_index = 0;
184 last_index = tmp_size;
186 int tmp_peer, peer, send_count, recv_count;
189 tmp_peer = tmp_rank ^ mask;
190 peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain;
192 /* figure out if we're sending, receiving, or both */
193 send_count = recv_count = 0;
194 if (tmp_rank < tmp_peer) {
195 send_index = recv_index + mask;
196 for (i = send_index ; i < last_index ; ++i) {
197 send_count += tmp_rcounts[i];
199 for (i = recv_index ; i < send_index ; ++i) {
200 recv_count += tmp_rcounts[i];
203 recv_index = send_index + mask;
204 for (i = send_index ; i < recv_index ; ++i) {
205 send_count += tmp_rcounts[i];
207 for (i = recv_index ; i < last_index ; ++i) {
208 recv_count += tmp_rcounts[i];
212 /* actual data transfer. Send from result_buf,
213 receive into recv_buf */
214 if (send_count > 0 && recv_count != 0) {
215 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
216 recv_count, dtype, peer,
217 COLL_TAG_REDUCE_SCATTER,
219 if (MPI_SUCCESS != err) {
220 xbt_free(tmp_rcounts);
225 if (recv_count > 0 && send_count != 0) {
226 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
227 send_count, dtype, peer,
228 COLL_TAG_REDUCE_SCATTER,
230 if (MPI_SUCCESS != err) {
231 xbt_free(tmp_rcounts);
236 if (send_count > 0 && recv_count != 0) {
237 smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
240 /* if we received something on this step, push it into
241 the results buffer */
242 if (recv_count > 0) {
244 recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
245 result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
246 &recv_count, &dtype);
249 /* update for next iteration */
250 send_index = recv_index;
251 last_index = recv_index + mask;
255 /* copy local results from results buffer into real receive buffer */
256 if (0 != rcounts[rank]) {
257 err = smpi_datatype_copy(result_buf + disps[rank] * extent,
258 rcounts[rank], dtype,
259 rbuf, rcounts[rank], dtype);
260 if (MPI_SUCCESS != err) {
261 xbt_free(tmp_rcounts);
267 xbt_free(tmp_rcounts);
271 /* Now fix up the non-power of two case, by having the odd
272 procs send the even procs the proper results */
273 if (rank < (2 * remain)) {
274 if ((rank & 1) == 0) {
276 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
277 COLL_TAG_REDUCE_SCATTER,
278 comm, MPI_STATUS_IGNORE);
281 if (rcounts[rank - 1]) {
282 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
283 rcounts[rank - 1], dtype, rank - 1,
284 COLL_TAG_REDUCE_SCATTER,
291 if (NULL != disps) xbt_free(disps);
292 if (NULL != recv_buf_free) xbt_free(recv_buf_free);
293 if (NULL != result_buf_free) xbt_free(result_buf_free);
298 /* copied function (with appropriate renaming) ends here */
302 * smpi_coll_tuned_reduce_scatter_ompi_ring
304 * Function: Ring algorithm for reduce_scatter operation
305 * Accepts: Same as MPI_Reduce_scatter()
306 * Returns: MPI_SUCCESS or error code
308 * Description: Implements ring algorithm for reduce_scatter:
309 * the block sizes defined in rcounts are exchanged and
310 8 updated until they reach proper destination.
311 * Algorithm requires 2 * max(rcounts) extra buffering
313 * Limitations: The algorithm DOES NOT preserve order of operations so it
314 * can be used only for commutative operations.
315 * Example on 5 nodes:
318 * [00] [10] -> [20] [30] [40]
319 * [01] [11] [21] -> [31] [41]
320 * [02] [12] [22] [32] -> [42]
321 * -> [03] [13] [23] [33] [43] --> ..
322 * [04] -> [14] [24] [34] [44]
325 * Step 0: rank r sends block (r-1) to rank (r+1) and
326 * receives block (r+1) from rank (r-1) [with wraparound].
328 * [00] [10] [10+20] -> [30] [40]
329 * [01] [11] [21] [21+31] -> [41]
330 * -> [02] [12] [22] [32] [32+42] -->..
331 * [43+03] -> [13] [23] [33] [43]
332 * [04] [04+14] -> [24] [34] [44]
336 * [00] [10] [10+20] [10+20+30] -> [40]
337 * -> [01] [11] [21] [21+31] [21+31+41] ->
338 * [32+42+02] -> [12] [22] [32] [32+42]
339 * [03] [43+03+13] -> [23] [33] [43]
340 * [04] [04+14] [04+14+24] -> [34] [44]
344 * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] ->
345 * [21+31+41+01]-> [11] [21] [21+31] [21+31+41]
346 * [32+42+02] [32+42+02+12]-> [22] [32] [32+42]
347 * [03] [43+03+13] [43+03+13+23]-> [33] [43]
348 * [04] [04+14] [04+14+24] [04+14+24+34] -> [44]
352 * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40]
353 * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41]
354 * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42]
355 * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43]
356 * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44]
361 smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
367 int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count;
368 int inbi, *displs = NULL;
369 char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL;
370 char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
371 ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
372 MPI_Request reqs[2] = {NULL, NULL};
374 size = smpi_comm_size(comm);
375 rank = smpi_comm_rank(comm);
377 XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d",
380 /* Determine the maximum number of elements per node,
381 corresponding block size, and displacements array.
383 displs = (int*) xbt_malloc(size * sizeof(int));
384 if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; }
386 total_count = rcounts[0];
387 max_block_count = rcounts[0];
388 for (i = 1; i < size; i++) {
389 displs[i] = total_count;
390 total_count += rcounts[i];
391 if (max_block_count < rcounts[i]) max_block_count = rcounts[i];
394 /* Special case for size == 1 */
396 if (MPI_IN_PLACE != sbuf) {
397 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype);
398 if (ret < 0) { line = __LINE__; goto error_hndl; }
404 /* Allocate and initialize temporary buffers, we need:
405 - a temporary buffer to perform reduction (size total_count) since
406 rbuf can be of rcounts[rank] size.
407 - up to two temporary buffers used for communication/computation overlap.
409 smpi_datatype_extent(dtype, &lb, &extent);
410 smpi_datatype_extent(dtype, &true_lb, &true_extent);
412 max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
414 accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
415 if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
416 accumbuf = accumbuf_free - lb;
418 inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
419 if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
420 inbuf[0] = inbuf_free[0] - lb;
422 inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
423 if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
424 inbuf[1] = inbuf_free[1] - lb;
427 /* Handle MPI_IN_PLACE for size > 1 */
428 if (MPI_IN_PLACE == sbuf) {
432 ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype);
433 if (ret < 0) { line = __LINE__; goto error_hndl; }
435 /* Computation loop */
438 For each of the remote nodes:
439 - post irecv for block (r-2) from (r-1) with wrap around
440 - send block (r-1) to (r+1)
441 - in loop for every step k = 2 .. n
442 - post irecv for block (r - 1 + n - k) % n
443 - wait on block (r + n - k) % n to arrive
444 - compute on block (r + n - k ) % n
445 - send block (r + n - k) % n
447 - compute on block (r)
448 - copy block (r) to rbuf
449 Note that we must be careful when computing the begining of buffers and
450 for send operations and computation we must compute the exact block size.
452 send_to = (rank + 1) % size;
453 recv_from = (rank + size - 1) % size;
456 /* Initialize first receive from the neighbor on the left */
457 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
458 COLL_TAG_REDUCE_SCATTER, comm
460 tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
461 smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
462 COLL_TAG_REDUCE_SCATTER,
465 for (k = 2; k < size; k++) {
466 const int prevblock = (rank + size - k) % size;
470 /* Post irecv for the current block */
471 reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
472 COLL_TAG_REDUCE_SCATTER, comm
475 /* Wait on previous block to arrive */
476 smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE);
478 /* Apply operation on previous block: result goes to rbuf
479 rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock]
481 tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent;
482 smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype);
484 /* send previous block to send_to */
485 smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
486 COLL_TAG_REDUCE_SCATTER,
490 /* Wait on the last block to arrive */
491 smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE);
493 /* Apply operation on the last block (my block)
494 rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */
495 tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent;
496 smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype);
498 /* Copy result from tmprecv to rbuf */
499 ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype);
500 if (ret < 0) { line = __LINE__; goto error_hndl; }
502 if (NULL != displs) xbt_free(displs);
503 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
504 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
505 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
510 XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
511 __FILE__, line, rank, ret);
512 if (NULL != displs) xbt_free(displs);
513 if (NULL != accumbuf_free) xbt_free(accumbuf_free);
514 if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
515 if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);