Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
66a4a97b78ec125338b267e71b11b62e0a69410a
[simgrid.git] / examples / python / network-nonlinear / network-nonlinear.py
1 # Copyright (c) 2006-2022. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 # This example shows how to simulate a non-linear resource sharing for
7 # network links.
8
9 from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
10 import sys
11 import functools
12
13 class Sender:
14   """
15   Send a series of messages to mailbox "receiver"
16   """
17   def __init__(self, msg_count: int, msg_size=int(1e6)):
18     self.msg_count = msg_count
19     self.msg_size = msg_size
20
21   # Actors that are created as object will execute their __call__ method.
22   # So, the following constitutes the main function of the Sender actor.
23   def __call__(self):
24     pending_comms = []
25     mbox = Mailbox.by_name("receiver")
26
27     for i in range(self.msg_count):
28       msg = "Message " + str(i)
29       size = self.msg_size * (i + 1)
30       this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
31       comm = mbox.put_async(msg, size)
32       pending_comms.append(comm)
33
34     this_actor.info("Done dispatching all messages")
35
36     # Now that all message exchanges were initiated, wait for their completion in one single call
37     Comm.wait_all(pending_comms)
38
39     this_actor.info("Goodbye now!")
40
41 class Receiver:
42   """
43   Receiver actor: wait for N messages on the mailbox "receiver"
44   """
45
46   def __init__(self, msg_count=10):
47     self.msg_count = msg_count
48
49   def __call__(self):
50     mbox = Mailbox.by_name("receiver")
51
52     pending_msgs = []
53     pending_comms = []
54
55     this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
56     for _ in range(self.msg_count):
57       comm, data = mbox.get_async()
58       pending_comms.append(comm)
59       pending_msgs.append(data)
60
61     while len(pending_comms) > 0:
62       index = Comm.wait_any(pending_comms)
63       msg = pending_msgs[index].get()
64       this_actor.info("I got '%s'." % msg)
65       del pending_comms[index]
66       del pending_msgs[index]
67
68 ####################################################################################################
69 def link_nonlinear(link: Link, capacity: float, n: int) -> float:
70   """
71   Non-linear resource sharing for links
72
73   Note that the callback is called twice in this example:
74    1) link UP: with the number of active flows (from 9 to 1)
75    2) link DOWN: with 0 active flows. A crosstraffic communication is happing
76    in the down link, but it's not considered as an active flow.
77   """
78   # emulates a degradation in link according to the number of flows
79   # you probably want something more complex than that and based on real
80   # experiments
81   capacity = min(capacity, capacity * (1.0 - (n - 1) / 10.0))
82   this_actor.info("Link %s, %d active communications, new capacity %f" % (link.name, n, capacity))
83   return capacity
84
85 def load_platform():
86   """
87   Create a simple 2-hosts platform */
88    ________                 __________
89   | Sender |===============| Receiver |
90   |________|    Link1      |__________|
91
92   """
93   zone = NetZone.create_full_zone("Zone1")
94   sender = zone.create_host("sender", 1).seal()
95   receiver = zone.create_host("receiver", 1).seal()
96
97   link = zone.create_split_duplex_link("link1", 1e6)
98   # setting same callbacks (could be different) for link UP/DOWN in split-duplex link
99   link.get_link_up().set_sharing_policy(
100       Link.SharingPolicy.NONLINEAR,
101       functools.partial(link_nonlinear, link.get_link_up()))
102   link.get_link_down().set_sharing_policy(
103       Link.SharingPolicy.NONLINEAR,
104       functools.partial(link_nonlinear, link.get_link_down()))
105   link.set_latency(10e-6).seal()
106
107   # create routes between nodes
108   zone.add_route(sender.get_netpoint(), receiver.get_netpoint(), None, None,
109                  [LinkInRoute(link, LinkInRoute.Direction.UP)], True)
110   zone.seal()
111
112   # create actors Sender/Receiver
113   Actor.create("receiver", receiver, Receiver(9))
114   Actor.create("sender", sender, Sender(9))
115
116 ###################################################################################################
117
118 if __name__ == '__main__':
119   e = Engine(sys.argv)
120
121   # create platform
122   load_platform()
123
124   # runs the simulation
125   e.run()
126
127   # explicitly deleting Engine object to avoid segfault during cleanup phase.
128   # During Engine destruction, the cleanup of std::function linked to link_non_linear callback is called.
129   # If we let the cleanup by itself, it fails trying on its destruction because the python main program
130   # has already freed its variables
131   del(e)