1 /* Copyright (c) 2007-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include <xbt/config.hpp>
11 #include "xbt/virtu.h"
13 #include "src/mc/mc_replay.h"
14 #include "xbt/replay.h"
16 #include "src/simix/smx_private.h"
17 #include "surf/surf.h"
18 #include "simgrid/sg_config.h"
19 #include "smpi/smpi_utils.hpp"
20 #include "colls/colls.h"
21 #include <simgrid/s4u/host.hpp>
23 #include "src/kernel/activity/SynchroComm.hpp"
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
28 static simgrid::config::Flag<double> smpi_wtime_sleep(
29 "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
30 static simgrid::config::Flag<double> smpi_init_sleep(
31 "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
33 void smpi_mpi_init() {
34 if(smpi_init_sleep > 0)
35 simcall_process_sleep(smpi_init_sleep);
38 double smpi_mpi_wtime(){
40 if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
42 time = SIMIX_get_clock();
43 // to avoid deadlocks if used as a break condition, such as
44 // while (MPI_Wtime(...) < time_limit) {
47 // because the time will not normally advance when only calls to MPI_Wtime
48 // are made -> deadlock (MPI_Wtime never reaches the time limit)
49 if(smpi_wtime_sleep > 0)
50 simcall_process_sleep(smpi_wtime_sleep);
53 time = SIMIX_get_clock();
59 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
61 smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
64 void smpi_mpi_barrier(MPI_Comm comm)
66 smpi_coll_tuned_barrier_ompi_basic_linear(comm);
69 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
70 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
72 int system_tag = COLL_TAG_GATHER;
76 int rank = comm->rank();
77 int size = comm->size();
79 // Send buffer to root
80 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
82 recvtype->extent(&lb, &recvext);
83 // Local copy from root
84 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
86 // Receive buffers from senders
87 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
89 for (int src = 0; src < size; src++) {
91 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
92 src, system_tag, comm);
96 // Wait for completion of irecv's.
97 Request::startall(size - 1, requests);
98 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
99 for (int src = 0; src < size-1; src++) {
100 Request::unuse(&requests[src]);
106 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
109 int rank = comm->rank();
111 /* arbitrarily choose root as rank 0 */
112 int size = comm->size();
114 int *displs = xbt_new(int, size);
115 for (int i = 0; i < size; i++) {
117 count += recvcounts[i];
119 void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
121 mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
122 smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
124 smpi_free_tmp_buffer(tmpbuf);
127 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
128 MPI_Datatype recvtype, int root, MPI_Comm comm)
130 int system_tag = COLL_TAG_GATHERV;
132 MPI_Aint recvext = 0;
134 int rank = comm->rank();
135 int size = comm->size();
137 // Send buffer to root
138 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
140 recvtype->extent(&lb, &recvext);
141 // Local copy from root
142 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
143 recvcounts[root], recvtype);
144 // Receive buffers from senders
145 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
147 for (int src = 0; src < size; src++) {
149 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
150 recvcounts[src], recvtype, src, system_tag, comm);
154 // Wait for completion of irecv's.
155 Request::startall(size - 1, requests);
156 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
157 for (int src = 0; src < size-1; src++) {
158 Request::unuse(&requests[src]);
164 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
165 void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
167 int system_tag = COLL_TAG_ALLGATHER;
169 MPI_Aint recvext = 0;
170 MPI_Request *requests;
172 int rank = comm->rank();
173 int size = comm->size();
174 // FIXME: check for errors
175 recvtype->extent(&lb, &recvext);
176 // Local copy from self
177 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
179 // Send/Recv buffers to/from others;
180 requests = xbt_new(MPI_Request, 2 * (size - 1));
182 for (int other = 0; other < size; other++) {
184 requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
186 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
187 other, system_tag, comm);
191 // Wait for completion of all comms.
192 Request::startall(2 * (size - 1), requests);
193 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
194 for (int other = 0; other < 2*(size-1); other++) {
195 Request::unuse(&requests[other]);
200 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
201 int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
203 int system_tag = COLL_TAG_ALLGATHERV;
205 MPI_Aint recvext = 0;
207 int rank = comm->rank();
208 int size = comm->size();
209 recvtype->extent(&lb, &recvext);
210 // Local copy from self
211 Datatype::copy(sendbuf, sendcount, sendtype,
212 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
213 // Send buffers to others;
214 MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
216 for (int other = 0; other < size; other++) {
219 Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
221 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
222 recvtype, other, system_tag, comm);
226 // Wait for completion of all comms.
227 Request::startall(2 * (size - 1), requests);
228 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
229 for (int other = 0; other < 2*(size-1); other++) {
230 Request::unuse(&requests[other]);
235 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
236 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
238 int system_tag = COLL_TAG_SCATTER;
240 MPI_Aint sendext = 0;
241 MPI_Request *requests;
243 int rank = comm->rank();
244 int size = comm->size();
246 // Recv buffer from root
247 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
249 sendtype->extent(&lb, &sendext);
250 // Local copy from root
251 if(recvbuf!=MPI_IN_PLACE){
252 Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
253 sendcount, sendtype, recvbuf, recvcount, recvtype);
255 // Send buffers to receivers
256 requests = xbt_new(MPI_Request, size - 1);
258 for(int dst = 0; dst < size; dst++) {
260 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
261 dst, system_tag, comm);
265 // Wait for completion of isend's.
266 Request::startall(size - 1, requests);
267 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
268 for (int dst = 0; dst < size-1; dst++) {
269 Request::unuse(&requests[dst]);
275 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
276 MPI_Datatype recvtype, int root, MPI_Comm comm)
278 int system_tag = COLL_TAG_SCATTERV;
280 MPI_Aint sendext = 0;
282 int rank = comm->rank();
283 int size = comm->size();
285 // Recv buffer from root
286 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
288 sendtype->extent(&lb, &sendext);
289 // Local copy from root
290 if(recvbuf!=MPI_IN_PLACE){
291 Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
292 sendtype, recvbuf, recvcount, recvtype);
294 // Send buffers to receivers
295 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
297 for (int dst = 0; dst < size; dst++) {
299 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
300 sendtype, dst, system_tag, comm);
304 // Wait for completion of isend's.
305 Request::startall(size - 1, requests);
306 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
307 for (int dst = 0; dst < size-1; dst++) {
308 Request::unuse(&requests[dst]);
314 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
317 int system_tag = COLL_TAG_REDUCE;
319 MPI_Aint dataext = 0;
321 char* sendtmpbuf = static_cast<char *>(sendbuf);
323 int rank = comm->rank();
324 int size = comm->size();
325 //non commutative case, use a working algo from openmpi
326 if(op != MPI_OP_NULL && !op->is_commutative()){
327 smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
331 if( sendbuf == MPI_IN_PLACE ) {
332 sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
333 Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
337 // Send buffer to root
338 Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
340 datatype->extent(&lb, &dataext);
341 // Local copy from root
342 if (sendtmpbuf != nullptr && recvbuf != nullptr)
343 Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
344 // Receive buffers from senders
345 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
346 void **tmpbufs = xbt_new(void *, size - 1);
348 for (int src = 0; src < size; src++) {
350 if (!smpi_process_get_replaying())
351 tmpbufs[index] = xbt_malloc(count * dataext);
353 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
355 Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
359 // Wait for completion of irecv's.
360 Request::startall(size - 1, requests);
361 for (int src = 0; src < size - 1; src++) {
362 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
363 XBT_DEBUG("finished waiting any request with index %d", index);
364 if(index == MPI_UNDEFINED) {
367 Request::unuse(&requests[index]);
369 if(op) /* op can be MPI_OP_NULL that does nothing */
370 if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
372 for(index = 0; index < size - 1; index++) {
373 smpi_free_tmp_buffer(tmpbufs[index]);
379 if( sendbuf == MPI_IN_PLACE ) {
380 smpi_free_tmp_buffer(sendtmpbuf);
384 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
386 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
387 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
390 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
392 int system_tag = -888;
394 MPI_Aint dataext = 0;
396 int rank = comm->rank();
397 int size = comm->size();
399 datatype->extent(&lb, &dataext);
401 // Local copy from self
402 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
404 // Send/Recv buffers to/from others;
405 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
406 void **tmpbufs = xbt_new(void *, rank);
408 for (int other = 0; other < rank; other++) {
409 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
410 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
413 for (int other = rank + 1; other < size; other++) {
414 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
417 // Wait for completion of all comms.
418 Request::startall(size - 1, requests);
420 if(op != MPI_OP_NULL && op->is_commutative()){
421 for (int other = 0; other < size - 1; other++) {
422 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
423 if(index == MPI_UNDEFINED) {
427 // #Request is below rank: it's a irecv
428 if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
432 //non commutative case, wait in order
433 for (int other = 0; other < size - 1; other++) {
434 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
436 if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
440 for(index = 0; index < rank; index++) {
441 smpi_free_tmp_buffer(tmpbufs[index]);
443 for(index = 0; index < size-1; index++) {
444 Request::unuse(&requests[index]);
450 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
452 int system_tag = -888;
454 MPI_Aint dataext = 0;
455 int recvbuf_is_empty=1;
456 int rank = comm->rank();
457 int size = comm->size();
459 datatype->extent(&lb, &dataext);
461 // Send/Recv buffers to/from others;
462 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
463 void **tmpbufs = xbt_new(void *, rank);
465 for (int other = 0; other < rank; other++) {
466 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
467 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
470 for (int other = rank + 1; other < size; other++) {
471 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
474 // Wait for completion of all comms.
475 Request::startall(size - 1, requests);
477 if(op != MPI_OP_NULL && op->is_commutative()){
478 for (int other = 0; other < size - 1; other++) {
479 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
480 if(index == MPI_UNDEFINED) {
484 if(recvbuf_is_empty){
485 Datatype::copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
488 // #Request is below rank: it's a irecv
489 if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
493 //non commutative case, wait in order
494 for (int other = 0; other < size - 1; other++) {
495 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
497 if (recvbuf_is_empty) {
498 Datatype::copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
499 recvbuf_is_empty = 0;
501 if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
505 for(index = 0; index < rank; index++) {
506 smpi_free_tmp_buffer(tmpbufs[index]);
508 for(index = 0; index < size-1; index++) {
509 Request::unuse(&requests[index]);
515 void smpi_empty_status(MPI_Status * status)
517 if(status != MPI_STATUS_IGNORE) {
518 status->MPI_SOURCE = MPI_ANY_SOURCE;
519 status->MPI_TAG = MPI_ANY_TAG;
520 status->MPI_ERROR = MPI_SUCCESS;
525 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
527 return status->count / datatype->size();