1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <xbt/config.hpp>
10 #include "xbt/virtu.h"
12 #include "src/mc/mc_replay.h"
13 #include "xbt/replay.h"
15 #include "src/simix/smx_private.h"
16 #include "surf/surf.h"
17 #include "simgrid/sg_config.h"
18 #include "smpi/smpi_utils.hpp"
19 #include "colls/colls.h"
20 #include <simgrid/s4u/host.hpp>
22 #include "src/kernel/activity/SynchroComm.hpp"
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
27 static simgrid::config::Flag<double> smpi_wtime_sleep(
28 "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
29 static simgrid::config::Flag<double> smpi_init_sleep(
30 "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
32 void smpi_mpi_init() {
33 if(smpi_init_sleep > 0)
34 simcall_process_sleep(smpi_init_sleep);
37 double smpi_mpi_wtime(){
39 if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
41 time = SIMIX_get_clock();
42 // to avoid deadlocks if used as a break condition, such as
43 // while (MPI_Wtime(...) < time_limit) {
46 // because the time will not normally advance when only calls to MPI_Wtime
47 // are made -> deadlock (MPI_Wtime never reaches the time limit)
48 if(smpi_wtime_sleep > 0)
49 simcall_process_sleep(smpi_wtime_sleep);
52 time = SIMIX_get_clock();
58 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
60 smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
63 void smpi_mpi_barrier(MPI_Comm comm)
65 smpi_coll_tuned_barrier_ompi_basic_linear(comm);
68 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
69 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
71 int system_tag = COLL_TAG_GATHER;
75 int rank = comm->rank();
76 int size = comm->size();
78 // Send buffer to root
79 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
81 recvtype->extent(&lb, &recvext);
82 // Local copy from root
83 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
85 // Receive buffers from senders
86 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
88 for (int src = 0; src < size; src++) {
90 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
91 src, system_tag, comm);
95 // Wait for completion of irecv's.
96 Request::startall(size - 1, requests);
97 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
98 for (int src = 0; src < size-1; src++) {
99 Request::unref(&requests[src]);
105 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
108 int rank = comm->rank();
110 /* arbitrarily choose root as rank 0 */
111 int size = comm->size();
113 int *displs = xbt_new(int, size);
114 for (int i = 0; i < size; i++) {
116 count += recvcounts[i];
118 void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
120 mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
121 smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
123 smpi_free_tmp_buffer(tmpbuf);
126 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
127 MPI_Datatype recvtype, int root, MPI_Comm comm)
129 int system_tag = COLL_TAG_GATHERV;
131 MPI_Aint recvext = 0;
133 int rank = comm->rank();
134 int size = comm->size();
136 // Send buffer to root
137 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
139 recvtype->extent(&lb, &recvext);
140 // Local copy from root
141 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
142 recvcounts[root], recvtype);
143 // Receive buffers from senders
144 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
146 for (int src = 0; src < size; src++) {
148 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
149 recvcounts[src], recvtype, src, system_tag, comm);
153 // Wait for completion of irecv's.
154 Request::startall(size - 1, requests);
155 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
156 for (int src = 0; src < size-1; src++) {
157 Request::unref(&requests[src]);
163 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
164 void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
166 int system_tag = COLL_TAG_ALLGATHER;
168 MPI_Aint recvext = 0;
169 MPI_Request *requests;
171 int rank = comm->rank();
172 int size = comm->size();
173 // FIXME: check for errors
174 recvtype->extent(&lb, &recvext);
175 // Local copy from self
176 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
178 // Send/Recv buffers to/from others;
179 requests = xbt_new(MPI_Request, 2 * (size - 1));
181 for (int other = 0; other < size; other++) {
183 requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
185 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
186 other, system_tag, comm);
190 // Wait for completion of all comms.
191 Request::startall(2 * (size - 1), requests);
192 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
193 for (int other = 0; other < 2*(size-1); other++) {
194 Request::unref(&requests[other]);
199 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
200 int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
202 int system_tag = COLL_TAG_ALLGATHERV;
204 MPI_Aint recvext = 0;
206 int rank = comm->rank();
207 int size = comm->size();
208 recvtype->extent(&lb, &recvext);
209 // Local copy from self
210 Datatype::copy(sendbuf, sendcount, sendtype,
211 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
212 // Send buffers to others;
213 MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
215 for (int other = 0; other < size; other++) {
218 Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
220 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
221 recvtype, other, system_tag, comm);
225 // Wait for completion of all comms.
226 Request::startall(2 * (size - 1), requests);
227 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
228 for (int other = 0; other < 2*(size-1); other++) {
229 Request::unref(&requests[other]);
234 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
235 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
237 int system_tag = COLL_TAG_SCATTER;
239 MPI_Aint sendext = 0;
240 MPI_Request *requests;
242 int rank = comm->rank();
243 int size = comm->size();
245 // Recv buffer from root
246 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
248 sendtype->extent(&lb, &sendext);
249 // Local copy from root
250 if(recvbuf!=MPI_IN_PLACE){
251 Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
252 sendcount, sendtype, recvbuf, recvcount, recvtype);
254 // Send buffers to receivers
255 requests = xbt_new(MPI_Request, size - 1);
257 for(int dst = 0; dst < size; dst++) {
259 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
260 dst, system_tag, comm);
264 // Wait for completion of isend's.
265 Request::startall(size - 1, requests);
266 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
267 for (int dst = 0; dst < size-1; dst++) {
268 Request::unref(&requests[dst]);
274 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
275 MPI_Datatype recvtype, int root, MPI_Comm comm)
277 int system_tag = COLL_TAG_SCATTERV;
279 MPI_Aint sendext = 0;
281 int rank = comm->rank();
282 int size = comm->size();
284 // Recv buffer from root
285 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
287 sendtype->extent(&lb, &sendext);
288 // Local copy from root
289 if(recvbuf!=MPI_IN_PLACE){
290 Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
291 sendtype, recvbuf, recvcount, recvtype);
293 // Send buffers to receivers
294 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
296 for (int dst = 0; dst < size; dst++) {
298 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
299 sendtype, dst, system_tag, comm);
303 // Wait for completion of isend's.
304 Request::startall(size - 1, requests);
305 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
306 for (int dst = 0; dst < size-1; dst++) {
307 Request::unref(&requests[dst]);
313 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
316 int system_tag = COLL_TAG_REDUCE;
318 MPI_Aint dataext = 0;
320 char* sendtmpbuf = static_cast<char *>(sendbuf);
322 int rank = comm->rank();
323 int size = comm->size();
324 //non commutative case, use a working algo from openmpi
325 if(op != MPI_OP_NULL && !op->is_commutative()){
326 smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
330 if( sendbuf == MPI_IN_PLACE ) {
331 sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
332 Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
336 // Send buffer to root
337 Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
339 datatype->extent(&lb, &dataext);
340 // Local copy from root
341 if (sendtmpbuf != nullptr && recvbuf != nullptr)
342 Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
343 // Receive buffers from senders
344 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
345 void **tmpbufs = xbt_new(void *, size - 1);
347 for (int src = 0; src < size; src++) {
349 if (!smpi_process_get_replaying())
350 tmpbufs[index] = xbt_malloc(count * dataext);
352 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
354 Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
358 // Wait for completion of irecv's.
359 Request::startall(size - 1, requests);
360 for (int src = 0; src < size - 1; src++) {
361 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
362 XBT_DEBUG("finished waiting any request with index %d", index);
363 if(index == MPI_UNDEFINED) {
366 Request::unref(&requests[index]);
368 if (op != MPI_OP_NULL) /* op can be MPI_OP_NULL that does nothing */
369 op->apply(tmpbufs[index], recvbuf, &count, datatype);
371 for(index = 0; index < size - 1; index++) {
372 smpi_free_tmp_buffer(tmpbufs[index]);
378 if( sendbuf == MPI_IN_PLACE ) {
379 smpi_free_tmp_buffer(sendtmpbuf);
383 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
385 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
386 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
389 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
391 int system_tag = -888;
393 MPI_Aint dataext = 0;
395 int rank = comm->rank();
396 int size = comm->size();
398 datatype->extent(&lb, &dataext);
400 // Local copy from self
401 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
403 // Send/Recv buffers to/from others;
404 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
405 void **tmpbufs = xbt_new(void *, rank);
407 for (int other = 0; other < rank; other++) {
408 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
409 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
412 for (int other = rank + 1; other < size; other++) {
413 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
416 // Wait for completion of all comms.
417 Request::startall(size - 1, requests);
419 if(op != MPI_OP_NULL && op->is_commutative()){
420 for (int other = 0; other < size - 1; other++) {
421 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
422 if(index == MPI_UNDEFINED) {
426 // #Request is below rank: it's a irecv.
427 op->apply(tmpbufs[index], recvbuf, &count, datatype);
430 //non commutative case, wait in order
431 for (int other = 0; other < size - 1; other++) {
432 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
433 if (index < rank && op != MPI_OP_NULL)
434 op->apply(tmpbufs[other], recvbuf, &count, datatype);
437 for(index = 0; index < rank; index++) {
438 smpi_free_tmp_buffer(tmpbufs[index]);
440 for(index = 0; index < size-1; index++) {
441 Request::unref(&requests[index]);
447 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
449 int system_tag = -888;
451 MPI_Aint dataext = 0;
452 int recvbuf_is_empty=1;
453 int rank = comm->rank();
454 int size = comm->size();
456 datatype->extent(&lb, &dataext);
458 // Send/Recv buffers to/from others;
459 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
460 void **tmpbufs = xbt_new(void *, rank);
462 for (int other = 0; other < rank; other++) {
463 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
464 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
467 for (int other = rank + 1; other < size; other++) {
468 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
471 // Wait for completion of all comms.
472 Request::startall(size - 1, requests);
474 if(op != MPI_OP_NULL && op->is_commutative()){
475 for (int other = 0; other < size - 1; other++) {
476 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
477 if(index == MPI_UNDEFINED) {
481 if(recvbuf_is_empty){
482 Datatype::copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
485 // #Request is below rank: it's a irecv
486 if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype);
490 //non commutative case, wait in order
491 for (int other = 0; other < size - 1; other++) {
492 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
494 if (recvbuf_is_empty) {
495 Datatype::copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
496 recvbuf_is_empty = 0;
498 if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
502 for(index = 0; index < rank; index++) {
503 smpi_free_tmp_buffer(tmpbufs[index]);
505 for(index = 0; index < size-1; index++) {
506 Request::unref(&requests[index]);
512 void smpi_empty_status(MPI_Status * status)
514 if(status != MPI_STATUS_IGNORE) {
515 status->MPI_SOURCE = MPI_ANY_SOURCE;
516 status->MPI_TAG = MPI_ANY_TAG;
517 status->MPI_ERROR = MPI_SUCCESS;
522 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
524 return status->count / datatype->size();