XBT:
- Drop xbt_dynar_shrink().
+Python:
+ - Added the following bindings: Comm.wait_for() and Comm.wait_any_for()
+ Example: examples/python/comm-waitfor/
+
Fixed bugs (FG#.. -> FramaGit bugs; FG!.. -> FG merge requests)
(FG: issues on Framagit; GH: issues on GitHub)
- FG#57: Mc SimGrid should test whether ptrace is usable
.. automethod:: simgrid.Comm.test
.. automethod:: simgrid.Comm.wait
+ .. automethod:: simgrid.Comm.wait_for
.. automethod:: simgrid.Comm.wait_all
.. automethod:: simgrid.Comm.wait_any
+ .. automethod:: simgrid.Comm.wait_any_for
.. group-tab:: C
See also :cpp:func:`simgrid::s4u::Activity::wait_until()` and :cpp:func:`simgrid::s4u::Comm::wait_for()`.
+ .. example-tab:: examples/python/comm-waitfor/comm-waitfor.py
+
+ See also :py:func:`simgrid.Comm.wait_for()` and :py:func:`simgrid.Comm.wait_any_for()`
+
Suspending communications
^^^^^^^^^^^^^^^^^^^^^^^^^
foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
app-masterworkers
- comm-wait comm-waitall comm-waitany
+ comm-wait comm-waitall comm-waitany comm-waitfor
exec-async exec-basic exec-dvfs exec-remote
platform-profile platform-failures
network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear)
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This example shows how to block on the completion of a set of communications.
+
+As for the other asynchronous examples, the sender initiate all the messages it wants to send and
+pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
+
+The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
+will notice events as soon as they occur even if it does not follow the order of the container.
+
+Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
+other messages of this application. As expected, the trace shows that the finalize of worker 1 is
+processed before 'Message 5' that is sent to worker 0.
+"""
+
+import sys
+from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
+
+
+FINALIZE_TAG = "finalize"
+
+
+def sender(receiver_id: str, messages_count: int, payload_size: int) -> None:
+ # List in which we store all ongoing communications
+ pending_comms = []
+ mbox = Mailbox.by_name(receiver_id)
+
+ # Asynchronously send `messages_count` message(s) to the receiver
+ for i in range(0, messages_count):
+ payload = "Message {:d}".format(i)
+ this_actor.info("Send '{:s}' to '{:s}'".format(payload, receiver_id))
+
+ # Create a communication representing the ongoing communication, and store it in pending_comms
+ comm = mbox.put_async(payload, payload_size)
+ pending_comms.append(comm)
+
+ # Send the final message to the receiver
+ payload = FINALIZE_TAG
+ final_payload_size = 0
+ final_comm = mbox.put_async(payload, final_payload_size)
+ pending_comms.append(final_comm)
+ this_actor.info("Send '{:s}' to '{:s}".format(payload, receiver_id))
+ this_actor.info("Done dispatching all messages")
+
+ this_actor.info("Waiting for all outstanding communications to complete")
+ while pending_comms:
+ current_comm: Comm = pending_comms[-1]
+ current_comm.wait_for(1.0)
+ pending_comms.pop()
+ this_actor.info("Goodbye now!")
+
+
+def receiver(identifier: str) -> None:
+ mbox: Mailbox = Mailbox.by_name(identifier)
+ this_actor.info("Wait for my first message")
+ while True:
+ received = mbox.get()
+ this_actor.info("I got a '{:s}'.".format(received))
+ if received == FINALIZE_TAG:
+ break
+ this_actor.info("Goodbye now!")
+
+
+def main():
+ e = Engine(sys.argv)
+
+ # Load the platform description
+ e.load_platform(sys.argv[1])
+
+ receiver_id = "receiver-0"
+ Actor.create("sender", Host.by_name("Tremblay"), sender, receiver_id, 3, int(5e7))
+ Actor.create("receiver", Host.by_name("Ruby"), receiver, receiver_id)
+
+ e.run()
+
+
+if __name__ == '__main__':
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+p Testing Comm.wait_any()
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitfor.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver-0'
+> [ 0.000000] (2:receiver@Ruby) Wait for my first message
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver-0'
+> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver-0'
+> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver-0
+> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
+> [ 0.000000] (1:sender@Tremblay) Waiting for all outstanding communications to complete
+> [ 0.105458] (2:receiver@Ruby) I got a 'Message 0'.
+> [ 0.210917] (2:receiver@Ruby) I got a 'Message 1'.
+> [ 0.316375] (2:receiver@Ruby) I got a 'Message 2'.
+> [ 0.318326] (2:receiver@Ruby) I got a 'finalize'.
+> [ 0.318326] (2:receiver@Ruby) Goodbye now!
+> [ 0.318326] (1:sender@Tremblay) Goodbye now!
"Test whether the communication is terminated.")
.def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication.")
+ .def("wait_for", &simgrid::s4u::Comm::wait_for,
+ py::call_guard<py::gil_scoped_release>(),
+ "Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
// use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed
.def_static(
"wait_all", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_all),
.def_static(
"wait_any", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_any),
py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of any communication in the list and return the index of the terminated one.");
+ "Block until the completion of any communication in the list and return the index of the terminated one.")
+ .def_static(
+ "wait_any_for",
+ py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&, double>(&simgrid::s4u::Comm::wait_any_for),
+ py::call_guard<py::gil_scoped_release>(),
+ "Block until the completion of any communication in the list and return the index of the terminated "
+ "one, or -1 if a timeout occurred."
+ );
/* Class Io */
py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")