Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Python bindings of the ActivitySet, and add one example
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Fri, 21 Jul 2023 16:56:39 +0000 (18:56 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Fri, 21 Jul 2023 16:59:51 +0000 (18:59 +0200)
examples/python/CMakeLists.txt
examples/python/activityset-testany/activityset-testany.py [new file with mode: 0644]
examples/python/activityset-testany/activityset-testany.tesh [new file with mode: 0644]
include/simgrid/s4u/ActivitySet.hpp
src/bindings/python/simgrid_python.cpp

index 555cfc1..e54901c 100644 (file)
@@ -1,4 +1,5 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
+        activityset-testany
         app-masterworkers
         comm-wait comm-waitall comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
         comm-ready comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
diff --git a/examples/python/activityset-testany/activityset-testany.py b/examples/python/activityset-testany/activityset-testany.py
new file mode 100644 (file)
index 0000000..34f1e9a
--- /dev/null
@@ -0,0 +1,57 @@
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-testany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+  mbox = Mailbox.by_name("mbox")
+  disk = Host.current().get_disks()[0]
+
+  this_actor.info("Create my asynchronous activities")
+  exec = this_actor.exec_async(5e9)
+  comm, payload = mbox.get_async()
+  io   = disk.read_async(300000000)
+
+  pending_activities = ActivitySet([exec, comm])
+  pending_activities.push(io) # Activities can be pushed after creation, too
+  this_actor.info("Sleep_for a while")
+  this_actor.sleep_for(1)
+
+  this_actor.info("Test for completed activities")
+  while not pending_activities.empty():
+    completed_one = pending_activities.test_any()
+    if completed_one == None:
+      this_actor.info("Nothing matches, test again in 0.5s")
+      this_actor.sleep_for(.5)
+    elif isinstance(completed_one, Comm):
+      this_actor.info("Completed a Comm")
+    elif isinstance(completed_one, Exec):
+      this_actor.info("Completed an Exec")
+    elif isinstance(completed_one, Io):
+      this_actor.info("Completed an I/O")
+
+  this_actor.info("Last activity is complete")
+
+def alice():
+  this_actor.info("Send 'Message'")
+  Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+  e = Engine(sys.argv)
+  e.set_log_control("root.fmt:[%4.2r]%e[%5a]%e%m%n")
+
+  # Load the platform description
+  e.load_platform(sys.argv[1])
+
+  Actor.create("bob",   Host.by_name("bob"), bob)
+  Actor.create("alice", Host.by_name("alice"), alice)
+
+  e.run()
diff --git a/examples/python/activityset-testany/activityset-testany.tesh b/examples/python/activityset-testany/activityset-testany.tesh
new file mode 100644 (file)
index 0000000..5177a16
--- /dev/null
@@ -0,0 +1,20 @@
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/activityset-testany.py ${platfdir}/hosts_with_disks.xml
+> [0.00] [alice] Send 'Message'
+> [0.00] [  bob] Create my asynchronous activities
+> [0.00] [  bob] Sleep_for a while
+> [1.00] [  bob] Test for completed activities
+> [1.00] [  bob] Nothing matches, test again in 0.5s
+> [1.50] [  bob] Nothing matches, test again in 0.5s
+> [2.00] [  bob] Nothing matches, test again in 0.5s
+> [2.50] [  bob] Nothing matches, test again in 0.5s
+> [3.00] [  bob] Completed an I/O
+> [3.00] [  bob] Nothing matches, test again in 0.5s
+> [3.50] [  bob] Nothing matches, test again in 0.5s
+> [4.00] [  bob] Nothing matches, test again in 0.5s
+> [4.50] [  bob] Nothing matches, test again in 0.5s
+> [5.00] [  bob] Completed an Exec
+> [5.00] [  bob] Nothing matches, test again in 0.5s
+> [5.50] [  bob] Completed a Comm
+> [5.50] [  bob] Last activity is complete
index 86942a3..b23ea6d 100644 (file)
@@ -23,8 +23,8 @@ namespace s4u {
  * activities.
  */
 class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
-  std::vector<ActivityPtr>
-      activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+  std::atomic_int_fast32_t refcount_{1};
+  std::vector<ActivityPtr> activities_; // Use vectors, not sets for better reproductibility accross architectures
   std::vector<ActivityPtr> failed_activities_;
 
 public:
@@ -78,6 +78,19 @@ public:
 
   ActivityPtr get_failed_activity();
   bool has_failed_activities() { return not failed_activities_.empty(); }
+
+  // boost::intrusive_ptr<ActivitySet> support:
+  friend void intrusive_ptr_add_ref(ActivitySet* as)
+  {
+    XBT_ATTRIB_UNUSED auto previous = as->refcount_.fetch_add(1);
+    xbt_assert(previous != 0);
+  }
+
+  friend void intrusive_ptr_release(ActivitySet* as)
+  {
+    if (as->refcount_.fetch_sub(1) == 1)
+      delete as;
+  }
 };
 
 } // namespace s4u
