1 /* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <xbt/config.hpp>
10 #include "xbt/virtu.h"
12 #include "src/mc/mc_replay.h"
14 #include "src/simix/smx_private.h"
15 #include "surf/surf.h"
16 #include "simgrid/sg_config.h"
17 #include "smpi/smpi_utils.hpp"
18 #include "colls/colls.h"
19 #include <simgrid/s4u/host.hpp>
21 #include "src/kernel/activity/SynchroComm.hpp"
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
25 static simgrid::config::Flag<double> smpi_wtime_sleep(
26 "smpi/wtime", "Minimum time to inject inside a call to MPI_Wtime", 0.0);
27 static simgrid::config::Flag<double> smpi_init_sleep(
28 "smpi/init", "Time to inject inside a call to MPI_Init", 0.0);
30 void smpi_mpi_init() {
31 if(smpi_init_sleep > 0)
32 simcall_process_sleep(smpi_init_sleep);
35 double smpi_mpi_wtime(){
37 if (smpi_process_initialized() != 0 && smpi_process_finalized() == 0 && smpi_process_get_sampling() == 0) {
39 time = SIMIX_get_clock();
40 // to avoid deadlocks if used as a break condition, such as
41 // while (MPI_Wtime(...) < time_limit) {
44 // because the time will not normally advance when only calls to MPI_Wtime
45 // are made -> deadlock (MPI_Wtime never reaches the time limit)
46 if(smpi_wtime_sleep > 0)
47 simcall_process_sleep(smpi_wtime_sleep);
50 time = SIMIX_get_clock();
55 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
57 smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
60 void smpi_mpi_barrier(MPI_Comm comm)
62 smpi_coll_tuned_barrier_ompi_basic_linear(comm);
65 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
66 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
68 int system_tag = COLL_TAG_GATHER;
72 int rank = comm->rank();
73 int size = comm->size();
75 // Send buffer to root
76 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
78 recvtype->extent(&lb, &recvext);
79 // Local copy from root
80 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
82 // Receive buffers from senders
83 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
85 for (int src = 0; src < size; src++) {
87 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
88 src, system_tag, comm);
92 // Wait for completion of irecv's.
93 Request::startall(size - 1, requests);
94 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
95 for (int src = 0; src < size-1; src++) {
96 Request::unref(&requests[src]);
102 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op,
105 int rank = comm->rank();
107 /* arbitrarily choose root as rank 0 */
108 int size = comm->size();
110 int *displs = xbt_new(int, size);
111 for (int i = 0; i < size; i++) {
113 count += recvcounts[i];
115 void *tmpbuf = static_cast<void*>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
117 mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
118 smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm);
120 smpi_free_tmp_buffer(tmpbuf);
123 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs,
124 MPI_Datatype recvtype, int root, MPI_Comm comm)
126 int system_tag = COLL_TAG_GATHERV;
128 MPI_Aint recvext = 0;
130 int rank = comm->rank();
131 int size = comm->size();
133 // Send buffer to root
134 Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm);
136 recvtype->extent(&lb, &recvext);
137 // Local copy from root
138 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
139 recvcounts[root], recvtype);
140 // Receive buffers from senders
141 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
143 for (int src = 0; src < size; src++) {
145 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
146 recvcounts[src], recvtype, src, system_tag, comm);
150 // Wait for completion of irecv's.
151 Request::startall(size - 1, requests);
152 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
153 for (int src = 0; src < size-1; src++) {
154 Request::unref(&requests[src]);
160 void smpi_mpi_allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
161 void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
163 int system_tag = COLL_TAG_ALLGATHER;
165 MPI_Aint recvext = 0;
166 MPI_Request *requests;
168 int rank = comm->rank();
169 int size = comm->size();
170 // FIXME: check for errors
171 recvtype->extent(&lb, &recvext);
172 // Local copy from self
173 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
175 // Send/Recv buffers to/from others;
176 requests = xbt_new(MPI_Request, 2 * (size - 1));
178 for (int other = 0; other < size; other++) {
180 requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
182 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
183 other, system_tag, comm);
187 // Wait for completion of all comms.
188 Request::startall(2 * (size - 1), requests);
189 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
190 for (int other = 0; other < 2*(size-1); other++) {
191 Request::unref(&requests[other]);
196 void smpi_mpi_allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
197 int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)
199 int system_tag = COLL_TAG_ALLGATHERV;
201 MPI_Aint recvext = 0;
203 int rank = comm->rank();
204 int size = comm->size();
205 recvtype->extent(&lb, &recvext);
206 // Local copy from self
207 Datatype::copy(sendbuf, sendcount, sendtype,
208 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
209 // Send buffers to others;
210 MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1));
212 for (int other = 0; other < size; other++) {
215 Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
217 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
218 recvtype, other, system_tag, comm);
222 // Wait for completion of all comms.
223 Request::startall(2 * (size - 1), requests);
224 Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
225 for (int other = 0; other < 2*(size-1); other++) {
226 Request::unref(&requests[other]);
231 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
232 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
234 int system_tag = COLL_TAG_SCATTER;
236 MPI_Aint sendext = 0;
237 MPI_Request *requests;
239 int rank = comm->rank();
240 int size = comm->size();
242 // Recv buffer from root
243 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
245 sendtype->extent(&lb, &sendext);
246 // Local copy from root
247 if(recvbuf!=MPI_IN_PLACE){
248 Datatype::copy(static_cast<char *>(sendbuf) + root * sendcount * sendext,
249 sendcount, sendtype, recvbuf, recvcount, recvtype);
251 // Send buffers to receivers
252 requests = xbt_new(MPI_Request, size - 1);
254 for(int dst = 0; dst < size; dst++) {
256 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
257 dst, system_tag, comm);
261 // Wait for completion of isend's.
262 Request::startall(size - 1, requests);
263 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
264 for (int dst = 0; dst < size-1; dst++) {
265 Request::unref(&requests[dst]);
271 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
272 MPI_Datatype recvtype, int root, MPI_Comm comm)
274 int system_tag = COLL_TAG_SCATTERV;
276 MPI_Aint sendext = 0;
278 int rank = comm->rank();
279 int size = comm->size();
281 // Recv buffer from root
282 Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
284 sendtype->extent(&lb, &sendext);
285 // Local copy from root
286 if(recvbuf!=MPI_IN_PLACE){
287 Datatype::copy(static_cast<char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
288 sendtype, recvbuf, recvcount, recvtype);
290 // Send buffers to receivers
291 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
293 for (int dst = 0; dst < size; dst++) {
295 requests[index] = Request::isend_init(static_cast<char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
296 sendtype, dst, system_tag, comm);
300 // Wait for completion of isend's.
301 Request::startall(size - 1, requests);
302 Request::waitall(size - 1, requests, MPI_STATUS_IGNORE);
303 for (int dst = 0; dst < size-1; dst++) {
304 Request::unref(&requests[dst]);
310 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
313 int system_tag = COLL_TAG_REDUCE;
315 MPI_Aint dataext = 0;
317 char* sendtmpbuf = static_cast<char *>(sendbuf);
319 int rank = comm->rank();
320 int size = comm->size();
321 //non commutative case, use a working algo from openmpi
322 if(op != MPI_OP_NULL && !op->is_commutative()){
323 smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count, datatype, op, root, comm);
327 if( sendbuf == MPI_IN_PLACE ) {
328 sendtmpbuf = static_cast<char *>(smpi_get_tmp_sendbuffer(count*datatype->get_extent()));
329 Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
333 // Send buffer to root
334 Request::send(sendtmpbuf, count, datatype, root, system_tag, comm);
336 datatype->extent(&lb, &dataext);
337 // Local copy from root
338 if (sendtmpbuf != nullptr && recvbuf != nullptr)
339 Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
340 // Receive buffers from senders
341 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
342 void **tmpbufs = xbt_new(void *, size - 1);
344 for (int src = 0; src < size; src++) {
346 if (!smpi_process_get_replaying())
347 tmpbufs[index] = xbt_malloc(count * dataext);
349 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
351 Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
355 // Wait for completion of irecv's.
356 Request::startall(size - 1, requests);
357 for (int src = 0; src < size - 1; src++) {
358 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
359 XBT_DEBUG("finished waiting any request with index %d", index);
360 if(index == MPI_UNDEFINED) {
363 Request::unref(&requests[index]);
365 if (op != MPI_OP_NULL) /* op can be MPI_OP_NULL that does nothing */
366 op->apply(tmpbufs[index], recvbuf, &count, datatype);
368 for(index = 0; index < size - 1; index++) {
369 smpi_free_tmp_buffer(tmpbufs[index]);
375 if( sendbuf == MPI_IN_PLACE ) {
376 smpi_free_tmp_buffer(sendtmpbuf);
380 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
382 smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
383 smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
386 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
388 int system_tag = -888;
390 MPI_Aint dataext = 0;
392 int rank = comm->rank();
393 int size = comm->size();
395 datatype->extent(&lb, &dataext);
397 // Local copy from self
398 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
400 // Send/Recv buffers to/from others;
401 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
402 void **tmpbufs = xbt_new(void *, rank);
404 for (int other = 0; other < rank; other++) {
405 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
406 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
409 for (int other = rank + 1; other < size; other++) {
410 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
413 // Wait for completion of all comms.
414 Request::startall(size - 1, requests);
416 if(op != MPI_OP_NULL && op->is_commutative()){
417 for (int other = 0; other < size - 1; other++) {
418 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
419 if(index == MPI_UNDEFINED) {
423 // #Request is below rank: it's a irecv.
424 op->apply(tmpbufs[index], recvbuf, &count, datatype);
427 //non commutative case, wait in order
428 for (int other = 0; other < size - 1; other++) {
429 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
430 if (index < rank && op != MPI_OP_NULL)
431 op->apply(tmpbufs[other], recvbuf, &count, datatype);
434 for(index = 0; index < rank; index++) {
435 smpi_free_tmp_buffer(tmpbufs[index]);
437 for(index = 0; index < size-1; index++) {
438 Request::unref(&requests[index]);
444 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
446 int system_tag = -888;
448 MPI_Aint dataext = 0;
449 int recvbuf_is_empty=1;
450 int rank = comm->rank();
451 int size = comm->size();
453 datatype->extent(&lb, &dataext);
455 // Send/Recv buffers to/from others;
456 MPI_Request *requests = xbt_new(MPI_Request, size - 1);
457 void **tmpbufs = xbt_new(void *, rank);
459 for (int other = 0; other < rank; other++) {
460 tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
461 requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
464 for (int other = rank + 1; other < size; other++) {
465 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
468 // Wait for completion of all comms.
469 Request::startall(size - 1, requests);
471 if(op != MPI_OP_NULL && op->is_commutative()){
472 for (int other = 0; other < size - 1; other++) {
473 index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE);
474 if(index == MPI_UNDEFINED) {
478 if(recvbuf_is_empty){
479 Datatype::copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
482 // #Request is below rank: it's a irecv
483 op->apply(tmpbufs[index], recvbuf, &count, datatype);
487 //non commutative case, wait in order
488 for (int other = 0; other < size - 1; other++) {
489 Request::wait(&(requests[other]), MPI_STATUS_IGNORE);
491 if (recvbuf_is_empty) {
492 Datatype::copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
493 recvbuf_is_empty = 0;
495 if(op!=MPI_OP_NULL) op->apply( tmpbufs[other], recvbuf, &count, datatype);
499 for(index = 0; index < rank; index++) {
500 smpi_free_tmp_buffer(tmpbufs[index]);
502 for(index = 0; index < size-1; index++) {
503 Request::unref(&requests[index]);
509 void smpi_empty_status(MPI_Status * status)
511 if(status != MPI_STATUS_IGNORE) {
512 status->MPI_SOURCE = MPI_ANY_SOURCE;
513 status->MPI_TAG = MPI_ANY_TAG;
514 status->MPI_ERROR = MPI_SUCCESS;
519 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
521 return status->count / datatype->size();