Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
autopep8 --max-line-length 120 -i --aggressive `find -name '*.py'`
[simgrid.git] / examples / python / async-waitany / async-waitany.py
1 # Copyright (c) 2010-2019. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 import sys
7 from simgrid import *
8
9 # This example shows how to block on the completion of a set of communications.
10 #
11 # As for the other asynchronous examples, the sender initiate all the messages it wants to send and
12 # pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
13 #
14 # The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
15 # will notice events as soon as they occur even if it does not follow the order of the container.
16 #
17 # Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
18 # other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
19 # processed before 'Message 5' that is sent to worker 0.
20
21
22 class Sender:
23     def __init__(self, *args):
24         if len(args) != 3:
25             raise AssertionError("Actor sender requires 3 parameters, but got {:d}".format(len(args)))
26         self.messages_count = int(args[0])  # number of tasks
27         self.msg_size = int(args[1])  # communication cost (in bytes)
28         self.receivers_count = int(args[2])  # number of receivers
29
30     def __call__(self):
31         # List in which we store all ongoing communications
32         pending_comms = []
33
34         # Vector of the used mailboxes
35         mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
36                   for i in range(0, self.receivers_count)]
37
38         # Start dispatching all messages to receivers, in a round robin fashion
39         for i in range(0, self.messages_count):
40             content = "Message {:d}".format(i)
41             mbox = mboxes[i % self.receivers_count]
42
43             this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
44
45             # Create a communication representing the ongoing communication, and store it in pending_comms
46             comm = mbox.put_async(content, self.msg_size)
47             pending_comms.append(comm)
48
49         # Start sending messages to let the workers know that they should stop
50         for i in range(0, self.receivers_count):
51             mbox = mboxes[i]
52             this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
53             comm = mbox.put_async("finalize", 0)
54             pending_comms.append(comm)
55
56         this_actor.info("Done dispatching all messages")
57
58         # Now that all message exchanges were initiated, wait for their completion, in order of completion.
59         #
60         # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
61         # terminated.
62         # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
63         while pending_comms:
64             changed_pos = Comm.wait_any(pending_comms)
65             del pending_comms[changed_pos]
66             if (changed_pos != 0):
67                 this_actor.info(
68                     "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first.".format(changed_pos))
69
70         this_actor.info("Goodbye now!")
71
72
73 class Receiver:
74     def __init__(self, *args):
75         if len(args) != 1:  # Receiver actor expects 1 argument: its ID
76             raise AssertionError(
77                 "Actor receiver requires 1 parameter, but got {:d}".format(len(args)))
78         self.mbox = Mailbox.by_name("receiver-{:s}".format(args[0]))
79
80     def __call__(self):
81         this_actor.info("Wait for my first message")
82         while True:
83             received = self.mbox.get()
84             this_actor.info("I got a '{:s}'.".format(received))
85             if received == "finalize":
86                 break  # If it's a finalize message, we're done.
87
88
89 if __name__ == '__main__':
90     e = Engine(sys.argv)
91
92     # Load the platform description
93     e.load_platform(sys.argv[1])
94
95     # Register the classes representing the actors
96     e.register_actor("sender", Sender)
97     e.register_actor("receiver", Receiver)
98
99     e.load_deployment(sys.argv[2])
100
101     e.run()