1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK
3 Non-topoloty-specific (however, number of cores/node need to be changed)
4 all-reduce operation designed for smp clusters
5 It uses 2-layer communication: binomial for both intra-communication
7 The communication are done in a pipeline fashion */
9 /* change number of core per smp-node
10 we assume that number of core per process will be the same for all implementations */
15 /* this is a default segment size for pipelining,
16 but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
20 This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21 for (loop over pipelength) {
22 smp-binomial main code;
27 Use -DMPICH2 if this code does not compile.
28 MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29 This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
33 extern void *MPIR_ToPointer();
36 MPI_User_function *op;
37 int commute, permanent;
41 extern MPI_User_function *MPIR_Op_table[];
45 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
46 1) binomial_tree reduce inside each SMP node
47 2) binomial_tree reduce intra-communication between root of each SMP node
48 3) binomial_tree bcast intra-communication between root of each SMP node
49 4) binomial_tree bcast inside each SMP node
51 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
52 void *recv_buf, int count,
54 MPI_Op op, MPI_Comm comm)
61 int num_core = NUM_CORE;
63 MPI_User_function *uop;
65 struct MPIR_OP *op_ptr = MPIR_ToPointer(op);
66 uop = (MPI_User_function *) op_ptr->op;
68 uop = MPIR_Op_table[op % 16 - 1];
71 comm_size = smpi_comm_size(comm);
72 rank = smpi_comm_rank(comm);
74 extent = smpi_datatype_get_extent(dtype);
75 tmp_buf = (void *) xbt_malloc(count * extent);
77 int intra_rank, inter_rank;
78 intra_rank = rank % num_core;
79 inter_rank = rank / num_core;
84 int pcount = allreduce_smp_binomial_pipeline_segment_size;
89 /* size of processes participate in intra communications =>
90 should be equal to number of machines */
91 int inter_comm_size = (comm_size + num_core - 1) / num_core;
93 /* copy input buffer to output buffer */
94 smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
95 recv_buf, count, dtype, rank, tag, comm, &status);
97 /* compute pipe length */
99 pipelength = count / pcount;
101 /* pipelining over pipelength (+3 is because we have 4 stages:
102 reduce-intra, reduce-inter, bcast-inter, bcast-intra */
103 for (phase = 0; phase < pipelength + 3; phase++) {
105 /* start binomial reduce intra communication inside each SMP node */
106 if (phase < pipelength) {
108 while (mask < num_core) {
109 if ((mask & intra_rank) == 0) {
110 src = (inter_rank * num_core) + (intra_rank | mask);
111 if (src < comm_size) {
112 recv_offset = phase * pcount * extent;
113 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
114 (*uop) (tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
117 send_offset = phase * pcount * extent;
118 dst = (inter_rank * num_core) + (intra_rank & (~mask));
119 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
126 /* start binomial reduce inter-communication between each SMP nodes:
127 each node only have one process that can communicate to other nodes */
128 if ((phase > 0) && (phase < (pipelength + 1))) {
129 if (intra_rank == 0) {
132 while (mask < inter_comm_size) {
133 if ((mask & inter_rank) == 0) {
134 src = (inter_rank | mask) * num_core;
135 if (src < comm_size) {
136 recv_offset = (phase - 1) * pcount * extent;
137 smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
138 (*uop) (tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
141 dst = (inter_rank & (~mask)) * num_core;
142 send_offset = (phase - 1) * pcount * extent;
143 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
151 /* start binomial broadcast inter-communication between each SMP nodes:
152 each node only have one process that can communicate to other nodes */
153 if ((phase > 1) && (phase < (pipelength + 2))) {
154 if (intra_rank == 0) {
156 while (mask < inter_comm_size) {
157 if (inter_rank & mask) {
158 src = (inter_rank - mask) * num_core;
159 recv_offset = (phase - 2) * pcount * extent;
160 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
169 if (inter_rank < inter_comm_size) {
170 dst = (inter_rank + mask) * num_core;
171 if (dst < comm_size) {
172 //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
173 send_offset = (phase - 2) * pcount * extent;
174 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
182 /* start binomial broadcast intra-communication inside each SMP nodes */
184 int num_core_in_current_smp = num_core;
185 if (inter_rank == (inter_comm_size - 1)) {
186 num_core_in_current_smp = comm_size - (inter_rank * num_core);
189 while (mask < num_core_in_current_smp) {
190 if (intra_rank & mask) {
191 src = (inter_rank * num_core) + (intra_rank - mask);
192 recv_offset = (phase - 3) * pcount * extent;
193 smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
202 dst = (inter_rank * num_core) + (intra_rank + mask);
203 if (dst < comm_size) {
204 send_offset = (phase - 3) * pcount * extent;
205 smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);