Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bd9906b1f2e44200f1b42dd934b3ded719348fcb
[simgrid.git] / src / smpi / colls / reduce / reduce-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 //#include <star-reduction.c>
9
10 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
11
12 #ifndef HEADER_SIZE
13 #define HEADER_SIZE 1024
14 #endif
15
16 #ifndef MAX_NODE
17 #define MAX_NODE 1024
18 #endif
19 namespace simgrid{
20 namespace smpi{
21 /* Non-topology-specific pipelined linear-reduce function */
22 int Coll_reduce_arrival_pattern_aware::reduce(const void *buf, void *rbuf,
23                                                  int count,
24                                                  MPI_Datatype datatype,
25                                                  MPI_Op op, int root,
26                                                  MPI_Comm comm)
27 {
28   int rank = comm->rank();
29   int tag = -COLL_TAG_REDUCE;
30   MPI_Status status;
31   MPI_Request request;
32
33   MPI_Status temp_status_array[MAX_NODE];
34
35   int size = comm->size();
36   int i;
37
38   int sent_count;
39   int header_index;
40   int flag_array[MAX_NODE];
41   int already_received[MAX_NODE];
42
43   int header_buf[HEADER_SIZE];
44   char temp_buf[MAX_NODE];
45
46   MPI_Aint extent, lb;
47   datatype->extent(&lb, &extent);
48
49   /* source and destination */
50   int to, from;
51
52   /* segment is segment size in number of elements (not bytes) */
53   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
54
55   /* pipeline length */
56   int pipe_length = count / segment;
57
58   /* use for buffer offset for sending and receiving data = segment size in byte */
59   int increment = segment * extent;
60
61   /* if the input size is not divisible by segment size =>
62      the small remainder will be done with native implementation */
63   int remainder = count % segment;
64
65
66   /* value == 0 means root has not send data (or header) to the node yet */
67   for (i = 0; i < MAX_NODE; i++) {
68     already_received[i] = 0;
69   }
70
71   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
72
73   Request::sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
74                tag, comm, &status);
75
76
77
78   /* when a message is smaller than a block size => no pipeline */
79   if (count <= segment) {
80
81     if (rank == 0) {
82       sent_count = 0;
83
84       while (sent_count < (size - 1)) {
85
86         for (i = 1; i < size; i++) {
87           if (already_received[i] == 0) {
88             Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
89                              MPI_STATUSES_IGNORE);
90             simcall_process_sleep(0.0001);
91             }
92         }
93
94         header_index = 0;
95         /* recv 1-byte message */
96         for (i = 0; i < size; i++) {
97           if (i == rank)
98             continue;
99
100           /* 1-byte message arrive */
101           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
102             Request::recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
103             header_buf[header_index] = i;
104             header_index++;
105             sent_count++;
106
107
108             //printf("root send to %d recv from %d : data = ",to,from);
109             /*
110                for (i=0;i<=header_index;i++) {
111                printf("%d ",header_buf[i]);
112                }
113                printf("\n");
114              */
115             /* will receive in the next step */
116             already_received[i] = 1;
117           }
118         }
119
120         /* send header followed by receive and reduce data */
121         if (header_index != 0) {
122           header_buf[header_index] = -1;
123           to = header_buf[0];
124           from = header_buf[header_index - 1];
125
126           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
127           Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
128           if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
129         }
130       }                         /* while loop */
131     }
132
133     /* root */
134     /* non-root */
135     else {
136
137       /* send 1-byte message to root */
138       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
139
140       /* wait for header and data, forward when required */
141       Request::recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
142                &status);
143       //      Request::recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
144
145       /* search for where it is */
146       int myordering = 0;
147       while (rank != header_buf[myordering]) {
148         myordering++;
149       }
150
151       /* forward header */
152       if (header_buf[myordering + 1] != -1) {
153           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
154                  tag, comm);
155       }
156       //printf("node %d ordering %d\n",rank,myordering);
157
158       /* receive, reduce, and forward data */
159
160       /* send only */
161       if (myordering == 0) {
162         if (header_buf[myordering + 1] == -1) {
163           to = 0;
164         } else {
165           to = header_buf[myordering + 1];
166         }
167         Request::send(rbuf, count, datatype, to, tag, comm);
168       }
169
170       /* recv, reduce, send */
171       else {
172         if (header_buf[myordering + 1] == -1) {
173           to = 0;
174         } else {
175           to = header_buf[myordering + 1];
176         }
177         from = header_buf[myordering - 1];
178         Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
179         if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
180         Request::send(rbuf, count, datatype, to, tag, comm);
181       }
182     }                           /* non-root */
183   }
184   /* pipeline bcast */
185   else {
186     //    printf("node %d start\n",rank);
187
188     MPI_Request* send_request_array = new MPI_Request[size + pipe_length];
189     MPI_Request* recv_request_array = new MPI_Request[size + pipe_length];
190     MPI_Status* send_status_array   = new MPI_Status[size + pipe_length];
191     MPI_Status* recv_status_array   = new MPI_Status[size + pipe_length];
192
193     if (rank == 0) {
194       sent_count = 0;
195
196       int will_send[MAX_NODE];
197       for (i = 0; i < MAX_NODE; i++)
198         will_send[i] = 0;
199
200       /* loop until all data are received (sent) */
201       while (sent_count < (size - 1)) {
202         int k;
203         for (k = 0; k < 1; k++) {
204           for (i = 1; i < size; i++) {
205             //if (i == rank)
206             //continue;
207             if ((already_received[i] == 0) && (will_send[i] == 0)) {
208                 Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
209                          &temp_status_array[i]);
210               if (flag_array[i] == 1) {
211                 will_send[i] = 1;
212                 Request::recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
213                          &status);
214                 //printf("recv from %d\n",i);
215                 i = 1;
216               }
217             }
218           }
219         }                       /* end of probing */
220
221         header_index = 0;
222
223         /* recv 1-byte message */
224         for (i = 1; i < size; i++) {
225           //if (i==rank)
226           //continue;
227           /* message arrived in this round (put in the header) */
228           if ((will_send[i] == 1) && (already_received[i] == 0)) {
229             header_buf[header_index] = i;
230             header_index++;
231             sent_count++;
232
233             /* will send in the next step */
234             already_received[i] = 1;
235           }
236         }
237
238         /* send header followed by data */
239         if (header_index != 0) {
240           header_buf[header_index] = -1;
241           to = header_buf[0];
242
243           /* send header */
244           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
245
246           /* recv data - pipeline */
247           from = header_buf[header_index - 1];
248           for (i = 0; i < pipe_length; i++) {
249             Request::recv(tmp_buf + (i * increment), segment, datatype, from, tag,
250                      comm, &status);
251             if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment),
252                            (char *)rbuf + (i * increment), &segment, datatype);
253           }
254         }
255       }                         /* while loop (sent_count < size-1 ) */
256     }
257
258     /* root */
259     /* none root */
260     else {
261       /* send 1-byte message to root */
262       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
263
264
265       /* wait for header forward when required */
266       request=Request::irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
267       Request::wait(&request, MPI_STATUS_IGNORE);
268
269       /* search for where it is */
270       int myordering = 0;
271
272       while (rank != header_buf[myordering]) {
273         myordering++;
274       }
275
276       /* send header when required */
277       if (header_buf[myordering + 1] != -1) {
278           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
279                  tag, comm);
280       }
281
282       /* (receive, reduce), and send data */
283       if (header_buf[myordering + 1] == -1) {
284         to = 0;
285       } else {
286         to = header_buf[myordering + 1];
287       }
288
289       /* send only */
290       if (myordering == 0) {
291         for (i = 0; i < pipe_length; i++) {
292             send_request_array[i]= Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
293         }
294         Request::waitall((pipe_length), send_request_array, send_status_array);
295       }
296
297       /* receive, reduce, and send */
298       else {
299         from = header_buf[myordering - 1];
300         for (i = 0; i < pipe_length; i++) {
301           recv_request_array[i]=Request::irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
302         }
303         for (i = 0; i < pipe_length; i++) {
304           Request::wait(&recv_request_array[i], MPI_STATUS_IGNORE);
305           if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment), (char *)rbuf + (i * increment),
306                          &segment, datatype);
307           send_request_array[i]=Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
308         }
309         Request::waitall((pipe_length), send_request_array, send_status_array);
310       }
311     }                           /* non-root */
312
313     delete[] send_request_array;
314     delete[] recv_request_array;
315     delete[] send_status_array;
316     delete[] recv_status_array;
317
318     //printf("node %d done\n",rank);
319   }                             /* end pipeline */
320
321
322   /* if root is not zero send root after finished
323      this can be modified to make it faster by using logical src, dst.
324    */
325   if (root != 0) {
326     if (rank == 0) {
327       Request::send(rbuf, count, datatype, root, tag, comm);
328     } else if (rank == root) {
329       Request::recv(rbuf, count, datatype, 0, tag, comm, &status);
330     }
331   }
332
333
334   /* when count is not divisible by block size, use default BCAST for the remainder */
335   if ((remainder != 0) && (count > segment)) {
336     Coll_reduce_default::reduce((char*)buf + (pipe_length * increment), (char*)rbuf + (pipe_length * increment),
337                                 remainder, datatype, op, root, comm);
338   }
339
340   smpi_free_tmp_buffer(tmp_buf);
341
342   return MPI_SUCCESS;
343 }
344 }
345 }