Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / smpi / colls / reduce / reduce-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2023. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 //#include <star-reduction.c>
9
10 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
11
12 #ifndef HEADER_SIZE
13 #define HEADER_SIZE 1024
14 #endif
15
16 #ifndef MAX_NODE
17 #define MAX_NODE 1024
18 #endif
19 namespace simgrid::smpi {
20 /* Non-topology-specific pipelined linear-reduce function */
21 int reduce__arrival_pattern_aware(const void *buf, void *rbuf,
22                                   int count,
23                                   MPI_Datatype datatype,
24                                   MPI_Op op, int root,
25                                   MPI_Comm comm)
26 {
27   int rank = comm->rank();
28   int tag = -COLL_TAG_REDUCE;
29   MPI_Status status;
30   MPI_Request request;
31
32   MPI_Status temp_status_array[MAX_NODE];
33
34   int size = comm->size();
35   int i;
36
37   int sent_count;
38   int header_index;
39   int flag_array[MAX_NODE];
40   int already_received[MAX_NODE];
41
42   int header_buf[HEADER_SIZE];
43   char temp_buf[MAX_NODE];
44
45   MPI_Aint extent, lb;
46   datatype->extent(&lb, &extent);
47
48   /* source and destination */
49   int to, from;
50
51   /* segment is segment size in number of elements (not bytes) */
52   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
53
54   /* pipeline length */
55   int pipe_length = count / segment;
56
57   /* use for buffer offset for sending and receiving data = segment size in byte */
58   int increment = segment * extent;
59
60   /* if the input size is not divisible by segment size =>
61      the small remainder will be done with native implementation */
62   int remainder = count % segment;
63
64
65   /* value == 0 means root has not send data (or header) to the node yet */
66   for (i = 0; i < MAX_NODE; i++) {
67     already_received[i] = 0;
68   }
69
70   unsigned char* tmp_buf = smpi_get_tmp_sendbuffer(count * extent);
71
72   Request::sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
73                tag, comm, &status);
74
75
76
77   /* when a message is smaller than a block size => no pipeline */
78   if (count <= segment) {
79
80     if (rank == 0) {
81       sent_count = 0;
82
83       while (sent_count < (size - 1)) {
84
85         for (i = 1; i < size; i++) {
86           if (already_received[i] == 0) {
87             Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i], MPI_STATUSES_IGNORE);
88             simgrid::s4u::this_actor::sleep_for(0.0001);
89           }
90             }
91
92         header_index = 0;
93         /* recv 1-byte message */
94         for (i = 0; i < size; i++) {
95           if (i == rank)
96             continue;
97
98           /* 1-byte message arrive */
99           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
100             Request::recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
101             header_buf[header_index] = i;
102             header_index++;
103             sent_count++;
104
105
106             //printf("root send to %d recv from %d : data = ",to,from);
107             /*
108                for (i=0;i<=header_index;i++) {
109                printf("%d ",header_buf[i]);
110                }
111                printf("\n");
112              */
113             /* will receive in the next step */
114             already_received[i] = 1;
115           }
116         }
117
118         /* send header followed by receive and reduce data */
119         if (header_index != 0) {
120           header_buf[header_index] = -1;
121           to = header_buf[0];
122           from = header_buf[header_index - 1];
123
124           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
125           Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
126           if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
127         }
128       }                         /* while loop */
129     }
130
131     /* root */
132     /* non-root */
133     else {
134
135       /* send 1-byte message to root */
136       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
137
138       /* wait for header and data, forward when required */
139       Request::recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
140                &status);
141       //      Request::recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
142
143       /* search for where it is */
144       int myordering = 0;
145       while (rank != header_buf[myordering]) {
146         myordering++;
147       }
148
149       /* forward header */
150       if (header_buf[myordering + 1] != -1) {
151           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
152                  tag, comm);
153       }
154       //printf("node %d ordering %d\n",rank,myordering);
155
156       /* receive, reduce, and forward data */
157
158       /* send only */
159       if (myordering == 0) {
160         if (header_buf[myordering + 1] == -1) {
161           to = 0;
162         } else {
163           to = header_buf[myordering + 1];
164         }
165         Request::send(rbuf, count, datatype, to, tag, comm);
166       }
167
168       /* recv, reduce, send */
169       else {
170         if (header_buf[myordering + 1] == -1) {
171           to = 0;
172         } else {
173           to = header_buf[myordering + 1];
174         }
175         from = header_buf[myordering - 1];
176         Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
177         if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
178         Request::send(rbuf, count, datatype, to, tag, comm);
179       }
180     }                           /* non-root */
181   }
182   /* pipeline bcast */
183   else {
184     //    printf("node %d start\n",rank);
185
186     auto* send_request_array = new MPI_Request[size + pipe_length];
187     auto* recv_request_array = new MPI_Request[size + pipe_length];
188     auto* send_status_array  = new MPI_Status[size + pipe_length];
189     auto* recv_status_array  = new MPI_Status[size + pipe_length];
190
191     if (rank == 0) {
192       sent_count = 0;
193
194       int will_send[MAX_NODE];
195       for (i = 0; i < MAX_NODE; i++)
196         will_send[i] = 0;
197
198       /* loop until all data are received (sent) */
199       while (sent_count < (size - 1)) {
200         int k;
201         for (k = 0; k < 1; k++) {
202           for (i = 1; i < size; i++) {
203             //if (i == rank)
204             //continue;
205             if ((already_received[i] == 0) && (will_send[i] == 0)) {
206                 Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
207                          &temp_status_array[i]);
208               if (flag_array[i] == 1) {
209                 will_send[i] = 1;
210                 Request::recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
211                          &status);
212                 //printf("recv from %d\n",i);
213                 i = 1;
214               }
215             }
216           }
217         }                       /* end of probing */
218
219         header_index = 0;
220
221         /* recv 1-byte message */
222         for (i = 1; i < size; i++) {
223           //if (i==rank)
224           //continue;
225           /* message arrived in this round (put in the header) */
226           if ((will_send[i] == 1) && (already_received[i] == 0)) {
227             header_buf[header_index] = i;
228             header_index++;
229             sent_count++;
230
231             /* will send in the next step */
232             already_received[i] = 1;
233           }
234         }
235
236         /* send header followed by data */
237         if (header_index != 0) {
238           header_buf[header_index] = -1;
239           to = header_buf[0];
240
241           /* send header */
242           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
243
244           /* recv data - pipeline */
245           from = header_buf[header_index - 1];
246           for (i = 0; i < pipe_length; i++) {
247             Request::recv(tmp_buf + (i * increment), segment, datatype, from, tag,
248                      comm, &status);
249             if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment),
250                            (char *)rbuf + (i * increment), &segment, datatype);
251           }
252         }
253       }                         /* while loop (sent_count < size-1 ) */
254     }
255
256     /* root */
257     /* none root */
258     else {
259       /* send 1-byte message to root */
260       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
261
262
263       /* wait for header forward when required */
264       request=Request::irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
265       Request::wait(&request, MPI_STATUS_IGNORE);
266
267       /* search for where it is */
268       int myordering = 0;
269
270       while (rank != header_buf[myordering]) {
271         myordering++;
272       }
273
274       /* send header when required */
275       if (header_buf[myordering + 1] != -1) {
276           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
277                  tag, comm);
278       }
279
280       /* (receive, reduce), and send data */
281       if (header_buf[myordering + 1] == -1) {
282         to = 0;
283       } else {
284         to = header_buf[myordering + 1];
285       }
286
287       /* send only */
288       if (myordering == 0) {
289         for (i = 0; i < pipe_length; i++) {
290             send_request_array[i]= Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
291         }
292         Request::waitall((pipe_length), send_request_array, send_status_array);
293       }
294
295       /* receive, reduce, and send */
296       else {
297         from = header_buf[myordering - 1];
298         for (i = 0; i < pipe_length; i++) {
299           recv_request_array[i]=Request::irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
300         }
301         for (i = 0; i < pipe_length; i++) {
302           Request::wait(&recv_request_array[i], MPI_STATUS_IGNORE);
303           if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment), (char *)rbuf + (i * increment),
304                          &segment, datatype);
305           send_request_array[i]=Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
306         }
307         Request::waitall((pipe_length), send_request_array, send_status_array);
308       }
309     }                           /* non-root */
310
311     delete[] send_request_array;
312     delete[] recv_request_array;
313     delete[] send_status_array;
314     delete[] recv_status_array;
315
316     //printf("node %d done\n",rank);
317   }                             /* end pipeline */
318
319
320   /* if root is not zero send root after finished
321      this can be modified to make it faster by using logical src, dst.
322    */
323   if (root != 0) {
324     if (rank == 0) {
325       Request::send(rbuf, count, datatype, root, tag, comm);
326     } else if (rank == root) {
327       Request::recv(rbuf, count, datatype, 0, tag, comm, &status);
328     }
329   }
330
331
332   /* when count is not divisible by block size, use default BCAST for the remainder */
333   if ((remainder != 0) && (count > segment)) {
334     reduce__default((char*)buf + (pipe_length * increment), (char*)rbuf + (pipe_length * increment),
335                     remainder, datatype, op, root, comm);
336   }
337
338   smpi_free_tmp_buffer(tmp_buf);
339
340   return MPI_SUCCESS;
341 }
342 } // namespace simgrid::smpi