Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge s4u wait_any
authorGabriel Corona <gabriel.corona@loria.fr>
Fri, 22 Jul 2016 14:30:32 +0000 (16:30 +0200)
committerGabriel Corona <gabriel.corona@loria.fr>
Fri, 22 Jul 2016 14:30:32 +0000 (16:30 +0200)
12 files changed:
include/simgrid/s4u/comm.hpp
include/simgrid/simix.h
src/msg/msg_gos.cpp
src/s4u/s4u_comm.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.h
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/popping_private.h
src/simix/simcalls.in
src/simix/smx_network.cpp
src/smpi/smpi_base.cpp

index 2929750..891d300 100644 (file)
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/forward.hpp>
 #include <simgrid/s4u/mailbox.hpp>
+#include <simgrid/forward.h>
+
 
 namespace simgrid {
 namespace s4u {
 
+
 /** @brief Communication async
  *
  * Represents all asynchronous communications, that you can test or wait onto.
@@ -26,6 +29,55 @@ public:
   ~Comm() override;
 
 public:
+  
+  /*! tanke a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an iterator on the finished Comms. */
+  template<class I> static
+  I wait_any(I first, I last)
+  {
+    // Map to dynar<Synchro*>:
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::simix::Synchro*), NULL);
+    for(I iter = first; iter != last; iter++) {
+      Comm& comm = **iter;
+      if (comm.state_ == inited)
+        comm.start();
+      xbt_assert(comm.state_ == started);
+      xbt_dynar_push_as(comms, simgrid::simix::Synchro*, comm.pimpl_);
+    }
+    // Call the underlying simcall:
+    int idx = simcall_comm_waitany(comms, -1);
+    xbt_dynar_free(&comms);
+    // Not found:
+    if (idx == -1)
+      return last;
+    // Lift the index to the corresponding iterator:
+    auto res = std::next(first, idx);
+    (*res)->state_ = finished;
+    return res;
+  }
+  /*! Same as wait_any, but with a timeout. If wait_any_for return because of the timeout last is returned.*/
+  template<class I> static
+  I wait_any_for(I first, I last, double timeout)
+  {
+    // Map to dynar<Synchro*>:
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::simix::Synchro*), NULL);
+    for(I iter = first; iter != last; iter++) {
+      Comm& comm = **iter;
+      if (comm.state_ == inited)
+        comm.start();
+      xbt_assert(comm.state_ == started);
+      xbt_dynar_push_as(comms, simgrid::simix::Synchro*, comm.pimpl_);
+    }
+    // Call the underlying simcall:
+    int idx = simcall_comm_waitany(comms, timeout);
+    xbt_dynar_free(&comms);
+    // Not found:
+    if (idx == -1)
+      return last;
+    // Lift the index to the corresponding iterator:
+    auto res = std::next(first, idx);
+    (*res)->state_ = finished;
+    return res;
+  }
   /** Creates (but don't start) an async send to the mailbox @p dest */
   static Comm &send_init(Mailbox &dest);
   /** Creates and start an async send to the mailbox @p dest */
index f996f13..6b7f31b 100644 (file)
@@ -359,7 +359,7 @@ XBT_PUBLIC(smx_synchro_t) simcall_comm_iprobe(smx_mailbox_t mbox, int type, int
 XBT_PUBLIC(void) simcall_comm_cancel(smx_synchro_t comm);
 
 /* FIXME: waitany is going to be a vararg function, and should take a timeout */
-XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms);
+XBT_PUBLIC(unsigned int) simcall_comm_waitany(xbt_dynar_t comms, double timeout);
 XBT_PUBLIC(void) simcall_comm_wait(smx_synchro_t comm, double timeout);
 XBT_PUBLIC(int) simcall_comm_test(smx_synchro_t comm);
 XBT_PUBLIC(int) simcall_comm_testany(smx_synchro_t* comms, size_t count);
