Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecate/remove Comm::wait_any and Comm::wait_any_for
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:31:02 +0000 (20:31 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:31:02 +0000 (20:31 +0200)
ChangeLog
examples/python/comm-failure/comm-failure.py
examples/python/network-nonlinear/network-nonlinear.py
examples/python/platform-comm-serialize/platform-comm-serialize.py
include/simgrid/s4u/Comm.hpp
src/bindings/python/simgrid_python.cpp
src/s4u/s4u_Comm.cpp
teshsuite/s4u/CMakeLists.txt
teshsuite/s4u/activity-lifecycle/testing_comm.cpp
teshsuite/s4u/wait-any-for/wait-any-for.cpp [deleted file]
teshsuite/s4u/wait-any-for/wait-any-for.tesh [deleted file]

index 5e8851b..b35e98a 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -16,7 +16,7 @@ SMPI:
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
  - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
- - Comm::waitall/testany() are gone. Please use ActivitySet() instead.
+ - Comm::waitall/waitany/testany() are gone. Please use ActivitySet() instead.
  - Comm::waitallfor() is gone too. Its semantic was unclear on timeout anyway.
  - Io::waitany() and waitanyfor() are gone. Please use ActivitySet() instead.
 
index c0ccb67..02cd701 100644 (file)
@@ -5,7 +5,7 @@
 
 import sys
 
-from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+from simgrid import Engine, Actor, ActivitySet, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
 
 
 def sender(mailbox1_name: str, mailbox2_name: str) -> None:
@@ -19,10 +19,10 @@ def sender(mailbox1_name: str, mailbox2_name: str) -> None:
     comm2: Comm = mailbox2.put_async(666, 2)
 
     this_actor.info("Calling wait_any..")
-    pending_comms = [comm1, comm2]
+    pending_comms = ActivitySet([comm1, comm2])
     try:
-        index = Comm.wait_any([comm1, comm2])
-        this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+        comm = pending_comms.wait_any()
+        this_actor.info(f"Wait any returned a comm to {comm.mailbox.name})")
     except NetworkFailureException:
         this_actor.info("Sender has experienced a network failure exception, so it knows that something went wrong")
         this_actor.info("Now it needs to figure out which of the two comms failed by looking at their state:")
@@ -36,9 +36,8 @@ def sender(mailbox1_name: str, mailbox2_name: str) -> None:
         this_actor.info(f"Waiting on a FAILED comm raises an exception: '{err}'")
 
     this_actor.info("Wait for remaining comm, just to be nice")
-    pending_comms.pop(0)
     try:
-        Comm.wait_any(pending_comms)
+        pending_comms.wait_all()
     except Exception as e:
         this_actor.warning(str(e))
 
index 10cb74b..40df779 100644 (file)
@@ -50,18 +50,16 @@ class Receiver:
     def __call__(self):
         mbox = Mailbox.by_name("receiver")
 
-        pending_comms = []
+        pending_comms = ActivitySet()
 
         this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
         for _ in range(self.msg_count):
             comm = mbox.get_async()
-            pending_comms.append(comm)
+            pending_comms.push(comm)
 
-        while pending_comms:
-            index = Comm.wait_any(pending_comms)
-            msg = pending_comms[index].get_payload()
-            this_actor.info("I got '%s'." % msg)
-            del pending_comms[index]
+        while not pending_comms.empty():
+            comm = pending_comms.wait_any()
+            this_actor.info("I got '%s'." % comm.get_payload())
 
 ####################################################################################################
 def link_nonlinear(link: Link, capacity: float, n: int) -> float:
index dd289c2..f221f49 100644 (file)
@@ -44,14 +44,13 @@ class Receiver(object):
 
     def __call__(self):
         # List in which we store all incoming msgs
-        pending_comms: List[Comm] = []
+        pending_comms = ActivitySet()
         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
         for _ in range(self.messages_count):
-            pending_comms.append(self.mailbox.get_async())
-        while pending_comms:
-            index = Comm.wait_any(pending_comms)
-            this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
-            pending_comms.pop(index)
+            pending_comms.push(self.mailbox.get_async())
+        while not pending_comms.empty():
+            comm = pending_comms.wait_any()
+            this_actor.info(f"I got '{comm.get_payload()}'.")
 
 
 def main():
index f730cf0..cf4cf5a 100644 (file)
@@ -185,8 +185,8 @@ public:
   Comm* wait_for(double timeout) override;
 
 #ifndef DOXYGEN
-  static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
-  static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
 
   static ssize_t deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout);
 
index d59e9c5..792ae4b 100644 (file)
@@ -689,14 +689,7 @@ PYBIND11_MODULE(simgrid, m)
                   py::arg("to"), py::arg("simulated_size_in_bytes"),
                   "Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
                   "completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
