SimGrid (3.34.1) not released (Target: fall 2023)
+S4U:
+ - New class ActivitySet to ease wait_any()/test_any()/wait_all()
+
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
include include/simgrid/plugins/photovoltaic.hpp
include include/simgrid/s4u.hpp
include include/simgrid/s4u/Activity.hpp
+include include/simgrid/s4u/ActivitySet.hpp
include include/simgrid/s4u/Actor.hpp
include include/simgrid/s4u/Barrier.hpp
include include/simgrid/s4u/Comm.hpp
include src/plugins/vm/VmLiveMigration.hpp
include src/plugins/vm/dirty_page_tracking.cpp
include src/s4u/s4u_Activity.cpp
+include src/s4u/s4u_ActivitySet.cpp
include src/s4u/s4u_Actor.cpp
include src/s4u/s4u_Barrier.cpp
include src/s4u/s4u_Comm.cpp
XBT_WARN("Sender has nothing to do. Bail out!");
return;
}
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ /* Set in which we store all ongoing communications */
+ sg4::ActivitySet pending_comms;
/* Make a vector of the mailboxes to use */
std::vector<sg4::Mailbox*> mboxes;
/* Create a communication representing the ongoing communication, and store it in pending_comms */
sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
/* Start sending messages to let the workers know that they should stop */
for (unsigned int i = 0; i < receivers_count; i++) {
XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
- /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
+ /* Now that all message exchanges were initiated, wait for their completion one by one.
*
- * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are
- * terminated
- * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
+ * Activities are removed in order of termination. It differs from the order of creation, even if it's a bit difficult
+ * to see it here.
*/
- while (not pending_comms.empty()) {
- ssize_t changed_pos = sg4::Comm::wait_any(pending_comms);
- pending_comms.erase(pending_comms.begin() + changed_pos);
- if (changed_pos != 0)
- XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
- changed_pos);
- }
+ while (not pending_comms.empty())
+ pending_comms.wait_any();
XBT_INFO("Goodbye now!");
}
> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
> [ 0.526478] (1:sender@Tremblay) Goodbye now!
class Activity;
/** Smart pointer to a simgrid::s4u::Activity */
using ActivityPtr = boost::intrusive_ptr<Activity>;
-XBT_PUBLIC void intrusive_ptr_release(const Activity* actor);
-XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor);
+XBT_PUBLIC void intrusive_ptr_release(const Activity* act);
+XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act);
+
+class ActivitySet;
+/** Smart pointer to a simgrid::s4u::Activity */
+using ActivitySetPtr = boost::intrusive_ptr<ActivitySet>;
+XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as);
+XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as);
class Actor;
/** Smart pointer to a simgrid::s4u::Actor */
class Barrier;
/** Smart pointer to a simgrid::s4u::Barrier */
using BarrierPtr = boost::intrusive_ptr<Barrier>;
-XBT_PUBLIC void intrusive_ptr_release(Barrier* m);
-XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m);
+XBT_PUBLIC void intrusive_ptr_release(Barrier* b);
+XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b);
class Comm;
/** Smart pointer to a simgrid::s4u::Comm */
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
-#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/VirtualMachine.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Task.hpp>
+
#include <simgrid/Exception.hpp>
#endif /* SIMGRID_S4U_S4U_H */
*/
class XBT_PUBLIC Activity : public xbt::Extendable<Activity> {
#ifndef DOXYGEN
+ friend ActivitySet;
friend Comm;
friend Exec;
friend Io;
--- /dev/null
+/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_ACTIVITYSET_HPP
+#define SIMGRID_S4U_ACTIVITYSET_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <vector>
+
+namespace simgrid::s4u {
+/** @brief ActivitiesSet
+ *
+ * This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
+ * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these
+ * activities.
+ */
+class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
+ std::vector<ActivityPtr>
+ activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+ std::vector<ActivityPtr> failed_activities_;
+
+public:
+ ActivitySet() = default;
+ ~ActivitySet() = default;
+
+ /** Add an activity to the set */
+ void push(ActivityPtr a) { activities_.push_back(a); }
+ /** Remove that activity from the set (no-op if the activity is not in the set) */
+ void erase(ActivityPtr a);
+
+ /** Get the amount of activities in the set. Failed activities (if any) are not counted */
+ int size() { return activities_.size(); }
+ /** Return whether the set is empty. Failed activities (if any) are not counted */
+ int empty() { return activities_.empty(); }
+
+ /** Wait for the completion of all activities in the set, but not longer than the provided timeout
+ *
+ * On timeout, an exception is raised, and the completed activities remain in the set. Use test_any() to retrieve
+ * them.
+ */
+ void wait_all_for(double timeout);
+ /** Wait for the completion of all activities in the set */
+ void wait_all() { wait_all_for(-1); }
+ /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */
+ ActivityPtr test_any();
+
+ /** Wait for the completion of one activity from the set, but not longer than the provided timeout.
+ *
+ * See wait_any() for details.
+ *
+ * @return the first terminated activity, which is automatically removed from the set.
+ */
+
+ ActivityPtr wait_any_for(double timeout);
+ /** Wait for the completion of one activity from the set.
+ *
+ * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the
+ * set. Use get_failed_activity() to retrieve it.
+ *
+ * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several
+ * time to retrieve them all.
+ *
+ * @return the first terminated activity, which is automatically removed from the set. If more than one activity
+ * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to
+ * retrieve the other ones.
+ */
+ ActivityPtr wait_any() { return wait_any_for(-1); }
+
+ ActivityPtr get_failed_activity();
+ bool has_failed_activities() { return not failed_activities_.empty(); }
+};
+
+}; // namespace simgrid::s4u
+
+#endif
--- /dev/null
+/* Copyright (c) 2023-. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Engine.hpp>
+
+namespace simgrid::s4u {
+
+void ActivitySet::erase(ActivityPtr a)
+{
+ for (auto it = activities_.begin(); it != activities_.end(); it++)
+ if (*it == a) {
+ activities_.erase(it);
+ return;
+ }
+}
+
+void ActivitySet::wait_all_for(double timeout)
+{
+ if (timeout < 0.0) {
+ for (const auto& act : activities_)
+ act->wait();
+
+ } else {
+
+ double deadline = Engine::get_clock() + timeout;
+ for (const auto& act : activities_)
+ act->wait_until(deadline);
+ }
+}
+
+ActivityPtr ActivitySet::test_any()
+{
+ std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+ std::transform(begin(activities_), end(activities_), begin(act_impls),
+ [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
+ ssize_t changed_pos = kernel::actor::simcall_answered(
+ [&observer] {
+ return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+ },
+ &observer);
+ if (changed_pos == -1)
+ return ActivityPtr(nullptr);
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+}
+
+ActivityPtr ActivitySet::wait_any_for(double timeout)
+{
+ std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+ std::transform(begin(activities_), end(activities_), begin(act_impls),
+ [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ xbt_assert(changed_pos != -1,
+ "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+}
+
+ActivityPtr ActivitySet::get_failed_activity()
+{
+ if (failed_activities_.empty())
+ return ActivityPtr(nullptr);
+ auto ret = failed_activities_.back();
+ failed_activities_.pop_back();
+ return ret;
+}
+
+}; // namespace simgrid::s4u
\ No newline at end of file
set(S4U_SRC
src/s4u/s4u_Activity.cpp
+ src/s4u/s4u_ActivitySet.cpp
src/s4u/s4u_Actor.cpp
src/s4u/s4u_Barrier.cpp
src/s4u/s4u_Comm.cpp
include/simgrid/vm.h
include/simgrid/zone.h
include/simgrid/s4u/Activity.hpp
+ include/simgrid/s4u/ActivitySet.hpp
include/simgrid/s4u/Actor.hpp
include/simgrid/s4u/Barrier.hpp
include/simgrid/s4u/Comm.hpp