Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added Python bindings: Comm.wait_for() and Comm.wait_any_for()
authorJean-Edouard BOULANGER <jean.edouard.boulanger@gmail.com>
Sun, 6 Mar 2022 09:19:23 +0000 (10:19 +0100)
committerJean-Edouard BOULANGER <jean.edouard.boulanger@gmail.com>
Sun, 6 Mar 2022 09:19:23 +0000 (10:19 +0100)
ChangeLog
docs/source/app_s4u.rst
examples/README.rst
examples/python/CMakeLists.txt
examples/python/comm-waitfor/comm-waitfor.py [new file with mode: 0644]
examples/python/comm-waitfor/comm-waitfor.tesh [new file with mode: 0644]
src/bindings/python/simgrid_python.cpp

index 25be7b7..9f836d8 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -35,6 +35,10 @@ New plugin: the Chaos Monkey (killing actors at any time)
 XBT:
  - Drop xbt_dynar_shrink().
 
+Python:
+ - Added the following bindings: Comm.wait_for() and Comm.wait_any_for()
+   Example: examples/python/comm-waitfor/
+
 Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests)
  (FG: issues on Framagit; GH: issues on GitHub)
  - FG#57: Mc SimGrid should test whether ptrace is usable
index 2ff9d3a..35c7f24 100644 (file)
@@ -2213,8 +2213,10 @@ also start direct communications as shown below.
 
       .. automethod:: simgrid.Comm.test
       .. automethod:: simgrid.Comm.wait
+      .. automethod:: simgrid.Comm.wait_for
       .. automethod:: simgrid.Comm.wait_all
       .. automethod:: simgrid.Comm.wait_any
+      .. automethod:: simgrid.Comm.wait_any_for
 
    .. group-tab:: C
 
index dc43b35..c11029c 100644 (file)
@@ -353,6 +353,10 @@ This example is very similar to the previous one, simply adding how to declare t
 
       See also :cpp:func:`simgrid::s4u::Activity::wait_until()` and :cpp:func:`simgrid::s4u::Comm::wait_for()`.
 
+   .. example-tab:: examples/python/comm-waitfor/comm-waitfor.py
+
+      See also :py:func:`simgrid.Comm.wait_for()` and :py:func:`simgrid.Comm.wait_any_for()`
+
 Suspending communications
 ^^^^^^^^^^^^^^^^^^^^^^^^^
 
index 0ad9767..cca4bd3 100644 (file)
@@ -1,6 +1,6 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
         app-masterworkers
-        comm-wait comm-waitall comm-waitany
+        comm-wait comm-waitall comm-waitany comm-waitfor
         exec-async exec-basic exec-dvfs exec-remote
         platform-profile platform-failures
         network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear)
diff --git a/examples/python/comm-waitfor/comm-waitfor.py b/examples/python/comm-waitfor/comm-waitfor.py
new file mode 100644 (file)
index 0000000..9624a75
--- /dev/null
@@ -0,0 +1,82 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This example shows how to block on the completion of a set of communications.
+
+As for the other asynchronous examples, the sender initiate all the messages it wants to send and
+pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
+
+The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
+will notice events as soon as they occur even if it does not follow the order of the container.
+
+Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
+other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
+processed before 'Message 5' that is sent to worker 0.
+"""
+
+import sys
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
+
+
+FINALIZE_TAG = "finalize"
+
+
+def sender(receiver_id: str, messages_count: int, payload_size: int) -> None:
+    # List in which we store all ongoing communications
+    pending_comms = []
+    mbox = Mailbox.by_name(receiver_id)
+
+    # Asynchronously send `messages_count` message(s) to the receiver
+    for i in range(0, messages_count):
+        payload = "Message {:d}".format(i)
+        this_actor.info("Send '{:s}' to '{:s}'".format(payload, receiver_id))
+
+        # Create a communication representing the ongoing communication, and store it in pending_comms
+        comm = mbox.put_async(payload, payload_size)
+        pending_comms.append(comm)
+
+    # Send the final message to the receiver
+    payload = FINALIZE_TAG
+    final_payload_size = 0
+    final_comm = mbox.put_async(payload, final_payload_size)
+    pending_comms.append(final_comm)
+    this_actor.info("Send '{:s}' to '{:s}".format(payload, receiver_id))
+    this_actor.info("Done dispatching all messages")
+
+    this_actor.info("Waiting for all outstanding communications to complete")
+    while pending_comms:
+        current_comm: Comm = pending_comms[-1]
+        current_comm.wait_for(1.0)
+        pending_comms.pop()
+    this_actor.info("Goodbye now!")
+
+
+def receiver(identifier: str) -> None:
+    mbox: Mailbox = Mailbox.by_name(identifier)
+    this_actor.info("Wait for my first message")
+    while True:
+        received = mbox.get()
+        this_actor.info("I got a '{:s}'.".format(received))
+        if received == FINALIZE_TAG:
+            break
+    this_actor.info("Goodbye now!")
+
+
+def main():
+    e = Engine(sys.argv)
+
+    # Load the platform description
+    e.load_platform(sys.argv[1])
+
+    receiver_id = "receiver-0"
+    Actor.create("sender", Host.by_name("Tremblay"), sender, receiver_id, 3, int(5e7))
+    Actor.create("receiver", Host.by_name("Ruby"), receiver, receiver_id)
+
+    e.run()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/examples/python/comm-waitfor/comm-waitfor.tesh b/examples/python/comm-waitfor/comm-waitfor.tesh
new file mode 100644 (file)
index 0000000..b326fcb
--- /dev/null
@@ -0,0 +1,18 @@
+#!/usr/bin/env tesh
+
+p Testing Comm.wait_any()
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitfor.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
+> [  0.000000] (2:receiver@Ruby) Wait for my first message
+> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-0'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
+> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0
+> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
+> [  0.000000] (1:sender@Tremblay) Waiting for all outstanding communications to complete
+> [  0.105458] (2:receiver@Ruby) I got a 'Message 0'.
+> [  0.210917] (2:receiver@Ruby) I got a 'Message 1'.
+> [  0.316375] (2:receiver@Ruby) I got a 'Message 2'.
+> [  0.318326] (2:receiver@Ruby) I got a 'finalize'.
+> [  0.318326] (2:receiver@Ruby) Goodbye now!
+> [  0.318326] (1:sender@Tremblay) Goodbye now!
index dbffb1f..3d2fa25 100644 (file)
@@ -688,6 +688,9 @@ PYBIND11_MODULE(simgrid, m)
            "Test whether the communication is terminated.")
       .def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
            "Block until the completion of that communication.")
+      .def("wait_for", &simgrid::s4u::Comm::wait_for,
+           py::call_guard<py::gil_scoped_release>(),
+           "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
       // use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed
       .def_static(
           "wait_all", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_all),
@@ -695,7 +698,14 @@ PYBIND11_MODULE(simgrid, m)
       .def_static(
           "wait_any", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_any),
           py::call_guard<py::gil_scoped_release>(),
-          "Block until the completion of any communication in the list and return the index of the terminated one.");
+          "Block until the completion of any communication in the list and return the index of the terminated one.")
+      .def_static(
+          "wait_any_for",
+          py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&, double>(&simgrid::s4u::Comm::wait_any_for),
+          py::call_guard<py::gil_scoped_release>(),
+          "Block until the completion of any communication in the list and return the index of the terminated "
+          "one, or -1 if a timeout occurred."
+      );
 
   /* Class Io */
   py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")