Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'fix-comm-signal' into 'master'
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Sat, 15 Jul 2023 09:12:17 +0000 (09:12 +0000)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Sat, 15 Jul 2023 09:12:17 +0000 (09:12 +0000)
update comm status BEFORE sending signals

See merge request simgrid/simgrid!167

29 files changed:
ChangeLog
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/activityset-testany/s4u-activityset-testany.cpp [moved from examples/cpp/activity-testany/s4u-activity-testany.cpp with 68% similarity]
examples/cpp/activityset-testany/s4u-activityset-testany.tesh [moved from examples/cpp/activity-testany/s4u-activity-testany.tesh with 88% similarity]
examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp [new file with mode: 0644]
examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh [new file with mode: 0644]
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp [moved from examples/cpp/activity-waitany/s4u-activity-waitany.cpp with 67% similarity]
examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh [new file with mode: 0644]
examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp [new file with mode: 0644]
examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh [moved from examples/cpp/activity-waitany/s4u-activity-waitany.tesh with 75% similarity]
examples/cpp/comm-testany/s4u-comm-testany.cpp [deleted file]
examples/cpp/comm-testany/s4u-comm-testany.tesh [deleted file]
examples/cpp/comm-waitall/s4u-comm-waitall.cpp [deleted file]
examples/cpp/comm-waitall/s4u-comm-waitall.tesh [deleted file]
examples/cpp/comm-waitany/s4u-comm-waitany.cpp [deleted file]
examples/cpp/comm-waitany/s4u-comm-waitany.tesh [deleted file]
examples/cpp/exec-waitany/s4u-exec-waitany.cpp [deleted file]
examples/cpp/exec-waitany/s4u-exec-waitany.tesh [deleted file]
examples/python/plugin-host-load/plugin-host-load.py
include/simgrid/forward.h
include/simgrid/s4u.hpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/ActivitySet.hpp [new file with mode: 0644]
src/kernel/actor/CommObserver.hpp
src/mc/api/strategy/UniformStrategy.hpp
src/s4u/s4u_ActivitySet.cpp [new file with mode: 0644]
tools/cmake/DefinePackages.cmake
tools/jenkins/build.sh

index 603816a..6e5f715 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,8 @@
 SimGrid (3.34.1) not released (Target: fall 2023)
 
+S4U:
+ - New class ActivitySet to ease wait_any()/test_any()/wait_all()
+
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
 
index f6767fe..3896bde 100644 (file)
@@ -120,10 +120,14 @@ include examples/c/plugin-host-load/plugin-host-load.c
 include examples/c/plugin-host-load/plugin-host-load.tesh
 include examples/c/synchro-semaphore/synchro-semaphore.c
 include examples/c/synchro-semaphore/synchro-semaphore.tesh
-include examples/cpp/activity-testany/s4u-activity-testany.cpp
-include examples/cpp/activity-testany/s4u-activity-testany.tesh
-include examples/cpp/activity-waitany/s4u-activity-waitany.cpp
-include examples/cpp/activity-waitany/s4u-activity-waitany.tesh
+include examples/cpp/activityset-testany/s4u-activityset-testany.cpp
+include examples/cpp/activityset-testany/s4u-activityset-testany.tesh
+include examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp
+include examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh
+include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.cpp
+include examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh
+include examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp
+include examples/cpp/activityset-waitany/s4u-activityset-waitany.tesh
 include examples/cpp/actor-create/s4u-actor-create.cpp
 include examples/cpp/actor-create/s4u-actor-create.tesh
 include examples/cpp/actor-create/s4u-actor-create_d.xml
@@ -190,16 +194,10 @@ include examples/cpp/comm-ready/s4u-comm-ready.cpp
 include examples/cpp/comm-ready/s4u-comm-ready.tesh
 include examples/cpp/comm-suspend/s4u-comm-suspend.cpp
 include examples/cpp/comm-suspend/s4u-comm-suspend.tesh
-include examples/cpp/comm-testany/s4u-comm-testany.cpp
-include examples/cpp/comm-testany/s4u-comm-testany.tesh
 include examples/cpp/comm-throttling/s4u-comm-throttling.cpp
 include examples/cpp/comm-throttling/s4u-comm-throttling.tesh
 include examples/cpp/comm-wait/s4u-comm-wait.cpp
 include examples/cpp/comm-wait/s4u-comm-wait.tesh
-include examples/cpp/comm-waitall/s4u-comm-waitall.cpp
-include examples/cpp/comm-waitall/s4u-comm-waitall.tesh
-include examples/cpp/comm-waitany/s4u-comm-waitany.cpp
-include examples/cpp/comm-waitany/s4u-comm-waitany.tesh
 include examples/cpp/comm-waituntil/s4u-comm-waituntil.cpp
 include examples/cpp/comm-waituntil/s4u-comm-waituntil.tesh
 include examples/cpp/dag-comm/s4u-dag-comm.cpp
@@ -291,8 +289,6 @@ include examples/cpp/exec-threads/s4u-exec-threads.cpp
 include examples/cpp/exec-threads/s4u-exec-threads.tesh
 include examples/cpp/exec-unassigned/s4u-exec-unassigned.cpp
 include examples/cpp/exec-unassigned/s4u-exec-unassigned.tesh
