Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Little Sonar things.
[simgrid.git] / examples / python / comm-serialize / comm-serialize.py
1 # Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 from typing import List, Tuple
7 import sys
8
9 from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
10
11
12 RECEIVER_MAILBOX_NAME = "receiver"
13
14
15 class Sender(object):
16     def __init__(self, message_size: int, messages_count: int):
17         self.message_size = message_size
18         self.messages_count = messages_count
19
20     def __call__(self) -> None:
21         # List in which we store all ongoing communications
22         pending_comms: List[Comm] = []
23
24         # Make a vector of the mailboxes to use
25         receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
26         for i in range(self.messages_count):
27             message_content = f"Message {i}"
28             this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
29             # Create a communication representing the ongoing communication, and store it in pending_comms
30             pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
31
32         this_actor.info("Done dispatching all messages")
33
34         # Now that all message exchanges were initiated, wait for their completion in one single call
35         Comm.wait_all(pending_comms)
36
37         this_actor.info("Goodbye now!")
38
39
40 class Receiver(object):
41     def __init__(self, messages_count: int):
42         self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
43         self.messages_count = messages_count
44
45     def __call__(self):
46         # List in which we store all incoming msgs
47         pending_comms: List[Tuple[Comm, PyGetAsync]] = []
48         this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
49         for _ in range(self.messages_count):
50             pending_comms.append(self.mailbox.get_async())
51         while pending_comms:
52             index = Comm.wait_any([comm for (comm, _) in pending_comms])
53             _, async_data = pending_comms[index]
54             this_actor.info(f"I got '{async_data.get()}'.")
55             pending_comms.pop(index)
56
57
58 def main():
59     e = Engine(sys.argv)
60     # Creates the platform
61     #  ________                 __________
62     # | Sender |===============| Receiver |
63     # |________|    Link1      |__________|
64     #
65     zone: NetZone = NetZone.create_full_zone("Zone1")
66     sender_host: Host = zone.create_host("sender", 1).seal()
67     receiver_host: Host = zone.create_host("receiver", 1).seal()
68
69     # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2
70     link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
71
72     # create routes between nodes
73     zone.add_route(
74         sender_host.netpoint,
75         receiver_host.netpoint,
76         None,
77         None,
78         [LinkInRoute(link, LinkInRoute.UP)],
79         True
80     )
81     zone.seal()
82
83     # create actors Sender/Receiver
84     messages_count = 10
85     Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count))
86     Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6)))
87
88     e.run()
89
90
91 if __name__ == "__main__":
92     main()