Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
6e829f86c85dee20f4743fe0ec3f519d75ba9c81
[simgrid.git] / src / smpi / colls / allreduce-smp-binomial-pipeline.c
1 #include "colls_private.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific (however, number of cores/node need to be changed) 
4    all-reduce operation designed for smp clusters
5    It uses 2-layer communication: binomial for both intra-communication 
6    inter-communication
7    The communication are done in a pipeline fashion */
8
9 /* change number of core per smp-node
10    we assume that number of core per process will be the same for all implementations */
11 #ifndef NUM_CORE
12 #define NUM_CORE 8
13 #endif
14
15 /* this is a default segment size for pipelining, 
16    but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
18
19 /* ** NOTE **
20    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21    for (loop over pipelength) {
22      smp-binomial main code;
23    }
24 */
25
26 /* ** NOTE **
27    Use -DMPICH2 if this code does not compile.
28    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
30 */
31
32 /*
33 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
34 1) binomial_tree reduce inside each SMP node
35 2) binomial_tree reduce intra-communication between root of each SMP node
36 3) binomial_tree bcast intra-communication between root of each SMP node
37 4) binomial_tree bcast inside each SMP node
38 */
39 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
40                                                     void *recv_buf, int count,
41                                                     MPI_Datatype dtype,
42                                                     MPI_Op op, MPI_Comm comm)
43 {
44   int comm_size, rank;
45   void *tmp_buf;
46   int tag = COLL_TAG_ALLREDUCE;
47   int mask, src, dst;
48   MPI_Status status;
49   int num_core = simcall_host_get_core(SIMIX_host_self());
50   // do we use the default one or the number of cores in the platform ?
51   // if the number of cores is one, the platform may be simulated with 1 node = 1 core
52   if (num_core == 1) num_core = NUM_CORE;
53
54   comm_size = smpi_comm_size(comm);
55   rank = smpi_comm_rank(comm);
56   MPI_Aint extent;
57   extent = smpi_datatype_get_extent(dtype);
58   tmp_buf = (void *) xbt_malloc(count * extent);
59
60   int intra_rank, inter_rank;
61   intra_rank = rank % num_core;
62   inter_rank = rank / num_core;
63
64   int phase;
65   int send_offset;
66   int recv_offset;
67   int pcount = allreduce_smp_binomial_pipeline_segment_size;
68   if (pcount > count) {
69     pcount = count;
70   }
71
72   /* size of processes participate in intra communications =>
73      should be equal to number of machines */
74   int inter_comm_size = (comm_size + num_core - 1) / num_core;
75
76   /* copy input buffer to output buffer */
77   smpi_mpi_sendrecv(send_buf, count, dtype, rank, tag,
78                recv_buf, count, dtype, rank, tag, comm, &status);
79
80   /* compute pipe length */
81   int pipelength;
82   pipelength = count / pcount;
83
84   /* pipelining over pipelength (+3 is because we have 4 stages:
85      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
86   for (phase = 0; phase < pipelength + 3; phase++) {
87
88     /* start binomial reduce intra communication inside each SMP node */
89     if (phase < pipelength) {
90       mask = 1;
91       while (mask < num_core) {
92         if ((mask & intra_rank) == 0) {
93           src = (inter_rank * num_core) + (intra_rank | mask);
94           if (src < comm_size) {
95             recv_offset = phase * pcount * extent;
96             smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
97             smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
98           }
99         } else {
100           send_offset = phase * pcount * extent;
101           dst = (inter_rank * num_core) + (intra_rank & (~mask));
102           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
103           break;
104         }
105         mask <<= 1;
106       }
107     }
108
109     /* start binomial reduce inter-communication between each SMP nodes: 
110        each node only have one process that can communicate to other nodes */
111     if ((phase > 0) && (phase < (pipelength + 1))) {
112       if (intra_rank == 0) {
113
114         mask = 1;
115         while (mask < inter_comm_size) {
116           if ((mask & inter_rank) == 0) {
117             src = (inter_rank | mask) * num_core;
118             if (src < comm_size) {
119               recv_offset = (phase - 1) * pcount * extent;
120               smpi_mpi_recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
121               smpi_op_apply(op, tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
122             }
123           } else {
124             dst = (inter_rank & (~mask)) * num_core;
125             send_offset = (phase - 1) * pcount * extent;
126             smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
127             break;
128           }
129           mask <<= 1;
130         }
131       }
132     }
133
134     /* start binomial broadcast inter-communication between each SMP nodes: 
135        each node only have one process that can communicate to other nodes */
136     if ((phase > 1) && (phase < (pipelength + 2))) {
137       if (intra_rank == 0) {
138         mask = 1;
139         while (mask < inter_comm_size) {
140           if (inter_rank & mask) {
141             src = (inter_rank - mask) * num_core;
142             recv_offset = (phase - 2) * pcount * extent;
143             smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
144                      &status);
145             break;
146           }
147           mask <<= 1;
148         }
149         mask >>= 1;
150
151         while (mask > 0) {
152           if (inter_rank < inter_comm_size) {
153             dst = (inter_rank + mask) * num_core;
154             if (dst < comm_size) {
155               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
156               send_offset = (phase - 2) * pcount * extent;
157               smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
158             }
159           }
160           mask >>= 1;
161         }
162       }
163     }
164
165     /* start binomial broadcast intra-communication inside each SMP nodes */
166     if (phase > 2) {
167       int num_core_in_current_smp = num_core;
168       if (inter_rank == (inter_comm_size - 1)) {
169         num_core_in_current_smp = comm_size - (inter_rank * num_core);
170       }
171       mask = 1;
172       while (mask < num_core_in_current_smp) {
173         if (intra_rank & mask) {
174           src = (inter_rank * num_core) + (intra_rank - mask);
175           recv_offset = (phase - 3) * pcount * extent;
176           smpi_mpi_recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
177                    &status);
178           break;
179         }
180         mask <<= 1;
181       }
182       mask >>= 1;
183
184       while (mask > 0) {
185         dst = (inter_rank * num_core) + (intra_rank + mask);
186         if (dst < comm_size) {
187           send_offset = (phase - 3) * pcount * extent;
188           smpi_mpi_send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
189         }
190         mask >>= 1;
191       }
192     }
193   }                             // for phase
194
195   free(tmp_buf);
196   return MPI_SUCCESS;
197 }