--- /dev/null
+# Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.
+#
+# This program is free software you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+Usage: activityset-testany.py platform_file [other parameters]
+"""
+
+import sys
+from simgrid import Actor, ActivitySet, Engine, Comm, Exec, Io, Host, Mailbox, this_actor
+
+def bob():
+ mbox = Mailbox.by_name("mbox")
+ disk = Host.current().get_disks()[0]
+
+ this_actor.info("Create my asynchronous activities")
+ exec = this_actor.exec_async(5e9)
+ comm, payload = mbox.get_async()
+ io = disk.read_async(300000000)
+
+ pending_activities = ActivitySet([exec, comm])
+ pending_activities.push(io) # Activities can be pushed after creation, too
+
+ this_actor.info("Sleep_for a while")
+ this_actor.sleep_for(1)
+
+ this_actor.info("Test for completed activities")
+ while not pending_activities.empty():
+ completed_one = pending_activities.test_any()
+ if completed_one == None:
+ this_actor.info("Nothing matches, test again in 0.5s")
+ this_actor.sleep_for(.5)
+ elif isinstance(completed_one, Comm):
+ this_actor.info("Completed a Comm")
+ elif isinstance(completed_one, Exec):
+ this_actor.info("Completed an Exec")
+ elif isinstance(completed_one, Io):
+ this_actor.info("Completed an I/O")
+
+ this_actor.info("Last activity is complete")
+
+def alice():
+ this_actor.info("Send 'Message'")
+ Mailbox.by_name("mbox").put("Message", 600000000)
+
+if __name__ == '__main__':
+ e = Engine(sys.argv)
+ e.set_log_control("root.fmt:[%4.2r]%e[%5a]%e%m%n")
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ Actor.create("bob", Host.by_name("bob"), bob)
+ Actor.create("alice", Host.by_name("alice"), alice)
+
+ e.run()
#include "simgrid/kernel/routing/NetPoint.hpp"
#include "simgrid/plugins/load.h"
#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Barrier.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <vector>
namespace py = pybind11;
+using simgrid::s4u::Activity;
+using simgrid::s4u::ActivityPtr;
+using simgrid::s4u::ActivitySet;
+using simgrid::s4u::ActivitySetPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
}
});
},
- "Registers the main function of an actor");
+ "Registers the main function of an actor")
+ .def("set_log_control", [](Engine*, const std::string& settings) { xbt_log_control_set(settings.c_str()); });
/* Class Netzone */
py::class_<simgrid::s4u::NetZone, std::unique_ptr<simgrid::s4u::NetZone, py::nodelete>> netzone(
"get", [](const PyGetAsync* self) { return py::reinterpret_steal<py::object>(*(self->get())); },
"Get python object after async communication in receiver side");
+ /* class Activity */
+ py::class_<Activity, ActivityPtr>(m, "Activityy", "Activity. See the C++ documentation for details.");
+
/* Class Comm */
- py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+ py::class_<Comm, CommPtr, Activity>(m, "Comm", "Communication. See the C++ documentation for details.")
.def_property_readonly("dst_data_size", &Comm::get_dst_data_size, py::call_guard<py::gil_scoped_release>(),
"Retrieve the size of the received data.")
.def_property_readonly("mailbox", &Comm::get_mailbox, py::call_guard<py::gil_scoped_release>(),
"one, or -1 if a timeout occurred.");
/* Class Io */
- py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
+ py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
+ "I/O activities. See the C++ documentation for details.")
.def("test", &simgrid::s4u::Io::test, py::call_guard<py::gil_scoped_release>(),
"Test whether the I/O is terminated.")
.def("wait", &simgrid::s4u::Io::wait, py::call_guard<py::gil_scoped_release>(),
"Block until the completion of any I/O in the list and return the index of the terminated one.");
/* Class Exec */
- py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution. See the C++ documentation for details.")
+ py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr, Activity>(m, "Exec",
+ "Execution. See the C++ documentation for details.")
.def_property_readonly("remaining", &simgrid::s4u::Exec::get_remaining, py::call_guard<py::gil_scoped_release>(),
"Amount of flops that remain to be computed until completion (read-only property).")
.def_property_readonly("remaining_ratio", &simgrid::s4u::Exec::get_remaining_ratio,
.def(
"__repr__", [](const IoTaskPtr io) { return "IoTask(" + io->get_name() + ")"; },
"Textual representation of the IoTask");
+
+ /* Class ActivitySet */
+ py::class_<ActivitySet, ActivitySetPtr>(m, "ActivitySet", "ActivitySet. See the C++ documentation for details.")
+ .def(py::init([](std::vector<simgrid::s4u::ActivityPtr> activities) {
+ auto* ret = new ActivitySet();
+ for (auto a : activities)
+ ret->push(a);
+ return ActivitySetPtr(ret);
+ }),
+ "The constructor should take the parameters from the command line, as is ")
+ .def(py::init([]() { return ActivitySetPtr(new ActivitySet()); }),
+ "The constructor should take the parameters from the command line, as is ")
+
+ .def("push", &ActivitySet::push, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+ "Add an activity to the set")
+ .def("erase", &ActivitySet::erase, py::call_guard<py::gil_scoped_release>(), py::arg("activity"),
+ "Remove that activity from the set")
+ .def_property_readonly("size", &ActivitySet::size, "Count of activities in the set")
+ .def("empty", &ActivitySet::empty, "Returns whether the set is empty")
+ .def("has_failed_activities", &ActivitySet::has_failed_activities,
+ "Returns whether there is any failed activities")
+ .def("get_failed_activity", &ActivitySet::get_failed_activity, "Returns a failed activity from the set, or None")
+
+ .def("wait_all_for", &ActivitySet::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+ "Wait for the completion of all activities in the set, but not longer than the provided timeout")
+ .def("wait_all", &ActivitySet::wait_all, py::call_guard<py::gil_scoped_release>(),
+ "Wait for the completion of all activities in the set, endlessly")
+ .def("test_any", &ActivitySet::test_any, py::call_guard<py::gil_scoped_release>(),
+ "Returns the first terminated activity if any, or None if no activity is terminated")
+ .def("wait_any_for", &ActivitySet::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("timeout"),
+ "Wait for the completion of one activity in the set, but not longer than the provided timeout")
+ .def("wait_any", &ActivitySet::wait_any, py::call_guard<py::gil_scoped_release>(),
+ "Wait for the completion of one activity in the set, endlessly")
+
+ .def(
+ "__repr__", [](const ActivitySetPtr as) { return "ActivitySet([...])"; },
+ "Textual representation of the ActivitySet");
}