Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move all smpi colls to cpp.
[simgrid.git] / src / smpi / smpi_base.cpp
index 1448c8e..0ec0ca0 100644 (file)
@@ -18,6 +18,7 @@
 #include "simgrid/sg_config.h"
 #include "smpi/smpi_utils.hpp"
 #include "colls/colls.h"
+#include <simgrid/s4u/host.hpp>
 
 #include "src/kernel/activity/SynchroComm.hpp"
 
@@ -780,8 +781,12 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
   // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
   static int nsleeps = 1;
-  if(smpi_iprobe_sleep > 0)  
-    simcall_process_sleep(nsleeps*smpi_iprobe_sleep);
+  double speed       = simgrid::s4u::Actor::self()->host()->speed();
+  double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
+  if (smpi_iprobe_sleep > 0) {
+    smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
+    simcall_execution_wait(iprobe_sleep);
+  }
   // behave like a receive, but don't do it
   smx_mailbox_t mailbox;
 
@@ -1341,7 +1346,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da
 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  MPI_Aint lb = 0, dataext = 0;
+  MPI_Aint lb      = 0;
+  MPI_Aint dataext = 0;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1400,7 +1406,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  MPI_Aint lb = 0, dataext = 0;
+  MPI_Aint lb         = 0;
+  MPI_Aint dataext    = 0;
   int recvbuf_is_empty=1;
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1422,6 +1429,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
   }
   // Wait for completion of all comms.
   smpi_mpi_startall(size - 1, requests);
+
   if(smpi_op_is_commute(op)){
     for (int other = 0; other < size - 1; other++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
@@ -1442,11 +1450,11 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
     for (int other = 0; other < size - 1; other++) {
       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
       if(index < rank) {
-          if(recvbuf_is_empty){
-            smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
-            recvbuf_is_empty=0;
-          } else
-            smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
+        if (recvbuf_is_empty) {
+          smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
+          recvbuf_is_empty = 0;
+        } else
+          smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
       }
     }
   }