Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Python: Add Comm.wait_any
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 17 Mar 2019 15:14:10 +0000 (16:14 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 17 Mar 2019 18:50:52 +0000 (19:50 +0100)
+ the example of same name
+ cosmetics in the related examples, in C++ comments

examples/python/CMakeLists.txt
examples/python/async-waitall/async-waitall.py
examples/python/async-waitany/async-waitany.py [new file with mode: 0644]
examples/python/async-waitany/async-waitany.tesh [new file with mode: 0644]
examples/python/async-waitany/async-waitany_d.xml [new file with mode: 0644]
examples/s4u/async-waitall/s4u-async-waitall.cpp
examples/s4u/async-waitany/s4u-async-waitany.cpp
src/bindings/python/simgrid_python.cpp

index 4c4eb11..d58c801 100644 (file)
@@ -1,5 +1,5 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield # actor-lifetime
-                async-wait async-waitall
+                async-wait async-waitall async-waitany
                 exec-basic)
   set(tesh_files    ${tesh_files}   ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.tesh)
   set(examples_src  ${examples_src} ${CMAKE_CURRENT_SOURCE_DIR}/${example}/${example}.py)
index ea7e73e..46cd50c 100644 (file)
@@ -9,7 +9,7 @@ from simgrid import *
 # This example shows how to block on the completion of a set of communications.
 #
 # As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-# pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
+# pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
 #
 # The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all()
 
@@ -51,7 +51,7 @@ class Sender:
         this_actor.info("Done dispatching all messages")
 
         # Now that all message exchanges were initiated, wait for their completion in one single call
-        Comm.waitall(pending_comms)
+        Comm.wait_all(pending_comms)
 
         this_actor.info("Goodbye now!")
 
