Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove warning
[simgrid.git] / src / smpi / colls / allreduce-smp-binomial-pipeline.c
1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific (however, number of cores/node need to be changed) 
4    all-reduce operation designed for smp clusters
5    It uses 2-layer communication: binomial for both intra-communication 
6    inter-communication
7    The communication are done in a pipeline fashion */
8
9 /* change number of core per smp-node
10    we assume that number of core per process will be the same for all implementations */
11 #ifndef NUM_CORE
12 #define NUM_CORE 8
13 #endif
14
15 /* this is a default segment size for pipelining, 
16    but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
18
19 /* ** NOTE **
20    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21    for (loop over pipelength) {
22      smp-binomial main code;
23    }
24 */
25
26 /* ** NOTE **
27    Use -DMPICH2 if this code does not compile.
28    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
30 */
31
32 /*
33 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
38 */
39 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
40                                                     void *recv_buf, int count,
41                                                     MPI_Datatype dtype,
42                                                     MPI_Op op, MPI_Comm comm)
43 {
44   int comm_size, rank;
45   void *tmp_buf;
46   int tag = 50;
47   int mask, src, dst;
48   MPI_Status status;
49   int num_core = NUM_CORE;
50
51   comm_size = smpi_comm_size(comm);
52   rank = smpi_comm_rank(comm);
53   MPI_Aint extent;
54   extent = smpi_datatype_get_extent(dtype);
55   tmp_buf = (void *) xbt_malloc(count * extent);
56
57   int intra_rank, inter_rank;
58   intra_rank = rank % num_core;
59   inter_rank = rank / num_core;
60
61   int phase;
62   int send_offset;
63   int recv_offset;
64   int pcount = allreduce_smp_binomial_pipeline_segment_size;
65   if (pcount > count) {
66     pcount = count;
67   }
68
69   /* size of processes participate in intra communications =>
70      should be equal to number of machines */
71   int inter_comm_size = (comm_size + num_core - 1) / num_core;
72
73   /* copy input buffer to output buffer */
74   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
75                recv_buf, count, dtype, rank, tag, comm, &status);
76
77   /* compute pipe length */
78   int pipelength;
79   pipelength = count / pcount;
80
81   /* pipelining over pipelength (+3 is because we have 4 stages:
82      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
83   for (phase = 0; phase < pipelength + 3; phase++) {
84
85     /* start binomial reduce intra communication inside each SMP node */
86     if (phase < pipelength) {
87       mask = 1;
88       while (mask < num_core) {
89         if ((mask & intra_rank) == 0) {
90           src = (inter_rank * num_core) + (intra_rank | mask);
91           if (src < comm_size) {
92             recv_offset = phase * pcount * extent;
93             smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
94             smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
95           }
96         } else {
97           send_offset = phase * pcount * extent;
98           dst = (inter_rank * num_core) + (intra_rank & (~mask));
99           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
100           break;
101         }
102         mask <<= 1;
103       }
104     }
105
106     /* start binomial reduce inter-communication between each SMP nodes: 
107        each node only have one process that can communicate to other nodes */
108     if ((phase > 0) && (phase < (pipelength + 1))) {
109       if (intra_rank == 0) {
110
111         mask = 1;
112         while (mask < inter_comm_size) {
113           if ((mask & inter_rank) == 0) {
114             src = (inter_rank | mask) * num_core;
115             if (src < comm_size) {
116               recv_offset = (phase - 1) * pcount * extent;
117               smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
118               smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
119             }
120           } else {
121             dst = (inter_rank & (~mask)) * num_core;
122             send_offset = (phase - 1) * pcount * extent;
123             smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
124             break;
125           }
126           mask <<= 1;
127         }
128       }
129     }
130
131     /* start binomial broadcast inter-communication between each SMP nodes: 
132        each node only have one process that can communicate to other nodes */
133     if ((phase > 1) && (phase < (pipelength + 2))) {
134       if (intra_rank == 0) {
135         mask = 1;
136         while (mask < inter_comm_size) {
137           if (inter_rank & mask) {
138             src = (inter_rank - mask) * num_core;
139             recv_offset = (phase - 2) * pcount * extent;
140             smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
141                      &status);
142             break;
143           }
144           mask <<= 1;
145         }
146         mask >>= 1;
147
148         while (mask > 0) {
149           if (inter_rank < inter_comm_size) {
150             dst = (inter_rank + mask) * num_core;
151             if (dst < comm_size) {
152               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
153               send_offset = (phase - 2) * pcount * extent;
154               smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
155             }
156           }
157           mask >>= 1;
158         }
159       }
160     }
161
162     /* start binomial broadcast intra-communication inside each SMP nodes */
163     if (phase > 2) {
164       int num_core_in_current_smp = num_core;
165       if (inter_rank == (inter_comm_size - 1)) {
166         num_core_in_current_smp = comm_size - (inter_rank * num_core);
167       }
168       mask = 1;
169       while (mask < num_core_in_current_smp) {
170         if (intra_rank & mask) {
171           src = (inter_rank * num_core) + (intra_rank - mask);
172           recv_offset = (phase - 3) * pcount * extent;
173           smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
174                    &status);
175           break;
176         }
177         mask <<= 1;
178       }
179       mask >>= 1;
180
181       while (mask > 0) {
182         dst = (inter_rank * num_core) + (intra_rank + mask);
183         if (dst < comm_size) {
184           send_offset = (phase - 3) * pcount * extent;
185           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
186         }
187         mask >>= 1;
188       }
189     }
190   }                             // for phase
191
192   free(tmp_buf);
193   return MPI_SUCCESS;
194 }