From 2bd7ad0811b9be3c2038c29eee0578f544051e2d Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Mon, 10 Jul 2023 01:25:41 +0200 Subject: [PATCH] Implement ActivitySet --- ChangeLog | 3 + MANIFEST.in | 2 + .../cpp/comm-waitany/s4u-comm-waitany.cpp | 24 ++--- .../cpp/comm-waitany/s4u-comm-waitany.tesh | 1 - include/simgrid/forward.h | 14 ++- include/simgrid/s4u.hpp | 4 +- include/simgrid/s4u/Activity.hpp | 1 + include/simgrid/s4u/ActivitySet.hpp | 79 ++++++++++++++++ src/s4u/s4u_ActivitySet.cpp | 91 +++++++++++++++++++ tools/cmake/DefinePackages.cmake | 2 + 10 files changed, 200 insertions(+), 21 deletions(-) create mode 100644 include/simgrid/s4u/ActivitySet.hpp create mode 100644 src/s4u/s4u_ActivitySet.cpp diff --git a/ChangeLog b/ChangeLog index 603816a154..6e5f715a92 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,8 @@ SimGrid (3.34.1) not released (Target: fall 2023) +S4U: + - New class ActivitySet to ease wait_any()/test_any()/wait_all() + Python: - Make the host_load plugin available from Python. See examples/python/plugin-host-load diff --git a/MANIFEST.in b/MANIFEST.in index f6767fe96c..e51cd443a7 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1951,6 +1951,7 @@ include include/simgrid/plugins/ns3.hpp include include/simgrid/plugins/photovoltaic.hpp include include/simgrid/s4u.hpp include include/simgrid/s4u/Activity.hpp +include include/simgrid/s4u/ActivitySet.hpp include include/simgrid/s4u/Actor.hpp include include/simgrid/s4u/Barrier.hpp include include/simgrid/s4u/Comm.hpp @@ -2328,6 +2329,7 @@ include src/plugins/vm/VmLiveMigration.cpp include src/plugins/vm/VmLiveMigration.hpp include src/plugins/vm/dirty_page_tracking.cpp include src/s4u/s4u_Activity.cpp +include src/s4u/s4u_ActivitySet.cpp include src/s4u/s4u_Actor.cpp include src/s4u/s4u_Barrier.cpp include src/s4u/s4u_Comm.cpp diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.cpp b/examples/cpp/comm-waitany/s4u-comm-waitany.cpp index 802a4fc253..429e22b121 100644 --- a/examples/cpp/comm-waitany/s4u-comm-waitany.cpp +++ b/examples/cpp/comm-waitany/s4u-comm-waitany.cpp @@ -31,8 +31,8 @@ static void sender(unsigned int messages_count, unsigned int receivers_count, lo XBT_WARN("Sender has nothing to do. Bail out!"); return; } - /* Vector in which we store all ongoing communications */ - std::vector pending_comms; + /* Set in which we store all ongoing communications */ + sg4::ActivitySet pending_comms; /* Make a vector of the mailboxes to use */ std::vector mboxes; @@ -50,30 +50,24 @@ static void sender(unsigned int messages_count, unsigned int receivers_count, lo /* Create a communication representing the ongoing communication, and store it in pending_comms */ sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size); - pending_comms.push_back(comm); + pending_comms.push(comm); } /* Start sending messages to let the workers know that they should stop */ for (unsigned int i = 0; i < receivers_count; i++) { XBT_INFO("Send 'finalize' to 'receiver-%u'", i); sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0); - pending_comms.push_back(comm); + pending_comms.push(comm); } XBT_INFO("Done dispatching all messages"); - /* Now that all message exchanges were initiated, wait for their completion, in order of termination. + /* Now that all message exchanges were initiated, wait for their completion one by one. * - * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are - * terminated - * Even in this simple example, the pending comms do not terminate in the exact same order of creation. + * Activities are removed in order of termination. It differs from the order of creation, even if it's a bit difficult + * to see it here. */ - while (not pending_comms.empty()) { - ssize_t changed_pos = sg4::Comm::wait_any(pending_comms); - pending_comms.erase(pending_comms.begin() + changed_pos); - if (changed_pos != 0) - XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.", - changed_pos); - } + while (not pending_comms.empty()) + pending_comms.wait_any(); XBT_INFO("Goodbye now!"); } diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.tesh b/examples/cpp/comm-waitany/s4u-comm-waitany.tesh index 5fe9dc925e..f2480ad6cf 100644 --- a/examples/cpp/comm-waitany/s4u-comm-waitany.tesh +++ b/examples/cpp/comm-waitany/s4u-comm-waitany.tesh @@ -21,7 +21,6 @@ $ ${bindir:=.}/s4u-comm-waitany ${platfdir}/small_platform.xml "--log=root.fmt:[ > [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'. > [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'. > [ 0.500898] (2:receiver@Fafard) I got a 'finalize'. -> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first. > [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'. > [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'. > [ 0.526478] (1:sender@Tremblay) Goodbye now! diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 07e7cbcb52..6c726824c4 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -19,8 +19,14 @@ namespace s4u { class Activity; /** Smart pointer to a simgrid::s4u::Activity */ using ActivityPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(const Activity* actor); -XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor); +XBT_PUBLIC void intrusive_ptr_release(const Activity* act); +XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act); + +class ActivitySet; +/** Smart pointer to a simgrid::s4u::Activity */ +using ActivitySetPtr = boost::intrusive_ptr; +XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as); +XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as); class Actor; /** Smart pointer to a simgrid::s4u::Actor */ @@ -31,8 +37,8 @@ XBT_PUBLIC void intrusive_ptr_add_ref(const Actor* actor); class Barrier; /** Smart pointer to a simgrid::s4u::Barrier */ using BarrierPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(Barrier* m); -XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m); +XBT_PUBLIC void intrusive_ptr_release(Barrier* b); +XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b); class Comm; /** Smart pointer to a simgrid::s4u::Comm */ diff --git a/include/simgrid/s4u.hpp b/include/simgrid/s4u.hpp index 77a71df2c4..b2853229c2 100644 --- a/include/simgrid/s4u.hpp +++ b/include/simgrid/s4u.hpp @@ -22,9 +22,11 @@ #include #include #include -#include #include +#include +#include + #include #endif /* SIMGRID_S4U_S4U_H */ diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index b66268ef3b..9973a00854 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -34,6 +34,7 @@ namespace s4u { */ class XBT_PUBLIC Activity : public xbt::Extendable { #ifndef DOXYGEN + friend ActivitySet; friend Comm; friend Exec; friend Io; diff --git a/include/simgrid/s4u/ActivitySet.hpp b/include/simgrid/s4u/ActivitySet.hpp new file mode 100644 index 0000000000..4add822a65 --- /dev/null +++ b/include/simgrid/s4u/ActivitySet.hpp @@ -0,0 +1,79 @@ +/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_S4U_ACTIVITYSET_HPP +#define SIMGRID_S4U_ACTIVITYSET_HPP + +#include +#include + +#include + +namespace simgrid::s4u { +/** @brief ActivitiesSet + * + * This class is a container of activities, allowing to wait for the completion of any or all activities in the set. + * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these + * activities. + */ +class XBT_PUBLIC ActivitySet : public xbt::Extendable { + std::vector + activities_; // We use a vector instead of a set to improve reproductibility accross architectures + std::vector failed_activities_; + +public: + ActivitySet() = default; + ~ActivitySet() = default; + + /** Add an activity to the set */ + void push(ActivityPtr a) { activities_.push_back(a); } + /** Remove that activity from the set (no-op if the activity is not in the set) */ + void erase(ActivityPtr a); + + /** Get the amount of activities in the set. Failed activities (if any) are not counted */ + int size() { return activities_.size(); } + /** Return whether the set is empty. Failed activities (if any) are not counted */ + int empty() { return activities_.empty(); } + + /** Wait for the completion of all activities in the set, but not longer than the provided timeout + * + * On timeout, an exception is raised, and the completed activities remain in the set. Use test_any() to retrieve + * them. + */ + void wait_all_for(double timeout); + /** Wait for the completion of all activities in the set */ + void wait_all() { wait_all_for(-1); } + /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */ + ActivityPtr test_any(); + + /** Wait for the completion of one activity from the set, but not longer than the provided timeout. + * + * See wait_any() for details. + * + * @return the first terminated activity, which is automatically removed from the set. + */ + + ActivityPtr wait_any_for(double timeout); + /** Wait for the completion of one activity from the set. + * + * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the + * set. Use get_failed_activity() to retrieve it. + * + * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several + * time to retrieve them all. + * + * @return the first terminated activity, which is automatically removed from the set. If more than one activity + * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to + * retrieve the other ones. + */ + ActivityPtr wait_any() { return wait_any_for(-1); } + + ActivityPtr get_failed_activity(); + bool has_failed_activities() { return not failed_activities_.empty(); } +}; + +}; // namespace simgrid::s4u + +#endif diff --git a/src/s4u/s4u_ActivitySet.cpp b/src/s4u/s4u_ActivitySet.cpp new file mode 100644 index 0000000000..6b628388b0 --- /dev/null +++ b/src/s4u/s4u_ActivitySet.cpp @@ -0,0 +1,91 @@ +/* Copyright (c) 2023-. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "src/kernel/activity/ActivityImpl.hpp" +#include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/CommObserver.hpp" +#include +#include + +namespace simgrid::s4u { + +void ActivitySet::erase(ActivityPtr a) +{ + for (auto it = activities_.begin(); it != activities_.end(); it++) + if (*it == a) { + activities_.erase(it); + return; + } +} + +void ActivitySet::wait_all_for(double timeout) +{ + if (timeout < 0.0) { + for (const auto& act : activities_) + act->wait(); + + } else { + + double deadline = Engine::get_clock() + timeout; + for (const auto& act : activities_) + act->wait_until(deadline); + } +} + +ActivityPtr ActivitySet::test_any() +{ + std::vector act_impls(activities_.size()); + std::transform(begin(activities_), end(activities_), begin(act_impls), + [](const ActivityPtr& act) { return act->pimpl_.get(); }); + + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); + kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"}; + ssize_t changed_pos = kernel::actor::simcall_answered( + [&observer] { + return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); + }, + &observer); + if (changed_pos == -1) + return ActivityPtr(nullptr); + + auto ret = activities_.at(changed_pos); + erase(ret); + ret->complete(Activity::State::FINISHED); + return ret; +} + +ActivityPtr ActivitySet::wait_any_for(double timeout) +{ + std::vector act_impls(activities_.size()); + std::transform(begin(activities_), end(activities_), begin(act_impls), + [](const ActivityPtr& activity) { return activity->pimpl_.get(); }); + + kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); + kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"}; + ssize_t changed_pos = kernel::actor::simcall_blocking( + [&observer] { + kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(), + observer.get_timeout()); + }, + &observer); + xbt_assert(changed_pos != -1, + "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions"); + + auto ret = activities_.at(changed_pos); + erase(ret); + ret->complete(Activity::State::FINISHED); + return ret; +} + +ActivityPtr ActivitySet::get_failed_activity() +{ + if (failed_activities_.empty()) + return ActivityPtr(nullptr); + auto ret = failed_activities_.back(); + failed_activities_.pop_back(); + return ret; +} + +}; // namespace simgrid::s4u \ No newline at end of file diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 20baa0b82e..a6ffe30e44 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -458,6 +458,7 @@ set(PLUGINS_SRC set(S4U_SRC src/s4u/s4u_Activity.cpp + src/s4u/s4u_ActivitySet.cpp src/s4u/s4u_Actor.cpp src/s4u/s4u_Barrier.cpp src/s4u/s4u_Comm.cpp @@ -677,6 +678,7 @@ set(headers_to_install include/simgrid/vm.h include/simgrid/zone.h include/simgrid/s4u/Activity.hpp + include/simgrid/s4u/ActivitySet.hpp include/simgrid/s4u/Actor.hpp include/simgrid/s4u/Barrier.hpp include/simgrid/s4u/Comm.hpp -- 2.20.1