1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK
3 Non-topoloty-specific (however, number of cores/node need to be changed)
4 all-reduce operation designed for smp clusters
5 It uses 2-layer communication: binomial for intra-communication
6 and rdb for inter-communication*/
8 /* change number of core per smp-node
9 we assume that number of core per process will be the same for all implementations */
15 Use -DMPICH2 if this code does not compile.
16 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
17 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
20 //#include <star-reduction.c>
23 This fucntion performs all-reduce operation as follow.
24 1) binomial_tree reduce inside each SMP node
25 2) Recursive doubling intra-communication between root of each SMP node
26 3) binomial_tree bcast inside each SMP node
28 int smpi_coll_tuned_allreduce_smp_rdb(void *send_buf, void *recv_buf, int count,
29 MPI_Datatype dtype, MPI_Op op,
34 int tag = COLL_TAG_ALLREDUCE;
37 int num_core = NUM_CORE;
39 #ifdef MPICH2_REDUCTION
40 MPI_User_function * uop = MPIR_Op_table[op % 16 - 1];
42 MPI_User_function *uop;
43 struct MPIR_OP *op_ptr;
44 op_ptr = MPIR_ToPointer(op);
48 comm_size = smpi_comm_size(comm);
49 rank = smpi_comm_rank(comm);
51 extent = smpi_datatype_get_extent(dtype);
52 tmp_buf = (void *) xbt_malloc(count * extent);
54 /* compute intra and inter ranking */
55 int intra_rank, inter_rank;
56 intra_rank = rank % num_core;
57 inter_rank = rank / num_core;
59 /* size of processes participate in intra communications =>
60 should be equal to number of machines */
61 int inter_comm_size = (comm_size + num_core - 1) / num_core;
63 /* copy input buffer to output buffer */
64 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
65 recv_buf, count, dtype, rank, tag, comm, &status);
67 /* start binomial reduce intra communication inside each SMP node */
69 while (mask < num_core) {
70 if ((mask & intra_rank) == 0) {
71 src = (inter_rank * num_core) + (intra_rank | mask);
72 if (src < comm_size) {
73 smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
74 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
77 dst = (inter_rank * num_core) + (intra_rank & (~mask));
78 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
82 } /* end binomial reduce intra-communication */
85 /* start rdb (recursive doubling) all-reduce inter-communication
86 between each SMP nodes : each node only have one process that can communicate
88 if (intra_rank == 0) {
90 /* find nearest power-of-two less than or equal to inter_comm_size */
91 int pof2, rem, newrank, newdst;
93 while (pof2 <= inter_comm_size)
96 rem = inter_comm_size - pof2;
98 /* In the non-power-of-two case, all even-numbered
99 processes of rank < 2*rem send their data to
100 (rank+1). These even-numbered processes no longer
101 participate in the algorithm until the very end.
103 if (inter_rank < 2 * rem) {
104 if (inter_rank % 2 == 0) {
105 dst = rank + num_core;
106 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);
109 src = rank - num_core;
110 smpi_mpi_recv(tmp_buf, count, dtype, src, tag, comm, &status);
111 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
112 newrank = inter_rank / 2;
115 newrank = inter_rank - rem;
118 /* example inter-communication RDB rank change algorithm
119 0,4,8,12..36 <= true rank (assume 4 core per SMP)
120 0123 4567 89 <= inter_rank
121 1 3 4567 89 (1,3 got data from 0,2 : 0,2 will be idle until the end)
123 0 1 2345 67 => newrank
128 while (mask < pof2) {
129 newdst = newrank ^ mask;
130 /* find real rank of dest */
131 dst = (newdst < rem) ? newdst * 2 + 1 : newdst + rem;
134 /* exchange data in rdb manner */
135 smpi_mpi_sendrecv(recv_buf, count, dtype, dst, tag, tmp_buf, count, dtype,
136 dst, tag, comm, &status);
137 smpi_op_apply(op, tmp_buf, recv_buf, &count, &dtype);
143 left-over processes (all even ranks: < 2 * rem) get the result
145 if (inter_rank < 2 * rem) {
146 if (inter_rank % 2) {
147 smpi_mpi_send(recv_buf, count, dtype, rank - num_core, tag, comm);
149 smpi_mpi_recv(recv_buf, count, dtype, rank + num_core, tag, comm, &status);
154 /* start binomial broadcast intra-communication inside each SMP nodes */
155 int num_core_in_current_smp = num_core;
156 if (inter_rank == (inter_comm_size - 1)) {
157 num_core_in_current_smp = comm_size - (inter_rank * num_core);
160 while (mask < num_core_in_current_smp) {
161 if (intra_rank & mask) {
162 src = (inter_rank * num_core) + (intra_rank - mask);
163 smpi_mpi_recv(recv_buf, count, dtype, src, tag, comm, &status);
171 dst = (inter_rank * num_core) + (intra_rank + mask);
172 if (dst < comm_size) {
173 smpi_mpi_send(recv_buf, count, dtype, dst, tag, comm);