Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Various sonar cleanups
[simgrid.git] / examples / python / comm-ready / comm-ready.py
1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 from argparse import ArgumentParser
7 from typing import List
8 import sys
9
10 from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
11
12
13 FINALIZE_MESSAGE = "finalize"
14
15
16 def create_parser() -> ArgumentParser:
17     parser = ArgumentParser()
18     parser.add_argument(
19         '--platform',
20         type=str,
21         required=True,
22         help='path to the platform description'
23     )
24     return parser
25
26
27 def get_peer_mailbox(peer_id: int) -> Mailbox:
28     return Mailbox.by_name(f"peer-{peer_id}")
29
30
31 def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
32     my_mailbox: Mailbox = get_peer_mailbox(my_id)
33     my_mailbox.set_receiver(Actor.self())
34     pending_comms = ActivitySet()
35     # Start dispatching all messages to peers others that myself
36     for i in range(message_count):
37         for peer_id in range(peers_count):
38             if peer_id != my_id:
39                 peer_mailbox = get_peer_mailbox(peer_id)
40                 message = f"Message {i} from peer {my_id}"
41                 this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
42                 pending_comms.push(peer_mailbox.put_async(message, payload_size))
43
44     # Start sending messages to let peers know that they should stop
45     for peer_id in range(peers_count):
46         if peer_id != my_id:
47             peer_mailbox = get_peer_mailbox(peer_id)
48             payload = str(FINALIZE_MESSAGE)
49             pending_comms.push(peer_mailbox.put_async(payload, payload_size))
50             this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
51     this_actor.info("Done dispatching all messages")
52
53     # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
54     # messages
55     pending_finalize_messages = peers_count - 1
56     while pending_finalize_messages > 0:
57         if my_mailbox.ready:
58             start = Engine.clock
59             received: str = my_mailbox.get()
60             waiting_time = Engine.clock - start
61             if waiting_time > 0.0:
62                 raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
63                                      f"ready, but got {waiting_time} instead")
64             this_actor.info(f"I got a '{received}'.")
65             if received == FINALIZE_MESSAGE:
66                 pending_finalize_messages -= 1
67         else:
68             this_actor.info("Nothing ready to consume yet, I better sleep for a while")
69             this_actor.sleep_for(0.01)
70
71     this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
72     pending_comms.wait_all()
73     this_actor.info("Goodbye now!")
74
75
76 def main():
77     settings = create_parser().parse_known_args()[0]
78     e = Engine(sys.argv)
79     e.load_platform(settings.platform)
80     Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
81     Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
82     Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
83     e.run()
84
85
86 if __name__ == "__main__":
87     main()