Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move collective algorithms to separate folders
[simgrid.git] / src / smpi / smpi_base.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <xbt/config.hpp>
7 #include <algorithm>
8
9 #include "private.h"
10 #include "xbt/virtu.h"
11 #include "mc/mc.h"
12 #include "src/mc/mc_replay.h"
13 #include <errno.h>
14 #include "src/simix/smx_private.h"
15 #include "surf/surf.h"
16 #include "simgrid/sg_config.h"
17 #include "smpi/smpi_utils.hpp"
18 #include "colls/colls.h"
19 #include <simgrid/s4u/host.hpp>
20
21 #include "src/kernel/activity/SynchroComm.hpp"
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
24
25 static simgrid::config::Flag<double> smpi_wtime_sleep(
26   "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
27 static simgrid::config::Flag<double> smpi_init_sleep(
28   "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
29
30 void smpi_mpi_init() {
31   if(smpi_init_sleep > 0) 
32     simcall_process_sleep(smpi_init_sleep);
33 }
34
35 double smpi_mpi_wtime(){
36   double time;
37   if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
38     smpi_bench_end();
39     time = SIMIX_get_clock();
40     // to avoid deadlocks if used as a break condition, such as
41     //     while (MPI_Wtime(...) < time_limit) {
42     //       ....
43     //     }
44     // because the time will not normally advance when only calls to MPI_Wtime
45     // are made -> deadlock (MPI_Wtime never reaches the time limit)
46     if(smpi_wtime_sleep > 0) 
47       simcall_process_sleep(smpi_wtime_sleep);
48     smpi_bench_begin();
49   } else {
50     time = SIMIX_get_clock();
51   }
52   return time;
53 }
54
55 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
56 {
57   smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
58 }
59
60 void smpi_mpi_barrier(MPI_Comm comm)
61 {
62   smpi_coll_tuned_barrier_ompi_basic_linear(comm);
63 }
64
65 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
66                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
67 {
68   int system_tag = COLL_TAG_GATHER;
69   MPI_Aint lb = 0;
70   MPI_Aint recvext = 0;
71
72   int rank = comm->rank();
73   int size = comm->size();
74   if(rank != root) {
75     // Send buffer to root
76     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
77   } else {
78     recvtype->extent(&lb, &recvext);
79     // Local copy from root
80     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
81                        recvcount, recvtype);
82     // Receive buffers from senders
83     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
84     int index = 0;
85     for (int src = 0; src < size; src++) {
86       if(src != root) {
87         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
88                                           src, system_tag, comm);
89         index++;
90       }
91     }
92     // Wait for completion of irecv's.
93     Request::startall(size - 1, requests);
94     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
95     for (int src = 0; src < size-1; src++) {
96       Request::unref(&requests[src]);
97     }
98     xbt_free(requests);
99   }
100 }
101
102 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
103                              MPI_Comm comm)
104 {
105   int rank = comm->rank();
106
107   /* arbitrarily choose root as rank 0 */
108   int size = comm->size();
109   int count = 0;
110   int *displs = xbt_new(int, size);
111   for (int i = 0; i < size; i++) {
112     displs[i] = count;
113     count += recvcounts[i];
114   }
115   void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
116
117   mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
118   smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
119   xbt_free(displs);
120   smpi_free_tmp_buffer(tmpbuf);
121 }
122
123 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
124                       MPI_Datatype recvtype, int root, MPI_Comm comm)
125 {
126   int system_tag = COLL_TAG_GATHERV;
127   MPI_Aint lb = 0;
128   MPI_Aint recvext = 0;
129
130   int rank = comm->rank();
131   int size = comm->size();
132   if (rank != root) {
133     // Send buffer to root
134     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
135   } else {
136     recvtype->extent(&lb, &recvext);
137     // Local copy from root
138     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
139                        recvcounts[root], recvtype);
140     // Receive buffers from senders
141     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
142     int index = 0;
143     for (int src = 0; src < size; src++) {
144       if(src != root) {
145         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
146                           recvcounts[src], recvtype, src, system_tag, comm);
147         index++;
148       }
149     }
150     // Wait for completion of irecv's.
151     Request::startall(size - 1, requests);
152     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
153     for (int src = 0; src < size-1; src++) {
154       Request::unref(&requests[src]);
155     }
156     xbt_free(requests);
157   }
158 }
159
160 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
161                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
162 {
163   int system_tag = COLL_TAG_ALLGATHER;
164   MPI_Aint lb = 0;
165   MPI_Aint recvext = 0;
166   MPI_Request *requests;
167
168   int rank = comm->rank();
169   int size = comm->size();
170   // FIXME: check for errors
171   recvtype->extent(&lb, &recvext);
172   // Local copy from self
173   Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
174                      recvtype);
175   // Send/Recv buffers to/from others;
176   requests = xbt_new(MPI_Request, 2 * (size - 1));
177   int index = 0;
178   for (int other = 0; other < size; other++) {
179     if(other != rank) {
180       requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
181       index++;
182       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
183                                         other, system_tag, comm);
184       index++;
185     }
186   }
187   // Wait for completion of all comms.
188   Request::startall(2 * (size - 1), requests);
189   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
190   for (int other = 0; other < 2*(size-1); other++) {
191     Request::unref(&requests[other]);
192   }
193   xbt_free(requests);
194 }
195
196 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
197                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
198 {
199   int system_tag = COLL_TAG_ALLGATHERV;
200   MPI_Aint lb = 0;
201   MPI_Aint recvext = 0;
202
203   int rank = comm->rank();
204   int size = comm->size();
205   recvtype->extent(&lb, &recvext);
206   // Local copy from self
207   Datatype::copy(sendbuf, sendcount, sendtype,
208                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
209   // Send buffers to others;
210   MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
211   int index = 0;
212   for (int other = 0; other < size; other++) {
213     if(other != rank) {
214       requests[index] =
215         Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
216       index++;
217       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
218                           recvtype, other, system_tag, comm);
219       index++;
220     }
221   }
222   // Wait for completion of all comms.
223   Request::startall(2 * (size - 1), requests);
224   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
225   for (int other = 0; other < 2*(size-1); other++) {
226     Request::unref(&requests[other]);
227   }
228   xbt_free(requests);
229 }
230
231 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
232                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
233 {
234   int system_tag = COLL_TAG_SCATTER;
235   MPI_Aint lb = 0;
236   MPI_Aint sendext = 0;
237   MPI_Request *requests;
238
239   int rank = comm->rank();
240   int size = comm->size();
241   if(rank != root) {
242     // Recv buffer from root
243     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
244   } else {
245     sendtype->extent(&lb, &sendext);
246     // Local copy from root
247     if(recvbuf!=MPI_IN_PLACE){
248         Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
249                            sendcount, sendtype, recvbuf, recvcount, recvtype);
250     }
251     // Send buffers to receivers
252     requests = xbt_new(MPI_Request, size - 1);
253     int index = 0;
254     for(int dst = 0; dst < size; dst++) {
255       if(dst != root) {
256         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
257                                           dst, system_tag, comm);
258         index++;
259       }
260     }
261     // Wait for completion of isend's.
262     Request::startall(size - 1, requests);
263     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
264     for (int dst = 0; dst < size-1; dst++) {
265       Request::unref(&requests[dst]);
266     }
267     xbt_free(requests);
268   }
269 }
270
271 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
272                        MPI_Datatype recvtype, int root, MPI_Comm comm)
273 {
274   int system_tag = COLL_TAG_SCATTERV;
275   MPI_Aint lb = 0;
276   MPI_Aint sendext = 0;
277
278   int rank = comm->rank();
279   int size = comm->size();
280   if(rank != root) {
281     // Recv buffer from root
282     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
283   } else {
284     sendtype->extent(&lb, &sendext);
285     // Local copy from root
286     if(recvbuf!=MPI_IN_PLACE){
287       Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
288                        sendtype, recvbuf, recvcount, recvtype);
289     }
290     // Send buffers to receivers
291     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
292     int index = 0;
293     for (int dst = 0; dst < size; dst++) {
294       if (dst != root) {
295         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
296                             sendtype, dst, system_tag, comm);
297         index++;
298       }
299     }
300     // Wait for completion of isend's.
301     Request::startall(size - 1, requests);
302     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
303     for (int dst = 0; dst < size-1; dst++) {
304       Request::unref(&requests[dst]);
305     }
306     xbt_free(requests);
307   }
308 }
309
310 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
311                      MPI_Comm comm)
312 {
313   int system_tag = COLL_TAG_REDUCE;
314   MPI_Aint lb = 0;
315   MPI_Aint dataext = 0;
316
317   char* sendtmpbuf = static_cast<char *>(sendbuf);
318
319   int rank = comm->rank();
320   int size = comm->size();
321   //non commutative case, use a working algo from openmpi
322   if(op != MPI_OP_NULL && !op->is_commutative()){
323     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
324     return;
325   }
326
327   if( sendbuf == MPI_IN_PLACE ) {
328     sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
329     Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
330   }
331   
332   if(rank != root) {
333     // Send buffer to root
334     Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
335   } else {
336     datatype->extent(&lb, &dataext);
337     // Local copy from root
338     if (sendtmpbuf != nullptr && recvbuf != nullptr)
339       Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
340     // Receive buffers from senders
341     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
342     void **tmpbufs = xbt_new(void *, size - 1);
343     int index = 0;
344     for (int src = 0; src < size; src++) {
345       if (src != root) {
346          if (!smpi_process_get_replaying())
347           tmpbufs[index] = xbt_malloc(count * dataext);
348          else
349            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
350         requests[index] =
351           Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
352         index++;
353       }
354     }
355     // Wait for completion of irecv's.
356     Request::startall(size - 1, requests);
357     for (int src = 0; src < size - 1; src++) {
358       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
359       XBT_DEBUG("finished waiting any request with index %d", index);
360       if(index == MPI_UNDEFINED) {
361         break;
362       }else{
363         Request::unref(&requests[index]);
364       }
365       if (op != MPI_OP_NULL) /* op can be MPI_OP_NULL that does nothing */
366         op->apply(tmpbufs[index], recvbuf, &count, datatype);
367     }
368       for(index = 0; index < size - 1; index++) {
369         smpi_free_tmp_buffer(tmpbufs[index]);
370       }
371     xbt_free(tmpbufs);
372     xbt_free(requests);
373
374   }
375   if( sendbuf == MPI_IN_PLACE ) {
376     smpi_free_tmp_buffer(sendtmpbuf);
377   }
378 }
379
380 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
381 {
382   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
383   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
384 }
385
386 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
387 {
388   int system_tag = -888;
389   MPI_Aint lb      = 0;
390   MPI_Aint dataext = 0;
391
392   int rank = comm->rank();
393   int size = comm->size();
394
395   datatype->extent(&lb, &dataext);
396
397   // Local copy from self
398   Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
399
400   // Send/Recv buffers to/from others;
401   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
402   void **tmpbufs = xbt_new(void *, rank);
403   int index = 0;
404   for (int other = 0; other < rank; other++) {
405     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
406     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
407     index++;
408   }
409   for (int other = rank + 1; other < size; other++) {
410     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
411     index++;
412   }
413   // Wait for completion of all comms.
414   Request::startall(size - 1, requests);
415
416   if(op != MPI_OP_NULL && op->is_commutative()){
417     for (int other = 0; other < size - 1; other++) {
418       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
419       if(index == MPI_UNDEFINED) {
420         break;
421       }
422       if (index < rank)
423         // #Request is below rank: it's a irecv.
424         op->apply(tmpbufs[index], recvbuf, &count, datatype);
425     }
426   }else{
427     //non commutative case, wait in order
428     for (int other = 0; other < size - 1; other++) {
429       Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
430       if (index < rank && op != MPI_OP_NULL)
431         op->apply(tmpbufs[other], recvbuf, &count, datatype);
432     }
433   }
434   for(index = 0; index < rank; index++) {
435     smpi_free_tmp_buffer(tmpbufs[index]);
436   }
437   for(index = 0; index < size-1; index++) {
438     Request::unref(&requests[index]);
439   }
440   xbt_free(tmpbufs);
441   xbt_free(requests);
442 }
443
444 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
445 {
446   int system_tag = -888;
447   MPI_Aint lb         = 0;
448   MPI_Aint dataext    = 0;
449   int recvbuf_is_empty=1;
450   int rank = comm->rank();
451   int size = comm->size();
452
453   datatype->extent(&lb, &dataext);
454
455   // Send/Recv buffers to/from others;
456   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
457   void **tmpbufs = xbt_new(void *, rank);
458   int index = 0;
459   for (int other = 0; other < rank; other++) {
460     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
461     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
462     index++;
463   }
464   for (int other = rank + 1; other < size; other++) {
465     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
466     index++;
467   }
468   // Wait for completion of all comms.
469   Request::startall(size - 1, requests);
470
471   if(op != MPI_OP_NULL && op->is_commutative()){
472     for (int other = 0; other < size - 1; other++) {
473       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
474       if(index == MPI_UNDEFINED) {
475         break;
476       }
477       if(index < rank) {
478         if(recvbuf_is_empty){
479           Datatype::copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
480           recvbuf_is_empty=0;
481         } else
482           // #Request is below rank: it's a irecv
483           op->apply(tmpbufs[index], recvbuf, &count, datatype);
484       }
485     }
486   }else{
487     //non commutative case, wait in order
488     for (int other = 0; other < size - 1; other++) {
489      Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
490       if(index < rank) {
491         if (recvbuf_is_empty) {
492           Datatype::copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
493           recvbuf_is_empty = 0;
494         } else
495           if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
496       }
497     }
498   }
499   for(index = 0; index < rank; index++) {
500     smpi_free_tmp_buffer(tmpbufs[index]);
501   }
502   for(index = 0; index < size-1; index++) {
503     Request::unref(&requests[index]);
504   }
505   xbt_free(tmpbufs);
506   xbt_free(requests);
507 }
508
509 void smpi_empty_status(MPI_Status * status)
510 {
511   if(status != MPI_STATUS_IGNORE) {
512     status->MPI_SOURCE = MPI_ANY_SOURCE;
513     status->MPI_TAG = MPI_ANY_TAG;
514     status->MPI_ERROR = MPI_SUCCESS;
515     status->count=0;
516   }
517 }
518
519 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
520 {
521   return status->count / datatype->size();
522 }