Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Updated/elaborated on comment about iprobes
[simgrid.git] / src / smpi / smpi_base.cpp
index 1448c8e..b64113c 100644 (file)
@@ -18,6 +18,7 @@
 #include "simgrid/sg_config.h"
 #include "smpi/smpi_utils.hpp"
 #include "colls/colls.h"
+#include <simgrid/s4u/host.hpp>
 
 #include "src/kernel/activity/SynchroComm.hpp"
 
@@ -778,10 +779,15 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
 
   // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
   // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
-  // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+  // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+  // (This can speed up the execution of certain applications by an order of magnitude, such as HPL)
   static int nsleeps = 1;
-  if(smpi_iprobe_sleep > 0)  
-    simcall_process_sleep(nsleeps*smpi_iprobe_sleep);
+  double speed       = simgrid::s4u::Actor::self()->host()->speed();
+  double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage");
+  if (smpi_iprobe_sleep > 0) {
+    smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed);
+    simcall_execution_wait(iprobe_sleep);
+  }
   // behave like a receive, but don't do it
   smx_mailbox_t mailbox;
 
@@ -1341,7 +1347,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da
 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  MPI_Aint lb = 0, dataext = 0;
+  MPI_Aint lb      = 0;
+  MPI_Aint dataext = 0;
 
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1400,7 +1407,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp
 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
   int system_tag = -888;
-  MPI_Aint lb = 0, dataext = 0;
+  MPI_Aint lb         = 0;
+  MPI_Aint dataext    = 0;
   int recvbuf_is_empty=1;
   int rank = smpi_comm_rank(comm);
   int size = smpi_comm_size(comm);
@@ -1422,6 +1430,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
   }
   // Wait for completion of all comms.
   smpi_mpi_startall(size - 1, requests);
+
   if(smpi_op_is_commute(op)){
     for (int other = 0; other < size - 1; other++) {
       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
@@ -1442,11 +1451,11 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat
     for (int other = 0; other < size - 1; other++) {
       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
       if(index < rank) {
-          if(recvbuf_is_empty){
-            smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
-            recvbuf_is_empty=0;
-          } else
-            smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
+        if (recvbuf_is_empty) {
+          smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
+          recvbuf_is_empty = 0;
+        } else
+          smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
       }
     }
   }