Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Also deprecate Activity::waitany/waitall/testany
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:55:25 +0000 (20:55 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:55:25 +0000 (20:55 +0200)
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Io.hpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp

index 9973a00..ef51321 100644 (file)
@@ -158,8 +158,6 @@ public:
   virtual Activity* do_start() = 0;
   /** Tests whether the given activity is terminated yet. */
   virtual bool test();
-  /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
-  static ssize_t test_any(const std::vector<ActivityPtr>& activities);
 
   /** Blocks the current actor until the activity is terminated */
   Activity* wait() { return wait_for(-1.0); }
@@ -169,11 +167,6 @@ public:
   /** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
    * Raises: timeout exception. */
   void wait_until(double time_limit);
-  /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
-   * The return value is the rank of the first finished ActivityPtr. */
-  static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
-  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
-  static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
 
   /** Cancel that activity */
   Activity* cancel();
@@ -205,6 +198,9 @@ public:
   kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
 
 #ifndef DOXYGEN
+  static ssize_t deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
   friend void intrusive_ptr_release(Activity* a)
   {
     if (a->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
@@ -283,16 +279,13 @@ public:
    *  dependency or no resource assigned) */
   void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
 
-  XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
-      const std::function<void(Activity const&)>& cb)
-  {
-    on_suspend.connect(cb);
-  }
-  XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
-      const std::function<void(Activity const&)>& cb)
-  {
-    on_resume.connect(cb);
-  }
+#ifndef DOXYGEN
+  XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
+  XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return deprecated_wait_any_for(activities, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) { return deprecated_wait_any_for(activities, timeout); }
+#endif
 
   AnyActivity* add_successor(ActivityPtr a)
   {
index fc92aaa..8167b87 100644 (file)
@@ -77,19 +77,12 @@ public:
   bool is_assigned() const override;
 
 #ifndef DOXYGEN
-  static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs,
-                                         double timeout); // XBT_ATTRIB_DEPRECATED_v339
+  static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout); // XBT_ATTRIB_DEPRECATED_v339
 
   XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
-      wait_any(const std::vector<ExecPtr>& execs)
-  {
-    return deprecated_wait_any_for(execs, -1);
-  }
+      wait_any(const std::vector<ExecPtr>& execs) { return deprecated_wait_any_for(execs, -1); }
   XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
-      wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
-  {
-    return deprecated_wait_any_for(execs, timeout);
-  }
+      wait_any_for(const std::vector<ExecPtr>& execs, double timeout) { return deprecated_wait_any_for(execs, timeout); }
 #endif
 };
 
index 961ffd6..6dc9a3f 100644 (file)
@@ -39,9 +39,7 @@ public:
   XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") 
   static ssize_t wait_any(const std::vector<IoPtr>& ios) { return deprecated_wait_any_for(ios, -1); }
   XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") 
-  static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) { 
-    return deprecated_wait_any_for(ios, timeout); 
-  }
+  static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) { return deprecated_wait_any_for(ios, timeout); }
 #endif
 
   double get_remaining() const override;
index 65fd067..445364f 100644 (file)
@@ -107,7 +107,7 @@ ssize_t Activity::test_any(const std::vector<ActivityPtr>& activities)
   return changed_pos;
 }
 
-ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+ssize_t Activity::deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
   std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
   std::transform(begin(activities), end(activities), begin(ractivities),
index 3ddc1d6..ab9fefb 100644 (file)
@@ -397,10 +397,19 @@ Comm* Comm::detach()
 
 ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
 {
-  std::vector<ActivityPtr> activities;
-  for (const auto& comm : comms)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
-  return Activity::test_any(activities);
+  std::vector<kernel::activity::ActivityImpl*> ractivities(comms.size());
+  std::transform(begin(comms), end(comms), begin(ractivities), [](const CommPtr& act) { return act->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestanySimcall observer{issuer, ractivities, "test_any"};
+  ssize_t changed_pos = kernel::actor::simcall_answered(
+      [&observer] {
+        return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+      },
+      &observer);
+  if (changed_pos != -1)
+    comms.at(changed_pos)->complete(State::FINISHED);
+  return changed_pos;
 }
 
 /** @brief Block the calling actor until the communication is finished, or until timeout
@@ -463,22 +472,27 @@ Comm* Comm::wait_for(double timeout)
 
 ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
-  std::vector<ActivityPtr> activities;
+  if (comms.empty())
+    return -1;
+  ActivitySet set;
   for (const auto& comm : comms)
-    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
-  ssize_t changed_pos;
+    set.push(comm);
   try {
-    changed_pos = Activity::wait_any_for(activities, timeout);
+    auto* ret = set.wait_any_for(timeout).get();
+    for (size_t i = 0; i < comms.size(); i++)
+      if (comms[i].get() == ret)
+        return i;
+
+  } catch (TimeoutException& e) {
+    return -1;
   } catch (const NetworkFailureException& e) {
-    changed_pos = -1;
-    for (auto c : comms) {
-      if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
+    for (auto c : comms)
+      if (c->pimpl_->get_state() == kernel::activity::State::FAILED)
         c->complete(State::FAILED);
-      }
-    }
+
     e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
   }
-  return changed_pos;
+  return -1;
 }
 
 void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339