1 # Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
6 from typing import List, Tuple
9 from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
12 RECEIVER_MAILBOX_NAME = "receiver"
16 def __init__(self, message_size: int, messages_count: int):
17 self.message_size = message_size
18 self.messages_count = messages_count
20 def __call__(self) -> None:
21 # List in which we store all ongoing communications
22 pending_comms: List[Comm] = []
24 # Make a vector of the mailboxes to use
25 receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
26 for i in range(self.messages_count):
27 message_content = f"Message {i}"
28 this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
29 # Create a communication representing the ongoing communication, and store it in pending_comms
30 pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
32 this_actor.info("Done dispatching all messages")
34 # Now that all message exchanges were initiated, wait for their completion in one single call
35 Comm.wait_all(pending_comms)
37 this_actor.info("Goodbye now!")
40 class Receiver(object):
41 def __init__(self, messages_count: int):
42 self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
43 self.messages_count = messages_count
46 # List in which we store all incoming msgs
47 pending_comms: List[Tuple[Comm, PyGetAsync]] = []
48 this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
49 for _ in range(self.messages_count):
50 pending_comms.append(self.mailbox.get_async())
52 index = Comm.wait_any([comm for (comm, _) in pending_comms])
53 _, async_data = pending_comms[index]
54 this_actor.info(f"I got '{async_data.get()}'.")
55 pending_comms.pop(index)
60 # Creates the platform
62 # | Sender |===============| Receiver |
63 # |________| Link1 |__________|
65 zone: NetZone = NetZone.create_full_zone("Zone1")
66 sender_host: Host = zone.create_host("sender", 1).seal()
67 receiver_host: Host = zone.create_host("receiver", 1).seal()
69 # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2
70 link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
72 # create routes between nodes
75 receiver_host.netpoint,
78 [LinkInRoute(link, LinkInRoute.UP)],
83 # create actors Sender/Receiver
85 Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count))
86 Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6)))
91 if __name__ == "__main__":