Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove superfluous simcall.
[simgrid.git] / examples / python / async-wait / async-wait.py
1 # Copyright (c) 2010-2019. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 import sys
7 from simgrid import *
8
9 # This example shows how to use simgrid::s4u::this_actor::wait() to wait for a given communication.
10 #
11 # As for the other asynchronous examples, the sender initiate all the messages it wants to send and
12 # pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
13 #
14 # The sender then loops until there is no ongoing communication.
15
16
17 class Sender:
18     def __init__(self, *args):
19         if len(args) != 3:
20             raise AssertionError(
21                 "Actor sender requires 3 parameters, but got {:d}".format(len(args)))
22         self.messages_count = int(args[0])  # number of tasks
23         self.msg_size = int(args[1])  # communication cost (in bytes)
24         self.receivers_count = int(args[2])  # number of receivers
25
26     def __call__(self):
27         # List in which we store all ongoing communications
28         pending_comms = []
29
30         # Vector of the used mailboxes
31         mboxes = [Mailbox.by_name("receiver-{:d}".format(i)) for i in range(0, self.receivers_count)]
32
33         # Start dispatching all messages to receivers, in a round robin fashion
34         for i in range(0, self.messages_count):
35             content = "Message {:d}".format(i)
36             mbox = mboxes[i % self.receivers_count]
37
38             this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
39
40             # Create a communication representing the ongoing communication, and store it in pending_comms
41             comm = mbox.put_async(content, self.msg_size)
42             pending_comms.append(comm)
43
44         # Start sending messages to let the workers know that they should stop
45         for i in range(0, self.receivers_count):
46             mbox = mboxes[i]
47             this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
48             comm = mbox.put_async("finalize", 0)
49             pending_comms.append(comm)
50
51         this_actor.info("Done dispatching all messages")
52
53         # Now that all message exchanges were initiated, wait for their completion, in order of creation.
54         for comm in pending_comms:
55             comm.wait()
56         this_actor.info("Goodbye now!")
57
58
59 class Receiver:
60     def __init__(self, *args):
61         if len(args) != 1:  # Receiver actor expects 1 argument: its ID
62             raise AssertionError("Actor receiver requires 1 parameter, but got {:d}".format(len(args)))
63         self.id = int(args[0])
64
65     def __call__(self):
66         # FIXME: It should be ok to initialize self.mbox from __init__, but it's currently failing on the OS X Jenkins slave.
67         self.mbox = Mailbox.by_name("receiver-{:d}".format(self.id))
68         this_actor.info("Wait for my first message")
69         while True:
70             received = self.mbox.get()
71             this_actor.info("I got a '{:s}'.".format(received))
72             if received == "finalize":
73                 break  # If it's a finalize message, we're done.
74
75
76 if __name__ == '__main__':
77     e = Engine(sys.argv)
78
79     e.load_platform(sys.argv[1])             # Load the platform description
80
81     # Register the classes representing the actors
82     e.register_actor("sender", Sender)
83     e.register_actor("receiver", Receiver)
84
85     e.load_deployment(sys.argv[2])
86
87     e.run()