Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Deprecate Comm::wait_all(). Remove it in python
[simgrid.git] / examples / python / platform-comm-serialize / platform-comm-serialize.py
1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 from typing import List, Tuple
7 import sys
8
9 from simgrid import Engine, Actor, ActivitySet, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
10
11
12 RECEIVER_MAILBOX_NAME = "receiver"
13
14
15 class Sender(object):
16     def __init__(self, message_size: int, messages_count: int):
17         self.message_size = message_size
18         self.messages_count = messages_count
19
20     def __call__(self) -> None:
21         # List in which we store all ongoing communications
22         pending_comms = ActivitySet()
23
24         # Make a vector of the mailboxes to use
25         receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
26         for i in range(self.messages_count):
27             message_content = f"Message {i}"
28             this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
29             # Create a communication representing the ongoing communication, and store it in pending_comms
30             pending_comms.push(receiver_mailbox.put_async(message_content, self.message_size))
31
32         this_actor.info("Done dispatching all messages")
33
34         # Now that all message exchanges were initiated, wait for their completion in one single call
35         pending_comms.wait_all()
36
37         this_actor.info("Goodbye now!")
38
39
40 class Receiver(object):
41     def __init__(self, messages_count: int):
42         self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
43         self.messages_count = messages_count
44
45     def __call__(self):
46         # List in which we store all incoming msgs
47         pending_comms: List[Comm] = []
48         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
49         for _ in range(self.messages_count):
50             pending_comms.append(self.mailbox.get_async())
51         while pending_comms:
52             index = Comm.wait_any(pending_comms)
53             this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
54             pending_comms.pop(index)
55
56
57 def main():
58     e = Engine(sys.argv)
59     # Creates the platform
60     #  ________                 __________
61     # | Sender |===============| Receiver |
62     # |________|    Link1      |__________|
63     #
64     zone: NetZone = NetZone.create_full_zone("Zone1")
65     sender_host: Host = zone.create_host("sender", 1).seal()
66     receiver_host: Host = zone.create_host("receiver", 1).seal()
67
68     # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2
69     link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
70
71     # create routes between nodes
72     zone.add_route(sender_host, receiver_host, [link])
73     zone.seal()
74
75     # create actors Sender/Receiver
76     messages_count = 10
77     Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count))
78     Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6)))
79
80     e.run()
81
82
83 if __name__ == "__main__":
84     main()