-include examples/cpp/exec-waitany/s4u-exec-waitany.cpp
-include examples/cpp/exec-waitany/s4u-exec-waitany.tesh
 include examples/cpp/exec-waitfor/s4u-exec-waitfor.cpp
 include examples/cpp/exec-waitfor/s4u-exec-waitfor.tesh
 include examples/cpp/io-async/s4u-io-async.cpp
@@ -1951,6 +1947,7 @@ include include/simgrid/plugins/ns3.hpp
 include include/simgrid/plugins/photovoltaic.hpp
 include include/simgrid/s4u.hpp
 include include/simgrid/s4u/Activity.hpp
+include include/simgrid/s4u/ActivitySet.hpp
 include include/simgrid/s4u/Actor.hpp
 include include/simgrid/s4u/Barrier.hpp
 include include/simgrid/s4u/Comm.hpp
@@ -2328,6 +2325,7 @@ include src/plugins/vm/VmLiveMigration.cpp
 include src/plugins/vm/VmLiveMigration.hpp
 include src/plugins/vm/dirty_page_tracking.cpp
 include src/s4u/s4u_Activity.cpp
+include src/s4u/s4u_ActivitySet.cpp
 include src/s4u/s4u_Actor.cpp
 include src/s4u/s4u_Barrier.cpp
 include src/s4u/s4u_Comm.cpp
index 66187a8..e1e5390 100644 (file)
@@ -153,19 +153,19 @@ endif()
 
 # Deal with each example
 
