Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add remaining Comm bindings and examples
[simgrid.git] / examples / python / comm-ready / comm-ready.py
diff --git a/examples/python/comm-ready/comm-ready.py b/examples/python/comm-ready/comm-ready.py
new file mode 100644 (file)
index 0000000..646d4d1
--- /dev/null
@@ -0,0 +1,87 @@
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+FINALIZE_MESSAGE = "finalize"
+
+
+def create_parser() -> ArgumentParser:
+    parser = ArgumentParser()
+    parser.add_argument(
+        '--platform',
+        type=str,
+        required=True,
+        help='path to the platform description'
+    )
+    return parser
+
+
+def get_peer_mailbox(peer_id: int) -> Mailbox:
+    return Mailbox.by_name(f"peer-{peer_id}")
+
+
+def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
+    my_mailbox: Mailbox = get_peer_mailbox(my_id)
+    my_mailbox.set_receiver(Actor.self())
+    pending_comms: List[Comm] = []
+    # Start dispatching all messages to peers others that myself
+    for i in range(message_count):
+        for peer_id in range(peers_count):
+            if peer_id != my_id:
+                peer_mailbox = get_peer_mailbox(peer_id)
+                message = f"Message {i} from peer {my_id}"
+                this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
+                pending_comms.append(peer_mailbox.put_async(message, payload_size))
+
+    # Start sending messages to let peers know that they should stop
+    for peer_id in range(peers_count):
+        if peer_id != my_id:
+            peer_mailbox = get_peer_mailbox(peer_id)
+            payload = str(FINALIZE_MESSAGE)
+            pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+            this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
+    this_actor.info("Done dispatching all messages")
+
+    # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
+    # messages
+    pending_finalize_messages = peers_count - 1
+    while pending_finalize_messages > 0:
+        if my_mailbox.ready:
+            start = Engine.clock
+            received: str = my_mailbox.get()
+            waiting_time = Engine.clock - start
+            if waiting_time != 0.0:
+                raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
+                                     f"ready, but got {waiting_time} instead")
+            this_actor.info(f"I got a '{received}'.")
+            if received == FINALIZE_MESSAGE:
+                pending_finalize_messages -= 1
+        else:
+            this_actor.info("Nothing ready to consume yet, I better sleep for a while")
+            this_actor.sleep_for(0.01)
+
+    this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
+    Comm.wait_all(pending_comms)
+    this_actor.info("Goodbye now!")
+
+
+def main():
+    settings = create_parser().parse_known_args()[0]
+    e = Engine(sys.argv)
+    e.load_platform(settings.platform)
+    Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
+    Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
+    Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
+    e.run()
+
+
+if __name__ == "__main__":
+    main()