1 /* selector for collective algorithms based on mpich decision logic */
3 /* Copyright (c) 2009-2020. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "colls_private.hpp"
11 /* This is the default implementation of allreduce. The algorithm is:
13 Algorithm: MPI_Allreduce
15 For the heterogeneous case, we call MPI_Reduce followed by MPI_Bcast
16 in order to meet the requirement that all processes must have the
17 same result. For the homogeneous case, we use the following algorithms.
20 For long messages and for builtin ops and if count >= pof2 (where
21 pof2 is the nearest power-of-two less than or equal to the number
22 of processes), we use Rabenseifner's algorithm (see
23 http://www.hlrs.de/mpi/myreduce.html).
24 This algorithm implements the allreduce in two steps: first a
25 reduce-scatter, followed by an allgather. A recursive-halving
26 algorithm (beginning with processes that are distance 1 apart) is
27 used for the reduce-scatter, and a recursive doubling
28 algorithm is used for the allgather. The non-power-of-two case is
29 handled by dropping to the nearest lower power-of-two: the first
30 few even-numbered processes send their data to their right neighbors
31 (rank+1), and the reduce-scatter and allgather happen among the remaining
32 power-of-two processes. At the end, the first few even-numbered
33 processes get the result from their right neighbors.
35 For the power-of-two case, the cost for the reduce-scatter is
36 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
37 allgather lgp.alpha + n.((p-1)/p).beta. Therefore, the
39 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
41 For the non-power-of-two case,
42 Cost = (2.floor(lgp)+2).alpha + (2.((p-1)/p) + 2).n.beta + n.(1+(p-1)/p).gamma
45 For short messages, for user-defined ops, and for count < pof2
46 we use a recursive doubling algorithm (similar to the one in
47 MPI_Allgather). We use this algorithm in the case of user-defined ops
48 because in this case derived datatypes are allowed, and the user
49 could pass basic datatypes on one process and derived on another as
50 long as the type maps are the same. Breaking up derived datatypes
51 to do the reduce-scatter is tricky.
53 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
55 Possible improvements:
57 End Algorithm: MPI_Allreduce
61 int allreduce__mpich(const void *sbuf, void *rbuf, int count,
62 MPI_Datatype dtype, MPI_Op op, MPI_Comm comm)
64 size_t dsize, block_dsize;
65 int comm_size = comm->size();
66 const size_t large_message = 2048; //MPIR_PARAM_ALLREDUCE_SHORT_MSG_SIZE
68 dsize = dtype->size();
69 block_dsize = dsize * count;
71 /*MPICH uses SMP algorithms for all commutative ops now*/
72 if (not comm->is_smp_comm()) {
73 if(comm->get_leaders_comm()==MPI_COMM_NULL){
76 if(op->is_commutative())
77 return allreduce__mvapich2_two_level(sbuf, rbuf,count, dtype, op, comm);
80 /* find nearest power-of-two less than or equal to comm_size */
82 while (pof2 <= comm_size) pof2 <<= 1;
85 if (block_dsize > large_message && count >= pof2 && (op==MPI_OP_NULL || op->is_commutative())) {
87 return allreduce__rab_rdb(sbuf, rbuf, count, dtype, op, comm);
89 //for short ones and count < pof2
90 return allreduce__rdb(sbuf, rbuf, count, dtype, op, comm);
95 /* This is the default implementation of alltoall. The algorithm is:
97 Algorithm: MPI_Alltoall
99 We use four algorithms for alltoall. For short messages and
100 (comm_size >= 8), we use the algorithm by Jehoshua Bruck et al,
101 IEEE TPDS, Nov. 1997. It is a store-and-forward algorithm that
102 takes lgp steps. Because of the extra communication, the bandwidth
103 requirement is (n/2).lgp.beta.
105 Cost = lgp.alpha + (n/2).lgp.beta
107 where n is the total amount of data a process needs to send to all
110 For medium size messages and (short messages for comm_size < 8), we
111 use an algorithm that posts all irecvs and isends and then does a
112 waitall. We scatter the order of sources and destinations among the
113 processes, so that all processes don't try to send/recv to/from the
114 same process at the same time.
116 *** Modification: We post only a small number of isends and irecvs
117 at a time and wait on them as suggested by Tony Ladd. ***
118 *** See comments below about an additional modification that
119 we may want to consider ***
121 For long messages and power-of-two number of processes, we use a
122 pairwise exchange algorithm, which takes p-1 steps. We
123 calculate the pairs by using an exclusive-or algorithm:
124 for (i=1; i<comm_size; i++)
126 This algorithm doesn't work if the number of processes is not a power of
127 two. For a non-power-of-two number of processes, we use an
128 algorithm in which, in step i, each process receives from (rank-i)
129 and sends to (rank+i).
131 Cost = (p-1).alpha + n.beta
133 where n is the total amount of data a process needs to send to all
136 Possible improvements:
138 End Algorithm: MPI_Alltoall
141 int alltoall__mpich(const void *sbuf, int scount,
143 void* rbuf, int rcount,
147 int communicator_size;
148 size_t dsize, block_dsize;
149 communicator_size = comm->size();
151 unsigned int short_size=256;
152 unsigned int medium_size=32768;
153 //short size and comm_size >=8 -> bruck
155 // medium size messages and (short messages for comm_size < 8), we
156 // use an algorithm that posts all irecvs and isends and then does a
159 // For long messages and power-of-two number of processes, we use a
160 // pairwise exchange algorithm
162 // For a non-power-of-two number of processes, we use an
163 // algorithm in which, in step i, each process receives from (rank-i)
164 // and sends to (rank+i).
167 dsize = sdtype->size();
168 block_dsize = dsize * scount;
170 if ((block_dsize < short_size) && (communicator_size >= 8)) {
171 return alltoall__bruck(sbuf, scount, sdtype,
172 rbuf, rcount, rdtype,
175 } else if (block_dsize < medium_size) {
176 return alltoall__mvapich2_scatter_dest(sbuf, scount, sdtype,
177 rbuf, rcount, rdtype,
179 }else if (communicator_size%2){
180 return alltoall__pair(sbuf, scount, sdtype,
181 rbuf, rcount, rdtype,
185 return alltoall__ring(sbuf, scount, sdtype,
186 rbuf, rcount, rdtype,
190 int alltoallv__mpich(const void *sbuf, const int *scounts, const int *sdisps,
192 void *rbuf, const int *rcounts, const int *rdisps,
197 /* For starters, just keep the original algorithm. */
198 return alltoallv__bruck(sbuf, scounts, sdisps, sdtype,
199 rbuf, rcounts, rdisps,rdtype,
204 int barrier__mpich(MPI_Comm comm)
206 return barrier__ompi_bruck(comm);
209 /* This is the default implementation of broadcast. The algorithm is:
213 For short messages, we use a binomial tree algorithm.
214 Cost = lgp.alpha + n.lgp.beta
216 For long messages, we do a scatter followed by an allgather.
217 We first scatter the buffer using a binomial tree algorithm. This costs
218 lgp.alpha + n.((p-1)/p).beta
219 If the datatype is contiguous and the communicator is homogeneous,
220 we treat the data as bytes and divide (scatter) it among processes
221 by using ceiling division. For the noncontiguous or heterogeneous
222 cases, we first pack the data into a temporary buffer by using
223 MPI_Pack, scatter it as bytes, and unpack it after the allgather.
225 For the allgather, we use a recursive doubling algorithm for
226 medium-size messages and power-of-two number of processes. This
227 takes lgp steps. In each step pairs of processes exchange all the
228 data they have (we take care of non-power-of-two situations). This
229 costs approximately lgp.alpha + n.((p-1)/p).beta. (Approximately
230 because it may be slightly more in the non-power-of-two case, but
231 it's still a logarithmic algorithm.) Therefore, for long messages
232 Total Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta
234 Note that this algorithm has twice the latency as the tree algorithm
235 we use for short messages, but requires lower bandwidth: 2.n.beta
236 versus n.lgp.beta. Therefore, for long messages and when lgp > 2,
237 this algorithm will perform better.
239 For long messages and for medium-size messages and non-power-of-two
240 processes, we use a ring algorithm for the allgather, which
241 takes p-1 steps, because it performs better than recursive doubling.
242 Total Cost = (lgp+p-1).alpha + 2.n.((p-1)/p).beta
244 Possible improvements:
245 For clusters of SMPs, we may want to do something differently to
246 take advantage of shared memory on each node.
248 End Algorithm: MPI_Bcast
252 int bcast__mpich(void *buff, int count,
253 MPI_Datatype datatype, int root,
257 /* Decision function based on MX results for
258 messages up to 36MB and communicator sizes up to 64 nodes */
259 const size_t small_message_size = 12288;
260 const size_t intermediate_message_size = 524288;
262 int communicator_size;
264 size_t message_size, dsize;
266 if (not comm->is_smp_comm()) {
267 if(comm->get_leaders_comm()==MPI_COMM_NULL){
270 if(comm->is_uniform())
271 return bcast__SMP_binomial(buff, count, datatype, root, comm);
274 communicator_size = comm->size();
276 /* else we need data size for decision function */
277 dsize = datatype->size();
278 message_size = dsize * (unsigned long)count; /* needed for decision */
280 /* Handle messages of small and intermediate size, and
281 single-element broadcasts */
282 if ((message_size < small_message_size) || (communicator_size <= 8)) {
283 /* Binomial without segmentation */
284 return bcast__binomial_tree(buff, count, datatype, root, comm);
286 } else if (message_size < intermediate_message_size && !(communicator_size%2)) {
287 // SplittedBinary with 1KB segments
288 return bcast__scatter_rdb_allgather(buff, count, datatype, root, comm);
291 //Handle large message sizes
292 return bcast__scatter_LR_allgather(buff, count, datatype, root, comm);
298 /* This is the default implementation of reduce. The algorithm is:
300 Algorithm: MPI_Reduce
302 For long messages and for builtin ops and if count >= pof2 (where
303 pof2 is the nearest power-of-two less than or equal to the number
304 of processes), we use Rabenseifner's algorithm (see
305 http://www.hlrs.de/organization/par/services/models/mpi/myreduce.html ).
306 This algorithm implements the reduce in two steps: first a
307 reduce-scatter, followed by a gather to the root. A
308 recursive-halving algorithm (beginning with processes that are
309 distance 1 apart) is used for the reduce-scatter, and a binomial tree
310 algorithm is used for the gather. The non-power-of-two case is
311 handled by dropping to the nearest lower power-of-two: the first
312 few odd-numbered processes send their data to their left neighbors
313 (rank-1), and the reduce-scatter happens among the remaining
314 power-of-two processes. If the root is one of the excluded
315 processes, then after the reduce-scatter, rank 0 sends its result to
316 the root and exits; the root now acts as rank 0 in the binomial tree
317 algorithm for gather.
319 For the power-of-two case, the cost for the reduce-scatter is
320 lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma. The cost for the
321 gather to root is lgp.alpha + n.((p-1)/p).beta. Therefore, the
323 Cost = 2.lgp.alpha + 2.n.((p-1)/p).beta + n.((p-1)/p).gamma
325 For the non-power-of-two case, assuming the root is not one of the
326 odd-numbered processes that get excluded in the reduce-scatter,
327 Cost = (2.floor(lgp)+1).alpha + (2.((p-1)/p) + 1).n.beta +
331 For short messages, user-defined ops, and count < pof2, we use a
332 binomial tree algorithm for both short and long messages.
334 Cost = lgp.alpha + n.lgp.beta + n.lgp.gamma
337 We use the binomial tree algorithm in the case of user-defined ops
338 because in this case derived datatypes are allowed, and the user
339 could pass basic datatypes on one process and derived on another as
340 long as the type maps are the same. Breaking up derived datatypes
341 to do the reduce-scatter is tricky.
343 FIXME: Per the MPI-2.1 standard this case is not possible. We
344 should be able to use the reduce-scatter/gather approach as long as
345 count >= pof2. [goodell@ 2009-01-21]
347 Possible improvements:
349 End Algorithm: MPI_Reduce
353 int reduce__mpich(const void *sendbuf, void *recvbuf,
354 int count, MPI_Datatype datatype,
359 int communicator_size=0;
360 size_t message_size, dsize;
362 if (not comm->is_smp_comm()) {
363 if(comm->get_leaders_comm()==MPI_COMM_NULL){
366 if (op->is_commutative() == 1)
367 return reduce__mvapich2_two_level(sendbuf, recvbuf, count, datatype, op, root, comm);
370 communicator_size = comm->size();
372 /* need data size for decision function */
373 dsize=datatype->size();
374 message_size = dsize * count; /* needed for decision */
377 while (pof2 <= communicator_size) pof2 <<= 1;
380 if ((count < pof2) || (message_size < 2048) || (op != MPI_OP_NULL && not op->is_commutative())) {
381 return reduce__binomial(sendbuf, recvbuf, count, datatype, op, root, comm);
383 return reduce__scatter_gather(sendbuf, recvbuf, count, datatype, op, root, comm);
388 /* This is the default implementation of reduce_scatter. The algorithm is:
390 Algorithm: MPI_Reduce_scatter
392 If the operation is commutative, for short and medium-size
393 messages, we use a recursive-halving
394 algorithm in which the first p/2 processes send the second n/2 data
395 to their counterparts in the other half and receive the first n/2
396 data from them. This procedure continues recursively, halving the
397 data communicated at each step, for a total of lgp steps. If the
398 number of processes is not a power-of-two, we convert it to the
399 nearest lower power-of-two by having the first few even-numbered
400 processes send their data to the neighboring odd-numbered process
401 at (rank+1). Those odd-numbered processes compute the result for
402 their left neighbor as well in the recursive halving algorithm, and
403 then at the end send the result back to the processes that didn't
405 Therefore, if p is a power-of-two,
406 Cost = lgp.alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
407 If p is not a power-of-two,
408 Cost = (floor(lgp)+2).alpha + n.(1+(p-1+n)/p).beta + n.(1+(p-1)/p).gamma
409 The above cost in the non power-of-two case is approximate because
410 there is some imbalance in the amount of work each process does
411 because some processes do the work of their neighbors as well.
413 For commutative operations and very long messages we use
414 we use a pairwise exchange algorithm similar to
415 the one used in MPI_Alltoall. At step i, each process sends n/p
416 amount of data to (rank+i) and receives n/p amount of data from
418 Cost = (p-1).alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
421 If the operation is not commutative, we do the following:
423 We use a recursive doubling algorithm, which
424 takes lgp steps. At step 1, processes exchange (n-n/p) amount of
425 data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
426 amount of data, and so forth.
428 Cost = lgp.alpha + n.(lgp-(p-1)/p).beta + n.(lgp-(p-1)/p).gamma
430 Possible improvements:
432 End Algorithm: MPI_Reduce_scatter
436 int reduce_scatter__mpich(const void *sbuf, void *rbuf,
444 size_t total_message_size;
446 if(sbuf==rbuf)sbuf=MPI_IN_PLACE; //restore MPI_IN_PLACE as these algorithms handle it
448 XBT_DEBUG("Coll_reduce_scatter_mpich::reduce");
450 comm_size = comm->size();
451 // We need data size for decision function
452 total_message_size = 0;
453 for (i = 0; i < comm_size; i++) {
454 total_message_size += rcounts[i];
457 if( (op==MPI_OP_NULL || op->is_commutative()) && total_message_size > 524288) {
458 return reduce_scatter__mpich_pair(sbuf, rbuf, rcounts, dtype, op, comm);
459 } else if ((op != MPI_OP_NULL && not op->is_commutative())) {
460 bool is_block_regular = true;
461 for (i = 0; i < (comm_size - 1); ++i) {
462 if (rcounts[i] != rcounts[i + 1]) {
463 is_block_regular = false;
468 /* slightly retask pof2 to mean pof2 equal or greater, not always greater as it is above */
470 while (pof2 < comm_size)
473 if (pof2 == comm_size && is_block_regular) {
474 /* noncommutative, pof2 size, and block regular */
475 return reduce_scatter__mpich_noncomm(sbuf, rbuf, rcounts, dtype, op, comm);
478 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
480 return reduce_scatter__mpich_rdb(sbuf, rbuf, rcounts, dtype, op, comm);
485 /* This is the default implementation of allgather. The algorithm is:
487 Algorithm: MPI_Allgather
489 For short messages and non-power-of-two no. of processes, we use
490 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
491 paper. It is a variant of the disemmination algorithm for
492 barrier. It takes ceiling(lg p) steps.
494 Cost = lgp.alpha + n.((p-1)/p).beta
495 where n is total size of data gathered on each process.
497 For short or medium-size messages and power-of-two no. of
498 processes, we use the recursive doubling algorithm.
500 Cost = lgp.alpha + n.((p-1)/p).beta
502 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
503 algorithm in all cases because of the pairwise-exchange property of
504 recursive doubling (see Benson et al paper in Euro PVM/MPI
507 It is interesting to note that either of the above algorithms for
508 MPI_Allgather has the same cost as the tree algorithm for MPI_Gather!
510 For long messages or medium-size messages and non-power-of-two
511 no. of processes, we use a ring algorithm. In the first step, each
512 process i sends its contribution to process i+1 and receives
513 the contribution from process i-1 (with wrap-around). From the
514 second step onwards, each process i forwards to process i+1 the
515 data it received from process i-1 in the previous step. This takes
516 a total of p-1 steps.
518 Cost = (p-1).alpha + n.((p-1)/p).beta
520 We use this algorithm instead of recursive doubling for long
521 messages because we find that this communication pattern (nearest
522 neighbor) performs twice as fast as recursive doubling for long
523 messages (on Myrinet and IBM SP).
525 Possible improvements:
527 End Algorithm: MPI_Allgather
530 int allgather__mpich(const void *sbuf, int scount,
532 void* rbuf, int rcount,
537 int communicator_size, pow2_size;
538 size_t dsize, total_dsize;
540 communicator_size = comm->size();
542 /* Determine complete data size */
543 dsize=sdtype->size();
544 total_dsize = dsize * scount * communicator_size;
546 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
548 /* Decision as in MPICH-2
549 presented in Thakur et.al. "Optimization of Collective Communication
550 Operations in MPICH", International Journal of High Performance Computing
551 Applications, Vol. 19, No. 1, 49-66 (2005)
552 - for power-of-two processes and small and medium size messages
553 (up to 512KB) use recursive doubling
554 - for non-power-of-two processes and small messages (80KB) use bruck,
555 - for everything else use ring.
557 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
558 return allgather__rdb(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
559 } else if (total_dsize <= 81920) {
560 return allgather__bruck(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
562 return allgather__ring(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm);
566 /* This is the default implementation of allgatherv. The algorithm is:
568 Algorithm: MPI_Allgatherv
570 For short messages and non-power-of-two no. of processes, we use
571 the algorithm from the Jehoshua Bruck et al IEEE TPDS Nov 97
572 paper. It is a variant of the disemmination algorithm for
573 barrier. It takes ceiling(lg p) steps.
575 Cost = lgp.alpha + n.((p-1)/p).beta
576 where n is total size of data gathered on each process.
578 For short or medium-size messages and power-of-two no. of
579 processes, we use the recursive doubling algorithm.
581 Cost = lgp.alpha + n.((p-1)/p).beta
583 TODO: On TCP, we may want to use recursive doubling instead of the Bruck
584 algorithm in all cases because of the pairwise-exchange property of
585 recursive doubling (see Benson et al paper in Euro PVM/MPI
588 For long messages or medium-size messages and non-power-of-two
589 no. of processes, we use a ring algorithm. In the first step, each
590 process i sends its contribution to process i+1 and receives
591 the contribution from process i-1 (with wrap-around). From the
592 second step onwards, each process i forwards to process i+1 the
593 data it received from process i-1 in the previous step. This takes
594 a total of p-1 steps.
596 Cost = (p-1).alpha + n.((p-1)/p).beta
598 Possible improvements:
600 End Algorithm: MPI_Allgatherv
602 int allgatherv__mpich(const void *sbuf, int scount,
604 void* rbuf, const int *rcounts,
610 int communicator_size, pow2_size,i;
613 communicator_size = comm->size();
615 /* Determine complete data size */
617 for (i=0; i<communicator_size; i++)
618 total_dsize += rcounts[i];
619 if (total_dsize == 0)
622 for (pow2_size = 1; pow2_size < communicator_size; pow2_size <<=1);
624 if ((pow2_size == communicator_size) && (total_dsize < 524288)) {
625 return allgatherv__mpich_rdb(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
626 } else if (total_dsize <= 81920) {
627 return allgatherv__ompi_bruck(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
629 return allgatherv__mpich_ring(sbuf, scount, sdtype, rbuf, rcounts, rdispls, rdtype, comm);
632 /* This is the default implementation of gather. The algorithm is:
634 Algorithm: MPI_Gather
636 We use a binomial tree algorithm for both short and long
637 messages. At nodes other than leaf nodes we need to allocate a
638 temporary buffer to store the incoming message. If the root is not
639 rank 0, for very small messages, we pack it into a temporary
640 contiguous buffer and reorder it to be placed in the right
641 order. For small (but not very small) messages, we use a derived
642 datatype to unpack the incoming data into non-contiguous buffers in
643 the right order. In the heterogeneous case we first pack the
644 buffers by using MPI_Pack and then do the gather.
646 Cost = lgp.alpha + n.((p-1)/p).beta
647 where n is the total size of the data gathered at the root.
649 Possible improvements:
651 End Algorithm: MPI_Gather
654 int gather__mpich(const void *sbuf, int scount,
656 void* rbuf, int rcount,
662 return gather__ompi_binomial(sbuf, scount, sdtype,
663 rbuf, rcount, rdtype,
667 /* This is the default implementation of scatter. The algorithm is:
669 Algorithm: MPI_Scatter
671 We use a binomial tree algorithm for both short and
672 long messages. At nodes other than leaf nodes we need to allocate
673 a temporary buffer to store the incoming message. If the root is
674 not rank 0, we reorder the sendbuf in order of relative ranks by
675 copying it into a temporary buffer, so that all the sends from the
676 root are contiguous and in the right order. In the heterogeneous
677 case, we first pack the buffer by using MPI_Pack and then do the
680 Cost = lgp.alpha + n.((p-1)/p).beta
681 where n is the total size of the data to be scattered from the root.
683 Possible improvements:
685 End Algorithm: MPI_Scatter
689 int scatter__mpich(const void *sbuf, int scount,
691 void* rbuf, int rcount,
693 int root, MPI_Comm comm
696 std::unique_ptr<unsigned char[]> tmp_buf;
697 if(comm->rank()!=root){
698 tmp_buf.reset(new unsigned char[rcount * rdtype->get_extent()]);
699 sbuf = tmp_buf.get();
703 return scatter__ompi_binomial(sbuf, scount, sdtype, rbuf, rcount, rdtype, root, comm);