diff --git a/examples/python/async-waitany/async-waitany.py b/examples/python/async-waitany/async-waitany.py
new file mode 100644 (file)
index 0000000..1bc5891
--- /dev/null
@@ -0,0 +1,99 @@
+# Copyright (c) 2010-2019. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+import sys
+from simgrid import *
+
+# This example shows how to block on the completion of a set of communications.
+#
+# As for the other asynchronous examples, the sender initiate all the messages it wants to send and
+# pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
+#
+# The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
+# will notice events as soon as they occur even if it does not follow the order of the container.
+#
+# Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
+# other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
+# processed before 'Message 5' that is sent to worker 0.
+
+class Sender:
+    def __init__(self, *args):
+        if len(args) != 3:
+            raise AssertionError("Actor sender requires 3 parameters, but got {:d}".format(len(args)))
+        self.messages_count = int(args[0])  # number of tasks
+        self.msg_size = int(args[1])  # communication cost (in bytes)
+        self.receivers_count = int(args[2])  # number of receivers
+
+    def __call__(self):
+        # List in which we store all ongoing communications
+        pending_comms = []
+
+        # Vector of the used mailboxes
+        mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
+                  for i in range(0, self.receivers_count)]
+
+        # Start dispatching all messages to receivers, in a round robin fashion
+        for i in range(0, self.messages_count):
+            content = "Message {:d}".format(i)
+            mbox = mboxes[i % self.receivers_count]
+
+            this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
+
+            # Create a communication representing the ongoing communication, and store it in pending_comms
+            comm = mbox.put_async(content, self.msg_size)
+            pending_comms.append(comm)
+
+        # Start sending messages to let the workers know that they should stop
+        for i in range(0, self.receivers_count):
+            mbox = mboxes[i]
+            this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
+            comm = mbox.put_async("finalize", 0)
+            pending_comms.append(comm)
+
+        this_actor.info("Done dispatching all messages")
+
+        # Now that all message exchanges were initiated, wait for their completion, in order of completion.
+        #
+        # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
+        # terminated.
+        # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
+        while pending_comms:
+          changed_pos = Comm.wait_any(pending_comms)
+          del pending_comms[changed_pos]
+          if (changed_pos != 0):
+            this_actor.info("Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first.".format(changed_pos));
+
+        this_actor.info("Goodbye now!")
+
+
+class Receiver:
+    def __init__(self, *args):
+        if len(args) != 1:  # Receiver actor expects 1 argument: its ID
+            raise AssertionError(
+                "Actor receiver requires 1 parameter, but got {:d}".format(len(args)))
+        self.mbox = Mailbox.by_name("receiver-{:s}".format(args[0]))
+
+    def __call__(self):
+        this_actor.info("Wait for my first message")
+        while True:
+            received = self.mbox.get()
+            this_actor.info("I got a '{:s}'.".format(received))
+            if received == "finalize":
+                break  # If it's a finalize message, we're done.
+
+
+if __name__ == '__main__':
+    e = Engine(sys.argv)
+
+    # Load the platform description
+    e.load_platform(sys.argv[1])
+
+    # Register the classes representing the actors
+    e.register_actor("sender", Sender)
+    e.register_actor("receiver", Receiver)
+
+    e.load_deployment(sys.argv[2])
+
+    e.run()
diff --git a/examples/python/async-waitany/async-waitany.tesh b/examples/python/async-waitany/async-waitany.tesh
new file mode 100644 (file)
index 0000000..4a1381a
--- /dev/null
@@ -0,0 +1,27 @@
+#!/usr/bin/env tesh
+
+p Testing Comm.wait_any()
+
+! output sort 19
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/async-waitany.py ${platfdir}/small_platform.xml async-waitany_d.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
+> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
+> [  0.000000] (2:receiver@Fafard) Wait for my first message
+> [  0.000000] (3:receiver@Jupiter) Wait for my first message
+> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
+> [  0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)'
+> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
+> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
+> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
+> [  0.158397] (2:receiver@Fafard) I got a 'Message 0'.
+> [  0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
+> [  0.316794] (2:receiver@Fafard) I got a 'Message 2'.
+> [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
+> [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
+> [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
+> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
+> [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
+> [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
+> [  0.526478] (1:sender@Tremblay) Goodbye now!
diff --git a/examples/python/async-waitany/async-waitany_d.xml b/examples/python/async-waitany/async-waitany_d.xml
new file mode 100644 (file)
index 0000000..4b4161d
--- /dev/null
@@ -0,0 +1,17 @@
+<?xml version='1.0'?>
+<!DOCTYPE platform SYSTEM "https://simgrid.org/simgrid.dtd">
+<platform version="4.1">
+  <!-- The master actor (with some arguments) -->
+  <actor host="Tremblay" function="sender">
+    <argument value="6"/>         <!-- Number of messages -->
+    <argument value="1000000"/>   <!-- Size of messages -->
+    <argument value="2"/>         <!-- Number of receivers -->
+  </actor>
+  <!-- The receiver processes -->
+  <actor host="Fafard" function="receiver">
+    <argument value="0"/>       <!-- My name -->
+  </actor>
+  <actor host="Jupiter" function="receiver">
+    <argument value="1"/>       <!-- My name -->
+  </actor>
+</platform>
index e3d83cf..0b6f83f 100644 (file)
@@ -6,7 +6,7 @@
 /* This example shows how to block on the completion of a set of communications.
  *
  * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
  *
  * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all()
  *
index 4d3b1fa..ca5c085 100644 (file)
@@ -6,7 +6,7 @@
 /* This example shows how to use simgrid::s4u::this_actor::wait_any() to wait for the first occurring event.
  *
  * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
- * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occur concurrently.
  *
  * The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
  * will notice events as soon as they occur even if it does not follow the order of the container.
index 3f9119d..260dd8f 100644 (file)
@@ -184,9 +184,13 @@ PYBIND11_MODULE(simgrid, m)
            "Test whether the communication is terminated, see :cpp:func:`simgrid::s4u::Comm::test()`")
       .def("wait", [](simgrid::s4u::CommPtr self) { self->wait(); },
            "Block until the completion of that communication, see :cpp:func:`simgrid::s4u::Comm::wait()`")
-      .def("waitall", [](std::vector<simgrid::s4u::CommPtr>* comms) { simgrid::s4u::Comm::wait_all(comms); },
+      .def("wait_all", [](std::vector<simgrid::s4u::CommPtr>* comms) { simgrid::s4u::Comm::wait_all(comms); },
            "Block until the completion of all communications in the list, see "
-           ":cpp:func:`simgrid::s4u::Comm::wait_all()`");
+           ":cpp:func:`simgrid::s4u::Comm::wait_all()`")
+      .def(
+          "wait_any", [](std::vector<simgrid::s4u::CommPtr>* comms) { return simgrid::s4u::Comm::wait_any(comms); },
+          "Block until the completion of any communication in the list and return the index of the terminated one, see "
+          ":cpp:func:`simgrid::s4u::Comm::wait_any()`");
 
   /* Class Actor */
   py::class_<simgrid::s4u::Actor, ActorPtr>(m, "Actor",