-foreach (example activity-testany activity-waitany
+foreach (example activityset-testany activityset-waitany activityset-waitall activityset-waitallfor
                  actor-create actor-daemon actor-exiting actor-join actor-kill
                  actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
                  app-bittorrent app-chainsend app-token-ring
                  battery-degradation battery-simple battery-energy
-                 comm-pingpong comm-ready comm-suspend comm-testany comm-wait comm-waitany comm-waitall comm-waituntil
+                 comm-pingpong comm-ready comm-suspend comm-wait comm-waituntil
                  comm-dependent comm-host2host comm-failure comm-throttling
                  cloud-capping cloud-migration cloud-simple
                  dag-comm dag-from-json-simple dag-from-dax-simple dag-from-dax dag-from-dot-simple dag-from-dot dag-failure dag-io dag-scheduling dag-simple dag-tuto
                  dht-chord dht-kademlia
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
                  engine-filtering engine-run-partial
-                 exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned
+                 exec-async exec-basic exec-dvfs exec-remote exec-waitfor exec-dependent exec-unassigned
                  exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads
                  maestro-set
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
@@ -22,26 +22,25 @@ static void bob()
   auto comm = mbox->get_async(&payload);
   auto io   = disk->read_async(3e8);
 
-  std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
-                                                      boost::dynamic_pointer_cast<sg4::Activity>(comm),
-                                                      boost::dynamic_pointer_cast<sg4::Activity>(io)};
+  sg4::ActivitySet pending_activities;
+  pending_activities.push(exec);
+  pending_activities.push(comm);
+  pending_activities.push(io);
 
   XBT_INFO("Sleep_for a while");
   sg4::this_actor::sleep_for(1);
 
   XBT_INFO("Test for completed activities");
   while (not pending_activities.empty()) {
-    ssize_t changed_pos = sg4::Activity::test_any(pending_activities);
-    if (changed_pos != -1) {
-      auto* completed_one = pending_activities[changed_pos].get();
-      if (dynamic_cast<sg4::Comm*>(completed_one))
+    auto completed_one = pending_activities.test_any();
+    if (completed_one != nullptr) {
+      if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
         XBT_INFO("Completed a Comm");
-      if (dynamic_cast<sg4::Exec*>(completed_one))
+      if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
         XBT_INFO("Completed an Exec");
-      if (dynamic_cast<sg4::Io*>(completed_one))
+      if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
         XBT_INFO("Completed an I/O");
-      pending_activities.erase(pending_activities.begin() + changed_pos);
-    } else { // nothing matches, wait for a little bit
+    } else {
       XBT_INFO("Nothing matches, test again in 0.5s");
       sg4::this_actor::sleep_for(.5);
     }
@@ -1,6 +1,6 @@
 #!/usr/bin/env tesh
 
-$ ${bindir:=.}/s4u-activity-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
+$ ${bindir:=.}/s4u-activityset-testany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%4.2r]%e[%5a]%e%m%n"
 > [0.00] [alice] Send 'Message'
 > [0.00] [  bob] Create my asynchronous activities
 > [0.00] [  bob] Sleep_for a while
diff --git a/examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp b/examples/cpp/activityset-waitall/s4u-activityset-waitall.cpp
new file mode 100644 (file)
index 0000000..176e50f
--- /dev/null
@@ -0,0 +1,55 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
+  const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+  std::string* payload;
+
+  XBT_INFO("Create my asynchronous activities");
+  auto exec = sg4::this_actor::exec_async(5e9);
+  auto comm = mbox->get_async(&payload);
+  auto io   = disk->read_async(3e8);
+
+  sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
+                                       boost::dynamic_pointer_cast<sg4::Activity>(comm),
+                                       boost::dynamic_pointer_cast<sg4::Activity>(io)});
+
+  XBT_INFO("Wait for asynchrounous activities to complete, all in one shot.");
+  pending_activities.wait_all();
+
+  XBT_INFO("All activities are completed.");
+  delete payload;
+}
+
+static void alice()
+{
+  auto* payload = new std::string("Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+  sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+  e.run();
+
+  return 0;
+}
diff --git a/examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh b/examples/cpp/activityset-waitall/s4u-activityset-waitall.tesh
new file mode 100644 (file)
index 0000000..6f01429
--- /dev/null
@@ -0,0 +1,7 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activityset-waitall ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchrounous activities to complete, all in one shot.
+> [5.197828] [  bob] All activities are completed.
@@ -22,22 +22,22 @@ static void bob()
   auto comm = mbox->get_async(&payload);
   auto io   = disk->read_async(3e8);
 
-  std::vector<sg4::ActivityPtr> pending_activities = {boost::dynamic_pointer_cast<sg4::Activity>(exec),
-                                                      boost::dynamic_pointer_cast<sg4::Activity>(comm),
-                                                      boost::dynamic_pointer_cast<sg4::Activity>(io)};
+  sg4::ActivitySet pending_activities({exec, comm, io});
 
   XBT_INFO("Wait for asynchrounous activities to complete");
   while (not pending_activities.empty()) {
-    ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
-    if (changed_pos != -1) {
-      auto* completed_one = pending_activities[changed_pos].get();
-      if (dynamic_cast<sg4::Comm*>(completed_one))
+    try {
+      pending_activities.wait_all_for(1);
+    } catch (simgrid::TimeoutException& e) {
+      XBT_INFO("Not all activities are terminated yet.");
+    }
+    while (auto completed_one = pending_activities.test_any()) {
+      if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
         XBT_INFO("Completed a Comm");
-      if (dynamic_cast<sg4::Exec*>(completed_one))
+      if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
         XBT_INFO("Completed an Exec");
-      if (dynamic_cast<sg4::Io*>(completed_one))
+      if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
         XBT_INFO("Completed an I/O");
-      pending_activities.erase(pending_activities.begin() + changed_pos);
     }
   }
   XBT_INFO("Last activity is complete");
diff --git a/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh b/examples/cpp/activityset-waitallfor/s4u-activityset-waitallfor.tesh
new file mode 100644 (file)
index 0000000..d0c1016
--- /dev/null
@@ -0,0 +1,15 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-activityset-waitallfor ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+> [0.000000] [alice] Send 'Message'
+> [0.000000] [  bob] Create my asynchronous activities
+> [0.000000] [  bob] Wait for asynchrounous activities to complete
+> [1.000000] [  bob] Not all activities are terminated yet.
+> [2.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Not all activities are terminated yet.
+> [3.000000] [  bob] Completed an I/O
+> [4.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Not all activities are terminated yet.
+> [5.000000] [  bob] Completed an Exec
+> [5.197828] [  bob] Completed a Comm
+> [5.197828] [  bob] Last activity is complete
diff --git a/examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp b/examples/cpp/activityset-waitany/s4u-activityset-waitany.cpp
new file mode 100644 (file)
index 0000000..1878a13
--- /dev/null
@@ -0,0 +1,64 @@
+/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_activity_waittany, "Messages specific for this s4u example");
+
+static void bob()
+{
+  sg4::Mailbox* mbox    = sg4::Mailbox::by_name("mbox");
+  const sg4::Disk* disk = sg4::Host::current()->get_disks().front();
+  std::string* payload;
+
+  XBT_INFO("Create my asynchronous activities");
+  auto exec = sg4::this_actor::exec_async(5e9);
+  auto comm = mbox->get_async(&payload);
+  auto io   = disk->read_async(3e8);
+
+  sg4::ActivitySet pending_activities({boost::dynamic_pointer_cast<sg4::Activity>(exec),
+                                       boost::dynamic_pointer_cast<sg4::Activity>(comm),
+                                       boost::dynamic_pointer_cast<sg4::Activity>(io)});
+
+  XBT_INFO("Wait for asynchrounous activities to complete");
+  while (not pending_activities.empty()) {
+    auto completed_one = pending_activities.wait_any();
+    if (completed_one != nullptr) {
+      if (boost::dynamic_pointer_cast<sg4::Comm>(completed_one))
+        XBT_INFO("Completed a Comm");
+      if (boost::dynamic_pointer_cast<sg4::Exec>(completed_one))
+        XBT_INFO("Completed an Exec");
+      if (boost::dynamic_pointer_cast<sg4::Io>(completed_one))
+        XBT_INFO("Completed an I/O");
+    }
+  }
+  XBT_INFO("Last activity is complete");
+  delete payload;
+}
+
+static void alice()
+{
+  auto* payload = new std::string("Message");
+  XBT_INFO("Send '%s'", payload->c_str());
+  sg4::Mailbox::by_name("mbox")->put(payload, 6e8);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("bob", e.host_by_name("bob"), bob);
+  sg4::Actor::create("alice", e.host_by_name("alice"), alice);
+
+  e.run();
+
+  return 0;
+}
@@ -1,6 +1,6 @@
 #!/usr/bin/env tesh
 
-$ ${bindir:=.}/s4u-activity-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
+$ ${bindir:=.}/s4u-activityset-waitany ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%7.6r]%e[%5a]%e%m%n"
 > [0.000000] [alice] Send 'Message'
 > [0.000000] [  bob] Create my asynchronous activities
 > [0.000000] [  bob] Wait for asynchrounous activities to complete
diff --git a/examples/cpp/comm-testany/s4u-comm-testany.cpp b/examples/cpp/comm-testany/s4u-comm-testany.cpp
deleted file mode 100644 (file)
index f06cb16..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_testany, "Messages specific for this s4u example");
-
-static void rank0()
-{
-  sg4::Mailbox* mbox = sg4::Mailbox::by_name("rank0");
-  std::string* msg1;
-  std::string* msg2;
-  std::string* msg3;
-
-  XBT_INFO("Post my asynchronous receives");
-  auto comm1                              = mbox->get_async(&msg1);
-  auto comm2                              = mbox->get_async(&msg2);
-  auto comm3                              = mbox->get_async(&msg3);
-  std::vector<sg4::CommPtr> pending_comms = {comm1, comm2, comm3};
-
-  XBT_INFO("Send some data to rank-1");
-  for (int i = 0; i < 3; i++)
-    sg4::Mailbox::by_name("rank1")->put(new int(i), 1);
-
-  XBT_INFO("Test for completed comms");
-  while (not pending_comms.empty()) {
-    ssize_t flag = sg4::Comm::test_any(pending_comms);
-    if (flag != -1) {
-      pending_comms.erase(pending_comms.begin() + flag);
-      XBT_INFO("Remove a pending comm.");
-    } else // nothing matches, wait for a little bit
-      sg4::this_actor::sleep_for(0.1);
-  }
-  XBT_INFO("Last comm is complete");
-  delete msg1;
-  delete msg2;
-  delete msg3;
-}
-
-static void rank1()
-{
-  sg4::Mailbox* rank0_mbox = sg4::Mailbox::by_name("rank0");
-  sg4::Mailbox* rank1_mbox = sg4::Mailbox::by_name("rank1");
-
-  for (int i = 0; i < 3; i++) {
-    auto res = rank1_mbox->get_unique<int>();
-    XBT_INFO("Received %d", *res);
-    std::string msg_content = "Message " + std::to_string(i);
-    auto* payload           = new std::string(msg_content);
-    XBT_INFO("Send '%s'", msg_content.c_str());
-    rank0_mbox->put(payload, 1e6);
-  }
-}
-
-int main(int argc, char* argv[])
-{
-  sg4::Engine e(&argc, argv);
-
-  e.load_platform(argv[1]);
-
-  sg4::Actor::create("rank-0", e.host_by_name("Tremblay"), rank0);
-  sg4::Actor::create("rank-1", e.host_by_name("Fafard"), rank1);
-
-  e.run();
-
-  return 0;
-}
diff --git a/examples/cpp/comm-testany/s4u-comm-testany.tesh b/examples/cpp/comm-testany/s4u-comm-testany.tesh
deleted file mode 100644 (file)
index 0f19916..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/s4u-comm-testany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e[%8h]%e[%a]%e%m%n"
-> [  0.000000] [Tremblay] [rank-0] Post my asynchronous receives
-> [  0.000000] [Tremblay] [rank-0] Send some data to rank-1
-> [  0.025708] [  Fafard] [rank-1] Received 0
-> [  0.025708] [  Fafard] [rank-1] Send 'Message 0'
-> [  0.209813] [  Fafard] [rank-1] Received 1
-> [  0.209813] [  Fafard] [rank-1] Send 'Message 1'
-> [  0.393918] [Tremblay] [rank-0] Test for completed comms
-> [  0.393918] [  Fafard] [rank-1] Received 2
-> [  0.393918] [  Fafard] [rank-1] Send 'Message 2'
-> [  0.393918] [Tremblay] [rank-0] Remove a pending comm.
-> [  0.393918] [Tremblay] [rank-0] Remove a pending comm.
-> [  0.593918] [Tremblay] [rank-0] Remove a pending comm.
-> [  0.593918] [Tremblay] [rank-0] Last comm is complete
diff --git a/examples/cpp/comm-waitall/s4u-comm-waitall.cpp b/examples/cpp/comm-waitall/s4u-comm-waitall.cpp
deleted file mode 100644 (file)
index 5c3f863..0000000
+++ /dev/null
@@ -1,94 +0,0 @@
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-/* This example shows how to block on the completion of a set of communications.
- *
- * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
- *
- * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all()
- *
- */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_async_waitall, "Messages specific for this s4u example");
-
-static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size)
-{
-  if (messages_count == 0 || receivers_count == 0) {
-    XBT_WARN("Sender has nothing to do. Bail out!");
-    return;
-  }
-  // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
-  /* Vector in which we store all ongoing communications */
-  std::vector<sg4::CommPtr> pending_comms;
-
-  /* Make a vector of the mailboxes to use */
-  std::vector<sg4::Mailbox*> mboxes;
-  for (unsigned int i = 0; i < receivers_count; i++)
-    mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i)));
-  // sphinx-doc: init-end
-
-  /* Start dispatching all messages to receivers, in a round robin fashion */
-  for (unsigned int i = 0; i < messages_count; i++) {
-    std::string msg_content = "Message " + std::to_string(i);
-    // Copy the data we send: the 'msg_content' variable is not a stable storage location.
-    // It will be destroyed when this actor leaves the loop, ie before the receiver gets it
-    auto* payload = new std::string(msg_content);
-
-    XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname());
-
-    /* Create a communication representing the ongoing communication, and store it in pending_comms */
-    sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
-    pending_comms.push_back(comm);
-  }
-
-  /* Start sending messages to let the workers know that they should stop */ // sphinx-doc: put-begin
-  for (unsigned int i = 0; i < receivers_count; i++) {
-    XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
-    sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
-    pending_comms.push_back(comm);
-  }
-  XBT_INFO("Done dispatching all messages");
-
-  /* Now that all message exchanges were initiated, wait for their completion in one single call */
-  sg4::Comm::wait_all(pending_comms);
-  // sphinx-doc: put-end
-
-  XBT_INFO("Goodbye now!");
-}
-
-/* Receiver actor expects 1 argument: its ID */
-static void receiver(int id)
-{
-  sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id));
-  XBT_INFO("Wait for my first message");
-  for (bool cont = true; cont;) {
-    auto received = mbox->get_unique<std::string>();
-    XBT_INFO("I got a '%s'.", received->c_str());
-    cont = (*received != "finalize"); // If it's a finalize message, we're done
-    // Receiving the message was all we were supposed to do
-  }
-}
-
-int main(int argc, char* argv[])
-{
-  sg4::Engine e(&argc, argv);
-
-  e.load_platform(argv[1]);
-
-  sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 5, 2, 1e6);
-  sg4::Actor::create("receiver", e.host_by_name("Ruby"), receiver, 0);
-  sg4::Actor::create("receiver", e.host_by_name("Perl"), receiver, 1);
-
-  e.run();
-
-  return 0;
-}
diff --git a/examples/cpp/comm-waitall/s4u-comm-waitall.tesh b/examples/cpp/comm-waitall/s4u-comm-waitall.tesh
deleted file mode 100644 (file)
index cdf7365..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${bindir:=.}/s4u-comm-waitall ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (2:receiver@Ruby) Wait for my first message
-> [  0.000000] (3:receiver@Perl) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [  0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [  0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [  0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [  0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [  0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [  0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [  0.014016] (1:sender@Tremblay) Goodbye now!
diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.cpp b/examples/cpp/comm-waitany/s4u-comm-waitany.cpp
deleted file mode 100644 (file)
index 802a4fc..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-/* Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-/* This example shows how to use simgrid::s4u::this_actor::wait_any() to wait for the first occurring event.
- *
- * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
- *
- * The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
- * will notice events as soon as they occur even if it does not follow the order of the container.
- *
- * Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
- * other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
- * processed before 'Message 5' that is sent to worker 0.
- *
- */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-namespace sg4 = simgrid::s4u;
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_waitall, "Messages specific for this s4u example");
-
-static void sender(unsigned int messages_count, unsigned int receivers_count, long msg_size)
-{
-  if (messages_count == 0 || receivers_count == 0) {
-    XBT_WARN("Sender has nothing to do. Bail out!");
-    return;
-  }
-  /* Vector in which we store all ongoing communications */
-  std::vector<sg4::CommPtr> pending_comms;
-
-  /* Make a vector of the mailboxes to use */
-  std::vector<sg4::Mailbox*> mboxes;
-  for (unsigned int i = 0; i < receivers_count; i++)
-    mboxes.push_back(sg4::Mailbox::by_name("receiver-" + std::to_string(i)));
-
-  /* Start dispatching all messages to receivers, in a round robin fashion */
-  for (unsigned int i = 0; i < messages_count; i++) {
-    std::string msg_content = "Message " + std::to_string(i);
-    // Copy the data we send: the 'msg_content' variable is not a stable storage location.
-    // It will be destroyed when this actor leaves the loop, ie before the receiver gets it
-    auto* payload = new std::string(msg_content);
-
-    XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mboxes[i % receivers_count]->get_cname());
-
-    /* Create a communication representing the ongoing communication, and store it in pending_comms */
-    sg4::CommPtr comm = mboxes[i % receivers_count]->put_async(payload, msg_size);
-    pending_comms.push_back(comm);
-  }
-
-  /* Start sending messages to let the workers know that they should stop */
-  for (unsigned int i = 0; i < receivers_count; i++) {
-    XBT_INFO("Send 'finalize' to 'receiver-%u'", i);
-    sg4::CommPtr comm = mboxes[i]->put_async(new std::string("finalize"), 0);
-    pending_comms.push_back(comm);
-  }
-  XBT_INFO("Done dispatching all messages");
-
-  /* Now that all message exchanges were initiated, wait for their completion, in order of termination.
-   *
-   * This loop waits for first terminating message with wait_any() and remove it with erase(), until all comms are
-   * terminated
-   * Even in this simple example, the pending comms do not terminate in the exact same order of creation.
-   */
-  while (not pending_comms.empty()) {
-    ssize_t changed_pos = sg4::Comm::wait_any(pending_comms);
-    pending_comms.erase(pending_comms.begin() + changed_pos);
-    if (changed_pos != 0)
-      XBT_INFO("Remove the %zdth pending comm: it terminated earlier than another comm that was initiated first.",
-               changed_pos);
-  }
-
-  XBT_INFO("Goodbye now!");
-}
-
-/* Receiver actor expects 1 argument: its ID */
-static void receiver(int id)
-{
-  sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver-" + std::to_string(id));
-  XBT_INFO("Wait for my first message");
-  for (bool cont = true; cont;) {
-    auto received = mbox->get_unique<std::string>();
-    XBT_INFO("I got a '%s'.", received->c_str());
-    cont = (*received != "finalize"); // If it's a finalize message, we're done
-    // Receiving the message was all we were supposed to do
-  }
-}
-
-int main(int argc, char* argv[])
-{
-  sg4::Engine e(&argc, argv);
-
-  e.load_platform(argv[1]);
-
-  sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 6, 2, 1e6);
-  sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver, 0);
-  sg4::Actor::create("receiver", e.host_by_name("Jupiter"), receiver, 1);
-
-  e.run();
-
-  return 0;
-}
diff --git a/examples/cpp/comm-waitany/s4u-comm-waitany.tesh b/examples/cpp/comm-waitany/s4u-comm-waitany.tesh
deleted file mode 100644 (file)
index 5fe9dc9..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing this_actor->wait_any()
-
-! output sort 19
-$ ${bindir:=.}/s4u-comm-waitany ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
-> [  0.000000] (2:receiver@Fafard) Wait for my first message
-> [  0.000000] (3:receiver@Jupiter) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 5' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-1'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [  0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [  0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [  0.526478] (1:sender@Tremblay) Goodbye now!
diff --git a/examples/cpp/exec-waitany/s4u-exec-waitany.cpp b/examples/cpp/exec-waitany/s4u-exec-waitany.cpp
deleted file mode 100644 (file)
index c30dd31..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "simgrid/s4u.hpp"
-#include <cstdlib>
-#include <iostream>
-#include <string>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitany, "Messages specific for this s4u example");
-namespace sg4 = simgrid::s4u;
-
-static void worker(bool with_timeout)
-{
-  /* Vector in which we store all pending executions*/
-  std::vector<sg4::ExecPtr> pending_executions;
-
-  for (int i = 0; i < 3; i++) {
-    std::string name = "Exec-" + std::to_string(i);
-    double amount    = (6 * (i % 2) + i + 1) * sg4::this_actor::get_host()->get_speed();
-
-    sg4::ExecPtr exec = sg4::this_actor::exec_init(amount)->set_name(name);
-    pending_executions.push_back(exec);
-    exec->start();
-
-    XBT_INFO("Activity %s has started for %.0f seconds", name.c_str(),
-             amount / sg4::this_actor::get_host()->get_speed());
-  }
-
-  /* Now that executions were initiated, wait for their completion, in order of termination.
-   *
-   * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
-   * terminated.
-   */
-  while (not pending_executions.empty()) {
-    ssize_t pos;
-    if (with_timeout)
-      pos = sg4::Exec::wait_any_for(pending_executions, 4);
-    else
-      pos = sg4::Exec::wait_any(pending_executions);
-
-    if (pos < 0) {
-      XBT_INFO("Do not wait any longer for an activity");
-      pending_executions.clear();
-    } else {
-      XBT_INFO("Activity '%s' (at position %zd) is complete", pending_executions[pos]->get_cname(), pos);
-      pending_executions.erase(pending_executions.begin() + pos);
-    }
-    XBT_INFO("%zu activities remain pending", pending_executions.size());
-  }
-}
-
-int main(int argc, char* argv[])
-{
-  sg4::Engine e(&argc, argv);
-  e.load_platform(argv[1]);
-  sg4::Actor::create("worker", e.host_by_name("Tremblay"), worker, false);
-  sg4::Actor::create("worker_timeout", e.host_by_name("Tremblay"), worker, true);
-  e.run();
-
-  return 0;
-}
diff --git a/examples/cpp/exec-waitany/s4u-exec-waitany.tesh b/examples/cpp/exec-waitany/s4u-exec-waitany.tesh
deleted file mode 100644 (file)
index 072cab5..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env tesh
-
-! output sort 19
-$ ${bindir:=.}/s4u-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
-> [  0.000000] [        worker] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
-> [  0.000000] [        worker] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
-> [  0.000000] [        worker] Activity Exec-2 has started for 3 seconds
-> [  0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
-> [  1.000000] [worker_timeout] Activity 'Exec-0' (at position 0) is complete
-> [  1.000000] [worker_timeout] 2 activities remain pending
-> [  1.000000] [        worker] Activity 'Exec-0' (at position 0) is complete
-> [  1.000000] [        worker] 2 activities remain pending
-> [  3.000000] [worker_timeout] Activity 'Exec-2' (at position 1) is complete
-> [  3.000000] [worker_timeout] 1 activities remain pending
-> [  3.000000] [        worker] Activity 'Exec-2' (at position 1) is complete
-> [  3.000000] [        worker] 1 activities remain pending
-> [  7.000000] [worker_timeout] Do not wait any longer for an activity
-> [  7.000000] [worker_timeout] 0 activities remain pending
-> [  8.000000] [        worker] Activity 'Exec-1' (at position 0) is complete
-> [  8.000000] [        worker] 0 activities remain pending
index 9fce743..71d3ac8 100644 (file)
@@ -48,7 +48,7 @@ far: {host.computed_flops:.0E}, average load as reported by the HostLoad plugin:
   this_actor.info(f'Run an activity of {100E6:.0E} flops')
   this_actor.execute(100E6)
   this_actor.info(f'Done working on my activity; this took {Engine.clock - start}s; current peak speed: {host.speed:.0E} flop/s; number of flops computed so far: {host.computed_flops:.0E}')
-  Engine
+
   start = Engine.clock
   this_actor.info("========= Requesting a reset of the computation and load counters")
   host.reset_load()
index 07e7cbc..6c72682 100644 (file)
@@ -19,8 +19,14 @@ namespace s4u {
 class Activity;
 /** Smart pointer to a simgrid::s4u::Activity */
 using ActivityPtr = boost::intrusive_ptr<Activity>;
-XBT_PUBLIC void intrusive_ptr_release(const Activity* actor);
-XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* actor);
+XBT_PUBLIC void intrusive_ptr_release(const Activity* act);
+XBT_PUBLIC void intrusive_ptr_add_ref(const Activity* act);
+
+class ActivitySet;
+/** Smart pointer to a simgrid::s4u::Activity */
+using ActivitySetPtr = boost::intrusive_ptr<ActivitySet>;
+XBT_PUBLIC void intrusive_ptr_release(const ActivitySet* as);
+XBT_PUBLIC void intrusive_ptr_add_ref(const ActivitySet* as);
 
 class Actor;
 /** Smart pointer to a simgrid::s4u::Actor */
@@ -31,8 +37,8 @@ XBT_PUBLIC void intrusive_ptr_add_ref(const Actor* actor);
 class Barrier;
 /** Smart pointer to a simgrid::s4u::Barrier */
 using BarrierPtr = boost::intrusive_ptr<Barrier>;
-XBT_PUBLIC void intrusive_ptr_release(Barrier* m);
-XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* m);
+XBT_PUBLIC void intrusive_ptr_release(Barrier* b);
+XBT_PUBLIC void intrusive_ptr_add_ref(Barrier* b);
 
 class Comm;
 /** Smart pointer to a simgrid::s4u::Comm */
index 77a71df..b285322 100644 (file)
 #include <simgrid/s4u/Mutex.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/s4u/Semaphore.hpp>
-#include <simgrid/s4u/Task.hpp>
 #include <simgrid/s4u/VirtualMachine.hpp>
 
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Task.hpp>
+
 #include <simgrid/Exception.hpp>
 
 #endif /* SIMGRID_S4U_S4U_H */
index b66268e..9973a00 100644 (file)
@@ -34,6 +34,7 @@ namespace s4u {
  */
 class XBT_PUBLIC Activity : public xbt::Extendable<Activity> {
 #ifndef DOXYGEN
+  friend ActivitySet;
   friend Comm;
   friend Exec;
   friend Io;
diff --git a/include/simgrid/s4u/ActivitySet.hpp b/include/simgrid/s4u/ActivitySet.hpp
new file mode 100644 (file)
index 0000000..86942a3
--- /dev/null
@@ -0,0 +1,86 @@
+/* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_ACTIVITYSET_HPP
+#define SIMGRID_S4U_ACTIVITYSET_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <vector>
+
+namespace simgrid {
+
+extern template class XBT_PUBLIC xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
+/** @brief ActivitiesSet
+ *
+ * This class is a container of activities, allowing to wait for the completion of any or all activities in the set.
+ * This is somehow similar to the select(2) system call under UNIX, allowing you to wait for the next event about these
+ * activities.
+ */
+class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
+  std::vector<ActivityPtr>
+      activities_; // We use a vector instead of a set to improve reproductibility accross architectures
+  std::vector<ActivityPtr> failed_activities_;
+
+public:
+  ActivitySet()  = default;
+  ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
+  ~ActivitySet() = default;
+
+  /** Add an activity to the set */
+  void push(ActivityPtr a) { activities_.push_back(a); }
+  /** Remove that activity from the set (no-op if the activity is not in the set) */
+  void erase(ActivityPtr a);
+
+  /** Get the amount of activities in the set. Failed activities (if any) are not counted */
+  int size() { return activities_.size(); }
+  /** Return whether the set is empty. Failed activities (if any) are not counted */
+  int empty() { return activities_.empty(); }
+
+  /** Wait for the completion of all activities in the set, but not longer than the provided timeout
+   *
+   * On timeout, an exception is raised.
+   *
+   * In any case, the completed activities remain in the set. Use test_any() to retrieve them.
+   */
+  void wait_all_for(double timeout);
+  /** Wait for the completion of all activities in the set. The set is NOT emptied afterward. */
+  void wait_all() { wait_all_for(-1); }
+  /** Returns the first terminated activity if any, or ActivityPtr(nullptr) if no activity is terminated */
+  ActivityPtr test_any();
+
+  /** Wait for the completion of one activity from the set, but not longer than the provided timeout.
+   *
+   *  See wait_any() for details.
+   *
+   * @return the first terminated activity, which is automatically removed from the set.
+   */
+
+  ActivityPtr wait_any_for(double timeout);
+  /** Wait for the completion of one activity from the set.
+   *
+   * If an activity fails during that time, an exception is raised, and the failed exception is marked as failed in the
+   * set. Use get_failed_activity() to retrieve it.
+   *
+   * If more than one activity failed, the other ones are also removed from the set. Use get_failed_activity() several
+   * time to retrieve them all.
+   *
+   * @return the first terminated activity, which is automatically removed from the set. If more than one activity
+   * terminated at the same timestamp, then the other ones are still in the set. Use either test_any() or wait_any() to
+   * retrieve the other ones.
+   */
+  ActivityPtr wait_any() { return wait_any_for(-1); }
+
+  ActivityPtr get_failed_activity();
+  bool has_failed_activities() { return not failed_activities_.empty(); }
+};
+
+} // namespace s4u
+} // namespace simgrid
+
+#endif
index c8e2c8c..ab03651 100644 (file)
@@ -110,7 +110,7 @@ public:
       const std::function<void(void*)>& clean_fun, // used to free the synchro in case of problem after a detached send
       const std::function<void(activity::CommImpl*, void*, size_t)>&
           copy_data_fun, // used to copy data if not default one
-      void* payload, bool detached, std::string fun_call)
+      void* payload, bool detached, std::string_view fun_call)
       : SimcallObserver(actor)
       , mbox_(mbox)
       , payload_size_(payload_size)
@@ -160,7 +160,7 @@ public:
   CommIrecvSimcall(ActorImpl* actor, activity::MailboxImpl* mbox, unsigned char* dst_buff, size_t* dst_buff_size,
                    const std::function<bool(void*, void*, activity::CommImpl*)>& match_fun,
                    const std::function<void(activity::CommImpl*, void*, size_t)>& copy_data_fun, void* payload,
-                   double rate, std::string fun_call)
+                   double rate, std::string_view fun_call)
       : SimcallObserver(actor)
       , mbox_(mbox)
       , dst_buff_(dst_buff)
