Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines.
[simgrid.git] / src / smpi / colls / reduce / reduce-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2021. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 //#include <star-reduction.c>
9
10 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
11
12 #ifndef HEADER_SIZE
13 #define HEADER_SIZE 1024
14 #endif
15
16 #ifndef MAX_NODE
17 #define MAX_NODE 1024
18 #endif
19 namespace simgrid{
20 namespace smpi{
21 /* Non-topology-specific pipelined linear-reduce function */
22 int reduce__arrival_pattern_aware(const void *buf, void *rbuf,
23                                   int count,
24                                   MPI_Datatype datatype,
25                                   MPI_Op op, int root,
26                                   MPI_Comm comm)
27 {
28   int rank = comm->rank();
29   int tag = -COLL_TAG_REDUCE;
30   MPI_Status status;
31   MPI_Request request;
32
33   MPI_Status temp_status_array[MAX_NODE];
34
35   int size = comm->size();
36   int i;
37
38   int sent_count;
39   int header_index;
40   int flag_array[MAX_NODE];
41   int already_received[MAX_NODE];
42
43   int header_buf[HEADER_SIZE];
44   char temp_buf[MAX_NODE];
45
46   MPI_Aint extent, lb;
47   datatype->extent(&lb, &extent);
48
49   /* source and destination */
50   int to, from;
51
52   /* segment is segment size in number of elements (not bytes) */
53   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
54
55   /* pipeline length */
56   int pipe_length = count / segment;
57
58   /* use for buffer offset for sending and receiving data = segment size in byte */
59   int increment = segment * extent;
60
61   /* if the input size is not divisible by segment size =>
62      the small remainder will be done with native implementation */
63   int remainder = count % segment;
64
65
66   /* value == 0 means root has not send data (or header) to the node yet */
67   for (i = 0; i < MAX_NODE; i++) {
68     already_received[i] = 0;
69   }
70
71   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
72
73   Request::sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
74                tag, comm, &status);
75
76
77
78   /* when a message is smaller than a block size => no pipeline */
79   if (count <= segment) {
80
81     if (rank == 0) {
82       sent_count = 0;
83
84       while (sent_count < (size - 1)) {
85
86         for (i = 1; i < size; i++) {
87           if (already_received[i] == 0) {
88             Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], MPI_STATUSES_IGNORE);
89             simgrid::s4u::this_actor::sleep_for(0.0001);
90           }
91             }
92
93         header_index = 0;
94         /* recv 1-byte message */
95         for (i = 0; i < size; i++) {
96           if (i == rank)
97             continue;
98
99           /* 1-byte message arrive */
100           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
101             Request::recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
102             header_buf[header_index] = i;
103             header_index++;
104             sent_count++;
105
106
107             //printf("root send to %d recv from %d : data = ",to,from);
108             /*
109                for (i=0;i<=header_index;i++) {
110                printf("%d ",header_buf[i]);
111                }
112                printf("\n");
113              */
114             /* will receive in the next step */
115             already_received[i] = 1;
116           }
117         }
118
119         /* send header followed by receive and reduce data */
120         if (header_index != 0) {
121           header_buf[header_index] = -1;
122           to = header_buf[0];
123           from = header_buf[header_index - 1];
124
125           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
126           Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
127           if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
128         }
129       }                         /* while loop */
130     }
131
132     /* root */
133     /* non-root */
134     else {
135
136       /* send 1-byte message to root */
137       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
138
139       /* wait for header and data, forward when required */
140       Request::recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
141                &status);
142       //      Request::recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
143
144       /* search for where it is */
145       int myordering = 0;
146       while (rank != header_buf[myordering]) {
147         myordering++;
148       }
149
150       /* forward header */
151       if (header_buf[myordering + 1] != -1) {
152           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
153                  tag, comm);
154       }
155       //printf("node %d ordering %d\n",rank,myordering);
156
157       /* receive, reduce, and forward data */
158
159       /* send only */
160       if (myordering == 0) {
161         if (header_buf[myordering + 1] == -1) {
162           to = 0;
163         } else {
164           to = header_buf[myordering + 1];
165         }
166         Request::send(rbuf, count, datatype, to, tag, comm);
167       }
168
169       /* recv, reduce, send */
170       else {
171         if (header_buf[myordering + 1] == -1) {
172           to = 0;
173         } else {
174           to = header_buf[myordering + 1];
175         }
176         from = header_buf[myordering - 1];
177         Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
178         if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
179         Request::send(rbuf, count, datatype, to, tag, comm);
180       }
181     }                           /* non-root */
182   }
183   /* pipeline bcast */
184   else {
185     //    printf("node %d start\n",rank);
186
187     auto* send_request_array = new MPI_Request[size + pipe_length];
188     auto* recv_request_array = new MPI_Request[size + pipe_length];
189     auto* send_status_array  = new MPI_Status[size + pipe_length];
190     auto* recv_status_array  = new MPI_Status[size + pipe_length];
191
192     if (rank == 0) {
193       sent_count = 0;
194
195       int will_send[MAX_NODE];
196       for (i = 0; i < MAX_NODE; i++)
197         will_send[i] = 0;
198
199       /* loop until all data are received (sent) */
200       while (sent_count < (size - 1)) {
201         int k;
202         for (k = 0; k < 1; k++) {
203           for (i = 1; i < size; i++) {
204             //if (i == rank)
205             //continue;
206             if ((already_received[i] == 0) && (will_send[i] == 0)) {
207                 Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
208                          &temp_status_array[i]);
209               if (flag_array[i] == 1) {
210                 will_send[i] = 1;
211                 Request::recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
212                          &status);
213                 //printf("recv from %d\n",i);
214                 i = 1;
215               }
216             }
217           }
218         }                       /* end of probing */
219
220         header_index = 0;
221
222         /* recv 1-byte message */
223         for (i = 1; i < size; i++) {
224           //if (i==rank)
225           //continue;
226           /* message arrived in this round (put in the header) */
227           if ((will_send[i] == 1) && (already_received[i] == 0)) {
228             header_buf[header_index] = i;
229             header_index++;
230             sent_count++;
231
232             /* will send in the next step */
233             already_received[i] = 1;
234           }
235         }
236
237         /* send header followed by data */
238         if (header_index != 0) {
239           header_buf[header_index] = -1;
240           to = header_buf[0];
241
242           /* send header */
243           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
244
245           /* recv data - pipeline */
246           from = header_buf[header_index - 1];
247           for (i = 0; i < pipe_length; i++) {
248             Request::recv(tmp_buf + (i * increment), segment, datatype, from, tag,
249                      comm, &status);
250             if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment),
251                            (char *)rbuf + (i * increment), &segment, datatype);
252           }
253         }
254       }                         /* while loop (sent_count < size-1 ) */
255     }
256
257     /* root */
258     /* none root */
259     else {
260       /* send 1-byte message to root */
261       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
262
263
264       /* wait for header forward when required */
265       request=Request::irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
266       Request::wait(&request, MPI_STATUS_IGNORE);
267
268       /* search for where it is */
269       int myordering = 0;
270
271       while (rank != header_buf[myordering]) {
272         myordering++;
273       }
274
275       /* send header when required */
276       if (header_buf[myordering + 1] != -1) {
277           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
278                  tag, comm);
279       }
280
281       /* (receive, reduce), and send data */
282       if (header_buf[myordering + 1] == -1) {
283         to = 0;
284       } else {
285         to = header_buf[myordering + 1];
286       }
287
288       /* send only */
289       if (myordering == 0) {
290         for (i = 0; i < pipe_length; i++) {
291             send_request_array[i]= Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
292         }
293         Request::waitall((pipe_length), send_request_array, send_status_array);
294       }
295
296       /* receive, reduce, and send */
297       else {
298         from = header_buf[myordering - 1];
299         for (i = 0; i < pipe_length; i++) {
300           recv_request_array[i]=Request::irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
301         }
302         for (i = 0; i < pipe_length; i++) {
303           Request::wait(&recv_request_array[i], MPI_STATUS_IGNORE);
304           if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment), (char *)rbuf + (i * increment),
305                          &segment, datatype);
306           send_request_array[i]=Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
307         }
308         Request::waitall((pipe_length), send_request_array, send_status_array);
309       }
310     }                           /* non-root */
311
312     delete[] send_request_array;
313     delete[] recv_request_array;
314     delete[] send_status_array;
315     delete[] recv_status_array;
316
317     //printf("node %d done\n",rank);
318   }                             /* end pipeline */
319
320
321   /* if root is not zero send root after finished
322      this can be modified to make it faster by using logical src, dst.
323    */
324   if (root != 0) {
325     if (rank == 0) {
326       Request::send(rbuf, count, datatype, root, tag, comm);
327     } else if (rank == root) {
328       Request::recv(rbuf, count, datatype, 0, tag, comm, &status);
329     }
330   }
331
332
333   /* when count is not divisible by block size, use default BCAST for the remainder */
334   if ((remainder != 0) && (count > segment)) {
335     reduce__default((char*)buf + (pipe_length * increment), (char*)rbuf + (pipe_length * increment),
336                     remainder, datatype, op, root, comm);
337   }
338
339   smpi_free_tmp_buffer(tmp_buf);
340
341   return MPI_SUCCESS;
342 }
343 }
344 }