Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecate Comm::wait_all(). Remove it in python
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 22:30:54 +0000 (00:30 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Mon, 24 Jul 2023 22:30:54 +0000 (00:30 +0200)
15 files changed:
ChangeLog
examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp
examples/python/CMakeLists.txt
examples/python/clusters-multicpu/clusters-multicpu.py
examples/python/comm-ready/comm-ready.py
examples/python/comm-waitall/comm-waitall.py [deleted file]
examples/python/comm-waitall/comm-waitall.tesh [deleted file]
examples/python/network-nonlinear/network-nonlinear.py
examples/python/platform-comm-serialize/platform-comm-serialize.py
include/simgrid/s4u/Comm.hpp
src/bindings/python/simgrid_python.cpp
src/s4u/s4u_Comm.cpp
teshsuite/models/issue105/issue105.cpp
teshsuite/s4u/CMakeLists.txt
teshsuite/s4u/seal-platform/seal-platform.cpp

index 158d75b..2074ed7 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -15,6 +15,7 @@ SMPI:
 Python:
  - Make the host_load plugin available from Python. See examples/python/plugin-host-load
  - Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
+ - Comm::waitall() is gone. Please use ActivitySet() instead.
 
 ----------------------------------------------------------------------------
 
index b637a23..2a98162 100644 (file)
@@ -24,7 +24,7 @@ public:
   void operator()() const
   {
     /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    sg4::ActivitySet pending_comms;
 
     /* Make a vector of the mailboxes to use */
     std::vector<sg4::Mailbox*> mboxes;
@@ -40,13 +40,13 @@ public:
       auto* mbox = sg4::Mailbox::by_name(host->get_name());
       mboxes.push_back(mbox);
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
 
     XBT_INFO("Goodbye now!");
   }
index 265268b..c27e830 100644 (file)
@@ -1,7 +1,7 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
         activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-masterworkers
-        comm-wait comm-waitall comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
+        comm-wait comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
         comm-ready comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
         exec-async exec-basic exec-dvfs exec-remote exec-ptask
         task-io task-simple task-switch-host task-variable-load
index 79ee80c..b48cb38 100644 (file)
@@ -28,19 +28,19 @@ class Sender:
     # Actors that are created as object will execute their __call__ method.
     # So, the following constitutes the main function of the Sender actor.
     def __call__(self):
-        pending_comms = []
+        pending_comms = simgrid.ActivitySet()
         mboxes = []
 
         for host in self.hosts:
             msg = "Hello, I'm alive and running on " + simgrid.this_actor.get_host().name
             mbox = simgrid.Mailbox.by_name(host.name)
             mboxes.append(mbox)
-            pending_comms.append(mbox.put_async(msg, self.msg_size))
+            pending_comms.push(mbox.put_async(msg, self.msg_size))
 
         simgrid.this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        simgrid.Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         simgrid.this_actor.info("Goodbye now!")
 
index fa5f91e..f874bd9 100644 (file)
@@ -7,7 +7,7 @@ from argparse import ArgumentParser
 from typing import List
 import sys
 
-from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
 
 
 FINALIZE_MESSAGE = "finalize"
@@ -31,7 +31,7 @@ def get_peer_mailbox(peer_id: int) -> Mailbox:
 def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
     my_mailbox: Mailbox = get_peer_mailbox(my_id)
     my_mailbox.set_receiver(Actor.self())
-    pending_comms: List[Comm] = []
+    pending_comms = ActivitySet()
     # Start dispatching all messages to peers others that myself
     for i in range(message_count):
         for peer_id in range(peers_count):
@@ -39,14 +39,14 @@ def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
                 peer_mailbox = get_peer_mailbox(peer_id)
                 message = f"Message {i} from peer {my_id}"
                 this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
-                pending_comms.append(peer_mailbox.put_async(message, payload_size))
+                pending_comms.push(peer_mailbox.put_async(message, payload_size))
 
     # Start sending messages to let peers know that they should stop
     for peer_id in range(peers_count):
         if peer_id != my_id:
             peer_mailbox = get_peer_mailbox(peer_id)
             payload = str(FINALIZE_MESSAGE)
-            pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+            pending_comms.push(peer_mailbox.put_async(payload, payload_size))
             this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
     this_actor.info("Done dispatching all messages")
 
@@ -69,7 +69,7 @@ def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
             this_actor.sleep_for(0.01)
 
     this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
-    Comm.wait_all(pending_comms)
+    pending_comms.wait_all()
     this_actor.info("Goodbye now!")
 
 
diff --git a/examples/python/comm-waitall/comm-waitall.py b/examples/python/comm-waitall/comm-waitall.py
deleted file mode 100644 (file)
index 68dacca..0000000
+++ /dev/null
@@ -1,74 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all()
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
-    # List in which we store all ongoing communications
-    pending_comms = []
-
-    # Vector of the used mailboxes
-    mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
-              for i in range(0, receivers_count)]
-
-    # Start dispatching all messages to receivers, in a round robin fashion
-    for i in range(0, messages_count):
-        content = "Message {:d}".format(i)
-        mbox = mboxes[i % receivers_count]
-
-        this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
-        # Create a communication representing the ongoing communication, and store it in pending_comms
-        comm = mbox.put_async(content, msg_size)
-        pending_comms.append(comm)
-
-    # Start sending messages to let the workers know that they should stop
-    for i in range(0, receivers_count):
-        mbox = mboxes[i]
-        this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
-        comm = mbox.put_async("finalize", 0)
-        pending_comms.append(comm)
-
-    this_actor.info("Done dispatching all messages")
-
-    # Now that all message exchanges were initiated, wait for their completion in one single call
-    Comm.wait_all(pending_comms)
-
-    this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
-    mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-
-    this_actor.info("Wait for my first message")
-    while True:
-        received = mbox.get()
-        this_actor.info("I got a '{:s}'.".format(received))
-        if received == "finalize":
-            break  # If it's a finalize message, we're done.
-
-
-if __name__ == '__main__':
-    e = Engine(sys.argv)
-
-    # Load the platform description
-    e.load_platform(sys.argv[1])
-
-    Actor.create("sender", Host.by_name("Tremblay"), sender, 5, 1000000, 2)
-    Actor.create("receiver", Host.by_name("Ruby"), receiver, 0)
-    Actor.create("receiver", Host.by_name("Perl"), receiver, 1)
-
-    e.run()
diff --git a/examples/python/comm-waitall/comm-waitall.tesh b/examples/python/comm-waitall/comm-waitall.tesh
deleted file mode 100644 (file)
index 32ca46c..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitall.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (2:receiver@Ruby) Wait for my first message
-> [  0.000000] (3:receiver@Perl) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [  0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [  0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [  0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [  0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [  0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [  0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [  0.014016] (1:sender@Tremblay) Goodbye now!
index 96b0981..10cb74b 100644 (file)
@@ -9,7 +9,7 @@ This example shows how to simulate a non-linear resource sharing for network lin
 
 import functools
 import sys
-from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
+from simgrid import Actor, ActivitySet, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
 
 class Sender:
     """
@@ -22,7 +22,7 @@ class Sender:
     # Actors that are created as object will execute their __call__ method.
     # So, the following constitutes the main function of the Sender actor.
     def __call__(self):
-        pending_comms = []
+        pending_comms = ActivitySet()
         mbox = Mailbox.by_name("receiver")
 
         for i in range(self.msg_count):
@@ -30,12 +30,12 @@ class Sender:
             size = self.msg_size * (i + 1)
             this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
             comm = mbox.put_async(msg, size)
-            pending_comms.append(comm)
+            pending_comms.push(comm)
 
         this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         this_actor.info("Goodbye now!")
 
index eaa880c..dd289c2 100644 (file)
@@ -6,7 +6,7 @@
 from typing import List, Tuple
 import sys
 
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
+from simgrid import Engine, Actor, ActivitySet, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
 
 
 RECEIVER_MAILBOX_NAME = "receiver"
@@ -19,7 +19,7 @@ class Sender(object):
 
     def __call__(self) -> None:
         # List in which we store all ongoing communications
-        pending_comms: List[Comm] = []
+        pending_comms = ActivitySet()
 
         # Make a vector of the mailboxes to use
         receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
@@ -27,12 +27,12 @@ class Sender(object):
             message_content = f"Message {i}"
             this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
             # Create a communication representing the ongoing communication, and store it in pending_comms
-            pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+            pending_comms.push(receiver_mailbox.put_async(message_content, self.message_size))
 
         this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        Comm.wait_all(pending_comms)
+        pending_comms.wait_all()
 
         this_actor.info("Goodbye now!")
 
index c8a0156..b9dcadc 100644 (file)
@@ -193,11 +193,13 @@ public:
   /*! \static Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
   static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout);
 
-  /*! \static take a vector s4u::CommPtr and return when all of them is finished. */
-  static void wait_all(const std::vector<CommPtr>& comms);
   /*! \static Same as wait_all, but with a timeout. Return the number of terminated comm (less than comms.size() if
    *  the timeout occurs). */
   static size_t wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+
+#ifndef DOXYGEN
+  XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static void wait_all(const std::vector<CommPtr>& comms);
+#endif
 };
 } // namespace simgrid::s4u
 
index 04f0a16..9d2ee6c 100644 (file)
@@ -692,8 +692,6 @@ PYBIND11_MODULE(simgrid, m)
                   "In particular, the actor does not have to be on one of the involved hosts.")
       .def_static("test_any", &Comm::test_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
                   "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
-      .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
-                  "Block until the completion of all communications in the list.")
       .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
                   py::arg("timeout"),
                   "Block until the completion of all communications in the list, or raises TimeoutException after "
index 404321a..25d2430 100644 (file)
@@ -481,7 +481,7 @@ ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
   return changed_pos;
 }
 
-void Comm::wait_all(const std::vector<CommPtr>& comms)
+void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
 {
   // TODO: this should be a simcall or something
   for (const auto& comm : comms)
@@ -491,7 +491,8 @@ void Comm::wait_all(const std::vector<CommPtr>& comms)
 size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
 {
   if (timeout < 0.0) {
-    wait_all(comms);
+    for (const auto& comm : comms)
+      comm->wait();
     return comms.size();
   }
 
index 940248f..c9df586 100644 (file)
@@ -15,13 +15,13 @@ namespace sg4 = simgrid::s4u;
 XBT_LOG_NEW_DEFAULT_CATEGORY(issue105, "Issue105");
 static void load_generator(sg4::Mailbox* mailbox)
 {
-  std::vector<sg4::CommPtr> comms;
+  sg4::ActivitySet comms;
 
   // Send the task messages
   for (int i = 0; i < 100; i++) {
     auto* payload     = new int(i);
     sg4::CommPtr comm = mailbox->put_async(payload, 1024);
-    comms.push_back(comm);
+    comms.push(comm);
     sg4::this_actor::sleep_for(1.0);
   }
 
@@ -29,10 +29,10 @@ static void load_generator(sg4::Mailbox* mailbox)
   auto* payload     = new int(-1);
   sg4::CommPtr comm = mailbox->put_async(payload, 1024);
   XBT_INFO("Sent shutdown");
-  comms.push_back(comm);
+  comms.push(comm);
 
   // Wait for all messages to be consumed before ending the simulation
-  sg4::Comm::wait_all(comms);
+  comms.wait_all();
   XBT_INFO("Load generator finished");
 }
 
index 0ea50b2..56d9e61 100644 (file)
@@ -6,7 +6,7 @@ endforeach()
 
 foreach(x actor actor-autorestart actor-suspend
         activity-lifecycle
-        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
+        comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
         cloud-interrupt-migration cloud-two-execs
        monkey-masterworkers monkey-semaphore
         concurrent_rw
index 6d49d7d..3b7d79a 100644 (file)
@@ -17,7 +17,7 @@ public:
   void operator()() const
   {
     /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    sg4::ActivitySet pending_comms;
     /* Make a vector of the mailboxes to use */
     std::vector<sg4::Mailbox*> mboxes;
 
@@ -28,13 +28,13 @@ public:
       auto* mbox = sg4::Mailbox::by_name(host->get_name());
       mboxes.push_back(mbox);
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
 
     XBT_INFO("Goodbye now!");
   }