Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
git checkout hypervisor
[simgrid.git] / src / smpi / colls / reduce-arrival-pattern-aware.c
1 #include "colls.h"
2 //#include <star-reduction.c>
3
4 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
5
6 #ifndef HEADER_SIZE
7 #define HEADER_SIZE 1024
8 #endif
9
10 #ifndef MAX_NODE
11 #define MAX_NODE 1024
12 #endif
13
14 /* Non-topology-specific pipelined linear-reduce function */
15 int smpi_coll_tuned_reduce_arrival_pattern_aware(void *buf, void *rbuf,
16                                                  int count,
17                                                  MPI_Datatype datatype,
18                                                  MPI_Op op, int root,
19                                                  MPI_Comm comm)
20 {
21   int rank;
22   MPI_Comm_rank(comm, &rank);
23
24   int tag = 50;
25   MPI_Status status;
26   MPI_Request request;
27   MPI_Request *send_request_array;
28   MPI_Request *recv_request_array;
29   MPI_Status *send_status_array;
30   MPI_Status *recv_status_array;
31
32   MPI_Status temp_status_array[MAX_NODE];
33
34   int size;
35   int i;
36
37   int sent_count;
38   int header_index;
39   int flag_array[MAX_NODE];
40   int already_received[MAX_NODE];
41
42   int header_buf[HEADER_SIZE];
43   char temp_buf[MAX_NODE];
44
45   MPI_Aint extent;
46   MPI_Type_extent(datatype, &extent);
47
48   /* source and destination */
49   int to, from;
50
51   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
52   MPI_Comm_size(MPI_COMM_WORLD, &size);
53
54
55   /* segment is segment size in number of elements (not bytes) */
56   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
57
58   /* pipeline length */
59   int pipe_length = count / segment;
60
61   /* use for buffer offset for sending and receiving data = segment size in byte */
62   int increment = segment * extent;
63
64   /* if the input size is not divisible by segment size => 
65      the small remainder will be done with native implementation */
66   int remainder = count % segment;
67
68
69   /* value == 0 means root has not send data (or header) to the node yet */
70   for (i = 0; i < MAX_NODE; i++) {
71     already_received[i] = 0;
72   }
73
74   char *tmp_buf;
75   tmp_buf = (char *) malloc(count * extent);
76
77   MPI_Sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
78                tag, comm, &status);
79
80
81
82   /* when a message is smaller than a block size => no pipeline */
83   if (count <= segment) {
84
85     if (rank == 0) {
86       sent_count = 0;
87
88       while (sent_count < (size - 1)) {
89
90         for (i = 1; i < size; i++) {
91           if (already_received[i] == 0)
92             MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
93                        MPI_STATUSES_IGNORE);
94         }
95
96         header_index = 0;
97         /* recv 1-byte message */
98         for (i = 0; i < size; i++) {
99           if (i == rank)
100             continue;
101
102           /* 1-byte message arrive */
103           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
104             MPI_Recv(temp_buf, 1, MPI_CHAR, i, tag, MPI_COMM_WORLD, &status);
105             header_buf[header_index] = i;
106             header_index++;
107             sent_count++;
108
109
110             //printf("root send to %d recv from %d : data = ",to,from);
111             /*
112                for (i=0;i<=header_index;i++) {
113                printf("%d ",header_buf[i]);
114                }
115                printf("\n");
116              */
117             /* will receive in the next step */
118             already_received[i] = 1;
119           }
120         }
121
122         /* send header followed by receive and reduce data */
123         if (header_index != 0) {
124           header_buf[header_index] = -1;
125           to = header_buf[0];
126           from = header_buf[header_index - 1];
127
128           MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
129           MPI_Recv(tmp_buf, count, datatype, from, tag, comm, &status);
130           star_reduction(op, tmp_buf, rbuf, &count, &datatype);
131         }
132       }                         /* while loop */
133     }
134
135     /* root */
136     /* non-root */
137     else {
138
139       /* send 1-byte message to root */
140       MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
141
142       /* wait for header and data, forward when required */
143       MPI_Recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
144                &status);
145       //      MPI_Recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
146
147       /* search for where it is */
148       int myordering = 0;
149       while (rank != header_buf[myordering]) {
150         myordering++;
151       }
152
153       /* forward header */
154       if (header_buf[myordering + 1] != -1) {
155         MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
156                  tag, comm);
157       }
158       //printf("node %d ordering %d\n",rank,myordering);
159
160       /* receive, reduce, and forward data */
161
162       /* send only */
163       if (myordering == 0) {
164         if (header_buf[myordering + 1] == -1) {
165           to = 0;
166         } else {
167           to = header_buf[myordering + 1];
168         }
169         MPI_Send(rbuf, count, datatype, to, tag, comm);
170       }
171
172       /* recv, reduce, send */
173       else {
174         if (header_buf[myordering + 1] == -1) {
175           to = 0;
176         } else {
177           to = header_buf[myordering + 1];
178         }
179         from = header_buf[myordering - 1];
180         MPI_Recv(tmp_buf, count, datatype, header_buf[myordering - 1], tag,
181                  comm, &status);
182         star_reduction(op, tmp_buf, rbuf, &count, &datatype);
183         MPI_Send(rbuf, count, datatype, to, tag, comm);
184       }
185     }                           /* non-root */
186   }
187   /* pipeline bcast */
188   else {
189     //    printf("node %d start\n",rank);
190
191     send_request_array =
192         (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
193     recv_request_array =
194         (MPI_Request *) malloc((size + pipe_length) * sizeof(MPI_Request));
195     send_status_array =
196         (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
197     recv_status_array =
198         (MPI_Status *) malloc((size + pipe_length) * sizeof(MPI_Status));
199
200     if (rank == 0) {
201       sent_count = 0;
202
203       int will_send[MAX_NODE];
204       for (i = 0; i < MAX_NODE; i++)
205         will_send[i] = 0;
206
207       /* loop until all data are received (sent) */
208       while (sent_count < (size - 1)) {
209         int k;
210         for (k = 0; k < 1; k++) {
211           for (i = 1; i < size; i++) {
212             //if (i == rank)
213             //continue;
214             if ((already_received[i] == 0) && (will_send[i] == 0)) {
215               MPI_Iprobe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &flag_array[i],
216                          &temp_status_array[i]);
217               if (flag_array[i] == 1) {
218                 will_send[i] = 1;
219                 MPI_Recv(&temp_buf[i], 1, MPI_CHAR, i, tag, MPI_COMM_WORLD,
220                          &status);
221                 //printf("recv from %d\n",i);
222                 i = 1;
223               }
224             }
225           }
226         }                       /* end of probing */
227
228         header_index = 0;
229
230         /* recv 1-byte message */
231         for (i = 1; i < size; i++) {
232           //if (i==rank)
233           //continue;
234           /* message arrived in this round (put in the header) */
235           if ((will_send[i] == 1) && (already_received[i] == 0)) {
236             header_buf[header_index] = i;
237             header_index++;
238             sent_count++;
239
240             /* will send in the next step */
241             already_received[i] = 1;
242           }
243         }
244
245         /* send header followed by data */
246         if (header_index != 0) {
247           header_buf[header_index] = -1;
248           to = header_buf[0];
249
250           /* send header */
251           MPI_Send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
252
253           /* recv data - pipeline */
254           from = header_buf[header_index - 1];
255           for (i = 0; i < pipe_length; i++) {
256             MPI_Recv(tmp_buf + (i * increment), segment, datatype, from, tag,
257                      comm, &status);
258             star_reduction(op, tmp_buf + (i * increment),
259                            (char *)rbuf + (i * increment), &segment, &datatype);
260           }
261         }
262       }                         /* while loop (sent_count < size-1 ) */
263     }
264
265     /* root */
266     /* none root */
267     else {
268       /* send 1-byte message to root */
269       MPI_Send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
270
271
272       /* wait for header forward when required */
273       MPI_Irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
274                 &request);
275       MPI_Wait(&request, MPI_STATUS_IGNORE);
276
277       /* search for where it is */
278       int myordering = 0;
279
280       while (rank != header_buf[myordering]) {
281         myordering++;
282       }
283
284       /* send header when required */
285       if (header_buf[myordering + 1] != -1) {
286         MPI_Send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
287                  tag, comm);
288       }
289
290       /* (receive, reduce), and send data */
291       if (header_buf[myordering + 1] == -1) {
292         to = 0;
293       } else {
294         to = header_buf[myordering + 1];
295       }
296
297       /* send only */
298       if (myordering == 0) {
299         for (i = 0; i < pipe_length; i++) {
300           MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm,
301                     &send_request_array[i]);
302         }
303         MPI_Waitall((pipe_length), send_request_array, send_status_array);
304       }
305
306       /* receive, reduce, and send */
307       else {
308         from = header_buf[myordering - 1];
309         for (i = 0; i < pipe_length; i++) {
310           MPI_Irecv(tmp_buf + (i * increment), segment, datatype, from, tag,
311                     comm, &recv_request_array[i]);
312         }
313         for (i = 0; i < pipe_length; i++) {
314           MPI_Wait(&recv_request_array[i], MPI_STATUS_IGNORE);
315           star_reduction(op, tmp_buf + (i * increment), (char *)rbuf + (i * increment),
316                          &segment, &datatype);
317           MPI_Isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm,
318                     &send_request_array[i]);
319         }
320         MPI_Waitall((pipe_length), send_request_array, send_status_array);
321       }
322     }                           /* non-root */
323
324
325
326
327     free(send_request_array);
328     free(recv_request_array);
329     free(send_status_array);
330     free(recv_status_array);
331
332     //printf("node %d done\n",rank);
333   }                             /* end pipeline */
334
335
336   /* if root is not zero send root after finished
337      this can be modified to make it faster by using logical src, dst.
338    */
339   if (root != 0) {
340     if (rank == 0) {
341       MPI_Send(rbuf, count, datatype, root, tag, comm);
342     } else if (rank == root) {
343       MPI_Recv(rbuf, count, datatype, 0, tag, comm, &status);
344     }
345   }
346
347
348   /* when count is not divisible by block size, use default BCAST for the remainder */
349   if ((remainder != 0) && (count > segment)) {
350     MPI_Reduce((char *)buf + (pipe_length * increment),
351                (char *)rbuf + (pipe_length * increment), remainder, datatype, op, root,
352                comm);
353   }
354
355   free(tmp_buf);
356
357   return MPI_SUCCESS;
358 }