Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
please sonar on recent SMPI++ code
[simgrid.git] / src / smpi / smpi_base.cpp
1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <xbt/config.hpp>
7 #include <algorithm>
8
9 #include "private.h"
10 #include "xbt/virtu.h"
11 #include "mc/mc.h"
12 #include "src/mc/mc_replay.h"
13 #include "xbt/replay.h"
14 #include <errno.h>
15 #include "src/simix/smx_private.h"
16 #include "surf/surf.h"
17 #include "simgrid/sg_config.h"
18 #include "smpi/smpi_utils.hpp"
19 #include "colls/colls.h"
20 #include <simgrid/s4u/host.hpp>
21
22 #include "src/kernel/activity/SynchroComm.hpp"
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
25
26
27 static simgrid::config::Flag<double> smpi_wtime_sleep(
28   "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
29 static simgrid::config::Flag<double> smpi_init_sleep(
30   "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
31
32 void smpi_mpi_init() {
33   if(smpi_init_sleep > 0) 
34     simcall_process_sleep(smpi_init_sleep);
35 }
36
37 double smpi_mpi_wtime(){
38   double time;
39   if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
40     smpi_bench_end();
41     time = SIMIX_get_clock();
42     // to avoid deadlocks if used as a break condition, such as
43     //     while (MPI_Wtime(...) < time_limit) {
44     //       ....
45     //     }
46     // because the time will not normally advance when only calls to MPI_Wtime
47     // are made -> deadlock (MPI_Wtime never reaches the time limit)
48     if(smpi_wtime_sleep > 0) 
49       simcall_process_sleep(smpi_wtime_sleep);
50     smpi_bench_begin();
51   } else {
52     time = SIMIX_get_clock();
53   }
54   return time;
55 }
56
57
58 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
59 {
60   smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
61 }
62
63 void smpi_mpi_barrier(MPI_Comm comm)
64 {
65   smpi_coll_tuned_barrier_ompi_basic_linear(comm);
66 }
67
68 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
69                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
70 {
71   int system_tag = COLL_TAG_GATHER;
72   MPI_Aint lb = 0;
73   MPI_Aint recvext = 0;
74
75   int rank = comm->rank();
76   int size = comm->size();
77   if(rank != root) {
78     // Send buffer to root
79     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
80   } else {
81     recvtype->extent(&lb, &recvext);
82     // Local copy from root
83     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
84                        recvcount, recvtype);
85     // Receive buffers from senders
86     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
87     int index = 0;
88     for (int src = 0; src < size; src++) {
89       if(src != root) {
90         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
91                                           src, system_tag, comm);
92         index++;
93       }
94     }
95     // Wait for completion of irecv's.
96     Request::startall(size - 1, requests);
97     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
98     for (int src = 0; src < size-1; src++) {
99       Request::unref(&requests[src]);
100     }
101     xbt_free(requests);
102   }
103 }
104
105 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
106                              MPI_Comm comm)
107 {
108   int rank = comm->rank();
109
110   /* arbitrarily choose root as rank 0 */
111   int size = comm->size();
112   int count = 0;
113   int *displs = xbt_new(int, size);
114   for (int i = 0; i < size; i++) {
115     displs[i] = count;
116     count += recvcounts[i];
117   }
118   void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
119
120   mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
121   smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
122   xbt_free(displs);
123   smpi_free_tmp_buffer(tmpbuf);
124 }
125
126 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
127                       MPI_Datatype recvtype, int root, MPI_Comm comm)
128 {
129   int system_tag = COLL_TAG_GATHERV;
130   MPI_Aint lb = 0;
131   MPI_Aint recvext = 0;
132
133   int rank = comm->rank();
134   int size = comm->size();
135   if (rank != root) {
136     // Send buffer to root
137     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
138   } else {
139     recvtype->extent(&lb, &recvext);
140     // Local copy from root
141     Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
142                        recvcounts[root], recvtype);
143     // Receive buffers from senders
144     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
145     int index = 0;
146     for (int src = 0; src < size; src++) {
147       if(src != root) {
148         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
149                           recvcounts[src], recvtype, src, system_tag, comm);
150         index++;
151       }
152     }
153     // Wait for completion of irecv's.
154     Request::startall(size - 1, requests);
155     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
156     for (int src = 0; src < size-1; src++) {
157       Request::unref(&requests[src]);
158     }
159     xbt_free(requests);
160   }
161 }
162
163 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
164                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
165 {
166   int system_tag = COLL_TAG_ALLGATHER;
167   MPI_Aint lb = 0;
168   MPI_Aint recvext = 0;
169   MPI_Request *requests;
170
171   int rank = comm->rank();
172   int size = comm->size();
173   // FIXME: check for errors
174   recvtype->extent(&lb, &recvext);
175   // Local copy from self
176   Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
177                      recvtype);
178   // Send/Recv buffers to/from others;
179   requests = xbt_new(MPI_Request, 2 * (size - 1));
180   int index = 0;
181   for (int other = 0; other < size; other++) {
182     if(other != rank) {
183       requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
184       index++;
185       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
186                                         other, system_tag, comm);
187       index++;
188     }
189   }
190   // Wait for completion of all comms.
191   Request::startall(2 * (size - 1), requests);
192   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
193   for (int other = 0; other < 2*(size-1); other++) {
194     Request::unref(&requests[other]);
195   }
196   xbt_free(requests);
197 }
198
199 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
200                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
201 {
202   int system_tag = COLL_TAG_ALLGATHERV;
203   MPI_Aint lb = 0;
204   MPI_Aint recvext = 0;
205
206   int rank = comm->rank();
207   int size = comm->size();
208   recvtype->extent(&lb, &recvext);
209   // Local copy from self
210   Datatype::copy(sendbuf, sendcount, sendtype,
211                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
212   // Send buffers to others;
213   MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
214   int index = 0;
215   for (int other = 0; other < size; other++) {
216     if(other != rank) {
217       requests[index] =
218         Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
219       index++;
220       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
221                           recvtype, other, system_tag, comm);
222       index++;
223     }
224   }
225   // Wait for completion of all comms.
226   Request::startall(2 * (size - 1), requests);
227   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
228   for (int other = 0; other < 2*(size-1); other++) {
229     Request::unref(&requests[other]);
230   }
231   xbt_free(requests);
232 }
233
234 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
235                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
236 {
237   int system_tag = COLL_TAG_SCATTER;
238   MPI_Aint lb = 0;
239   MPI_Aint sendext = 0;
240   MPI_Request *requests;
241
242   int rank = comm->rank();
243   int size = comm->size();
244   if(rank != root) {
245     // Recv buffer from root
246     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
247   } else {
248     sendtype->extent(&lb, &sendext);
249     // Local copy from root
250     if(recvbuf!=MPI_IN_PLACE){
251         Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
252                            sendcount, sendtype, recvbuf, recvcount, recvtype);
253     }
254     // Send buffers to receivers
255     requests = xbt_new(MPI_Request, size - 1);
256     int index = 0;
257     for(int dst = 0; dst < size; dst++) {
258       if(dst != root) {
259         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
260                                           dst, system_tag, comm);
261         index++;
262       }
263     }
264     // Wait for completion of isend's.
265     Request::startall(size - 1, requests);
266     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
267     for (int dst = 0; dst < size-1; dst++) {
268       Request::unref(&requests[dst]);
269     }
270     xbt_free(requests);
271   }
272 }
273
274 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
275                        MPI_Datatype recvtype, int root, MPI_Comm comm)
276 {
277   int system_tag = COLL_TAG_SCATTERV;
278   MPI_Aint lb = 0;
279   MPI_Aint sendext = 0;
280
281   int rank = comm->rank();
282   int size = comm->size();
283   if(rank != root) {
284     // Recv buffer from root
285     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
286   } else {
287     sendtype->extent(&lb, &sendext);
288     // Local copy from root
289     if(recvbuf!=MPI_IN_PLACE){
290       Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
291                        sendtype, recvbuf, recvcount, recvtype);
292     }
293     // Send buffers to receivers
294     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
295     int index = 0;
296     for (int dst = 0; dst < size; dst++) {
297       if (dst != root) {
298         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
299                             sendtype, dst, system_tag, comm);
300         index++;
301       }
302     }
303     // Wait for completion of isend's.
304     Request::startall(size - 1, requests);
305     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
306     for (int dst = 0; dst < size-1; dst++) {
307       Request::unref(&requests[dst]);
308     }
309     xbt_free(requests);
310   }
311 }
312
313 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
314                      MPI_Comm comm)
315 {
316   int system_tag = COLL_TAG_REDUCE;
317   MPI_Aint lb = 0;
318   MPI_Aint dataext = 0;
319
320   char* sendtmpbuf = static_cast<char *>(sendbuf);
321
322   int rank = comm->rank();
323   int size = comm->size();
324   //non commutative case, use a working algo from openmpi
325   if(op != MPI_OP_NULL && !op->is_commutative()){
326     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
327     return;
328   }
329
330   if( sendbuf == MPI_IN_PLACE ) {
331     sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
332     Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
333   }
334   
335   if(rank != root) {
336     // Send buffer to root
337     Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
338   } else {
339     datatype->extent(&lb, &dataext);
340     // Local copy from root
341     if (sendtmpbuf != nullptr && recvbuf != nullptr)
342       Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
343     // Receive buffers from senders
344     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
345     void **tmpbufs = xbt_new(void *, size - 1);
346     int index = 0;
347     for (int src = 0; src < size; src++) {
348       if (src != root) {
349          if (!smpi_process_get_replaying())
350           tmpbufs[index] = xbt_malloc(count * dataext);
351          else
352            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
353         requests[index] =
354           Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
355         index++;
356       }
357     }
358     // Wait for completion of irecv's.
359     Request::startall(size - 1, requests);
360     for (int src = 0; src < size - 1; src++) {
361       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
362       XBT_DEBUG("finished waiting any request with index %d", index);
363       if(index == MPI_UNDEFINED) {
364         break;
365       }else{
366         Request::unref(&requests[index]);
367       }
368       if (op != MPI_OP_NULL) /* op can be MPI_OP_NULL that does nothing */
369         op->apply(tmpbufs[index], recvbuf, &count, datatype);
370     }
371       for(index = 0; index < size - 1; index++) {
372         smpi_free_tmp_buffer(tmpbufs[index]);
373       }
374     xbt_free(tmpbufs);
375     xbt_free(requests);
376
377   }
378   if( sendbuf == MPI_IN_PLACE ) {
379     smpi_free_tmp_buffer(sendtmpbuf);
380   }
381 }
382
383 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
384 {
385   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
386   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
387 }
388
389 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
390 {
391   int system_tag = -888;
392   MPI_Aint lb      = 0;
393   MPI_Aint dataext = 0;
394
395   int rank = comm->rank();
396   int size = comm->size();
397
398   datatype->extent(&lb, &dataext);
399
400   // Local copy from self
401   Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
402
403   // Send/Recv buffers to/from others;
404   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
405   void **tmpbufs = xbt_new(void *, rank);
406   int index = 0;
407   for (int other = 0; other < rank; other++) {
408     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
409     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
410     index++;
411   }
412   for (int other = rank + 1; other < size; other++) {
413     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
414     index++;
415   }
416   // Wait for completion of all comms.
417   Request::startall(size - 1, requests);
418
419   if(op != MPI_OP_NULL && op->is_commutative()){
420     for (int other = 0; other < size - 1; other++) {
421       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
422       if(index == MPI_UNDEFINED) {
423         break;
424       }
425       if (index < rank)
426         // #Request is below rank: it's a irecv.
427         op->apply(tmpbufs[index], recvbuf, &count, datatype);
428     }
429   }else{
430     //non commutative case, wait in order
431     for (int other = 0; other < size - 1; other++) {
432       Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
433       if (index < rank && op != MPI_OP_NULL)
434         op->apply(tmpbufs[other], recvbuf, &count, datatype);
435     }
436   }
437   for(index = 0; index < rank; index++) {
438     smpi_free_tmp_buffer(tmpbufs[index]);
439   }
440   for(index = 0; index < size-1; index++) {
441     Request::unref(&requests[index]);
442   }
443   xbt_free(tmpbufs);
444   xbt_free(requests);
445 }
446
447 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
448 {
449   int system_tag = -888;
450   MPI_Aint lb         = 0;
451   MPI_Aint dataext    = 0;
452   int recvbuf_is_empty=1;
453   int rank = comm->rank();
454   int size = comm->size();
455
456   datatype->extent(&lb, &dataext);
457
458   // Send/Recv buffers to/from others;
459   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
460   void **tmpbufs = xbt_new(void *, rank);
461   int index = 0;
462   for (int other = 0; other < rank; other++) {
463     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
464     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
465     index++;
466   }
467   for (int other = rank + 1; other < size; other++) {
468     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
469     index++;
470   }
471   // Wait for completion of all comms.
472   Request::startall(size - 1, requests);
473
474   if(op != MPI_OP_NULL && op->is_commutative()){
475     for (int other = 0; other < size - 1; other++) {
476       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
477       if(index == MPI_UNDEFINED) {
478         break;
479       }
480       if(index < rank) {
481         if(recvbuf_is_empty){
482           Datatype::copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
483           recvbuf_is_empty=0;
484         } else
485           // #Request is below rank: it's a irecv
486           if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
487       }
488     }
489   }else{
490     //non commutative case, wait in order
491     for (int other = 0; other < size - 1; other++) {
492      Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
493       if(index < rank) {
494         if (recvbuf_is_empty) {
495           Datatype::copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
496           recvbuf_is_empty = 0;
497         } else
498           if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
499       }
500     }
501   }
502   for(index = 0; index < rank; index++) {
503     smpi_free_tmp_buffer(tmpbufs[index]);
504   }
505   for(index = 0; index < size-1; index++) {
506     Request::unref(&requests[index]);
507   }
508   xbt_free(tmpbufs);
509   xbt_free(requests);
510 }
511
512 void smpi_empty_status(MPI_Status * status)
513 {
514   if(status != MPI_STATUS_IGNORE) {
515     status->MPI_SOURCE = MPI_ANY_SOURCE;
516     status->MPI_TAG = MPI_ANY_TAG;
517     status->MPI_ERROR = MPI_SUCCESS;
518     status->count=0;
519   }
520 }
521
522 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
523 {
524   return status->count / datatype->size();
525 }
526
527