Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[s4u] add Comm::wait_any
authoradfaure <adrien.faure2@gmail.com>
Thu, 21 Jul 2016 14:12:48 +0000 (16:12 +0200)
committeradfaure <adrien.faure2@gmail.com>
Fri, 22 Jul 2016 13:11:41 +0000 (15:11 +0200)
include/simgrid/s4u/comm.hpp
src/s4u/s4u_comm.cpp
src/simix/smx_network.cpp

index 2929750..891d300 100644 (file)
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/forward.hpp>
 #include <simgrid/s4u/mailbox.hpp>
+#include <simgrid/forward.h>
+
 
 namespace simgrid {
 namespace s4u {
 
+
 /** @brief Communication async
  *
  * Represents all asynchronous communications, that you can test or wait onto.
@@ -26,6 +29,55 @@ public:
   ~Comm() override;
 
 public:
+  
+  /*! tanke a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an iterator on the finished Comms. */
+  template<class I> static
+  I wait_any(I first, I last)
+  {
+    // Map to dynar<Synchro*>:
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::simix::Synchro*), NULL);
+    for(I iter = first; iter != last; iter++) {
+      Comm& comm = **iter;
+      if (comm.state_ == inited)
+        comm.start();
+      xbt_assert(comm.state_ == started);
+      xbt_dynar_push_as(comms, simgrid::simix::Synchro*, comm.pimpl_);
+    }
+    // Call the underlying simcall:
+    int idx = simcall_comm_waitany(comms, -1);
+    xbt_dynar_free(&comms);
+    // Not found:
+    if (idx == -1)
+      return last;
+    // Lift the index to the corresponding iterator:
+    auto res = std::next(first, idx);
+    (*res)->state_ = finished;
+    return res;
+  }
+  /*! Same as wait_any, but with a timeout. If wait_any_for return because of the timeout last is returned.*/
+  template<class I> static
+  I wait_any_for(I first, I last, double timeout)
+  {
+    // Map to dynar<Synchro*>:
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::simix::Synchro*), NULL);
+    for(I iter = first; iter != last; iter++) {
+      Comm& comm = **iter;
+      if (comm.state_ == inited)
+        comm.start();
+      xbt_assert(comm.state_ == started);
+      xbt_dynar_push_as(comms, simgrid::simix::Synchro*, comm.pimpl_);
+    }
+    // Call the underlying simcall:
+    int idx = simcall_comm_waitany(comms, timeout);
+    xbt_dynar_free(&comms);
+    // Not found:
+    if (idx == -1)
+      return last;
+    // Lift the index to the corresponding iterator:
+    auto res = std::next(first, idx);
+    (*res)->state_ = finished;
+    return res;
+  }
   /** Creates (but don't start) an async send to the mailbox @p dest */
   static Comm &send_init(Mailbox &dest);
   /** Creates and start an async send to the mailbox @p dest */
index 401a6c2..e10d2b0 100644 (file)
@@ -18,6 +18,8 @@ Comm::~Comm() {
 
 }
 
+
+
 s4u::Comm &Comm::send_init(s4u::Mailbox &chan) {
   s4u::Comm *res = new s4u::Comm();
   res->sender_ = SIMIX_process_self();
index 9810cbf..dd86baa 100644 (file)
@@ -594,8 +594,10 @@ void SIMIX_comm_finish(smx_synchro_t synchro)
       continue; // if process handling comm is killed
     if (simcall->call == SIMCALL_COMM_WAITANY) {
       SIMIX_waitany_remove_simcall_from_actions(simcall);
-      if (simcall->timer) 
+      if (simcall->timer) {
         SIMIX_timer_remove(simcall->timer);
+        simcall->timer = nullptr;
+      }
       if (!MC_is_active() && !MC_record_replay_is_active())
         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
     }