From 04e0b504663b64f6aab0a018caf9c7175d2a778d Mon Sep 17 00:00:00 2001 From: Jean-Edouard BOULANGER Date: Sun, 6 Mar 2022 10:19:23 +0100 Subject: [PATCH] Added Python bindings: Comm.wait_for() and Comm.wait_any_for() --- ChangeLog | 4 + docs/source/app_s4u.rst | 2 + examples/README.rst | 4 + examples/python/CMakeLists.txt | 2 +- examples/python/comm-waitfor/comm-waitfor.py | 82 +++++++++++++++++++ .../python/comm-waitfor/comm-waitfor.tesh | 18 ++++ src/bindings/python/simgrid_python.cpp | 12 ++- 7 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 examples/python/comm-waitfor/comm-waitfor.py create mode 100644 examples/python/comm-waitfor/comm-waitfor.tesh diff --git a/ChangeLog b/ChangeLog index 25be7b7112..9f836d88aa 100644 --- a/ChangeLog +++ b/ChangeLog @@ -35,6 +35,10 @@ New plugin: the Chaos Monkey (killing actors at any time) XBT: - Drop xbt_dynar_shrink(). +Python: + - Added the following bindings: Comm.wait_for() and Comm.wait_any_for() + Example: examples/python/comm-waitfor/ + Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests) (FG: issues on Framagit; GH: issues on GitHub) - FG#57: Mc SimGrid should test whether ptrace is usable diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index 2ff9d3a139..35c7f24c5c 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -2213,8 +2213,10 @@ also start direct communications as shown below. .. automethod:: simgrid.Comm.test .. automethod:: simgrid.Comm.wait + .. automethod:: simgrid.Comm.wait_for .. automethod:: simgrid.Comm.wait_all .. automethod:: simgrid.Comm.wait_any + .. automethod:: simgrid.Comm.wait_any_for .. group-tab:: C diff --git a/examples/README.rst b/examples/README.rst index dc43b35653..c11029c590 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -353,6 +353,10 @@ This example is very similar to the previous one, simply adding how to declare t See also :cpp:func:`simgrid::s4u::Activity::wait_until()` and :cpp:func:`simgrid::s4u::Comm::wait_for()`. + .. example-tab:: examples/python/comm-waitfor/comm-waitfor.py + + See also :py:func:`simgrid.Comm.wait_for()` and :py:func:`simgrid.Comm.wait_any_for()` + Suspending communications ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index 0ad9767c70..cca4bd3226 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,6 +1,6 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime app-masterworkers - comm-wait comm-waitall comm-waitany + comm-wait comm-waitall comm-waitany comm-waitfor exec-async exec-basic exec-dvfs exec-remote platform-profile platform-failures network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear) diff --git a/examples/python/comm-waitfor/comm-waitfor.py b/examples/python/comm-waitfor/comm-waitfor.py new file mode 100644 index 0000000000..9624a75351 --- /dev/null +++ b/examples/python/comm-waitfor/comm-waitfor.py @@ -0,0 +1,82 @@ +# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +""" +This example shows how to block on the completion of a set of communications. + +As for the other asynchronous examples, the sender initiate all the messages it wants to send and +pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently. + +The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender +will notice events as soon as they occur even if it does not follow the order of the container. + +Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the +other messages of this application. As expected, the trace shows that the finalize of worker 1 is +processed before 'Message 5' that is sent to worker 0. +""" + +import sys +from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor + + +FINALIZE_TAG = "finalize" + + +def sender(receiver_id: str, messages_count: int, payload_size: int) -> None: + # List in which we store all ongoing communications + pending_comms = [] + mbox = Mailbox.by_name(receiver_id) + + # Asynchronously send `messages_count` message(s) to the receiver + for i in range(0, messages_count): + payload = "Message {:d}".format(i) + this_actor.info("Send '{:s}' to '{:s}'".format(payload, receiver_id)) + + # Create a communication representing the ongoing communication, and store it in pending_comms + comm = mbox.put_async(payload, payload_size) + pending_comms.append(comm) + + # Send the final message to the receiver + payload = FINALIZE_TAG + final_payload_size = 0 + final_comm = mbox.put_async(payload, final_payload_size) + pending_comms.append(final_comm) + this_actor.info("Send '{:s}' to '{:s}".format(payload, receiver_id)) + this_actor.info("Done dispatching all messages") + + this_actor.info("Waiting for all outstanding communications to complete") + while pending_comms: + current_comm: Comm = pending_comms[-1] + current_comm.wait_for(1.0) + pending_comms.pop() + this_actor.info("Goodbye now!") + + +def receiver(identifier: str) -> None: + mbox: Mailbox = Mailbox.by_name(identifier) + this_actor.info("Wait for my first message") + while True: + received = mbox.get() + this_actor.info("I got a '{:s}'.".format(received)) + if received == FINALIZE_TAG: + break + this_actor.info("Goodbye now!") + + +def main(): + e = Engine(sys.argv) + + # Load the platform description + e.load_platform(sys.argv[1]) + + receiver_id = "receiver-0" + Actor.create("sender", Host.by_name("Tremblay"), sender, receiver_id, 3, int(5e7)) + Actor.create("receiver", Host.by_name("Ruby"), receiver, receiver_id) + + e.run() + + +if __name__ == '__main__': + main() diff --git a/examples/python/comm-waitfor/comm-waitfor.tesh b/examples/python/comm-waitfor/comm-waitfor.tesh new file mode 100644 index 0000000000..b326fcb7db --- /dev/null +++ b/examples/python/comm-waitfor/comm-waitfor.tesh @@ -0,0 +1,18 @@ +#!/usr/bin/env tesh + +p Testing Comm.wait_any() + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitfor.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0' +> [ 0.000000] (2:receiver@Ruby) Wait for my first message +> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-0' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0' +> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0 +> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages +> [ 0.000000] (1:sender@Tremblay) Waiting for all outstanding communications to complete +> [ 0.105458] (2:receiver@Ruby) I got a 'Message 0'. +> [ 0.210917] (2:receiver@Ruby) I got a 'Message 1'. +> [ 0.316375] (2:receiver@Ruby) I got a 'Message 2'. +> [ 0.318326] (2:receiver@Ruby) I got a 'finalize'. +> [ 0.318326] (2:receiver@Ruby) Goodbye now! +> [ 0.318326] (1:sender@Tremblay) Goodbye now! diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index dbffb1fcee..3d2fa256bb 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -688,6 +688,9 @@ PYBIND11_MODULE(simgrid, m) "Test whether the communication is terminated.") .def("wait", &simgrid::s4u::Comm::wait, py::call_guard(), "Block until the completion of that communication.") + .def("wait_for", &simgrid::s4u::Comm::wait_for, + py::call_guard(), + "Block until the completion of that communication, or raises TimeoutException after the specified timeout.") // use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed .def_static( "wait_all", py::overload_cast&>(&simgrid::s4u::Comm::wait_all), @@ -695,7 +698,14 @@ PYBIND11_MODULE(simgrid, m) .def_static( "wait_any", py::overload_cast&>(&simgrid::s4u::Comm::wait_any), py::call_guard(), - "Block until the completion of any communication in the list and return the index of the terminated one."); + "Block until the completion of any communication in the list and return the index of the terminated one.") + .def_static( + "wait_any_for", + py::overload_cast&, double>(&simgrid::s4u::Comm::wait_any_for), + py::call_guard(), + "Block until the completion of any communication in the list and return the index of the terminated " + "one, or -1 if a timeout occurred." + ); /* Class Io */ py::class_(m, "Io", "I/O activities. See the C++ documentation for details.") -- 2.20.1