index c431875..a8e9ce7 100644 (file)
@@ -12,6 +12,7 @@
 #include "simgrid/kernel/routing/NetPoint.hpp"
 #include "simgrid/plugins/load.h"
 #include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
 #include <simgrid/s4u/Actor.hpp>
 #include <simgrid/s4u/Barrier.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <vector>
 
 namespace py = pybind11;
+using simgrid::s4u::Activity;
+using simgrid::s4u::ActivityPtr;
+using simgrid::s4u::ActivitySet;
+using simgrid::s4u::ActivitySetPtr;
 using simgrid::s4u::Actor;
 using simgrid::s4u::ActorPtr;
 using simgrid::s4u::Barrier;
@@ -223,7 +228,8 @@ PYBIND11_MODULE(simgrid, m)
               }
             });
           },
-          "Registers the main function of an actor");
+          "Registers the main function of an actor")
+      .def("set_log_control", [](Engine*, const std::string& settings) { xbt_log_control_set(settings.c_str()); });
 
   /* Class Netzone */
   py::class_<simgrid::s4u::NetZone, std::unique_ptr<simgrid::s4u::NetZone, py::nodelete>> netzone(
@@ -646,8 +652,11 @@ PYBIND11_MODULE(simgrid, m)
           "get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
           "Get python object after async communication in receiver side");
 
+  /* class Activity */
+  py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+
   /* Class Comm */
-  py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+  py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
       .def_property_readonly("dst_data_size", &Comm::get_dst_data_size, py::call_guard<py::gil_scoped_release>(),
                              "Retrieve the size of the received data.")
       .def_property_readonly("mailbox", &Comm::get_mailbox, py::call_guard<py::gil_scoped_release>(),
@@ -714,7 +723,8 @@ PYBIND11_MODULE(simgrid, m)
                   "one, or -1 if a timeout occurred.");
 
   /* Class Io */
-  py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
+  py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
+                                                              "I/O activities. See the C++ documentation for details.")
       .def("test", &simgrid::s4u::Io::test, py::call_guard<py::gil_scoped_release>(),
            "Test whether the I/O is terminated.")
       .def("wait", &simgrid::s4u::Io::wait, py::call_guard<py::gil_scoped_release>(),
@@ -726,7 +736,8 @@ PYBIND11_MODULE(simgrid, m)
                   "Block until the completion of any I/O in the list and return the index of the terminated one.");
 
   /* Class Exec */
-  py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution. See the C++ documentation for details.")
+  py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr, Activity>(m, "Exec",
+                                                                  "Execution. See the C++ documentation for details.")
       .def_property_readonly("remaining", &simgrid::s4u::Exec::get_remaining, py::call_guard<py::gil_scoped_release>(),
                              "Amount of flops that remain to be computed until completion (read-only property).")
       .def_property_readonly("remaining_ratio", &simgrid::s4u::Exec::get_remaining_ratio,
@@ -946,4 +957,41 @@ PYBIND11_MODULE(simgrid, m)
       .def(
           "__repr__", [](const IoTaskPtr io) { return "IoTask(" + io->get_name() + ")"; },
           "Textual representation of the IoTask");
+
+  /* Class ActivitySet */
+  py::class_<ActivitySet, ActivitySetPtr>(m, "ActivitySet", "ActivitySet. See the C++ documentation for details.")
+      .def(py::init([](std::vector<simgrid::s4u::ActivityPtr> activities) {
+             auto* ret = new ActivitySet();
+             for (auto a : activities)
+               ret->push(a);
+             return ActivitySetPtr(ret);
+           }),
+           "The constructor should take the parameters from the command line, as is ")
+      .def(py::init([]() { return ActivitySetPtr(new ActivitySet()); }),
+           "The constructor should take the parameters from the command line, as is ")
+
+      .def("push", &ActivitySet::push, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+           "Add an activity to the set")
+      .def("erase", &ActivitySet::erase, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+           "Remove that activity from the set")
+      .def_property_readonly("size", &ActivitySet::size, "Count of activities in the set")
+      .def("empty", &ActivitySet::empty, "Returns whether the set is empty")
+      .def("has_failed_activities", &ActivitySet::has_failed_activities,
+           "Returns whether there is any failed activities")
+      .def("get_failed_activity", &ActivitySet::get_failed_activity, "Returns a failed activity from the set, or None")
+
+      .def("wait_all_for", &ActivitySet::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+           "Wait for the completion of all activities in the set, but not longer than the provided timeout")
+      .def("wait_all", &ActivitySet::wait_all, py::call_guard<py::gil_scoped_release>(),
+           "Wait for the completion of all activities in the set, endlessly")
+      .def("test_any", &ActivitySet::test_any, py::call_guard<py::gil_scoped_release>(),
+           "Returns the first terminated activity if any, or None if no activity is terminated")
+      .def("wait_any_for", &ActivitySet::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+           "Wait for the completion of one activity in the set, but not longer than the provided timeout")
+      .def("wait_any", &ActivitySet::wait_any, py::call_guard<py::gil_scoped_release>(),
+           "Wait for the completion of one activity in the set, endlessly")
+
+      .def(
+          "__repr__", [](const ActivitySetPtr as) { return "ActivitySet([...])"; },
+          "Textual representation of the ActivitySet");
 }