Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8f0b20260ec645d4d6f417f06b89ce27ff6cea09
[simgrid.git] / src / smpi / colls / reduce / reduce-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 //#include <star-reduction.c>
9
10 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
11
12 #ifndef HEADER_SIZE
13 #define HEADER_SIZE 1024
14 #endif
15
16 #ifndef MAX_NODE
17 #define MAX_NODE 1024
18 #endif
19 namespace simgrid{
20 namespace smpi{
21 /* Non-topology-specific pipelined linear-reduce function */
22 int Coll_reduce_arrival_pattern_aware::reduce(const void *buf, void *rbuf,
23                                                  int count,
24                                                  MPI_Datatype datatype,
25                                                  MPI_Op op, int root,
26                                                  MPI_Comm comm)
27 {
28   int rank = comm->rank();
29   int tag = -COLL_TAG_REDUCE;
30   MPI_Status status;
31   MPI_Request request;
32
33   MPI_Status temp_status_array[MAX_NODE];
34
35   int size = comm->size();
36   int i;
37
38   int sent_count;
39   int header_index;
40   int flag_array[MAX_NODE];
41   int already_received[MAX_NODE];
42
43   int header_buf[HEADER_SIZE];
44   char temp_buf[MAX_NODE];
45
46   MPI_Aint extent, lb;
47   datatype->extent(&lb, &extent);
48
49   /* source and destination */
50   int to, from;
51
52   /* segment is segment size in number of elements (not bytes) */
53   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
54
55   /* pipeline length */
56   int pipe_length = count / segment;
57
58   /* use for buffer offset for sending and receiving data = segment size in byte */
59   int increment = segment * extent;
60
61   /* if the input size is not divisible by segment size =>
62      the small remainder will be done with native implementation */
63   int remainder = count % segment;
64
65
66   /* value == 0 means root has not send data (or header) to the node yet */
67   for (i = 0; i < MAX_NODE; i++) {
68     already_received[i] = 0;
69   }
70
71   char *tmp_buf;
72   tmp_buf = (char *) smpi_get_tmp_sendbuffer(count * extent);
73
74   Request::sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
75                tag, comm, &status);
76
77
78
79   /* when a message is smaller than a block size => no pipeline */
80   if (count <= segment) {
81
82     if (rank == 0) {
83       sent_count = 0;
84
85       while (sent_count < (size - 1)) {
86
87         for (i = 1; i < size; i++) {
88           if (already_received[i] == 0) {
89             Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
90                              MPI_STATUSES_IGNORE);
91             simcall_process_sleep(0.0001);
92             }
93         }
94
95         header_index = 0;
96         /* recv 1-byte message */
97         for (i = 0; i < size; i++) {
98           if (i == rank)
99             continue;
100
101           /* 1-byte message arrive */
102           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
103             Request::recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
104             header_buf[header_index] = i;
105             header_index++;
106             sent_count++;
107
108
109             //printf("root send to %d recv from %d : data = ",to,from);
110             /*
111                for (i=0;i<=header_index;i++) {
112                printf("%d ",header_buf[i]);
113                }
114                printf("\n");
115              */
116             /* will receive in the next step */
117             already_received[i] = 1;
118           }
119         }
120
121         /* send header followed by receive and reduce data */
122         if (header_index != 0) {
123           header_buf[header_index] = -1;
124           to = header_buf[0];
125           from = header_buf[header_index - 1];
126
127           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
128           Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
129           if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
130         }
131       }                         /* while loop */
132     }
133
134     /* root */
135     /* non-root */
136     else {
137
138       /* send 1-byte message to root */
139       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
140
141       /* wait for header and data, forward when required */
142       Request::recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
143                &status);
144       //      Request::recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
145
146       /* search for where it is */
147       int myordering = 0;
148       while (rank != header_buf[myordering]) {
149         myordering++;
150       }
151
152       /* forward header */
153       if (header_buf[myordering + 1] != -1) {
154           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
155                  tag, comm);
156       }
157       //printf("node %d ordering %d\n",rank,myordering);
158
159       /* receive, reduce, and forward data */
160
161       /* send only */
162       if (myordering == 0) {
163         if (header_buf[myordering + 1] == -1) {
164           to = 0;
165         } else {
166           to = header_buf[myordering + 1];
167         }
168         Request::send(rbuf, count, datatype, to, tag, comm);
169       }
170
171       /* recv, reduce, send */
172       else {
173         if (header_buf[myordering + 1] == -1) {
174           to = 0;
175         } else {
176           to = header_buf[myordering + 1];
177         }
178         from = header_buf[myordering - 1];
179         Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
180         if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
181         Request::send(rbuf, count, datatype, to, tag, comm);
182       }
183     }                           /* non-root */
184   }
185   /* pipeline bcast */
186   else {
187     //    printf("node %d start\n",rank);
188
189     MPI_Request* send_request_array = new MPI_Request[size + pipe_length];
190     MPI_Request* recv_request_array = new MPI_Request[size + pipe_length];
191     MPI_Status* send_status_array   = new MPI_Status[size + pipe_length];
192     MPI_Status* recv_status_array   = new MPI_Status[size + pipe_length];
193
194     if (rank == 0) {
195       sent_count = 0;
196
197       int will_send[MAX_NODE];
198       for (i = 0; i < MAX_NODE; i++)
199         will_send[i] = 0;
200
201       /* loop until all data are received (sent) */
202       while (sent_count < (size - 1)) {
203         int k;
204         for (k = 0; k < 1; k++) {
205           for (i = 1; i < size; i++) {
206             //if (i == rank)
207             //continue;
208             if ((already_received[i] == 0) && (will_send[i] == 0)) {
209                 Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
210                          &temp_status_array[i]);
211               if (flag_array[i] == 1) {
212                 will_send[i] = 1;
213                 Request::recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
214                          &status);
215                 //printf("recv from %d\n",i);
216                 i = 1;
217               }
218             }
219           }
220         }                       /* end of probing */
221
222         header_index = 0;
223
224         /* recv 1-byte message */
225         for (i = 1; i < size; i++) {
226           //if (i==rank)
227           //continue;
228           /* message arrived in this round (put in the header) */
229           if ((will_send[i] == 1) && (already_received[i] == 0)) {
230             header_buf[header_index] = i;
231             header_index++;
232             sent_count++;
233
234             /* will send in the next step */
235             already_received[i] = 1;
236           }
237         }
238
239         /* send header followed by data */
240         if (header_index != 0) {
241           header_buf[header_index] = -1;
242           to = header_buf[0];
243
244           /* send header */
245           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
246
247           /* recv data - pipeline */
248           from = header_buf[header_index - 1];
249           for (i = 0; i < pipe_length; i++) {
250             Request::recv(tmp_buf + (i * increment), segment, datatype, from, tag,
251                      comm, &status);
252             if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment),
253                            (char *)rbuf + (i * increment), &segment, datatype);
254           }
255         }
256       }                         /* while loop (sent_count < size-1 ) */
257     }
258
259     /* root */
260     /* none root */
261     else {
262       /* send 1-byte message to root */
263       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
264
265
266       /* wait for header forward when required */
267       request=Request::irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
268       Request::wait(&request, MPI_STATUS_IGNORE);
269
270       /* search for where it is */
271       int myordering = 0;
272
273       while (rank != header_buf[myordering]) {
274         myordering++;
275       }
276
277       /* send header when required */
278       if (header_buf[myordering + 1] != -1) {
279           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
280                  tag, comm);
281       }
282
283       /* (receive, reduce), and send data */
284       if (header_buf[myordering + 1] == -1) {
285         to = 0;
286       } else {
287         to = header_buf[myordering + 1];
288       }
289
290       /* send only */
291       if (myordering == 0) {
292         for (i = 0; i < pipe_length; i++) {
293             send_request_array[i]= Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
294         }
295         Request::waitall((pipe_length), send_request_array, send_status_array);
296       }
297
298       /* receive, reduce, and send */
299       else {
300         from = header_buf[myordering - 1];
301         for (i = 0; i < pipe_length; i++) {
302           recv_request_array[i]=Request::irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
303         }
304         for (i = 0; i < pipe_length; i++) {
305           Request::wait(&recv_request_array[i], MPI_STATUS_IGNORE);
306           if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment), (char *)rbuf + (i * increment),
307                          &segment, datatype);
308           send_request_array[i]=Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
309         }
310         Request::waitall((pipe_length), send_request_array, send_status_array);
311       }
312     }                           /* non-root */
313
314     delete[] send_request_array;
315     delete[] recv_request_array;
316     delete[] send_status_array;
317     delete[] recv_status_array;
318
319     //printf("node %d done\n",rank);
320   }                             /* end pipeline */
321
322
323   /* if root is not zero send root after finished
324      this can be modified to make it faster by using logical src, dst.
325    */
326   if (root != 0) {
327     if (rank == 0) {
328       Request::send(rbuf, count, datatype, root, tag, comm);
329     } else if (rank == root) {
330       Request::recv(rbuf, count, datatype, 0, tag, comm, &status);
331     }
332   }
333
334
335   /* when count is not divisible by block size, use default BCAST for the remainder */
336   if ((remainder != 0) && (count > segment)) {
337     Coll_reduce_default::reduce((char*)buf + (pipe_length * increment), (char*)rbuf + (pipe_length * increment),
338                                 remainder, datatype, op, root, comm);
339   }
340
341   smpi_free_tmp_buffer(tmp_buf);
342
343   return MPI_SUCCESS;
344 }
345 }
346 }