-                  "In particular, the actor does not have to be on one of the involved hosts.")
-      .def_static("wait_any", &Comm::wait_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  "Block until the completion of any communication in the list and return the index of the "
-                  "terminated one.")
-      .def_static("wait_any_for", &Comm::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  py::arg("timeout"),
-                  "Block until the completion of any communication in the list and return the index of the terminated "
-                  "one, or -1 if a timeout occurred.");
+                  "In particular, the actor does not have to be on one of the involved hosts.");
 
   /* Class Io */
   py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
index 64e9ea3..3ddc1d6 100644 (file)
@@ -395,7 +395,7 @@ Comm* Comm::detach()
   return this;
 }
 
-ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
+ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
 {
   std::vector<ActivityPtr> activities;
   for (const auto& comm : comms)
@@ -461,7 +461,7 @@ Comm* Comm::wait_for(double timeout)
   return this;
 }
 
-ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout)
+ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
 {
   std::vector<ActivityPtr> activities;
   for (const auto& comm : comms)
@@ -496,18 +496,12 @@ size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout) //
     return comms.size();
   }
 
-  double deadline = Engine::get_clock() + timeout;
-  std::vector<CommPtr> waited_comm(1, nullptr);
-  for (size_t i = 0; i < comms.size(); i++) {
-    double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
-    waited_comm[0]      = comms[i];
-    // Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
-    if (wait_any_for(waited_comm, wait_timeout) == -1) {
-      XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
-      return i;
-    }
-  }
-  return comms.size();
+  ActivitySet set;
+  for (auto comm : comms)
+    set.push(comm);
+  set.wait_all_for(timeout);
+
+  return set.size();
 }
 } // namespace simgrid::s4u
 /* **************************** Public C interface *************************** */
index 80f46da..a21866a 100644 (file)
@@ -6,7 +6,7 @@ endforeach()
 
 foreach(x actor actor-autorestart actor-suspend
         activity-lifecycle
-        comm-get-sender comm-pt2pt comm-fault-scenarios wait-any-for
+        comm-get-sender comm-pt2pt comm-fault-scenarios
         cloud-interrupt-migration cloud-two-execs
        monkey-masterworkers monkey-semaphore
         concurrent_rw
@@ -39,7 +39,7 @@ set_property(TARGET activity-lifecycle APPEND PROPERTY INCLUDE_DIRECTORIES "${IN
 
 ## Add the tests.
 ## Some need to be run with all factories, some don't need tesh to run
-foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender wait-any-for
+foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender
         cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw io-stream
              vm-live-migration vm-suicide)
   set(tesh_files    ${tesh_files}    ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh)
index f485fc8..f00b25b 100644 (file)
@@ -312,9 +312,9 @@ TEST_CASE("Activity lifecycle: comm activities")
     simgrid::s4u::ActorPtr receiver = simgrid::s4u::Actor::create("receiver", all_hosts[1], []() {
       assert_exit(true, 2);
       int* data;
-      simgrid::s4u::CommPtr comm                       = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
-      std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
-      REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(pending_comms));
+      simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
+      simgrid::s4u::ActivitySet pending_comms({comm});
+      REQUIRE_NETWORK_FAILURE(pending_comms.wait_any());
     });
 
     simgrid::s4u::ActorPtr sender = simgrid::s4u::Actor::create("sender", all_hosts[2], []() {
diff --git a/teshsuite/s4u/wait-any-for/wait-any-for.cpp b/teshsuite/s4u/wait-any-for/wait-any-for.cpp
deleted file mode 100644 (file)
index 8eae452..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved.          */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <string>
-#include <simgrid/s4u.hpp>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
-  auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
-  int input1 = 42;
-  int input2 = 51;
-
-  XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
-  auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
-  auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
-  int* out1;
-  auto get1 = mbox->get_async<int>(&out1);
-
-  int* out2;
-  auto get2 = mbox->get_async<int>(&out2);
-
-  XBT_INFO("All comms have started");
-  std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
-  while (not comms.empty()) {
-    ssize_t index = simgrid::s4u::Comm::wait_any_for(comms, 0.5);
-    if (index < 0)
-      XBT_INFO("wait_any_for: Timeout reached");
-    else {
-      XBT_INFO("wait_any_for: A comm finished (index=%zd, #comms=%zu)", index, comms.size());
-      comms.erase(comms.begin() + index);
-    }
-  }
-
-  XBT_INFO("All comms have finished");
-  XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
-  simgrid::s4u::Engine e(&argc, argv);
-  e.load_platform(argv[1]);
-  simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
-  e.run();
-  return 0;
-}
diff --git a/teshsuite/s4u/wait-any-for/wait-any-for.tesh b/teshsuite/s4u/wait-any-for/wait-any-for.tesh
deleted file mode 100644 (file)
index 87ae347..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing the wait_any_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-any-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [  0.000000] (1:worker@Tremblay) All comms have started
-> [  0.500000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  1.000000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=4)
-> [  1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=1, #comms=3)
-> [  1.535263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  2.035263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [  2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=2)
-> [  2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=1)
-> [  2.070331] (1:worker@Tremblay) All comms have finished
-> [  2.070331] (1:worker@Tremblay) Got 42 and 51