From: Martin Quinson Date: Sun, 17 Mar 2019 15:14:10 +0000 (+0100) Subject: Python: Add Comm.wait_any X-Git-Tag: v3_22~78 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/c1194401ce4a41ff54a7591f4cf33c9ce2756978 Python: Add Comm.wait_any + the example of same name + cosmetics in the related examples, in C++ comments --- diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index 4c4eb11623..d58c801227 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,5 +1,5 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield # actor-lifetime - async-wait async-waitall + async-wait async-waitall async-waitany exec-basic) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py) diff --git a/examples/python/async-waitall/async-waitall.py b/examples/python/async-waitall/async-waitall.py index ea7e73e0ca..46cd50c699 100644 --- a/examples/python/async-waitall/async-waitall.py +++ b/examples/python/async-waitall/async-waitall.py @@ -9,7 +9,7 @@ from simgrid import * # This example shows how to block on the completion of a set of communications. # # As for the other asynchronous examples, the sender initiate all the messages it wants to send and -# pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently. +# pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently. # # The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all() @@ -51,7 +51,7 @@ class Sender: this_actor.info("Done dispatching all messages") # Now that all message exchanges were initiated, wait for their completion in one single call - Comm.waitall(pending_comms) + Comm.wait_all(pending_comms) this_actor.info("Goodbye now!") diff --git a/examples/python/async-waitany/async-waitany.py b/examples/python/async-waitany/async-waitany.py new file mode 100644 index 0000000000..1bc5891a0c --- /dev/null +++ b/examples/python/async-waitany/async-waitany.py @@ -0,0 +1,99 @@ +# Copyright (c) 2010-2019. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +import sys +from simgrid import * + +# This example shows how to block on the completion of a set of communications. +# +# As for the other asynchronous examples, the sender initiate all the messages it wants to send and +# pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently. +# +# The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender +# will notice events as soon as they occur even if it does not follow the order of the container. +# +# Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the +# other messages of this application. As expected, the trace shows that the finalize of worker 1 is +# processed before 'Message 5' that is sent to worker 0. + +class Sender: + def __init__(self, *args): + if len(args) != 3: + raise AssertionError("Actor sender requires 3 parameters, but got {:d}".format(len(args))) + self.messages_count = int(args[0]) # number of tasks + self.msg_size = int(args[1]) # communication cost (in bytes) + self.receivers_count = int(args[2]) # number of receivers + + def __call__(self): + # List in which we store all ongoing communications + pending_comms = [] + + # Vector of the used mailboxes + mboxes = [Mailbox.by_name("receiver-{:d}".format(i)) + for i in range(0, self.receivers_count)] + + # Start dispatching all messages to receivers, in a round robin fashion + for i in range(0, self.messages_count): + content = "Message {:d}".format(i) + mbox = mboxes[i % self.receivers_count] + + this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox))) + + # Create a communication representing the ongoing communication, and store it in pending_comms + comm = mbox.put_async(content, self.msg_size) + pending_comms.append(comm) + + # Start sending messages to let the workers know that they should stop + for i in range(0, self.receivers_count): + mbox = mboxes[i] + this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox))) + comm = mbox.put_async("finalize", 0) + pending_comms.append(comm) + + this_actor.info("Done dispatching all messages") + + # Now that all message exchanges were initiated, wait for their completion, in order of completion. + # + # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are + # terminated. + # Even in this simple example, the pending comms do not terminate in the exact same order of creation. + while pending_comms: + changed_pos = Comm.wait_any(pending_comms) + del pending_comms[changed_pos] + if (changed_pos != 0): + this_actor.info("Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first.".format(changed_pos)); + + this_actor.info("Goodbye now!") + + +class Receiver: + def __init__(self, *args): + if len(args) != 1: # Receiver actor expects 1 argument: its ID + raise AssertionError( + "Actor receiver requires 1 parameter, but got {:d}".format(len(args))) + self.mbox = Mailbox.by_name("receiver-{:s}".format(args[0])) + + def __call__(self): + this_actor.info("Wait for my first message") + while True: + received = self.mbox.get() + this_actor.info("I got a '{:s}'.".format(received)) + if received == "finalize": + break # If it's a finalize message, we're done. + + +if __name__ == '__main__': + e = Engine(sys.argv) + + # Load the platform description + e.load_platform(sys.argv[1]) + + # Register the classes representing the actors + e.register_actor("sender", Sender) + e.register_actor("receiver", Receiver) + + e.load_deployment(sys.argv[2]) + + e.run() diff --git a/examples/python/async-waitany/async-waitany.tesh b/examples/python/async-waitany/async-waitany.tesh new file mode 100644 index 0000000000..4a1381a547 --- /dev/null +++ b/examples/python/async-waitany/async-waitany.tesh @@ -0,0 +1,27 @@ +#!/usr/bin/env tesh + +p Testing Comm.wait_any() + +! output sort 19 +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/async-waitany.py ${platfdir}/small_platform.xml async-waitany_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)' +> [ 0.000000] (2:receiver@Fafard) Wait for my first message +> [ 0.000000] (3:receiver@Jupiter) Wait for my first message +> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages +> [ 0.158397] (2:receiver@Fafard) I got a 'Message 0'. +> [ 0.169155] (3:receiver@Jupiter) I got a 'Message 1'. +> [ 0.316794] (2:receiver@Fafard) I got a 'Message 2'. +> [ 0.338309] (3:receiver@Jupiter) I got a 'Message 3'. +> [ 0.475190] (2:receiver@Fafard) I got a 'Message 4'. +> [ 0.500898] (2:receiver@Fafard) I got a 'finalize'. +> [ 0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first. +> [ 0.507464] (3:receiver@Jupiter) I got a 'Message 5'. +> [ 0.526478] (3:receiver@Jupiter) I got a 'finalize'. +> [ 0.526478] (1:sender@Tremblay) Goodbye now! diff --git a/examples/python/async-waitany/async-waitany_d.xml b/examples/python/async-waitany/async-waitany_d.xml new file mode 100644 index 0000000000..4b4161d339 --- /dev/null +++ b/examples/python/async-waitany/async-waitany_d.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/examples/s4u/async-waitall/s4u-async-waitall.cpp b/examples/s4u/async-waitall/s4u-async-waitall.cpp index e3d83cfd9e..0b6f83fdf0 100644 --- a/examples/s4u/async-waitall/s4u-async-waitall.cpp +++ b/examples/s4u/async-waitall/s4u-async-waitall.cpp @@ -6,7 +6,7 @@ /* This example shows how to block on the completion of a set of communications. * * As for the other asynchronous examples, the sender initiate all the messages it wants to send and - * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently. + * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently. * * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all() * diff --git a/examples/s4u/async-waitany/s4u-async-waitany.cpp b/examples/s4u/async-waitany/s4u-async-waitany.cpp index 4d3b1fab49..ca5c0851c8 100644 --- a/examples/s4u/async-waitany/s4u-async-waitany.cpp +++ b/examples/s4u/async-waitany/s4u-async-waitany.cpp @@ -6,7 +6,7 @@ /* This example shows how to use simgrid::s4u::this_actor::wait_any() to wait for the first occurring event. * * As for the other asynchronous examples, the sender initiate all the messages it wants to send and - * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently. + * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently. * * The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender * will notice events as soon as they occur even if it does not follow the order of the container. diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 3f9119d81c..260dd8f3a2 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -184,9 +184,13 @@ PYBIND11_MODULE(simgrid, m) "Test whether the communication is terminated, see :cpp:func:`simgrid::s4u::Comm::test()`") .def("wait", [](simgrid::s4u::CommPtr self) { self->wait(); }, "Block until the completion of that communication, see :cpp:func:`simgrid::s4u::Comm::wait()`") - .def("waitall", [](std::vector* comms) { simgrid::s4u::Comm::wait_all(comms); }, + .def("wait_all", [](std::vector* comms) { simgrid::s4u::Comm::wait_all(comms); }, "Block until the completion of all communications in the list, see " - ":cpp:func:`simgrid::s4u::Comm::wait_all()`"); + ":cpp:func:`simgrid::s4u::Comm::wait_all()`") + .def( + "wait_any", [](std::vector* comms) { return simgrid::s4u::Comm::wait_any(comms); }, + "Block until the completion of any communication in the list and return the index of the terminated one, see " + ":cpp:func:`simgrid::s4u::Comm::wait_any()`"); /* Class Actor */ py::class_(m, "Actor",