index 0b665a8..472c571 100644 (file)
@@ -48,7 +48,7 @@ public:
        chosen = xbt::random::uniform_int(0, possibilities-1);
 
     for (auto const& [aid, actor] : actors_to_run_) {
-        if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
+      if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
         continue;
       if (chosen == 0) {
         return std::make_pair(aid, valuation.at(aid));
diff --git a/src/s4u/s4u_ActivitySet.cpp b/src/s4u/s4u_ActivitySet.cpp
new file mode 100644 (file)
index 0000000..4d70420
--- /dev/null
@@ -0,0 +1,98 @@
+/* Copyright (c) 2023-. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Engine.hpp>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
+
+namespace simgrid {
+
+template class xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
+
+void ActivitySet::erase(ActivityPtr a)
+{
+  for (auto it = activities_.begin(); it != activities_.end(); it++)
+    if (*it == a) {
+      activities_.erase(it);
+      return;
+    }
+}
+
+void ActivitySet::wait_all_for(double timeout)
+{
+  if (timeout < 0.0) {
+    for (const auto& act : activities_)
+      act->wait();
+
+  } else {
+
+    double deadline = Engine::get_clock() + timeout;
+    for (const auto& act : activities_)
+      act->wait_until(deadline);
+  }
+}
+
+ActivityPtr ActivitySet::test_any()
+{
+  std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+  std::transform(begin(activities_), end(activities_), begin(act_impls),
+                 [](const ActivityPtr& act) { return act->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
+  ssize_t changed_pos = kernel::actor::simcall_answered(
+      [&observer] {
+        return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+      },
+      &observer);
+  if (changed_pos == -1)
+    return ActivityPtr(nullptr);
+
+  auto ret = activities_.at(changed_pos);
+  erase(ret);
+  ret->complete(Activity::State::FINISHED);
+  return ret;
+}
+
+ActivityPtr ActivitySet::wait_any_for(double timeout)
+{
+  std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
+  std::transform(begin(activities_), end(activities_), begin(act_impls),
+                 [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
+  ssize_t changed_pos = kernel::actor::simcall_blocking(
+      [&observer] {
+        kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                     observer.get_timeout());
+      },
+      &observer);
+  xbt_assert(changed_pos != -1,
+             "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
+
+  auto ret = activities_.at(changed_pos);
+  erase(ret);
+  ret->complete(Activity::State::FINISHED);
+  return ret;
+}
+
+ActivityPtr ActivitySet::get_failed_activity()
+{
+  if (failed_activities_.empty())
+    return ActivityPtr(nullptr);
+  auto ret = failed_activities_.back();
+  failed_activities_.pop_back();
+  return ret;
+}
+
+} // namespace s4u
+} // namespace simgrid
index 20baa0b..a6ffe30 100644 (file)
@@ -458,6 +458,7 @@ set(PLUGINS_SRC
 
 set(S4U_SRC
   src/s4u/s4u_Activity.cpp
+  src/s4u/s4u_ActivitySet.cpp
   src/s4u/s4u_Actor.cpp
   src/s4u/s4u_Barrier.cpp
   src/s4u/s4u_Comm.cpp
@@ -677,6 +678,7 @@ set(headers_to_install
   include/simgrid/vm.h
   include/simgrid/zone.h
   include/simgrid/s4u/Activity.hpp
+  include/simgrid/s4u/ActivitySet.hpp
   include/simgrid/s4u/Actor.hpp
   include/simgrid/s4u/Barrier.hpp
   include/simgrid/s4u/Comm.hpp
index 9c8fc76..a9d9d64 100755 (executable)
@@ -2,6 +2,13 @@
 
 # This script is used by various build projects on Jenkins
 
+case "$JENKINS_HOME" in
+*-qualif)
+  echo "Build skipped on $JENKINS_HOME."
+  exit 0
+  ;;
+esac
+
 # See https://ci.inria.fr/simgrid/job/SimGrid/configure
 # See https://ci.inria.fr/simgrid/job/Simgrid-Windows/configure