SimGrid (3.34.1) not released (Target: fall 2023)
+S4U:
+ - New class ActivitySet to ease wait_any()/test_any()/wait_all()
+
Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
include examples/c/plugin-host-load/plugin-host-load.tesh
include examples/c/synchro-semaphore/synchro-semaphore.c
include examples/c/synchro-semaphore/synchro-semaphore.tesh
-include examples/cpp/activity-testany/s4u-activity-testany.cpp
-include examples/cpp/activity-testany/s4u-activity-testany.tesh
-include examples/cpp/activity-waitany/s4u-activity-waitany.cpp
-include examples/cpp/activity-waitany/s4u-activity-waitany.tesh
+include examples/cpp/activityset-testany/s4u-activityset-testany.cpp
+include examples/cpp/activityset-testany/s4u-activityset-testany.tesh
+include examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp
+include examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh
+include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp
+include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh
+include examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp
+include examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh
include examples/cpp/actor-create/s4u-actor-create.cpp
include examples/cpp/actor-create/s4u-actor-create.tesh
include examples/cpp/actor-create/s4u-actor-create_d.xml
include examples/cpp/comm-ready/s4u-comm-ready.tesh
include examples/cpp/comm-suspend/s4u-comm-suspend.cpp
include examples/cpp/comm-suspend/s4u-comm-suspend.tesh
-include examples/cpp/comm-testany/s4u-comm-testany.cpp
-include examples/cpp/comm-testany/s4u-comm-testany.tesh
include examples/cpp/comm-throttling/s4u-comm-throttling.cpp
include examples/cpp/comm-throttling/s4u-comm-throttling.tesh
include examples/cpp/comm-wait/s4u-comm-wait.cpp
include examples/cpp/comm-wait/s4u-comm-wait.tesh
-include examples/cpp/comm-waitall/s4u-comm-waitall.cpp
-include examples/cpp/comm-waitall/s4u-comm-waitall.tesh
-include examples/cpp/comm-waitany/s4u-comm-waitany.cpp
-include examples/cpp/comm-waitany/s4u-comm-waitany.tesh
include examples/cpp/comm-waituntil/s4u-comm-waituntil.cpp
include examples/cpp/comm-waituntil/s4u-comm-waituntil.tesh
include examples/cpp/dag-comm/s4u-dag-comm.cpp
include examples/cpp/exec-threads/s4u-exec-threads.tesh
include examples/cpp/exec-unassigned/s4u-exec-unassigned.cpp
include examples/cpp/exec-unassigned/s4u-exec-unassigned.tesh
-include examples/cpp/exec-waitany/s4u-exec-waitany.cpp
-include examples/cpp/exec-waitany/s4u-exec-waitany.tesh
include examples/cpp/exec-waitfor/s4u-exec-waitfor.cpp
include examples/cpp/exec-waitfor/s4u-exec-waitfor.tesh
include examples/cpp/io-async/s4u-io-async.cpp
include include/simgrid/plugins/photovoltaic.hpp
include include/simgrid/s4u.hpp
include include/simgrid/s4u/Activity.hpp
+include include/simgrid/s4u/ActivitySet.hpp
include include/simgrid/s4u/Actor.hpp
include include/simgrid/s4u/Barrier.hpp
include include/simgrid/s4u/Comm.hpp
include src/plugins/vm/VmLiveMigration.hpp
include src/plugins/vm/dirty_page_tracking.cpp
include src/s4u/s4u_Activity.cpp
+include src/s4u/s4u_ActivitySet.cpp
include src/s4u/s4u_Actor.cpp
include src/s4u/s4u_Barrier.cpp
include src/s4u/s4u_Comm.cpp
# Deal with each example
-foreach (example activity-testany activity-waitany
+foreach (example activityset-testany activityset-waitany activityset-waitall activityset-waitallfor
actor-create actor-daemon actor-exiting actor-join actor-kill
actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
app-bittorrent app-chainsend app-token-ring
battery-degradation battery-simple battery-energy
- comm-pingpong comm-ready comm-suspend comm-testany comm-wait comm-waitany comm-waitall comm-waituntil
+ comm-pingpong comm-ready comm-suspend comm-wait comm-waituntil
comm-dependent comm-host2host comm-failure comm-throttling
cloud-capping cloud-migration cloud-simple
dag-comm dag-from-json-simple dag-from-dax-simple dag-from-dax dag-from-dot-simple dag-from-dot dag-failure dag-io dag-scheduling dag-simple dag-tuto
dht-chord dht-kademlia
energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
engine-filtering engine-run-partial
- exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned
+ exec-async exec-basic exec-dvfs exec-remote exec-waitfor exec-dependent exec-unassigned
exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads
maestro-set
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
auto comm = mbox->get_async(&payload);
auto io = disk->read_async(3e8);
- std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
- boost::dynamic_pointer_cast<sg4::Activity>(comm),
- boost::dynamic_pointer_cast<sg4::Activity>(io)};
+ sg4::ActivitySet pending_activities;
+ pending_activities.push(exec);
+ pending_activities.push(comm);
+ pending_activities.push(io);
XBT_INFO("Sleep_for a while");
sg4::this_actor::sleep_for(1);
XBT_INFO("Test for completed activities");
while (not pending_activities.empty()) {
- ssize_t changed_pos = sg4::Activity::test_any(pending_activities);
- if (changed_pos != -1) {
- auto* completed_one = pending_activities[changed_pos].get();
- if (dynamic_cast<sg4::Comm*>(completed_one))
+ auto completed_one = pending_activities.test_any();
+ if (completed_one != nullptr) {
+ if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
XBT_INFO("Completed a Comm");
- if (dynamic_cast<sg4::Exec*>(completed_one))
+ if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
XBT_INFO("Completed an Exec");
- if (dynamic_cast<sg4::Io*>(completed_one))
+ if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
XBT_INFO("Completed an I/O");
- pending_activities.erase(pending_activities.begin() + changed_pos);
- } else { // nothing matches, wait for a little bit
+ } else {
XBT_INFO("Nothing matches, test again in 0.5s");
sg4::this_actor::sleep_for(.5);
}
#!/usr/bin/env tesh
-$ ${bindir:=.}/s4u-activity-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+$ ${bindir:=.}/s4u-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
> [0.00] [alice] Send 'Message'
> [0.00] [ bob] Create my asynchronous activities
> [0.00] [ bob] Sleep_for a while
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ sg4::Mailbox* mbox = sg4::Mailbox::by_name("mbox");
+ const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+ std::string* payload;
+
+ XBT_INFO("Create my asynchronous activities");
+ auto exec = sg4::this_actor::exec_async(5e9);
+ auto comm = mbox->get_async(&payload);
+ auto io = disk->read_async(3e8);
+
+ sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
+ boost::dynamic_pointer_cast<sg4::Activity>(comm),
+ boost::dynamic_pointer_cast<sg4::Activity>(io)});
+
+ XBT_INFO("Wait for asynchrounous activities to complete, all in one shot.");
+ pending_activities.wait_all();
+
+ XBT_INFO("All activities are completed.");
+ delete payload;
+}
+
+static void alice()
+{
+ auto* payload = new std::string("Message");
+ XBT_INFO("Send '%s'", payload->c_str());
+ sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+ sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchrounous activities to complete, all in one shot.
+> [5.197828] [ bob] All activities are completed.
auto comm = mbox->get_async(&payload);
auto io = disk->read_async(3e8);
- std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
- boost::dynamic_pointer_cast<sg4::Activity>(comm),
- boost::dynamic_pointer_cast<sg4::Activity>(io)};
+ sg4::ActivitySet pending_activities({exec, comm, io});
XBT_INFO("Wait for asynchrounous activities to complete");
while (not pending_activities.empty()) {
- ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
- if (changed_pos != -1) {
- auto* completed_one = pending_activities[changed_pos].get();
- if (dynamic_cast<sg4::Comm*>(completed_one))
+ try {
+ pending_activities.wait_all_for(1);
+ } catch (simgrid::TimeoutException& e) {
+ XBT_INFO("Not all activities are terminated yet.");
+ }
+ while (auto completed_one = pending_activities.test_any()) {
+ if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
XBT_INFO("Completed a Comm");
- if (dynamic_cast<sg4::Exec*>(completed_one))
+ if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
XBT_INFO("Completed an Exec");
- if (dynamic_cast<sg4::Io*>(completed_one))
+ if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
XBT_INFO("Completed an I/O");
- pending_activities.erase(pending_activities.begin() + changed_pos);
}
}
XBT_INFO("Last activity is complete");
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [ bob] Create my asynchronous activities
+> [0.000000] [ bob] Wait for asynchrounous activities to complete
+> [1.000000] [ bob] Not all activities are terminated yet.
+> [2.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Not all activities are terminated yet.
+> [3.000000] [ bob] Completed an I/O
+> [4.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Not all activities are terminated yet.
+> [5.000000] [ bob] Completed an Exec
+> [5.197828] [ bob] Completed a Comm
+> [5.197828] [ bob] Last activity is complete
--- /dev/null
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+ sg4::Mailbox* mbox = sg4::Mailbox::by_name("mbox");
+ const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+ std::string* payload;
+
+ XBT_INFO("Create my asynchronous activities");
+ auto exec = sg4::this_actor::exec_async(5e9);
+ auto comm = mbox->get_async(&payload);
+ auto io = disk->read_async(3e8);
+
+ sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
+ boost::dynamic_pointer_cast<sg4::Activity>(comm),
+ boost::dynamic_pointer_cast<sg4::Activity>(io)});
+
+ XBT_INFO("Wait for asynchrounous activities to complete");
+ while (not pending_activities.empty()) {
+ auto completed_one = pending_activities.wait_any();
+ if (completed_one != nullptr) {
+ if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
+ XBT_INFO("Completed a Comm");
+ if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
+ XBT_INFO("Completed an Exec");
+ if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
+ XBT_INFO("Completed an I/O");
+ }
+ }
+ XBT_INFO("Last activity is complete");
+ delete payload;
+}
+
+static void alice()
+{
+ auto* payload = new std::string("Message");
+ XBT_INFO("Send '%s'", payload->c_str());
+ sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ e.load_platform(argv[1]);
+
+ sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+ sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+ e.run();
+
+ return 0;
+}
#!/usr/bin/env tesh
-$ ${bindir:=.}/s4u-activity-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+$ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
> [0.000000] [alice] Send 'Message'
> [0.000000] [ bob] Create my asynchronous activities
> [0.000000] [ bob] Wait for asynchrounous activities to complete
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_testany, "Messages specific for this s4u example");
-
-static void rank0()
-{
- sg4::Mailbox* mbox = sg4::Mailbox::by_name("rank0");
- std::string* msg1;
- std::string* msg2;
- std::string* msg3;
-
- XBT_INFO("Post my asynchronous receives");
- auto comm1 = mbox->get_async(&msg1);
- auto comm2 = mbox->get_async(&msg2);
- auto comm3 = mbox->get_async(&msg3);
- std::vector<sg4::CommPtr> pending_comms = {comm1, comm2, comm3};
-
- XBT_INFO("Send some data to rank-1");
- for (int i = 0; i < 3; i++)
- sg4::Mailbox::by_name("rank1")->put(new int(i), 1);
-
- XBT_INFO("Test for completed comms");
- while (not pending_comms.empty()) {
- ssize_t flag = sg4::Comm::test_any(pending_comms);
- if (flag != -1) {
- pending_comms.erase(pending_comms.begin() + flag);
- XBT_INFO("Remove a pending comm.");
- } else // nothing matches, wait for a little bit
- sg4::this_actor::sleep_for(0.1);
- }
- XBT_INFO("Last comm is complete");
- delete msg1;
- delete msg2;
- delete msg3;
-}
-
-static void rank1()
-{
- sg4::Mailbox* rank0_mbox = sg4::Mailbox::by_name("rank0");
- sg4::Mailbox* rank1_mbox = sg4::Mailbox::by_name("rank1");
-
- for (int i = 0; i < 3; i++) {
- auto res = rank1_mbox->get_unique<int>();
- XBT_INFO("Received %d", *res);
- std::string msg_content = "Message " + std::to_string(i);
- auto* payload = new std::string(msg_content);
- XBT_INFO("Send '%s'", msg_content.c_str());
- rank0_mbox->put(payload, 1e6);
- }
-}
-
-int main(int argc, char* argv[])
-{
- sg4::Engine e(&argc, argv);
-
- e.load_platform(argv[1]);
-
- sg4::Actor::create("rank-0", e.host_by_name("Tremblay"), rank0);
- sg4::Actor::create("rank-1", e.host_by_name("Fafard"), rank1);
-
- e.run();
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/s4u-comm-testany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e[%8h]%e[%a]%e%m%n"
-> [ 0.000000] [Tremblay] [rank-0] Post my asynchronous receives
-> [ 0.000000] [Tremblay] [rank-0] Send some data to rank-1
-> [ 0.025708] [ Fafard] [rank-1] Received 0
-> [ 0.025708] [ Fafard] [rank-1] Send 'Message 0'
-> [ 0.209813] [ Fafard] [rank-1] Received 1
-> [ 0.209813] [ Fafard] [rank-1] Send 'Message 1'
-> [ 0.393918] [Tremblay] [rank-0] Test for completed comms
-> [ 0.393918] [ Fafard] [rank-1] Received 2
-> [ 0.393918] [ Fafard] [rank-1] Send 'Message 2'
-> [ 0.393918] [Tremblay] [rank-0] Remove a pending comm.
-> [ 0.393918] [Tremblay] [rank-0] Remove a pending comm.
-> [ 0.593918] [Tremblay] [rank-0] Remove a pending comm.
-> [ 0.593918] [Tremblay] [rank-0] Last comm is complete
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-/* This example shows how to block on the completion of a set of communications.
- *
- * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
- *
- * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all()
- *
- */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_async_waitall, "Messages specific for this s4u example");
-
-static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size)
-{
- if (messages_count == 0 || receivers_count == 0) {
- XBT_WARN("Sender has nothing to do. Bail out!");
- return;
- }
- // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
-
- /* Make a vector of the mailboxes to use */
- std::vector<sg4::Mailbox*> mboxes;
- for (unsigned int i = 0; i < receivers_count; i++)
- mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i)));
- // sphinx-doc: init-end
-
- /* Start dispatching all messages to receivers, in a round robin fashion */
- for (unsigned int i = 0; i < messages_count; i++) {
- std::string msg_content = "Message " + std::to_string(i);
- // Copy the data we send: the 'msg_content' variable is not a stable storage location.
- // It will be destroyed when this actor leaves the loop, ie before the receiver gets it
- auto* payload = new std::string(msg_content);
-
- XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname());
-
- /* Create a communication representing the ongoing communication, and store it in pending_comms */
- sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
- pending_comms.push_back(comm);
- }
-
- /* Start sending messages to let the workers know that they should stop */ // sphinx-doc: put-begin
- for (unsigned int i = 0; i < receivers_count; i++) {
- XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
- sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
- pending_comms.push_back(comm);
- }
- XBT_INFO("Done dispatching all messages");
-
- /* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
- // sphinx-doc: put-end
-
- XBT_INFO("Goodbye now!");
-}
-
-/* Receiver actor expects 1 argument: its ID */
-static void receiver(int id)
-{
- sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id));
- XBT_INFO("Wait for my first message");
- for (bool cont = true; cont;) {
- auto received = mbox->get_unique<std::string>();
- XBT_INFO("I got a '%s'.", received->c_str());
- cont = (*received != "finalize"); // If it's a finalize message, we're done
- // Receiving the message was all we were supposed to do
- }
-}
-
-int main(int argc, char* argv[])
-{
- sg4::Engine e(&argc, argv);
-
- e.load_platform(argv[1]);
-
- sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 5, 2, 1e6);
- sg4::Actor::create("receiver", e.host_by_name("Ruby"), receiver, 0);
- sg4::Actor::create("receiver", e.host_by_name("Perl"), receiver, 1);
-
- e.run();
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/s4u-comm-waitall ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (2:receiver@Ruby) Wait for my first message
-> [ 0.000000] (3:receiver@Perl) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [ 0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [ 0.014016] (1:sender@Tremblay) Goodbye now!
+++ /dev/null
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-/* This example shows how to use simgrid::s4u::this_actor::wait_any() to wait for the first occurring event.
- *
- * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
- *
- * The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
- * will notice events as soon as they occur even if it does not follow the order of the container.
- *
- * Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
- * other messages of this application. As expected, the trace shows that the finalize of worker 1 is
- * processed before 'Message 5' that is sent to worker 0.
- *
- */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_waitall, "Messages specific for this s4u example");
-
-static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size)
-{
- if (messages_count == 0 || receivers_count == 0) {
- XBT_WARN("Sender has nothing to do. Bail out!");
- return;
- }
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
-
- /* Make a vector of the mailboxes to use */
- std::vector<sg4::Mailbox*> mboxes;
- for (unsigned int i = 0; i < receivers_count; i++)
- mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i)));
-
- /* Start dispatching all messages to receivers, in a round robin fashion */
- for (unsigned int i = 0; i < messages_count; i++) {
- std::string msg_content = "Message " + std::to_string(i);
- // Copy the data we send: the 'msg_content' variable is not a stable storage location.
- // It will be destroyed when this actor leaves the loop, ie before the receiver gets it
- auto* payload = new std::string(msg_content);
-
- XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname());
-
- /* Create a communication representing the ongoing communication, and store it in pending_comms */
- sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
- pending_comms.push_back(comm);
- }
-
- /* Start sending messages to let the workers know that they should stop */
- for (unsigned int i = 0; i < receivers_count; i++) {
- XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
- sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
- pending_comms.push_back(comm);
- }
- XBT_INFO("Done dispatching all messages");
-
- /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are
- * terminated
- * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- */
- while (not pending_comms.empty()) {
- ssize_t changed_pos = sg4::Comm::wait_any(pending_comms);
- pending_comms.erase(pending_comms.begin() + changed_pos);
- if (changed_pos != 0)
- XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
- changed_pos);
- }
-
- XBT_INFO("Goodbye now!");
-}
-
-/* Receiver actor expects 1 argument: its ID */
-static void receiver(int id)
-{
- sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id));
- XBT_INFO("Wait for my first message");
- for (bool cont = true; cont;) {
- auto received = mbox->get_unique<std::string>();
- XBT_INFO("I got a '%s'.", received->c_str());
- cont = (*received != "finalize"); // If it's a finalize message, we're done
- // Receiving the message was all we were supposed to do
- }
-}
-
-int main(int argc, char* argv[])
-{
- sg4::Engine e(&argc, argv);
-
- e.load_platform(argv[1]);
-
- sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 6, 2, 1e6);
- sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver, 0);
- sg4::Actor::create("receiver", e.host_by_name("Jupiter"), receiver, 1);
-
- e.run();
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing this_actor->wait_any()
-
-! output sort 19
-$ ${bindir:=.}/s4u-comm-waitany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [ 0.000000] (2:receiver@Fafard) Wait for my first message
-> [ 0.000000] (3:receiver@Jupiter) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [ 0.526478] (1:sender@Tremblay) Goodbye now!
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitany, "Messages specific for this s4u example");
-namespace sg4 = simgrid::s4u;
-
-static void worker(bool with_timeout)
-{
- /* Vector in which we store all pending executions*/
- std::vector<sg4::ExecPtr> pending_executions;
-
- for (int i = 0; i < 3; i++) {
- std::string name = "Exec-" + std::to_string(i);
- double amount = (6 * (i % 2) + i + 1) * sg4::this_actor::get_host()->get_speed();
-
- sg4::ExecPtr exec = sg4::this_actor::exec_init(amount)->set_name(name);
- pending_executions.push_back(exec);
- exec->start();
-
- XBT_INFO("Activity %s has started for %.0f seconds", name.c_str(),
- amount / sg4::this_actor::get_host()->get_speed());
- }
-
- /* Now that executions were initiated, wait for their completion, in order of termination.
- *
- * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
- * terminated.
- */
- while (not pending_executions.empty()) {
- ssize_t pos;
- if (with_timeout)
- pos = sg4::Exec::wait_any_for(pending_executions, 4);
- else
- pos = sg4::Exec::wait_any(pending_executions);
-
- if (pos < 0) {
- XBT_INFO("Do not wait any longer for an activity");
- pending_executions.clear();
- } else {
- XBT_INFO("Activity '%s' (at position %zd) is complete", pending_executions[pos]->get_cname(), pos);
- pending_executions.erase(pending_executions.begin() + pos);
- }
- XBT_INFO("%zu activities remain pending", pending_executions.size());
- }
-}
-
-int main(int argc, char* argv[])
-{
- sg4::Engine e(&argc, argv);
- e.load_platform(argv[1]);
- sg4::Actor::create("worker", e.host_by_name("Tremblay"), worker, false);
- sg4::Actor::create("worker_timeout", e.host_by_name("Tremblay"), worker, true);
- e.run();
-
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/s4u-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [ 0.000000] [ worker] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [ 0.000000] [ worker] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [ 0.000000] [ worker] Activity Exec-2 has started for 3 seconds
-> [ 0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [ 1.000000] [worker_timeout] Activity 'Exec-0' (at position 0) is complete
-> [ 1.000000] [worker_timeout] 2 activities remain pending
-> [ 1.000000] [ worker] Activity 'Exec-0' (at position 0) is complete
-> [ 1.000000] [ worker] 2 activities remain pending
-> [ 3.000000] [worker_timeout] Activity 'Exec-2' (at position 1) is complete
-> [ 3.000000] [worker_timeout] 1 activities remain pending
-> [ 3.000000] [ worker] Activity 'Exec-2' (at position 1) is complete
-> [ 3.000000] [ worker] 1 activities remain pending
-> [ 7.000000] [worker_timeout] Do not wait any longer for an activity
-> [ 7.000000] [worker_timeout] 0 activities remain pending
-> [ 8.000000] [ worker] Activity 'Exec-1' (at position 0) is complete
-> [ 8.000000] [ worker] 0 activities remain pending
class Activity;
/** Smart pointer to a simgrid::s4u::Activity */
using ActivityPtr = boost::intrusive_ptr<Activity>;
-XBT_PUBLIC void intrusive_ptr_release(const Activity* actor);
-XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor);
+XBT_PUBLIC void intrusive_ptr_release(const Activity* act);
+XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act);
+
+class ActivitySet;
+/** Smart pointer to a simgrid::s4u::Activity */
+using ActivitySetPtr = boost::intrusive_ptr<ActivitySet>;
+XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as);
+XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as);
class Actor;
/** Smart pointer to a simgrid::s4u::Actor */
class Barrier;
/** Smart pointer to a simgrid::s4u::Barrier */
using BarrierPtr = boost::intrusive_ptr<Barrier>;
-XBT_PUBLIC void intrusive_ptr_release(Barrier* m);
-XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m);
+XBT_PUBLIC void intrusive_ptr_release(Barrier* b);
+XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b);
class Comm;
/** Smart pointer to a simgrid::s4u::Comm */
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
-#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/VirtualMachine.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Task.hpp>
+
#include <simgrid/Exception.hpp>
#endif /* SIMGRID_S4U_S4U_H */
*/
class XBT_PUBLIC Activity : public xbt::Extendable<Activity> {
#ifndef DOXYGEN
+ friend ActivitySet;
friend Comm;
friend Exec;
friend Io;
--- /dev/null
+/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_ACTIVITYSET_HPP
+#define SIMGRID_S4U_ACTIVITYSET_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <vector>
+
+namespace simgrid::s4u {
+/** @brief ActivitiesSet
+ *
+ * This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
+ * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these
+ * activities.
+ */
+class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
+ std::vector<ActivityPtr>
+ activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+ std::vector<ActivityPtr> failed_activities_;
+
+public:
+ ActivitySet() = default;
+ ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
+ ~ActivitySet() = default;
+
+ /** Add an activity to the set */
+ void push(ActivityPtr a) { activities_.push_back(a); }
+ /** Remove that activity from the set (no-op if the activity is not in the set) */
+ void erase(ActivityPtr a);
+
+ /** Get the amount of activities in the set. Failed activities (if any) are not counted */
+ int size() { return activities_.size(); }
+ /** Return whether the set is empty. Failed activities (if any) are not counted */
+ int empty() { return activities_.empty(); }
+
+ /** Wait for the completion of all activities in the set, but not longer than the provided timeout
+ *
+ * On timeout, an exception is raised.
+ *
+ * In any case, the completed activities remain in the set. Use test_any() to retrieve them.
+ */
+ void wait_all_for(double timeout);
+ /** Wait for the completion of all activities in the set. The set is NOT emptied afterward. */
+ void wait_all() { wait_all_for(-1); }
+ /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */
+ ActivityPtr test_any();
+
+ /** Wait for the completion of one activity from the set, but not longer than the provided timeout.
+ *
+ * See wait_any() for details.
+ *
+ * @return the first terminated activity, which is automatically removed from the set.
+ */
+
+ ActivityPtr wait_any_for(double timeout);
+ /** Wait for the completion of one activity from the set.
+ *
+ * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the
+ * set. Use get_failed_activity() to retrieve it.
+ *
+ * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several
+ * time to retrieve them all.
+ *
+ * @return the first terminated activity, which is automatically removed from the set. If more than one activity
+ * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to
+ * retrieve the other ones.
+ */
+ ActivityPtr wait_any() { return wait_any_for(-1); }
+
+ ActivityPtr get_failed_activity();
+ bool has_failed_activities() { return not failed_activities_.empty(); }
+};
+
+}; // namespace simgrid::s4u
+
+#endif
--- /dev/null
+/* Copyright (c) 2023-. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Engine.hpp>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
+
+namespace simgrid::s4u {
+
+void ActivitySet::erase(ActivityPtr a)
+{
+ for (auto it = activities_.begin(); it != activities_.end(); it++)
+ if (*it == a) {
+ activities_.erase(it);
+ return;
+ }
+}
+
+void ActivitySet::wait_all_for(double timeout)
+{
+ if (timeout < 0.0) {
+ for (const auto& act : activities_)
+ act->wait();
+
+ } else {
+
+ double deadline = Engine::get_clock() + timeout;
+ for (const auto& act : activities_)
+ act->wait_until(deadline);
+ }
+}
+
+ActivityPtr ActivitySet::test_any()
+{
+ std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+ std::transform(begin(activities_), end(activities_), begin(act_impls),
+ [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
+ ssize_t changed_pos = kernel::actor::simcall_answered(
+ [&observer] {
+ return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+ },
+ &observer);
+ if (changed_pos == -1)
+ return ActivityPtr(nullptr);
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+}
+
+ActivityPtr ActivitySet::wait_any_for(double timeout)
+{
+ std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+ std::transform(begin(activities_), end(activities_), begin(act_impls),
+ [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ xbt_assert(changed_pos != -1,
+ "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+}
+
+ActivityPtr ActivitySet::get_failed_activity()
+{
+ if (failed_activities_.empty())
+ return ActivityPtr(nullptr);
+ auto ret = failed_activities_.back();
+ failed_activities_.pop_back();
+ return ret;
+}
+
+}; // namespace simgrid::s4u
\ No newline at end of file
set(S4U_SRC
src/s4u/s4u_Activity.cpp
+ src/s4u/s4u_ActivitySet.cpp
src/s4u/s4u_Actor.cpp
src/s4u/s4u_Barrier.cpp
src/s4u/s4u_Comm.cpp
include/simgrid/vm.h
include/simgrid/zone.h
include/simgrid/s4u/Activity.hpp
+ include/simgrid/s4u/ActivitySet.hpp
include/simgrid/s4u/Actor.hpp
include/simgrid/s4u/Barrier.hpp
include/simgrid/s4u/Comm.hpp