Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
adapt two collectives of starmpi to avoid timing issues, by using only smpi calls...
[simgrid.git] / src / smpi / colls / allreduce-smp-binomial-pipeline.c
1 #include "colls.h"
2 /* IMPLEMENTED BY PITCH PATARASUK 
3    Non-topoloty-specific (however, number of cores/node need to be changed) 
4    all-reduce operation designed for smp clusters
5    It uses 2-layer communication: binomial for both intra-communication 
6    inter-communication
7    The communication are done in a pipeline fashion */
8
9 /* change number of core per smp-node
10    we assume that number of core per process will be the same for all implementations */
11 #ifndef NUM_CORE
12 #define NUM_CORE 8
13 #endif
14
15 /* this is a default segment size for pipelining, 
16    but it is typically passed as a command line argument */
17 int allreduce_smp_binomial_pipeline_segment_size = 4096;
18
19 /* ** NOTE **
20    This code is modified from allreduce-smp-binomial.c by wrapping the code with pipeline effect as follow
21    for (loop over pipelength) {
22      smp-binomial main code;
23    }
24 */
25
26 /* ** NOTE **
27    Use -DMPICH2 if this code does not compile.
28    MPICH1 code also work on MPICH2 on our cluster and the performance are similar.
29    This code assume commutative and associative reduce operator (MPI_SUM, MPI_MAX, etc).
30 */
31
32 #ifndef MPICH2
33 extern void *MPIR_ToPointer();
34
35 struct MPIR_OP {
36   MPI_User_function *op;
37   int commute, permanent;
38 };
39
40 #else
41 extern MPI_User_function *MPIR_Op_table[];
42 #endif
43
44 /*
45 This fucntion performs all-reduce operation as follow. ** in a pipeline fashion **
46 1) binomial_tree reduce inside each SMP node
47 2) binomial_tree reduce intra-communication between root of each SMP node
48 3) binomial_tree bcast intra-communication between root of each SMP node
49 4) binomial_tree bcast inside each SMP node
50 */
51 int smpi_coll_tuned_allreduce_smp_binomial_pipeline(void *send_buf,
52                                                     void *recv_buf, int count,
53                                                     MPI_Datatype dtype,
54                                                     MPI_Op op, MPI_Comm comm)
55 {
56   int comm_size, rank;
57   void *tmp_buf;
58   int tag = 50;
59   int mask, src, dst;
60   MPI_Status status;
61   int num_core = NUM_CORE;
62
63   MPI_User_function *uop;
64 #ifndef MPICH2
65   struct MPIR_OP *op_ptr = MPIR_ToPointer(op);
66   uop = (MPI_User_function *) op_ptr->op;
67 #else
68   uop = MPIR_Op_table[op % 16 - 1];
69 #endif
70
71   MPI_Comm_size(comm, &comm_size);
72   MPI_Comm_rank(comm, &rank);
73   MPI_Aint extent;
74   MPI_Type_extent(dtype, &extent);
75   tmp_buf = (void *) malloc(count * extent);
76
77   int intra_rank, inter_rank;
78   intra_rank = rank % num_core;
79   inter_rank = rank / num_core;
80
81   int phase;
82   int send_offset;
83   int recv_offset;
84   int pcount = allreduce_smp_binomial_pipeline_segment_size;
85   if (pcount > count) {
86     pcount = count;
87   }
88
89   /* size of processes participate in intra communications =>
90      should be equal to number of machines */
91   int inter_comm_size = (comm_size + num_core - 1) / num_core;
92
93   /* copy input buffer to output buffer */
94   MPI_Sendrecv(send_buf, count, dtype, rank, tag,
95                recv_buf, count, dtype, rank, tag, comm, &status);
96
97   /* compute pipe length */
98   int pipelength;
99   pipelength = count / pcount;
100
101   /* pipelining over pipelength (+3 is because we have 4 stages:
102      reduce-intra, reduce-inter, bcast-inter, bcast-intra */
103   for (phase = 0; phase < pipelength + 3; phase++) {
104
105     /* start binomial reduce intra communication inside each SMP node */
106     if (phase < pipelength) {
107       mask = 1;
108       while (mask < num_core) {
109         if ((mask & intra_rank) == 0) {
110           src = (inter_rank * num_core) + (intra_rank | mask);
111           if (src < comm_size) {
112             recv_offset = phase * pcount * extent;
113             MPI_Recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
114             (*uop) (tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
115           }
116         } else {
117           send_offset = phase * pcount * extent;
118           dst = (inter_rank * num_core) + (intra_rank & (~mask));
119           MPI_Send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
120           break;
121         }
122         mask <<= 1;
123       }
124     }
125
126     /* start binomial reduce inter-communication between each SMP nodes: 
127        each node only have one process that can communicate to other nodes */
128     if ((phase > 0) && (phase < (pipelength + 1))) {
129       if (intra_rank == 0) {
130
131         mask = 1;
132         while (mask < inter_comm_size) {
133           if ((mask & inter_rank) == 0) {
134             src = (inter_rank | mask) * num_core;
135             if (src < comm_size) {
136               recv_offset = (phase - 1) * pcount * extent;
137               MPI_Recv(tmp_buf, pcount, dtype, src, tag, comm, &status);
138               (*uop) (tmp_buf, (char *)recv_buf + recv_offset, &pcount, &dtype);
139             }
140           } else {
141             dst = (inter_rank & (~mask)) * num_core;
142             send_offset = (phase - 1) * pcount * extent;
143             MPI_Send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
144             break;
145           }
146           mask <<= 1;
147         }
148       }
149     }
150
151     /* start binomial broadcast inter-communication between each SMP nodes: 
152        each node only have one process that can communicate to other nodes */
153     if ((phase > 1) && (phase < (pipelength + 2))) {
154       if (intra_rank == 0) {
155         mask = 1;
156         while (mask < inter_comm_size) {
157           if (inter_rank & mask) {
158             src = (inter_rank - mask) * num_core;
159             recv_offset = (phase - 2) * pcount * extent;
160             MPI_Recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
161                      &status);
162             break;
163           }
164           mask <<= 1;
165         }
166         mask >>= 1;
167
168         while (mask > 0) {
169           if (inter_rank < inter_comm_size) {
170             dst = (inter_rank + mask) * num_core;
171             if (dst < comm_size) {
172               //printf("Node %d send to node %d when mask is %d\n", rank, dst, mask);
173               send_offset = (phase - 2) * pcount * extent;
174               MPI_Send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
175             }
176           }
177           mask >>= 1;
178         }
179       }
180     }
181
182     /* start binomial broadcast intra-communication inside each SMP nodes */
183     if (phase > 2) {
184       int num_core_in_current_smp = num_core;
185       if (inter_rank == (inter_comm_size - 1)) {
186         num_core_in_current_smp = comm_size - (inter_rank * num_core);
187       }
188       mask = 1;
189       while (mask < num_core_in_current_smp) {
190         if (intra_rank & mask) {
191           src = (inter_rank * num_core) + (intra_rank - mask);
192           recv_offset = (phase - 3) * pcount * extent;
193           MPI_Recv((char *)recv_buf + recv_offset, pcount, dtype, src, tag, comm,
194                    &status);
195           break;
196         }
197         mask <<= 1;
198       }
199       mask >>= 1;
200
201       while (mask > 0) {
202         dst = (inter_rank * num_core) + (intra_rank + mask);
203         if (dst < comm_size) {
204           send_offset = (phase - 3) * pcount * extent;
205           MPI_Send((char *)recv_buf + send_offset, pcount, dtype, dst, tag, comm);
206         }
207         mask >>= 1;
208       }
209     }
210   }                             // for phase
211
212   free(tmp_buf);
213   return MPI_SUCCESS;
214 }