Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
sanitize the OOP of kernel::profile
[simgrid.git] / src / smpi / colls / reduce / reduce-arrival-pattern-aware.cpp
1 /* Copyright (c) 2013-2019. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "../colls_private.hpp"
8 //#include <star-reduction.c>
9
10 int reduce_arrival_pattern_aware_segment_size_in_byte = 8192;
11
12 #ifndef HEADER_SIZE
13 #define HEADER_SIZE 1024
14 #endif
15
16 #ifndef MAX_NODE
17 #define MAX_NODE 1024
18 #endif
19 namespace simgrid{
20 namespace smpi{
21 /* Non-topology-specific pipelined linear-reduce function */
22 int Coll_reduce_arrival_pattern_aware::reduce(void *buf, void *rbuf,
23                                                  int count,
24                                                  MPI_Datatype datatype,
25                                                  MPI_Op op, int root,
26                                                  MPI_Comm comm)
27 {
28   int rank = comm->rank();
29   int tag = -COLL_TAG_REDUCE;
30   MPI_Status status;
31   MPI_Request request;
32   MPI_Request *send_request_array;
33   MPI_Request *recv_request_array;
34   MPI_Status *send_status_array;
35   MPI_Status *recv_status_array;
36
37   MPI_Status temp_status_array[MAX_NODE];
38
39   int size = comm->size();
40   int i;
41
42   int sent_count;
43   int header_index;
44   int flag_array[MAX_NODE];
45   int already_received[MAX_NODE];
46
47   int header_buf[HEADER_SIZE];
48   char temp_buf[MAX_NODE];
49
50   MPI_Aint extent, lb;
51   datatype->extent(&lb, &extent);
52
53   /* source and destination */
54   int to, from;
55
56   /* segment is segment size in number of elements (not bytes) */
57   int segment = reduce_arrival_pattern_aware_segment_size_in_byte / extent;
58
59   /* pipeline length */
60   int pipe_length = count / segment;
61
62   /* use for buffer offset for sending and receiving data = segment size in byte */
63   int increment = segment * extent;
64
65   /* if the input size is not divisible by segment size =>
66      the small remainder will be done with native implementation */
67   int remainder = count % segment;
68
69
70   /* value == 0 means root has not send data (or header) to the node yet */
71   for (i = 0; i < MAX_NODE; i++) {
72     already_received[i] = 0;
73   }
74
75   char *tmp_buf;
76   tmp_buf = (char *) smpi_get_tmp_sendbuffer(count * extent);
77
78   Request::sendrecv(buf, count, datatype, rank, tag, rbuf, count, datatype, rank,
79                tag, comm, &status);
80
81
82
83   /* when a message is smaller than a block size => no pipeline */
84   if (count <= segment) {
85
86     if (rank == 0) {
87       sent_count = 0;
88
89       while (sent_count < (size - 1)) {
90
91         for (i = 1; i < size; i++) {
92           if (already_received[i] == 0) {
93             Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
94                              MPI_STATUSES_IGNORE);
95             simcall_process_sleep(0.0001);
96             }
97         }
98
99         header_index = 0;
100         /* recv 1-byte message */
101         for (i = 0; i < size; i++) {
102           if (i == rank)
103             continue;
104
105           /* 1-byte message arrive */
106           if ((flag_array[i] == 1) && (already_received[i] == 0)) {
107             Request::recv(temp_buf, 1, MPI_CHAR, i, tag, comm, &status);
108             header_buf[header_index] = i;
109             header_index++;
110             sent_count++;
111
112
113             //printf("root send to %d recv from %d : data = ",to,from);
114             /*
115                for (i=0;i<=header_index;i++) {
116                printf("%d ",header_buf[i]);
117                }
118                printf("\n");
119              */
120             /* will receive in the next step */
121             already_received[i] = 1;
122           }
123         }
124
125         /* send header followed by receive and reduce data */
126         if (header_index != 0) {
127           header_buf[header_index] = -1;
128           to = header_buf[0];
129           from = header_buf[header_index - 1];
130
131           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
132           Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
133           if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
134         }
135       }                         /* while loop */
136     }
137
138     /* root */
139     /* non-root */
140     else {
141
142       /* send 1-byte message to root */
143       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
144
145       /* wait for header and data, forward when required */
146       Request::recv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm,
147                &status);
148       //      Request::recv(buf,count,datatype,MPI_ANY_SOURCE,tag,comm,&status);
149
150       /* search for where it is */
151       int myordering = 0;
152       while (rank != header_buf[myordering]) {
153         myordering++;
154       }
155
156       /* forward header */
157       if (header_buf[myordering + 1] != -1) {
158           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
159                  tag, comm);
160       }
161       //printf("node %d ordering %d\n",rank,myordering);
162
163       /* receive, reduce, and forward data */
164
165       /* send only */
166       if (myordering == 0) {
167         if (header_buf[myordering + 1] == -1) {
168           to = 0;
169         } else {
170           to = header_buf[myordering + 1];
171         }
172         Request::send(rbuf, count, datatype, to, tag, comm);
173       }
174
175       /* recv, reduce, send */
176       else {
177         if (header_buf[myordering + 1] == -1) {
178           to = 0;
179         } else {
180           to = header_buf[myordering + 1];
181         }
182         from = header_buf[myordering - 1];
183         Request::recv(tmp_buf, count, datatype, from, tag, comm, &status);
184         if(op!=MPI_OP_NULL) op->apply( tmp_buf, rbuf, &count, datatype);
185         Request::send(rbuf, count, datatype, to, tag, comm);
186       }
187     }                           /* non-root */
188   }
189   /* pipeline bcast */
190   else {
191     //    printf("node %d start\n",rank);
192
193     send_request_array =
194         (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
195     recv_request_array =
196         (MPI_Request *) xbt_malloc((size + pipe_length) * sizeof(MPI_Request));
197     send_status_array =
198         (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
199     recv_status_array =
200         (MPI_Status *) xbt_malloc((size + pipe_length) * sizeof(MPI_Status));
201
202     if (rank == 0) {
203       sent_count = 0;
204
205       int will_send[MAX_NODE];
206       for (i = 0; i < MAX_NODE; i++)
207         will_send[i] = 0;
208
209       /* loop until all data are received (sent) */
210       while (sent_count < (size - 1)) {
211         int k;
212         for (k = 0; k < 1; k++) {
213           for (i = 1; i < size; i++) {
214             //if (i == rank)
215             //continue;
216             if ((already_received[i] == 0) && (will_send[i] == 0)) {
217                 Request::iprobe(i, MPI_ANY_TAG, comm, &flag_array[i],
218                          &temp_status_array[i]);
219               if (flag_array[i] == 1) {
220                 will_send[i] = 1;
221                 Request::recv(&temp_buf[i], 1, MPI_CHAR, i, tag, comm,
222                          &status);
223                 //printf("recv from %d\n",i);
224                 i = 1;
225               }
226             }
227           }
228         }                       /* end of probing */
229
230         header_index = 0;
231
232         /* recv 1-byte message */
233         for (i = 1; i < size; i++) {
234           //if (i==rank)
235           //continue;
236           /* message arrived in this round (put in the header) */
237           if ((will_send[i] == 1) && (already_received[i] == 0)) {
238             header_buf[header_index] = i;
239             header_index++;
240             sent_count++;
241
242             /* will send in the next step */
243             already_received[i] = 1;
244           }
245         }
246
247         /* send header followed by data */
248         if (header_index != 0) {
249           header_buf[header_index] = -1;
250           to = header_buf[0];
251
252           /* send header */
253           Request::send(header_buf, HEADER_SIZE, MPI_INT, to, tag, comm);
254
255           /* recv data - pipeline */
256           from = header_buf[header_index - 1];
257           for (i = 0; i < pipe_length; i++) {
258             Request::recv(tmp_buf + (i * increment), segment, datatype, from, tag,
259                      comm, &status);
260             if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment),
261                            (char *)rbuf + (i * increment), &segment, datatype);
262           }
263         }
264       }                         /* while loop (sent_count < size-1 ) */
265     }
266
267     /* root */
268     /* none root */
269     else {
270       /* send 1-byte message to root */
271       Request::send(temp_buf, 1, MPI_CHAR, 0, tag, comm);
272
273
274       /* wait for header forward when required */
275       request=Request::irecv(header_buf, HEADER_SIZE, MPI_INT, MPI_ANY_SOURCE, tag, comm);
276       Request::wait(&request, MPI_STATUS_IGNORE);
277
278       /* search for where it is */
279       int myordering = 0;
280
281       while (rank != header_buf[myordering]) {
282         myordering++;
283       }
284
285       /* send header when required */
286       if (header_buf[myordering + 1] != -1) {
287           Request::send(header_buf, HEADER_SIZE, MPI_INT, header_buf[myordering + 1],
288                  tag, comm);
289       }
290
291       /* (receive, reduce), and send data */
292       if (header_buf[myordering + 1] == -1) {
293         to = 0;
294       } else {
295         to = header_buf[myordering + 1];
296       }
297
298       /* send only */
299       if (myordering == 0) {
300         for (i = 0; i < pipe_length; i++) {
301             send_request_array[i]= Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
302         }
303         Request::waitall((pipe_length), send_request_array, send_status_array);
304       }
305
306       /* receive, reduce, and send */
307       else {
308         from = header_buf[myordering - 1];
309         for (i = 0; i < pipe_length; i++) {
310           recv_request_array[i]=Request::irecv(tmp_buf + (i * increment), segment, datatype, from, tag, comm);
311         }
312         for (i = 0; i < pipe_length; i++) {
313           Request::wait(&recv_request_array[i], MPI_STATUS_IGNORE);
314           if(op!=MPI_OP_NULL) op->apply( tmp_buf + (i * increment), (char *)rbuf + (i * increment),
315                          &segment, datatype);
316           send_request_array[i]=Request::isend((char *)rbuf + (i * increment), segment, datatype, to, tag, comm);
317         }
318         Request::waitall((pipe_length), send_request_array, send_status_array);
319       }
320     }                           /* non-root */
321
322
323
324
325     free(send_request_array);
326     free(recv_request_array);
327     free(send_status_array);
328     free(recv_status_array);
329
330     //printf("node %d done\n",rank);
331   }                             /* end pipeline */
332
333
334   /* if root is not zero send root after finished
335      this can be modified to make it faster by using logical src, dst.
336    */
337   if (root != 0) {
338     if (rank == 0) {
339       Request::send(rbuf, count, datatype, root, tag, comm);
340     } else if (rank == root) {
341       Request::recv(rbuf, count, datatype, 0, tag, comm, &status);
342     }
343   }
344
345
346   /* when count is not divisible by block size, use default BCAST for the remainder */
347   if ((remainder != 0) && (count > segment)) {
348     Coll_reduce_default::reduce((char*)buf + (pipe_length * increment), (char*)rbuf + (pipe_length * increment),
349                                 remainder, datatype, op, root, comm);
350   }
351
352   smpi_free_tmp_buffer(tmp_buf);
353
354   return MPI_SUCCESS;
355 }
356 }
357 }