index 5a1f0b5..1d5ebf1 100644 (file)
@@ -627,7 +627,7 @@ int MSG_comm_waitany(xbt_dynar_t comms)
 
   msg_error_t status = MSG_OK;
   try {
-    finished_index = simcall_comm_waitany(s_comms);
+    finished_index = simcall_comm_waitany(s_comms, -1);
   }
   catch(xbt_ex& e) {
     switch (e.category) {
index 401a6c2..e10d2b0 100644 (file)
@@ -18,6 +18,8 @@ Comm::~Comm() {
 
 }
 
+
+
 s4u::Comm &Comm::send_init(s4u::Mailbox &chan) {
   s4u::Comm *res = new s4u::Comm();
   res->sender_ = SIMIX_process_self();
index 10177cc..8cf08c4 100644 (file)
@@ -765,9 +765,9 @@ void simcall_comm_cancel(smx_synchro_t synchro)
 /**
  * \ingroup simix_comm_management
  */
-unsigned int simcall_comm_waitany(xbt_dynar_t comms)
+unsigned int simcall_comm_waitany(xbt_dynar_t comms, double timeout)
 {
-  return simcall_BODY_comm_waitany(comms);
+  return simcall_BODY_comm_waitany(comms, timeout);
 }
 
 /**
index 64db9d1..dede08b 100644 (file)
@@ -657,6 +657,12 @@ static inline xbt_dynar_t simcall_comm_waitany__get__comms(smx_simcall_t simcall
 static inline void simcall_comm_waitany__set__comms(smx_simcall_t simcall, xbt_dynar_t arg) {
     simgrid::simix::marshal<xbt_dynar_t>(simcall->args[0], arg);
 }
+static inline double simcall_comm_waitany__get__timeout(smx_simcall_t simcall) {
+  return simgrid::simix::unmarshal<double>(simcall->args[1]);
+}
+static inline void simcall_comm_waitany__set__timeout(smx_simcall_t simcall, double arg) {
+    simgrid::simix::marshal<double>(simcall->args[1], arg);
+}
 static inline int simcall_comm_waitany__get__result(smx_simcall_t simcall){
     return simgrid::simix::unmarshal<int>(simcall->result);
 }
@@ -1185,7 +1191,7 @@ XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t
 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate);
 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms);
+XBT_PRIVATE void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t comms, double timeout);
 XBT_PRIVATE void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t comm, double timeout);
 XBT_PRIVATE void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t comm);
 XBT_PRIVATE void simcall_HANDLER_comm_testany(smx_simcall_t simcall, smx_synchro_t* comms, size_t count);
index f830047..1910808 100644 (file)
@@ -221,10 +221,10 @@ inline static smx_synchro_t simcall_BODY_comm_irecv(smx_process_t receiver, smx_
     return simcall<smx_synchro_t, smx_process_t, smx_mailbox_t, void*, size_t*, simix_match_func_t, simix_copy_data_func_t, void*, double>(SIMCALL_COMM_IRECV, receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
   }
   
-inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms) {
+inline static int simcall_BODY_comm_waitany(xbt_dynar_t comms, double timeout) {
     /* Go to that function to follow the code flow through the simcall barrier */
-    if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms);
-    return simcall<int, xbt_dynar_t>(SIMCALL_COMM_WAITANY, comms);
+    if (0) simcall_HANDLER_comm_waitany(&SIMIX_process_self()->simcall, comms, timeout);
+    return simcall<int, xbt_dynar_t, double>(SIMCALL_COMM_WAITANY, comms, timeout);
   }
   
 inline static void simcall_BODY_comm_wait(smx_synchro_t comm, double timeout) {
index e5bd57b..3e13053 100644 (file)
@@ -254,7 +254,7 @@ case SIMCALL_COMM_IRECV:
       break;
 
 case SIMCALL_COMM_WAITANY:
-      simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]));
+      simcall_HANDLER_comm_waitany(simcall, simgrid::simix::unmarshal<xbt_dynar_t>(simcall->args[0]), simgrid::simix::unmarshal<double>(simcall->args[1]));
       break;
 
 case SIMCALL_COMM_WAIT:
index d6a3a57..d532dbd 100644 (file)
@@ -45,6 +45,7 @@ union u_smx_scalar {
 struct s_smx_simcall {
   e_smx_simcall_t call;
   smx_process_t issuer;
+  smx_timer_t timer;
   int mc_value;
   union u_smx_scalar args[11];
   union u_smx_scalar result;
index dbef079..476dd57 100644 (file)
@@ -72,7 +72,7 @@ void          comm_send(smx_process_t sender, smx_mailbox_t mbox, double task_si
 smx_synchro_t comm_isend(smx_process_t sender, smx_mailbox_t mbox, double task_size, double rate, void* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_clean_func_t clean_fun, simix_copy_data_func_t copy_data_fun, void* data, int detached);
 void          comm_recv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout, double rate) [[block]];
 smx_synchro_t comm_irecv(smx_process_t receiver, smx_mailbox_t mbox, void* dst_buff, size_t* dst_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double rate);
-int           comm_waitany(xbt_dynar_t comms) [[block]];
+int           comm_waitany(xbt_dynar_t comms, double timeout) [[block]];
 void          comm_wait(smx_synchro_t comm, double timeout) [[block]];
 int           comm_test(smx_synchro_t comm) [[block]];
 int           comm_testany(smx_synchro_t* comms, size_t count) [[block]];
index 0b47f73..25783eb 100644 (file)
@@ -478,12 +478,14 @@ void simcall_HANDLER_comm_testany(
   SIMIX_simcall_answer(simcall);
 }
 
-void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
+void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
 {
   smx_synchro_t synchro;
   unsigned int cursor = 0;
 
   if (MC_is_active() || MC_record_replay_is_active()){
+    if (timeout != -1)
+      xbt_die("Timeout not implemented for waitany in the model-checker"); 
     int idx = SIMCALL_GET_MC_VALUE(simcall);
     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
     synchro->simcalls.push_back(simcall);
@@ -492,7 +494,17 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
     SIMIX_comm_finish(synchro);
     return;
   }
-
+  
+  if (timeout == -1 ){
+    simcall->timer = NULL;
+  } else {
+    simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
+      SIMIX_waitany_remove_simcall_from_actions(simcall);
+      simcall_comm_waitany__set__result(simcall, -1);
+      SIMIX_simcall_answer(simcall);
+    });
+  }
+  
   xbt_dynar_foreach(synchros, cursor, synchro){
     /* associate this simcall to the the synchro */
     synchro->simcalls.push_back(simcall);
@@ -584,6 +596,10 @@ void SIMIX_comm_finish(smx_synchro_t synchro)
       continue; // if process handling comm is killed
     if (simcall->call == SIMCALL_COMM_WAITANY) {
       SIMIX_waitany_remove_simcall_from_actions(simcall);
+      if (simcall->timer) {
+        SIMIX_timer_remove(simcall->timer);
+        simcall->timer = nullptr;
+      }
       if (!MC_is_active() && !MC_record_replay_is_active())
         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
     }
index b1389d3..d60b2bf 100644 (file)
@@ -946,7 +946,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status)
       }
     }
     if(size > 0) {
-      i = simcall_comm_waitany(comms);
+      i = simcall_comm_waitany(comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {