Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implement ActivitySet
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 9 Jul 2023 23:25:41 +0000 (01:25 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 9 Jul 2023 23:27:05 +0000 (01:27 +0200)
ChangeLog
MANIFEST.in
examples/cpp/comm-waitany/s4u-comm-waitany.cpp
examples/cpp/comm-waitany/s4u-comm-waitany.tesh
include/simgrid/forward.h
include/simgrid/s4u.hpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/ActivitySet.hpp [new file with mode: 0644]
src/s4u/s4u_ActivitySet.cpp [new file with mode: 0644]
tools/cmake/DefinePackages.cmake

index 603816a..6e5f715 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,8 @@
 SimGrid (3.34.1) not released (Target: fall 2023)
 
+S4U:
+ - New class ActivitySet to ease wait_any()/test_any()/wait_all()
+
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
 
index f6767fe..e51cd44 100644 (file)
@@ -1951,6 +1951,7 @@ include include/simgrid/plugins/ns3.hpp
 include include/simgrid/plugins/photovoltaic.hpp
 include include/simgrid/s4u.hpp
 include include/simgrid/s4u/Activity.hpp
+include include/simgrid/s4u/ActivitySet.hpp
 include include/simgrid/s4u/Actor.hpp
 include include/simgrid/s4u/Barrier.hpp
 include include/simgrid/s4u/Comm.hpp
@@ -2328,6 +2329,7 @@ include src/plugins/vm/VmLiveMigration.cpp
 include src/plugins/vm/VmLiveMigration.hpp
 include src/plugins/vm/dirty_page_tracking.cpp
 include src/s4u/s4u_Activity.cpp
+include src/s4u/s4u_ActivitySet.cpp
 include src/s4u/s4u_Actor.cpp
 include src/s4u/s4u_Barrier.cpp
 include src/s4u/s4u_Comm.cpp
index 802a4fc..429e22b 100644 (file)
@@ -31,8 +31,8 @@ static void sender(unsigned int messages_count, unsigned int receivers_count, lo
     XBT_WARN("Sender has nothing to do. Bail out!");
     return;
   }
-  /* Vector in which we store all ongoing communications */
-  std::vector<sg4::CommPtr> pending_comms;
+  /* Set in which we store all ongoing communications */
+  sg4::ActivitySet pending_comms;
 
   /* Make a vector of the mailboxes to use */
   std::vector<sg4::Mailbox*> mboxes;
@@ -50,30 +50,24 @@ static void sender(unsigned int messages_count, unsigned int receivers_count, lo
 
     /* Create a communication representing the ongoing communication, and store it in pending_comms */
     sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
-    pending_comms.push_back(comm);
+    pending_comms.push(comm);
   }
 
   /* Start sending messages to let the workers know that they should stop */
   for (unsigned int i = 0; i < receivers_count; i++) {
     XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
     sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
-    pending_comms.push_back(comm);
+    pending_comms.push(comm);
   }
   XBT_INFO("Done dispatching all messages");
 
-  /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
+  /* Now that all message exchanges were initiated, wait for their completion one by one.
    *
-   * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are
-   * terminated
-   * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
+   * Activities are removed in order of termination. It differs from the order of creation, even if it's a bit difficult
+   * to see it here.
    */
-  while (not pending_comms.empty()) {
-    ssize_t changed_pos = sg4::Comm::wait_any(pending_comms);
-    pending_comms.erase(pending_comms.begin() + changed_pos);
-    if (changed_pos != 0)
-      XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
-               changed_pos);
-  }
+  while (not pending_comms.empty())
+    pending_comms.wait_any();
 
   XBT_INFO("Goodbye now!");
 }
index 5fe9dc9..f2480ad 100644 (file)
@@ -21,7 +21,6 @@ $ ${bindir:=.}/s4u-comm-waitany ${platfdir}/small_platform.xml "--log=root.fmt:[
 > [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
 > [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
 > [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
 > [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
 > [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
 > [  0.526478] (1:sender@Tremblay) Goodbye now!
index 07e7cbc..6c72682 100644 (file)
@@ -19,8 +19,14 @@ namespace s4u {
 class Activity;
 /** Smart pointer to a simgrid::s4u::Activity */
 using ActivityPtr = boost::intrusive_ptr<Activity>;
-XBT_PUBLIC void intrusive_ptr_release(const Activity* actor);
-XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor);
+XBT_PUBLIC void intrusive_ptr_release(const Activity* act);
+XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act);
+
+class ActivitySet;
+/** Smart pointer to a simgrid::s4u::Activity */
+using ActivitySetPtr = boost::intrusive_ptr<ActivitySet>;
+XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as);
+XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as);
 
 class Actor;
 /** Smart pointer to a simgrid::s4u::Actor */
@@ -31,8 +37,8 @@ XBT_PUBLIC void intrusive_ptr_add_ref(const Actor* actor);
 class Barrier;
 /** Smart pointer to a simgrid::s4u::Barrier */
 using BarrierPtr = boost::intrusive_ptr<Barrier>;
-XBT_PUBLIC void intrusive_ptr_release(Barrier* m);
-XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m);
+XBT_PUBLIC void intrusive_ptr_release(Barrier* b);
+XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b);
 
 class Comm;
 /** Smart pointer to a simgrid::s4u::Comm */
index 77a71df..b285322 100644 (file)
 #include <simgrid/s4u/Mutex.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/s4u/Semaphore.hpp>
-#include <simgrid/s4u/Task.hpp>
 #include <simgrid/s4u/VirtualMachine.hpp>
 
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Task.hpp>
+
 #include <simgrid/Exception.hpp>
 
 #endif /* SIMGRID_S4U_S4U_H */
index b66268e..9973a00 100644 (file)
@@ -34,6 +34,7 @@ namespace s4u {
  */
 class XBT_PUBLIC Activity : public xbt::Extendable<Activity> {
 #ifndef DOXYGEN
+  friend ActivitySet;
   friend Comm;
   friend Exec;
   friend Io;
diff --git a/include/simgrid/s4u/ActivitySet.hpp b/include/simgrid/s4u/ActivitySet.hpp
new file mode 100644 (file)
index 0000000..4add822
--- /dev/null
@@ -0,0 +1,79 @@
+/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_ACTIVITYSET_HPP
+#define SIMGRID_S4U_ACTIVITYSET_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <vector>
+
+namespace simgrid::s4u {
+/** @brief ActivitiesSet
+ *
+ * This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
+ * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these
+ * activities.
+ */
+class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
+  std::vector<ActivityPtr>
+      activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+  std::vector<ActivityPtr> failed_activities_;
+
+public:
+  ActivitySet()  = default;
+  ~ActivitySet() = default;
+
+  /** Add an activity to the set */
+  void push(ActivityPtr a) { activities_.push_back(a); }
+  /** Remove that activity from the set (no-op if the activity is not in the set) */
+  void erase(ActivityPtr a);
+
+  /** Get the amount of activities in the set. Failed activities (if any) are not counted */
+  int size() { return activities_.size(); }
+  /** Return whether the set is empty. Failed activities (if any) are not counted */
+  int empty() { return activities_.empty(); }
+
+  /** Wait for the completion of all activities in the set, but not longer than the provided timeout
+   *
+   * On timeout, an exception is raised, and the completed activities remain in the set. Use test_any() to retrieve
+   * them.
+   */
+  void wait_all_for(double timeout);
+  /** Wait for the completion of all activities in the set */
+  void wait_all() { wait_all_for(-1); }
+  /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */
+  ActivityPtr test_any();
+
+  /** Wait for the completion of one activity from the set, but not longer than the provided timeout.
+   *
+   *  See wait_any() for details.
+   *
+   * @return the first terminated activity, which is automatically removed from the set.
+   */
+
+  ActivityPtr wait_any_for(double timeout);
+  /** Wait for the completion of one activity from the set.
+   *
+   * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the
+   * set. Use get_failed_activity() to retrieve it.
+   *
+   * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several
+   * time to retrieve them all.
+   *
+   * @return the first terminated activity, which is automatically removed from the set. If more than one activity
+   * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to
+   * retrieve the other ones.
+   */
+  ActivityPtr wait_any() { return wait_any_for(-1); }
+
+  ActivityPtr get_failed_activity();
+  bool has_failed_activities() { return not failed_activities_.empty(); }
+};
+
+}; // namespace simgrid::s4u
+
+#endif
diff --git a/src/s4u/s4u_ActivitySet.cpp b/src/s4u/s4u_ActivitySet.cpp
new file mode 100644 (file)
index 0000000..6b62838
--- /dev/null
@@ -0,0 +1,91 @@
+/* Copyright (c) 2023-. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Engine.hpp>
+
+namespace simgrid::s4u {
+
+void ActivitySet::erase(ActivityPtr a)
+{
+  for (auto it = activities_.begin(); it != activities_.end(); it++)
+    if (*it == a) {
+      activities_.erase(it);
+      return;
+    }
+}
+
+void ActivitySet::wait_all_for(double timeout)
+{
+  if (timeout < 0.0) {
+    for (const auto& act : activities_)
+      act->wait();
+
+  } else {
+
+    double deadline = Engine::get_clock() + timeout;
+    for (const auto& act : activities_)
+      act->wait_until(deadline);
+  }
+}
+
+ActivityPtr ActivitySet::test_any()
+{
+  std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+  std::transform(begin(activities_), end(activities_), begin(act_impls),
+                 [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
+  ssize_t changed_pos = kernel::actor::simcall_answered(
+      [&observer] {
+        return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+      },
+      &observer);
+  if (changed_pos == -1)
+    return ActivityPtr(nullptr);
+
+  auto ret = activities_.at(changed_pos);
+  erase(ret);
+  ret->complete(Activity::State::FINISHED);
+  return ret;
+}
+
+ActivityPtr ActivitySet::wait_any_for(double timeout)
+{
+  std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+  std::transform(begin(activities_), end(activities_), begin(act_impls),
+                 [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
+  ssize_t changed_pos = kernel::actor::simcall_blocking(
+      [&observer] {
+        kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                     observer.get_timeout());
+      },
+      &observer);
+  xbt_assert(changed_pos != -1,
+             "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
+
+  auto ret = activities_.at(changed_pos);
+  erase(ret);
+  ret->complete(Activity::State::FINISHED);
+  return ret;
+}
+
+ActivityPtr ActivitySet::get_failed_activity()
+{
+  if (failed_activities_.empty())
+    return ActivityPtr(nullptr);
+  auto ret = failed_activities_.back();
+  failed_activities_.pop_back();
+  return ret;
+}
+
+}; // namespace simgrid::s4u
\ No newline at end of file
index 20baa0b..a6ffe30 100644 (file)
@@ -458,6 +458,7 @@ set(PLUGINS_SRC
 
 set(S4U_SRC
   src/s4u/s4u_Activity.cpp
+  src/s4u/s4u_ActivitySet.cpp
   src/s4u/s4u_Actor.cpp
   src/s4u/s4u_Barrier.cpp
   src/s4u/s4u_Comm.cpp
@@ -677,6 +678,7 @@ set(headers_to_install
   include/simgrid/vm.h
   include/simgrid/zone.h
   include/simgrid/s4u/Activity.hpp
+  include/simgrid/s4u/ActivitySet.hpp
   include/simgrid/s4u/Actor.hpp
   include/simgrid/s4u/Barrier.hpp
   include/simgrid/s4u/Comm.hpp