Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
94fa819b9370f61eeaed8b9cd909e45829562702
[simgrid.git] / src / smpi / smpi_base.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <xbt/config.hpp>
8 #include <algorithm>
9
10 #include "private.h"
11 #include "xbt/virtu.h"
12 #include "mc/mc.h"
13 #include "src/mc/mc_replay.h"
14 #include "xbt/replay.h"
15 #include <errno.h>
16 #include "src/simix/smx_private.h"
17 #include "surf/surf.h"
18 #include "simgrid/sg_config.h"
19 #include "smpi/smpi_utils.hpp"
20 #include "colls/colls.h"
21 #include <simgrid/s4u/host.hpp>
22
23 #include "src/kernel/activity/SynchroComm.hpp"
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
26
27
28 static simgrid::config::Flag<double> smpi_wtime_sleep(
29   "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
30 static simgrid::config::Flag<double> smpi_init_sleep(
31   "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
32
33 void smpi_mpi_init() {
34   if(smpi_init_sleep > 0) 
35     simcall_process_sleep(smpi_init_sleep);
36 }
37
38 double smpi_mpi_wtime(){
39   double time;
40   if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
41     smpi_bench_end();
42     time = SIMIX_get_clock();
43     // to avoid deadlocks if used as a break condition, such as
44     //     while (MPI_Wtime(...) < time_limit) {
45     //       ....
46     //     }
47     // because the time will not normally advance when only calls to MPI_Wtime
48     // are made -> deadlock (MPI_Wtime never reaches the time limit)
49     if(smpi_wtime_sleep > 0) 
50       simcall_process_sleep(smpi_wtime_sleep);
51     smpi_bench_begin();
52   } else {
53     time = SIMIX_get_clock();
54   }
55   return time;
56 }
57
58
59 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
60 {
61   smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
62 }
63
64 void smpi_mpi_barrier(MPI_Comm comm)
65 {
66   smpi_coll_tuned_barrier_ompi_basic_linear(comm);
67 }
68
69 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
70                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
71 {
72   int system_tag = COLL_TAG_GATHER;
73   MPI_Aint lb = 0;
74   MPI_Aint recvext = 0;
75
76   int rank = comm->rank();
77   int size = comm->size();
78   if(rank != root) {
79     // Send buffer to root
80     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
81   } else {
82     smpi_datatype_extent(recvtype, &lb, &recvext);
83     // Local copy from root
84     smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
85                        recvcount, recvtype);
86     // Receive buffers from senders
87     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
88     int index = 0;
89     for (int src = 0; src < size; src++) {
90       if(src != root) {
91         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
92                                           src, system_tag, comm);
93         index++;
94       }
95     }
96     // Wait for completion of irecv's.
97     Request::startall(size - 1, requests);
98     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
99     for (int src = 0; src < size-1; src++) {
100       Request::unuse(&requests[src]);
101     }
102     xbt_free(requests);
103   }
104 }
105
106 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
107                              MPI_Comm comm)
108 {
109   int rank = comm->rank();
110
111   /* arbitrarily choose root as rank 0 */
112   int size = comm->size();
113   int count = 0;
114   int *displs = xbt_new(int, size);
115   for (int i = 0; i < size; i++) {
116     displs[i] = count;
117     count += recvcounts[i];
118   }
119   void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
120
121   mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
122   smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
123   xbt_free(displs);
124   smpi_free_tmp_buffer(tmpbuf);
125 }
126
127 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
128                       MPI_Datatype recvtype, int root, MPI_Comm comm)
129 {
130   int system_tag = COLL_TAG_GATHERV;
131   MPI_Aint lb = 0;
132   MPI_Aint recvext = 0;
133
134   int rank = comm->rank();
135   int size = comm->size();
136   if (rank != root) {
137     // Send buffer to root
138     Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
139   } else {
140     smpi_datatype_extent(recvtype, &lb, &recvext);
141     // Local copy from root
142     smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
143                        recvcounts[root], recvtype);
144     // Receive buffers from senders
145     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
146     int index = 0;
147     for (int src = 0; src < size; src++) {
148       if(src != root) {
149         requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
150                           recvcounts[src], recvtype, src, system_tag, comm);
151         index++;
152       }
153     }
154     // Wait for completion of irecv's.
155     Request::startall(size - 1, requests);
156     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
157     for (int src = 0; src < size-1; src++) {
158       Request::unuse(&requests[src]);
159     }
160     xbt_free(requests);
161   }
162 }
163
164 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
165                         void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
166 {
167   int system_tag = COLL_TAG_ALLGATHER;
168   MPI_Aint lb = 0;
169   MPI_Aint recvext = 0;
170   MPI_Request *requests;
171
172   int rank = comm->rank();
173   int size = comm->size();
174   // FIXME: check for errors
175   smpi_datatype_extent(recvtype, &lb, &recvext);
176   // Local copy from self
177   smpi_datatype_copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
178                      recvtype);
179   // Send/Recv buffers to/from others;
180   requests = xbt_new(MPI_Request, 2 * (size - 1));
181   int index = 0;
182   for (int other = 0; other < size; other++) {
183     if(other != rank) {
184       requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
185       index++;
186       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
187                                         other, system_tag, comm);
188       index++;
189     }
190   }
191   // Wait for completion of all comms.
192   Request::startall(2 * (size - 1), requests);
193   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
194   for (int other = 0; other < 2*(size-1); other++) {
195     Request::unuse(&requests[other]);
196   }
197   xbt_free(requests);
198 }
199
200 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
201                          int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
202 {
203   int system_tag = COLL_TAG_ALLGATHERV;
204   MPI_Aint lb = 0;
205   MPI_Aint recvext = 0;
206
207   int rank = comm->rank();
208   int size = comm->size();
209   smpi_datatype_extent(recvtype, &lb, &recvext);
210   // Local copy from self
211   smpi_datatype_copy(sendbuf, sendcount, sendtype,
212                      static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
213   // Send buffers to others;
214   MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
215   int index = 0;
216   for (int other = 0; other < size; other++) {
217     if(other != rank) {
218       requests[index] =
219         Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
220       index++;
221       requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
222                           recvtype, other, system_tag, comm);
223       index++;
224     }
225   }
226   // Wait for completion of all comms.
227   Request::startall(2 * (size - 1), requests);
228   Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
229   for (int other = 0; other < 2*(size-1); other++) {
230     Request::unuse(&requests[other]);
231   }
232   xbt_free(requests);
233 }
234
235 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
236                       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
237 {
238   int system_tag = COLL_TAG_SCATTER;
239   MPI_Aint lb = 0;
240   MPI_Aint sendext = 0;
241   MPI_Request *requests;
242
243   int rank = comm->rank();
244   int size = comm->size();
245   if(rank != root) {
246     // Recv buffer from root
247     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
248   } else {
249     smpi_datatype_extent(sendtype, &lb, &sendext);
250     // Local copy from root
251     if(recvbuf!=MPI_IN_PLACE){
252         smpi_datatype_copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
253                            sendcount, sendtype, recvbuf, recvcount, recvtype);
254     }
255     // Send buffers to receivers
256     requests = xbt_new(MPI_Request, size - 1);
257     int index = 0;
258     for(int dst = 0; dst < size; dst++) {
259       if(dst != root) {
260         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
261                                           dst, system_tag, comm);
262         index++;
263       }
264     }
265     // Wait for completion of isend's.
266     Request::startall(size - 1, requests);
267     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
268     for (int dst = 0; dst < size-1; dst++) {
269       Request::unuse(&requests[dst]);
270     }
271     xbt_free(requests);
272   }
273 }
274
275 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
276                        MPI_Datatype recvtype, int root, MPI_Comm comm)
277 {
278   int system_tag = COLL_TAG_SCATTERV;
279   MPI_Aint lb = 0;
280   MPI_Aint sendext = 0;
281
282   int rank = comm->rank();
283   int size = comm->size();
284   if(rank != root) {
285     // Recv buffer from root
286     Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
287   } else {
288     smpi_datatype_extent(sendtype, &lb, &sendext);
289     // Local copy from root
290     if(recvbuf!=MPI_IN_PLACE){
291       smpi_datatype_copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
292                        sendtype, recvbuf, recvcount, recvtype);
293     }
294     // Send buffers to receivers
295     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
296     int index = 0;
297     for (int dst = 0; dst < size; dst++) {
298       if (dst != root) {
299         requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
300                             sendtype, dst, system_tag, comm);
301         index++;
302       }
303     }
304     // Wait for completion of isend's.
305     Request::startall(size - 1, requests);
306     Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
307     for (int dst = 0; dst < size-1; dst++) {
308       Request::unuse(&requests[dst]);
309     }
310     xbt_free(requests);
311   }
312 }
313
314 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
315                      MPI_Comm comm)
316 {
317   int system_tag = COLL_TAG_REDUCE;
318   MPI_Aint lb = 0;
319   MPI_Aint dataext = 0;
320
321   char* sendtmpbuf = static_cast<char *>(sendbuf);
322
323   int rank = comm->rank();
324   int size = comm->size();
325   //non commutative case, use a working algo from openmpi
326   if(op != MPI_OP_NULL && !op->is_commutative()){
327     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
328     return;
329   }
330
331   if( sendbuf == MPI_IN_PLACE ) {
332     sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype)));
333     smpi_datatype_copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
334   }
335   
336   if(rank != root) {
337     // Send buffer to root
338     Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
339   } else {
340     smpi_datatype_extent(datatype, &lb, &dataext);
341     // Local copy from root
342     if (sendtmpbuf != nullptr && recvbuf != nullptr)
343       smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
344     // Receive buffers from senders
345     MPI_Request *requests = xbt_new(MPI_Request, size - 1);
346     void **tmpbufs = xbt_new(void *, size - 1);
347     int index = 0;
348     for (int src = 0; src < size; src++) {
349       if (src != root) {
350          if (!smpi_process_get_replaying())
351           tmpbufs[index] = xbt_malloc(count * dataext);
352          else
353            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
354         requests[index] =
355           Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
356         index++;
357       }
358     }
359     // Wait for completion of irecv's.
360     Request::startall(size - 1, requests);
361     for (int src = 0; src < size - 1; src++) {
362       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
363       XBT_DEBUG("finished waiting any request with index %d", index);
364       if(index == MPI_UNDEFINED) {
365         break;
366       }else{
367         Request::unuse(&requests[index]);
368       }
369       if(op) /* op can be MPI_OP_NULL that does nothing */
370         if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, &datatype);
371     }
372       for(index = 0; index < size - 1; index++) {
373         smpi_free_tmp_buffer(tmpbufs[index]);
374       }
375     xbt_free(tmpbufs);
376     xbt_free(requests);
377
378   }
379   if( sendbuf == MPI_IN_PLACE ) {
380     smpi_free_tmp_buffer(sendtmpbuf);
381   }
382 }
383
384 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
385 {
386   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
387   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
388 }
389
390 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
391 {
392   int system_tag = -888;
393   MPI_Aint lb      = 0;
394   MPI_Aint dataext = 0;
395
396   int rank = comm->rank();
397   int size = comm->size();
398
399   smpi_datatype_extent(datatype, &lb, &dataext);
400
401   // Local copy from self
402   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
403
404   // Send/Recv buffers to/from others;
405   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
406   void **tmpbufs = xbt_new(void *, rank);
407   int index = 0;
408   for (int other = 0; other < rank; other++) {
409     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
410     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
411     index++;
412   }
413   for (int other = rank + 1; other < size; other++) {
414     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
415     index++;
416   }
417   // Wait for completion of all comms.
418   Request::startall(size - 1, requests);
419
420   if(op != MPI_OP_NULL && op->is_commutative()){
421     for (int other = 0; other < size - 1; other++) {
422       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
423       if(index == MPI_UNDEFINED) {
424         break;
425       }
426       if(index < rank) {
427         // #Request is below rank: it's a irecv
428         if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, &datatype);
429       }
430     }
431   }else{
432     //non commutative case, wait in order
433     for (int other = 0; other < size - 1; other++) {
434       Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
435       if(index < rank) {
436         if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, &datatype);
437       }
438     }
439   }
440   for(index = 0; index < rank; index++) {
441     smpi_free_tmp_buffer(tmpbufs[index]);
442   }
443   for(index = 0; index < size-1; index++) {
444     Request::unuse(&requests[index]);
445   }
446   xbt_free(tmpbufs);
447   xbt_free(requests);
448 }
449
450 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
451 {
452   int system_tag = -888;
453   MPI_Aint lb         = 0;
454   MPI_Aint dataext    = 0;
455   int recvbuf_is_empty=1;
456   int rank = comm->rank();
457   int size = comm->size();
458
459   smpi_datatype_extent(datatype, &lb, &dataext);
460
461   // Send/Recv buffers to/from others;
462   MPI_Request *requests = xbt_new(MPI_Request, size - 1);
463   void **tmpbufs = xbt_new(void *, rank);
464   int index = 0;
465   for (int other = 0; other < rank; other++) {
466     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
467     requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
468     index++;
469   }
470   for (int other = rank + 1; other < size; other++) {
471     requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
472     index++;
473   }
474   // Wait for completion of all comms.
475   Request::startall(size - 1, requests);
476
477   if(op != MPI_OP_NULL && op->is_commutative()){
478     for (int other = 0; other < size - 1; other++) {
479       index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
480       if(index == MPI_UNDEFINED) {
481         break;
482       }
483       if(index < rank) {
484         if(recvbuf_is_empty){
485           smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
486           recvbuf_is_empty=0;
487         } else
488           // #Request is below rank: it's a irecv
489           if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, &datatype);
490       }
491     }
492   }else{
493     //non commutative case, wait in order
494     for (int other = 0; other < size - 1; other++) {
495      Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
496       if(index < rank) {
497         if (recvbuf_is_empty) {
498           smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
499           recvbuf_is_empty = 0;
500         } else
501           if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, &datatype);
502       }
503     }
504   }
505   for(index = 0; index < rank; index++) {
506     smpi_free_tmp_buffer(tmpbufs[index]);
507   }
508   for(index = 0; index < size-1; index++) {
509     Request::unuse(&requests[index]);
510   }
511   xbt_free(tmpbufs);
512   xbt_free(requests);
513 }
514
515 void smpi_empty_status(MPI_Status * status)
516 {
517   if(status != MPI_STATUS_IGNORE) {
518     status->MPI_SOURCE = MPI_ANY_SOURCE;
519     status->MPI_TAG = MPI_ANY_TAG;
520     status->MPI_ERROR = MPI_SUCCESS;
521     status->count=0;
522   }
523 }
524
525 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
526 {
527   return status->count / smpi_datatype_size(datatype);
528 }
529
530