From: Martin Quinson Date: Sun, 17 Mar 2019 10:03:43 +0000 (+0100) Subject: python: add Comm.waitall X-Git-Tag: v3_22~79 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/cbc2112805562213866f62f194adb0a443d9cc99 python: add Comm.waitall --- diff --git a/examples/python/CMakeLists.txt b/examples/python/CMakeLists.txt index 1d75808c6e..4c4eb11623 100644 --- a/examples/python/CMakeLists.txt +++ b/examples/python/CMakeLists.txt @@ -1,5 +1,5 @@ foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield # actor-lifetime - async-wait + async-wait async-waitall exec-basic) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh) set(examples_src ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py) diff --git a/examples/python/async-wait/async-wait.py b/examples/python/async-wait/async-wait.py index 0051e8a79c..198f779c60 100644 --- a/examples/python/async-wait/async-wait.py +++ b/examples/python/async-wait/async-wait.py @@ -17,7 +17,7 @@ class Sender: def __init__(self, *args): if len(args) != 3: raise AssertionError( - "Actor sender requires 4 parameters, but got {:d}".format(len(args))) + "Actor sender requires 3 parameters, but got {:d}".format(len(args))) self.messages_count = int(args[0]) # number of tasks self.msg_size = int(args[1]) # communication cost (in bytes) self.receivers_count = int(args[2]) # number of receivers diff --git a/examples/python/async-waitall/async-waitall.py b/examples/python/async-waitall/async-waitall.py new file mode 100644 index 0000000000..ea7e73e0ca --- /dev/null +++ b/examples/python/async-waitall/async-waitall.py @@ -0,0 +1,87 @@ +# Copyright (c) 2010-2019. The SimGrid Team. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the license (GNU LGPL) which comes with this package. + +import sys +from simgrid import * + +# This example shows how to block on the completion of a set of communications. +# +# As for the other asynchronous examples, the sender initiate all the messages it wants to send and +# pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently. +# +# The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all() + + +class Sender: + def __init__(self, *args): + if len(args) != 3: + raise AssertionError("Actor sender requires 3 parameters, but got {:d}".format(len(args))) + self.messages_count = int(args[0]) # number of tasks + self.msg_size = int(args[1]) # communication cost (in bytes) + self.receivers_count = int(args[2]) # number of receivers + + def __call__(self): + # List in which we store all ongoing communications + pending_comms = [] + + # Vector of the used mailboxes + mboxes = [Mailbox.by_name("receiver-{:d}".format(i)) + for i in range(0, self.receivers_count)] + + # Start dispatching all messages to receivers, in a round robin fashion + for i in range(0, self.messages_count): + content = "Message {:d}".format(i) + mbox = mboxes[i % self.receivers_count] + + this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox))) + + # Create a communication representing the ongoing communication, and store it in pending_comms + comm = mbox.put_async(content, self.msg_size) + pending_comms.append(comm) + + # Start sending messages to let the workers know that they should stop + for i in range(0, self.receivers_count): + mbox = mboxes[i] + this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox))) + comm = mbox.put_async("finalize", 0) + pending_comms.append(comm) + + this_actor.info("Done dispatching all messages") + + # Now that all message exchanges were initiated, wait for their completion in one single call + Comm.waitall(pending_comms) + + this_actor.info("Goodbye now!") + + +class Receiver: + def __init__(self, *args): + if len(args) != 1: # Receiver actor expects 1 argument: its ID + raise AssertionError( + "Actor receiver requires 1 parameter, but got {:d}".format(len(args))) + self.mbox = Mailbox.by_name("receiver-{:s}".format(args[0])) + + def __call__(self): + this_actor.info("Wait for my first message") + while True: + received = self.mbox.get() + this_actor.info("I got a '{:s}'.".format(received)) + if received == "finalize": + break # If it's a finalize message, we're done. + + +if __name__ == '__main__': + e = Engine(sys.argv) + + # Load the platform description + e.load_platform(sys.argv[1]) + + # Register the classes representing the actors + e.register_actor("sender", Sender) + e.register_actor("receiver", Receiver) + + e.load_deployment(sys.argv[2]) + + e.run() diff --git a/examples/python/async-waitall/async-waitall.tesh b/examples/python/async-waitall/async-waitall.tesh new file mode 100644 index 0000000000..5a6670471b --- /dev/null +++ b/examples/python/async-waitall/async-waitall.tesh @@ -0,0 +1,21 @@ +#!/usr/bin/env tesh + +$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/async-waitall.py ${platfdir}/small_platform_fatpipe.xml async-waitall_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" +> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)' +> [ 0.000000] (2:receiver@Ruby) Wait for my first message +> [ 0.000000] (3:receiver@Perl) Wait for my first message +> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)' +> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)' +> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages +> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'. +> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'. +> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'. +> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'. +> [ 0.009995] (3:receiver@Perl) I got a 'finalize'. +> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'. +> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'. +> [ 0.014016] (1:sender@Tremblay) Goodbye now! diff --git a/examples/python/async-waitall/async-waitall_d.xml b/examples/python/async-waitall/async-waitall_d.xml new file mode 100644 index 0000000000..8e51efffe5 --- /dev/null +++ b/examples/python/async-waitall/async-waitall_d.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 710f8f889e..3f9119d81c 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -178,13 +178,15 @@ PYBIND11_MODULE(simgrid, m) }, "Blocking data reception, see :cpp:func:`void* simgrid::s4u::Mailbox::get()`"); /* Class Comm */ - py::class_(m, "Comm", "Communication, see :ref:`class s4u::Comm `") - .def("test", [](simgrid::s4u::CommPtr self) { - return self->test(); - }, "Test whether the communication is terminated, see :cpp:func:`simgrid::s4u::Comm::test()`") - .def("wait", [](simgrid::s4u::CommPtr self) { - self->wait(); - }, "Block until the completion of that communication, see :cpp:func:`simgrid::s4u::Comm::wait()`"); + py::class_(m, "Comm", + "Communication, see :ref:`class s4u::Comm `") + .def("test", [](simgrid::s4u::CommPtr self) { return self->test(); }, + "Test whether the communication is terminated, see :cpp:func:`simgrid::s4u::Comm::test()`") + .def("wait", [](simgrid::s4u::CommPtr self) { self->wait(); }, + "Block until the completion of that communication, see :cpp:func:`simgrid::s4u::Comm::wait()`") + .def("waitall", [](std::vector* comms) { simgrid::s4u::Comm::wait_all(comms); }, + "Block until the completion of all communications in the list, see " + ":cpp:func:`simgrid::s4u::Comm::wait_all()`"); /* Class Actor */ py::class_(m, "Actor",