SimGrid (3.31.1) NOT RELEASED YET (v3.32 expected June 21. 2022, 09:13 UTC)
+Python:
+ - Added the following bindings / examples:
+ - Comm (now 100% covers the C++ interface):
+ - Comm.dst_data_size, Comm.mailbox, Comm.sender, Comm.start_time, Comm.finish_time
+ - Comm.state_str [examples: examples/python/comm-failure/, examples/python/comm-host2host/]
+ - Comm.remaining [examples: examples/python/comm-host2host/, examples/python/comm-suspend/]
+ - Comm.set_payload_size [example: examples/python/comm-host2host/]
+ - Comm.set_rate [example: examples/python/comm-throttling/]
+ - Comm.sendto, Comm.sendto_init, Comm.sendto_async [example: examples/python/comm-host2host/]
+ - Comm.start, Comm.suspend, Comm.resume [example: examples/python/comm-host2host/]
+ - Comm.test_any [example: examples/python/comm-testany/]
+ - Comm.wait_until [example: examples/python/comm-waituntil/]
+ - Engine:
+ - Engine.host_by_name [example: examples/python/comm-host2host/]
+ - Engine.mailbox_by_name_or_create [example: examples/python/comm-pingpong/]
+ - Mailbox: Mailbox.ready [example: examples/python/comm-ready/]
+
----------------------------------------------------------------------------
SimGrid (3.31) March 22. 2022.
include examples/python/app-masterworkers/app-masterworkers.tesh
include examples/python/clusters-multicpu/clusters-multicpu.py
include examples/python/clusters-multicpu/clusters-multicpu.tesh
+include examples/python/comm-failure/comm-failure.py
+include examples/python/comm-failure/comm-failure.tesh
+include examples/python/comm-host2host/comm-host2host.py
+include examples/python/comm-host2host/comm-host2host.tesh
+include examples/python/comm-pingpong/comm-pingpong.py
+include examples/python/comm-pingpong/comm-pingpong.tesh
+include examples/python/comm-ready/comm-ready.py
+include examples/python/comm-ready/comm-ready.tesh
+include examples/python/comm-serialize/comm-serialize.py
+include examples/python/comm-serialize/comm-serialize.tesh
+include examples/python/comm-suspend/comm-suspend.py
+include examples/python/comm-suspend/comm-suspend.tesh
+include examples/python/comm-testany/comm-testany.py
+include examples/python/comm-testany/comm-testany.tesh
+include examples/python/comm-throttling/comm-throttling.py
+include examples/python/comm-throttling/comm-throttling.tesh
include examples/python/comm-wait/comm-wait.py
include examples/python/comm-wait/comm-wait.tesh
include examples/python/comm-waitall/comm-waitall.py
include examples/python/comm-waitany/comm-waitany.tesh
include examples/python/comm-waitfor/comm-waitfor.py
include examples/python/comm-waitfor/comm-waitfor.tesh
+include examples/python/comm-waituntil/comm-waituntil.py
+include examples/python/comm-waituntil/comm-waituntil.tesh
include examples/python/exec-async/exec-async.py
include examples/python/exec-async/exec-async.tesh
include examples/python/exec-basic/exec-basic.py
.. group-tab:: Python
.. autoattribute:: simgrid.Engine.all_hosts
+ .. automethod:: simgrid::s4u::Engine::host_by_name
.. group-tab:: C
.. automethod:: simgrid.Mailbox.get
.. automethod:: simgrid.Mailbox.get_async
+ .. autoattribute:: simgrid.Mailbox.ready
.. group-tab:: C
.. group-tab:: Python
+ .. autoattribute:: simgrid.Comm.dst_data_size
+ .. autoattribute:: simgrid.Comm.mailbox
+ .. autoattribute:: simgrid.Comm.sender
+ .. autoattribute:: simgrid.Comm.state_str
+ .. automethod:: simgrid.Comm.detach
+ .. automethod:: simgrid.Comm.set_payload_size
+ .. automethod:: simgrid.Comm.set_rate
.. automethod:: simgrid.Comm.detach
Life cycle
.. doxygenfunction:: simgrid::s4u::Comm::wait_any(const std::vector< CommPtr >& comms)
.. doxygenfunction:: simgrid::s4u::Comm::wait_any_for(const std::vector< CommPtr >& comms, double timeout)
.. doxygenfunction:: simgrid::s4u::Comm::wait_for
+ .. doxygenfunction:: simgrid::s4u::Comm::wait_until
.. group-tab:: Python
+ .. automethod:: simgrid.Comm.sendto
+ .. automethod:: simgrid.Comm.sendto_init
+ .. automethod:: simgrid.Comm.sendto_async
+
+ .. automethod:: simgrid.Comm.cancel
+ .. automethod:: simgrid.Comm.start
.. automethod:: simgrid.Comm.test
+ .. automethod:: simgrid.Comm.test_any
.. automethod:: simgrid.Comm.wait
.. automethod:: simgrid.Comm.wait_for
.. automethod:: simgrid.Comm.wait_all
.. automethod:: simgrid.Comm.wait_all_for
.. automethod:: simgrid.Comm.wait_any
.. automethod:: simgrid.Comm.wait_any_for
+ .. automethod:: simgrid.Comm.wait_until
.. group-tab:: C
.. example-tab:: examples/cpp/comm-pingpong/s4u-comm-pingpong.cpp
+ .. example-tab:: examples/python/comm-pingpong/comm-pingpong.py
+
.. example-tab:: examples/c/comm-pingpong/comm-pingpong.c
:cpp:func:`simgrid::s4u::Activity::resume()` and
:cpp:func:`simgrid::s4u::Activity::is_suspended()`.
+ .. example-tab:: examples/python/comm-suspend/comm-suspend.py
+
+ See also :py:func:`simgrid.Comm::suspend()` and
+ :py:func:`simgrid.Comm.resume()`.
+
Waiting for all communications in a set
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
See also :cpp:func:`simgrid::s4u::Comm::test_any()`.
+ .. example-tab:: examples/python/comm-testany/comm-testany.py
+
+ See also :py:func:`simgrid.Comm.test_any()`.
.. _s4u_ex_execution:
foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
app-masterworkers
- comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor
+ comm-wait comm-waitall comm-waitallfor comm-waitany comm-waitfor comm-failure comm-host2host comm-pingpong
+ comm-ready comm-serialize comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
exec-async exec-basic exec-dvfs exec-remote
platform-profile platform-failures
network-nonlinear clusters-multicpu io-degradation exec-cpu-nonlinear
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+import sys
+
+from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+
+
+def sender(mailbox1_name: str, mailbox2_name: str) -> None:
+ mailbox1: Mailbox = Mailbox.by_name(mailbox1_name)
+ mailbox2: Mailbox = Mailbox.by_name(mailbox2_name)
+
+ this_actor.info(f"Initiating asynchronous send to {mailbox1.name}")
+ comm1: Comm = mailbox1.put_async(666, 5)
+
+ this_actor.info(f"Initiating asynchronous send to {mailbox2.name}")
+ comm2: Comm = mailbox2.put_async(666, 2)
+
+ this_actor.info(f"Calling wait_any..")
+ pending_comms = [comm1, comm2]
+ try:
+ index = Comm.wait_any([comm1, comm2])
+ this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+ except NetworkFailureException:
+ this_actor.info(f"Sender has experienced a network failure exception, so it knows that something went wrong")
+ this_actor.info(f"Now it needs to figure out which of the two comms failed by looking at their state")
+
+ this_actor.info(f"Comm to {comm1.mailbox.name} has state: {comm1.state_str}")
+ this_actor.info(f"Comm to {comm2.mailbox.name} has state: {comm2.state_str}")
+
+ try:
+ comm1.wait()
+ except NetworkFailureException:
+ this_actor.info(f"Waiting on a FAILED comm raises an exception")
+
+ this_actor.info("Wait for remaining comm, just to be nice")
+ pending_comms.pop(0)
+ try:
+ Comm.wait_any(pending_comms)
+ except Exception as e:
+ this_actor.warning(str(e))
+
+
+def receiver(mailbox_name: str) -> None:
+ mailbox: Mailbox = Mailbox.by_name(mailbox_name)
+ this_actor.info(f"Receiver posting a receive ({mailbox_name})...")
+ try:
+ mailbox.get()
+ this_actor.info(f"Receiver has received successfully ({mailbox_name})!")
+ except NetworkFailureException:
+ this_actor.info(f"Receiver has experience a network failure exception ({mailbox_name})")
+
+
+def link_killer(link_name: str) -> None:
+ link_to_kill = Link.by_name(link_name)
+ this_actor.info("sleeping 10 seconds...")
+ this_actor.sleep_for(10.0)
+ this_actor.info(f"turning off link {link_to_kill.name}")
+ link_to_kill.turn_off()
+ this_actor.info("link killed. exiting")
+
+
+def main():
+ e = Engine(sys.argv)
+ zone: NetZone = NetZone.create_full_zone("AS0")
+ host1 = zone.create_host("Host1", "1f")
+ host2 = zone.create_host("Host2", "1f")
+ host3 = zone.create_host("Host3", "1f")
+
+ link_to_2 = LinkInRoute(zone.create_link("link_to_2", "1bps").seal())
+ link_to_3 = LinkInRoute(zone.create_link("link_to_3", "1bps").seal())
+
+ zone.add_route(host1.netpoint, host2.netpoint, None, None, [link_to_2], False)
+ zone.add_route(host1.netpoint, host3.netpoint, None, None, [link_to_3], False)
+ zone.seal()
+
+ Actor.create("Sender", host1, sender, "mailbox2", "mailbox3")
+ Actor.create("Receiver-1", host2, receiver, "mailbox2").daemonize()
+ Actor.create("Receiver-2", host3, receiver, "mailbox3").daemonize()
+ Actor.create("LinkKiller", host2, link_killer, "link_to_2").daemonize()
+
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-failure.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (4:LinkKiller@Host2) sleeping 10 seconds...
+>[ 0.000000] (2:Receiver-1@Host2) Receiver posting a receive (mailbox2)...
+>[ 0.000000] (3:Receiver-2@Host3) Receiver posting a receive (mailbox3)...
+>[ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox2
+>[ 0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox3
+>[ 0.000000] (1:Sender@Host1) Calling wait_any..
+>[ 10.000000] (4:LinkKiller@Host2) turning off link link_to_2
+>[ 10.000000] (4:LinkKiller@Host2) link killed. exiting
+>[ 10.000000] (2:Receiver-1@Host2) Receiver has experience a network failure exception (mailbox2)
+>[ 10.000000] (1:Sender@Host1) Sender has experienced a network failure exception, so it knows that something went wrong
+>[ 10.000000] (1:Sender@Host1) Now it needs to figure out which of the two comms failed by looking at their state
+>[ 10.000000] (1:Sender@Host1) Comm to mailbox2 has state: FAILED
+>[ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED
+>[ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception
+>[ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
+>[ 16.494845] (3:Receiver-2@Host3) Receiver has received successfully (mailbox3)!
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+"""
+This simple example demonstrates the Comm.sento_init() Comm.sento_async() functions,
+that can be used to create a direct communication from one host to another without
+relying on the mailbox mechanism.
+
+There is not much to say, actually: The _init variant creates the communication and
+leaves it unstarted (in case you want to modify this communication before it starts),
+while the _async variant creates and start it. In both cases, you need to wait() it.
+
+It is mostly useful when you want to have a centralized simulation of your settings,
+with a central actor declaring all communications occurring on your distributed system.
+"""
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Actor, Comm, Engine, Host, this_actor
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def sender(h1: Host, h2: Host, h3: Host, h4: Host):
+ this_actor.info(f"Send c12 with sendto_async({h1.name} -> {h2.name}),"
+ f" and c34 with sendto_init({h3.name} -> {h4.name})")
+ c12: Comm = Comm.sendto_async(h1, h2, int(1.5e7))
+ c34: Comm = Comm.sendto_init(h3, h4)
+ c34.set_payload_size(int(1e7))
+
+ # You can also detach() communications that you never plan to test() or wait().
+ # Here we create a communication that only slows down the other ones
+ noise: Comm = Comm.sendto_init(h1, h2)
+ noise.set_payload_size(10000)
+ noise.detach()
+
+ this_actor.info(f"After creation, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+ f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+ this_actor.sleep_for(1)
+ this_actor.info(f"One sec later, c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+ f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+ c34.start()
+ this_actor.info(f"After c34.start(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+ f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+ c12.wait()
+ this_actor.info(f"After c12.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+ f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+ c34.wait()
+ this_actor.info(f"After c34.wait(), c12 is {c12.state_str} (remaining: {c12.remaining:.2e} bytes);"
+ f" c34 is {c34.state_str} (remaining: {c34.remaining:.2e} bytes)")
+
+ # As usual, you don't have to explicitly start communications that were just init()ed.
+ # The wait() will start it automatically.
+ c14: Comm = Comm.sendto_init(h1, h4)
+ c14.set_payload_size(100).wait()
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+ Actor.create(
+ "sender", e.host_by_name("Boivin"), sender,
+ e.host_by_name("Tremblay"), # h1
+ e.host_by_name("Jupiter"), # h2
+ e.host_by_name("Fafard"), # h3
+ e.host_by_name("Ginette") # h4
+ )
+ e.run()
+ this_actor.info(f"Total simulation time: {e.clock:.3f}")
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-host2host.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:sender@Boivin) Send c12 with sendto_async(Tremblay -> Jupiter), and c34 with sendto_init(Fafard -> Ginette)
+>[ 0.000000] (1:sender@Boivin) After creation, c12 is STARTED (remaining: 1.50e+07 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
+>[ 1.000000] (1:sender@Boivin) One sec later, c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTING (remaining: 1.00e+07 bytes)
+>[ 1.000000] (1:sender@Boivin) After c34.start(), c12 is STARTED (remaining: 8.48e+06 bytes); c34 is STARTED (remaining: 1.00e+07 bytes)
+>[ 2.272621] (1:sender@Boivin) After c12.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is STARTED (remaining: 5.33e+05 bytes)
+>[ 2.343278] (1:sender@Boivin) After c34.wait(), c12 is FINISHED (remaining: 0.00e+00 bytes); c34 is FINISHED (remaining: 0.00e+00 bytes)
+>[ 2.359841] (0:maestro@) Total simulation time: 2.360
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Engine, Actor, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def pinger(mailbox_in: Mailbox, mailbox_out: Mailbox):
+ this_actor.info(f"Ping from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}")
+
+ # Do the ping with a 1-Byte payload (latency bound) ...
+ payload = Engine.clock
+ mailbox_out.put(payload, 1)
+
+ # ... then wait for the (large) pong
+ sender_time: float = mailbox_in.get()
+ communication_time = Engine.clock - sender_time
+ this_actor.info("Payload received : large communication (bandwidth bound)")
+ this_actor.info(f"Pong time (bandwidth bound): {communication_time:.3f}")
+
+
+def ponger(mailbox_in: Mailbox, mailbox_out: Mailbox):
+ this_actor.info(f"Pong from mailbox {mailbox_in.name} to mailbox {mailbox_out.name}")
+
+ # Receive the (small) ping first ...
+ sender_time: float = mailbox_in.get()
+ communication_time = Engine.clock - sender_time
+ this_actor.info("Payload received : small communication (latency bound)")
+ this_actor.info(f"Ping time (latency bound) {communication_time:.3f}")
+
+ # ... Then send a 1GB pong back (bandwidth bound)
+ payload = Engine.clock
+ this_actor.info(f"payload = {payload:.3f}")
+ mailbox_out.put(payload, int(1e9))
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+
+ mb1: Mailbox = e.mailbox_by_name_or_create("Mailbox 1")
+ mb2: Mailbox = e.mailbox_by_name_or_create("Mailbox 2")
+
+ Actor.create("pinger", e.host_by_name("Tremblay"), pinger, mb1, mb2)
+ Actor.create("ponger", e.host_by_name("Jupiter"), ponger, mb2, mb1)
+
+ e.run()
+
+ this_actor.info(f"Total simulation time: {e.clock:.3f}")
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+p Testing with default compound
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+>[ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+>[ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+>[ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019
+>[ 0.019014] (2:ponger@Jupiter) payload = 0.019
+>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+>[150.178356] (0:maestro@) Total simulation time: 150.178
+
+p Testing with default compound Full network optimization
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-pingpong.py --platform ${platfdir}/small_platform.xml "--cfg=network/optim:Full" "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (0:maestro@) Configuration change: Set 'network/optim' to 'Full'
+>[ 0.000000] (1:pinger@Tremblay) Ping from mailbox Mailbox 1 to mailbox Mailbox 2
+>[ 0.000000] (2:ponger@Jupiter) Pong from mailbox Mailbox 2 to mailbox Mailbox 1
+>[ 0.019014] (2:ponger@Jupiter) Payload received : small communication (latency bound)
+>[ 0.019014] (2:ponger@Jupiter) Ping time (latency bound) 0.019
+>[ 0.019014] (2:ponger@Jupiter) payload = 0.019
+>[150.178356] (1:pinger@Tremblay) Payload received : large communication (bandwidth bound)
+>[150.178356] (1:pinger@Tremblay) Pong time (bandwidth bound): 150.159
+>[150.178356] (0:maestro@) Total simulation time: 150.178
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+FINALIZE_MESSAGE = "finalize"
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def get_peer_mailbox(peer_id: int) -> Mailbox:
+ return Mailbox.by_name(f"peer-{peer_id}")
+
+
+def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
+ my_mailbox: Mailbox = get_peer_mailbox(my_id)
+ my_mailbox.set_receiver(Actor.self())
+ pending_comms: List[Comm] = []
+ # Start dispatching all messages to peers others that myself
+ for i in range(message_count):
+ for peer_id in range(peers_count):
+ if peer_id != my_id:
+ peer_mailbox = get_peer_mailbox(peer_id)
+ message = f"Message {i} from peer {my_id}"
+ this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
+ pending_comms.append(peer_mailbox.put_async(message, payload_size))
+
+ # Start sending messages to let peers know that they should stop
+ for peer_id in range(peers_count):
+ if peer_id != my_id:
+ peer_mailbox = get_peer_mailbox(peer_id)
+ payload = str(FINALIZE_MESSAGE)
+ pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+ this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
+ this_actor.info("Done dispatching all messages")
+
+ # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
+ # messages
+ pending_finalize_messages = peers_count - 1
+ while pending_finalize_messages > 0:
+ if my_mailbox.ready:
+ start = Engine.clock
+ received: str = my_mailbox.get()
+ waiting_time = Engine.clock - start
+ if waiting_time != 0.0:
+ raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
+ f"ready, but got {waiting_time} instead")
+ this_actor.info(f"I got a '{received}'.")
+ if received == FINALIZE_MESSAGE:
+ pending_finalize_messages -= 1
+ else:
+ this_actor.info("Nothing ready to consume yet, I better sleep for a while")
+ this_actor.sleep_for(0.01)
+
+ this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
+ Comm.wait_all(pending_comms)
+ this_actor.info("Goodbye now!")
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+ Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
+ Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
+ Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+p Peer sending and receiving
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-ready.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-1'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-0'
+>[ 0.000000] (3:peer@Perl) Send 'finalize' to 'peer-0'
+>[ 0.000000] (1:peer@Tremblay) Send 'Message 0 from peer 0' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 0 from peer 1' to 'peer-2'
+>[ 0.000000] (3:peer@Perl) Send 'finalize' to 'peer-1'
+>[ 0.000000] (3:peer@Perl) Done dispatching all messages
+>[ 0.000000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-1'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-0'
+>[ 0.000000] (1:peer@Tremblay) Send 'Message 1 from peer 0' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 1 from peer 1' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-0'
+>[ 0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-1'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 2 from peer 1' to 'peer-2'
+>[ 0.000000] (1:peer@Tremblay) Send 'finalize' to 'peer-2'
+>[ 0.000000] (1:peer@Tremblay) Done dispatching all messages
+>[ 0.000000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-0'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 3 from peer 1' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-0'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 4 from peer 1' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-0'
+>[ 0.000000] (2:peer@Ruby) Send 'Message 5 from peer 1' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-0'
+>[ 0.000000] (2:peer@Ruby) Send 'finalize' to 'peer-2'
+>[ 0.000000] (2:peer@Ruby) Done dispatching all messages
+>[ 0.000000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.010000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.010000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.010000] (1:peer@Tremblay) I got a 'Message 0 from peer 1'.
+>[ 0.010000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.020000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.020000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.020000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.030000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.030000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.030000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.040000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.040000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.040000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.050000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.050000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.050000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.060000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.060000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.060000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.070000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.070000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.070000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.080000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.080000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.080000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.090000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.090000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.090000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.100000] (1:peer@Tremblay) Nothing ready to consume yet, I better sleep for a while
+>[ 0.100000] (3:peer@Perl) Nothing ready to consume yet, I better sleep for a while
+>[ 0.100000] (2:peer@Ruby) Nothing ready to consume yet, I better sleep for a while
+>[ 0.110000] (2:peer@Ruby) I got a 'Message 0 from peer 0'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 0 from peer 0'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'finalize'.
+>[ 0.110000] (2:peer@Ruby) I got a 'finalize'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 0 from peer 1'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'Message 1 from peer 1'.
+>[ 0.110000] (2:peer@Ruby) I got a 'Message 1 from peer 0'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 1 from peer 0'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'Message 2 from peer 1'.
+>[ 0.110000] (2:peer@Ruby) I got a 'finalize'.
+>[ 0.110000] (2:peer@Ruby) I'm done, just waiting for my peers to receive the messages before exiting
+>[ 0.110000] (3:peer@Perl) I got a 'Message 1 from peer 1'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'Message 3 from peer 1'.
+>[ 0.110000] (3:peer@Perl) I got a 'finalize'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'Message 4 from peer 1'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 2 from peer 1'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'Message 5 from peer 1'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 3 from peer 1'.
+>[ 0.110000] (1:peer@Tremblay) I got a 'finalize'.
+>[ 0.110000] (1:peer@Tremblay) I'm done, just waiting for my peers to receive the messages before exiting
+>[ 0.110000] (3:peer@Perl) I got a 'Message 4 from peer 1'.
+>[ 0.110000] (3:peer@Perl) I got a 'Message 5 from peer 1'.
+>[ 0.110000] (3:peer@Perl) I got a 'finalize'.
+>[ 0.110000] (3:peer@Perl) I'm done, just waiting for my peers to receive the messages before exiting
+>[ 0.110000] (1:peer@Tremblay) Goodbye now!
+>[ 0.110000] (2:peer@Ruby) Goodbye now!
+>[ 0.110000] (3:peer@Perl) Goodbye now!
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from typing import List, Tuple
+import sys
+
+from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor, PyGetAsync
+
+
+RECEIVER_MAILBOX_NAME = "receiver"
+
+
+class Sender(object):
+ def __init__(self, message_size: int, messages_count: int):
+ self.message_size = message_size
+ self.messages_count = messages_count
+
+ def __call__(self) -> None:
+ # List in which we store all ongoing communications
+ pending_comms: List[Comm] = []
+
+ # Make a vector of the mailboxes to use
+ receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
+ for i in range(self.messages_count):
+ message_content = f"Message {i}"
+ this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
+ # Create a communication representing the ongoing communication, and store it in pending_comms
+ pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+
+ this_actor.info("Done dispatching all messages")
+
+ # Now that all message exchanges were initiated, wait for their completion in one single call
+ Comm.wait_all(pending_comms)
+
+ this_actor.info("Goodbye now!")
+
+
+class Receiver(object):
+ def __init__(self, messages_count: int):
+ self.mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
+ self.messages_count = messages_count
+
+ def __call__(self):
+ # List in which we store all incoming msgs
+ pending_comms: List[Tuple[Comm, PyGetAsync]] = []
+ this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
+ for i in range(self.messages_count):
+ pending_comms.append(self.mailbox.get_async())
+ while pending_comms:
+ index = Comm.wait_any([comm for (comm, _) in pending_comms])
+ _, async_data = pending_comms[index]
+ this_actor.info(f"I got '{async_data.get()}'.")
+ pending_comms.pop(index)
+
+
+def main():
+ e = Engine(sys.argv)
+ # Creates the platform
+ # ________ __________
+ # | Sender |===============| Receiver |
+ # |________| Link1 |__________|
+ #
+ zone: NetZone = NetZone.create_full_zone("Zone1")
+ sender_host: Host = zone.create_host("sender", 1).seal()
+ receiver_host: Host = zone.create_host("receiver", 1).seal()
+
+ # create split-duplex link1 (UP/DOWN), limiting the number of concurrent flows in it for 2
+ link = zone.create_split_duplex_link("link1", 10e9).set_latency(10e-6).set_concurrency_limit(2).seal()
+
+ # create routes between nodes
+ zone.add_route(
+ sender_host.netpoint,
+ receiver_host.netpoint,
+ None,
+ None,
+ [LinkInRoute(link, LinkInRoute.UP)],
+ True
+ )
+ zone.seal()
+
+ # create actors Sender/Receiver
+ messages_count = 10
+ Actor.create("receiver", receiver_host, Receiver(messages_count=messages_count))
+ Actor.create("sender", sender_host, Sender(messages_count=messages_count, message_size=int(1e6)))
+
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-serialize.py "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:receiver@receiver) Wait for 10 messages asynchronously
+>[ 0.000000] (2:sender@sender) Send 'Message 0' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 1' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 2' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 3' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 4' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 5' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 6' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 7' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 8' to 'receiver'
+>[ 0.000000] (2:sender@sender) Send 'Message 9' to 'receiver'
+>[ 0.000000] (2:sender@sender) Done dispatching all messages
+>[ 0.000336] (1:receiver@receiver) I got 'Message 1'.
+>[ 0.000336] (1:receiver@receiver) I got 'Message 0'.
+>[ 0.000542] (1:receiver@receiver) I got 'Message 3'.
+>[ 0.000542] (1:receiver@receiver) I got 'Message 2'.
+>[ 0.000749] (1:receiver@receiver) I got 'Message 5'.
+>[ 0.000749] (1:receiver@receiver) I got 'Message 4'.
+>[ 0.000955] (1:receiver@receiver) I got 'Message 7'.
+>[ 0.000955] (1:receiver@receiver) I got 'Message 6'.
+>[ 0.001161] (1:receiver@receiver) I got 'Message 9'.
+>[ 0.001161] (1:receiver@receiver) I got 'Message 8'.
+>[ 0.001161] (2:sender@sender) Goodbye now!
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+""" This example shows how to suspend and resume an asynchronous communication.
+"""
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def sender():
+ mailbox: Mailbox = Mailbox.by_name("receiver")
+ payload = "Sent message"
+
+ # Create a communication representing the ongoing communication and then
+ simulated_size_in_bytes = 13194230
+ comm: Comm = mailbox.put_init(payload, simulated_size_in_bytes)
+ this_actor.info(f"Suspend the communication before it starts (remaining: {comm.remaining:.0f} bytes)"
+ f" and wait a second.")
+ this_actor.sleep_for(1)
+ this_actor.info(f"Now, start the communication (remaining: {comm.remaining:.0f} bytes) and wait another second.")
+ comm.start()
+ this_actor.sleep_for(1)
+ this_actor.info(f"There is still {comm.remaining:.0f} bytes to transfer in this communication."
+ " Suspend it for one second.")
+ comm.suspend()
+ this_actor.info(f"Now there is {comm.remaining:.0f} bytes to transfer. Resume it and wait for its completion.")
+ comm.resume()
+ comm.wait()
+ this_actor.info(f"There is {comm.remaining:.0f} bytes to transfer after the communication completion.")
+ this_actor.info(f"Suspending a completed activity is a no-op.")
+ comm.suspend()
+
+
+def receiver():
+ mailbox: Mailbox = Mailbox.by_name("receiver")
+ this_actor.info("Wait for the message.")
+ received: str = mailbox.get()
+ this_actor.info(f"I got '{received}'.")
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+ Actor.create("sender", e.host_by_name("Tremblay"), sender)
+ Actor.create("receiver", e.host_by_name("Jupiter"), receiver)
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-suspend.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [ 0.000000] (1:sender@Tremblay) Suspend the communication before it starts (remaining: 13194230 bytes) and wait a second.
+> [ 0.000000] (2:receiver@Jupiter) Wait for the message.
+> [ 1.000000] (1:sender@Tremblay) Now, start the communication (remaining: 13194230 bytes) and wait another second.
+> [ 2.000000] (1:sender@Tremblay) There is still 6660438 bytes to transfer in this communication. Suspend it for one second.
+> [ 2.000000] (1:sender@Tremblay) Now there is 6660438 bytes to transfer. Resume it and wait for its completion.
+> [ 3.000000] (2:receiver@Jupiter) I got 'Sent message'.
+> [ 3.000000] (1:sender@Tremblay) There is 0 bytes to transfer after the communication completion.
+> [ 3.000000] (1:sender@Tremblay) Suspending a completed activity is a no-op.
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Engine, Actor, Comm, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def rank0():
+ rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
+ this_actor.info("Post my asynchronous receives")
+ comm1, a1 = rank0_mailbox.get_async()
+ comm2, a2 = rank0_mailbox.get_async()
+ comm3, a3 = rank0_mailbox.get_async()
+ pending_comms: List[Comm] = [comm1, comm2, comm3]
+
+ this_actor.info("Send some data to rank-1")
+ rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
+ for i in range(3):
+ rank1_mailbox.put(i, 1)
+
+ this_actor.info("Test for completed comms")
+ while pending_comms:
+ flag = Comm.test_any(pending_comms)
+ if flag != -1:
+ pending_comms.pop(flag)
+ this_actor.info("Remove a pending comm.")
+ else:
+ # Nothing matches, wait for a little bit
+ this_actor.sleep_for(0.1)
+ this_actor.info("Last comm is complete")
+
+
+def rank1():
+ rank0_mailbox: Mailbox = Mailbox.by_name("rank0")
+ rank1_mailbox: Mailbox = Mailbox.by_name("rank1")
+ for i in range(3):
+ data: int = rank1_mailbox.get()
+ this_actor.info(f"Received {data}")
+ msg_content = f"Message {i}"
+ this_actor.info(f"Send '{msg_content}'")
+ rank0_mailbox.put(msg_content, int(1e6))
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+
+ Actor.create("rank0", e.host_by_name("Tremblay"), rank0)
+ Actor.create("rank1", e.host_by_name("Fafard"), rank1)
+
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-testany.py --platform ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:rank0@Tremblay) Post my asynchronous receives
+>[ 0.000000] (1:rank0@Tremblay) Send some data to rank-1
+>[ 0.025708] (2:rank1@Fafard) Received 0
+>[ 0.025708] (2:rank1@Fafard) Send 'Message 0'
+>[ 0.209813] (2:rank1@Fafard) Received 1
+>[ 0.209813] (2:rank1@Fafard) Send 'Message 1'
+>[ 0.393918] (1:rank0@Tremblay) Test for completed comms
+>[ 0.393918] (2:rank1@Fafard) Received 2
+>[ 0.393918] (2:rank1@Fafard) Send 'Message 2'
+>[ 0.393918] (1:rank0@Tremblay) Remove a pending comm.
+>[ 0.393918] (1:rank0@Tremblay) Remove a pending comm.
+>[ 0.593918] (1:rank0@Tremblay) Remove a pending comm.
+>[ 0.593918] (1:rank0@Tremblay) Last comm is complete
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+from argparse import ArgumentParser
+import sys
+
+from simgrid import Engine, Actor, Comm, Mailbox, this_actor
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def sender(mailbox: Mailbox):
+ this_actor.info("Send at full bandwidth")
+
+ # First send a 2.5e8 Bytes payload at full bandwidth (1.25e8 Bps)
+ payload = Engine.clock
+ mailbox.put(payload, int(2.5e8))
+
+ this_actor.info("Throttle the bandwidth at the Comm level")
+ # ... then send it again but throttle the Comm
+ payload = Engine.clock
+ # get a handler on the comm first
+ comm: Comm = mailbox.put_init(payload, int(2.5e8))
+
+ # let throttle the communication. It amounts to set the rate of the comm to half the nominal bandwidth of the link,
+ # i.e., 1.25e8 / 2. This second communication will thus take approximately twice as long as the first one
+ comm.set_rate(int(1.25e8 / 2)).wait()
+
+
+def receiver(mailbox: Mailbox):
+ # Receive the first payload sent at full bandwidth
+ sender_time = mailbox.get()
+ communication_time = Engine.clock - sender_time
+ this_actor.info(f"Payload received (full bandwidth) in {communication_time} seconds")
+
+ # ... Then receive the second payload sent with a throttled Comm
+ sender_time = mailbox.get()
+ communication_time = Engine.clock - sender_time
+ this_actor.info(f"Payload received (throttled) in {communication_time} seconds")
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+
+ mailbox = e.mailbox_by_name_or_create("Mailbox")
+
+ Actor.create("sender", e.host_by_name("node-0.simgrid.org"), sender, mailbox)
+ Actor.create("receiver", e.host_by_name("node-1.simgrid.org"), receiver, mailbox)
+
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-throttling.py --platform ${platfdir}/cluster_backbone.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:sender@node-0.simgrid.org) Send at full bandwidth
+>[ 2.069662] (1:sender@node-0.simgrid.org) Throttle the bandwidth at the Comm level
+>[ 2.069662] (2:receiver@node-1.simgrid.org) Payload received (full bandwidth) in 2.0696616701030925 seconds
+>[ 6.077468] (2:receiver@node-1.simgrid.org) Payload received (throttled) in 4.007806 seconds
parser.add_argument(
'--platform',
type=str,
+ required=True,
help='path to the platform description'
)
parser.add_argument(
--- /dev/null
+# Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the license (GNU LGPL) which comes with this package.
+
+""" This example shows how to suspend and resume an asynchronous communication.
+"""
+
+from argparse import ArgumentParser
+from typing import List
+import sys
+
+from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+
+
+FINALIZE_MESSAGE = "finalize"
+
+
+def create_parser() -> ArgumentParser:
+ parser = ArgumentParser()
+ parser.add_argument(
+ '--platform',
+ type=str,
+ required=True,
+ help='path to the platform description'
+ )
+ return parser
+
+
+def sender(receiver_mailbox: Mailbox, messages_count: int, payload_size: int):
+ pending_comms: List[Comm] = []
+ # Start dispatching all messages to the receiver
+ for i in range(messages_count):
+ payload = f"Message {i}"
+ this_actor.info(f"Send '{payload}' to '{receiver_mailbox.name}'")
+ # Create a communication representing the ongoing communication
+ comm = receiver_mailbox.put_async(payload, payload_size)
+ # Add this comm to the vector of all known comms
+ pending_comms.append(comm)
+
+ # Start the finalize signal to the receiver
+ final_comm = receiver_mailbox.put_async(FINALIZE_MESSAGE, 0)
+ pending_comms.append(final_comm)
+ this_actor.info(f"Send '{FINALIZE_MESSAGE}' to '{receiver_mailbox.name}'")
+ this_actor.info("Done dispatching all messages")
+
+ # Now that all message exchanges were initiated, wait for their completion, in order of creation
+ while pending_comms:
+ comm = pending_comms[-1]
+ comm.wait_until(Engine.clock + 1)
+ pending_comms.pop() # remove it from the list
+ this_actor.info("Goodbye now!")
+
+
+def receiver(mailbox: Mailbox):
+ this_actor.info("Wait for my first message")
+ finalized = False
+ while not finalized:
+ received: str = mailbox.get()
+ this_actor.info(f"I got a '{received}'.")
+ # If it's a finalize message, we're done.
+ if received == FINALIZE_MESSAGE:
+ finalized = True
+
+
+def main():
+ settings = create_parser().parse_known_args()[0]
+ e = Engine(sys.argv)
+ e.load_platform(settings.platform)
+ receiver_mailbox: Mailbox = Mailbox.by_name("receiver")
+ Actor.create("sender", e.host_by_name("Tremblay"), sender, receiver_mailbox, 3, int(5e7))
+ Actor.create("receiver", e.host_by_name("Ruby"), receiver, receiver_mailbox)
+ e.run()
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waituntil.py --platform ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+>[ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'receiver'
+>[ 0.000000] (2:receiver@Ruby) Wait for my first message
+>[ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'receiver'
+>[ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'receiver'
+>[ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'receiver'
+>[ 0.000000] (1:sender@Tremblay) Done dispatching all messages
+>[ 0.105458] (2:receiver@Ruby) I got a 'Message 0'.
+>[ 0.210917] (2:receiver@Ruby) I got a 'Message 1'.
+>[ 0.316375] (2:receiver@Ruby) I got a 'Message 2'.
+>[ 0.318326] (2:receiver@Ruby) I got a 'finalize'.
+>[ 0.318326] (1:sender@Tremblay) Goodbye now!
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
using simgrid::s4u::BarrierPtr;
+using simgrid::s4u::Comm;
+using simgrid::s4u::CommPtr;
using simgrid::s4u::Engine;
using simgrid::s4u::Host;
using simgrid::s4u::Link;
"get_all_hosts() is deprecated and will be dropped after v3.33, use all_hosts instead.", 1);
return self.attr("all_hosts");
})
+ .def("host_by_name", &Engine::host_by_name_or_null, py::call_guard<py::gil_scoped_release>(),
+ "Retrieve a host by its name, or None if it does not exist in the platform.")
.def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform")
.def("get_all_links",
[](py::object self) // XBT_ATTRIB_DEPRECATED_v334
.def("netzone_by_name", &Engine::netzone_by_name_or_null)
.def("load_platform", &Engine::load_platform, "Load a platform file describing the environment")
.def("load_deployment", &Engine::load_deployment, "Load a deployment file and launch the actors that it contains")
+ .def("mailbox_by_name_or_create", &Engine::mailbox_by_name_or_create,
+ py::call_guard<py::gil_scoped_release>(),
+ py::arg("name"),
+ "Find a mailbox from its name or create one if it does not exist")
.def("run", &Engine::run, py::call_guard<py::gil_scoped_release>(), "Run the simulation until its end")
.def("run_until", py::overload_cast<double>(&Engine::run_until, py::const_),
py::call_guard<py::gil_scoped_release>(), "Run the simulation until the given date",
return std::string(self->get_name().c_str()); // Convert from xbt::string because of MC
},
"The name of that mailbox (read-only property).")
+ .def_property_readonly("ready", &Mailbox::ready, py::call_guard<py::gil_scoped_release>(),
+ "Check if there is a communication ready to be consumed from a mailbox.")
.def(
"put",
[](Mailbox* self, py::object data, int size, double timeout) {
py::call_guard<py::gil_scoped_release>(), "Blocking data reception")
.def(
"get_async",
- [](Mailbox* self) -> std::tuple<simgrid::s4u::CommPtr, PyGetAsync> {
+ [](Mailbox* self) -> std::tuple<CommPtr, PyGetAsync> {
PyGetAsync wrap;
auto comm = self->get_async(wrap.get());
return std::make_tuple(std::move(comm), std::move(wrap));
"Get python object after async communication in receiver side");
/* Class Comm */
- py::class_<simgrid::s4u::Comm, simgrid::s4u::CommPtr>(m, "Comm",
- "Communication. See the C++ documentation for details.")
- .def("test", &simgrid::s4u::Comm::test, py::call_guard<py::gil_scoped_release>(),
+ py::class_<Comm, CommPtr>(m, "Comm", "Communication. See the C++ documentation for details.")
+ .def_property_readonly("dst_data_size", &Comm::get_dst_data_size,
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the size of the received data.")
+ .def_property_readonly("mailbox", &Comm::get_mailbox,
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the mailbox on which this comm acts.")
+ .def_property_readonly("sender", &Comm::get_sender,
+ py::call_guard<py::gil_scoped_release>())
+ .def_property_readonly("state_str", [](Comm* self){ return std::string(self->get_state_str()); },
+ py::call_guard<py::gil_scoped_release>(),
+ "Retrieve the Comm state as string")
+ .def_property_readonly("remaining", &Comm::get_remaining,
+ py::call_guard<py::gil_scoped_release>(),
+ "Remaining amount of work that this Comm entails")
+ .def_property_readonly("start_time", &Comm::get_start_time,
+ py::call_guard<py::gil_scoped_release>(),
+ "Time at which this Comm started")
+ .def_property_readonly("finish_time", &Comm::get_finish_time,
+ py::call_guard<py::gil_scoped_release>(),
+ "Time at which this Comm finished")
+ .def("set_payload_size", &Comm::set_payload_size, py::call_guard<py::gil_scoped_release>(),
+ py::arg("bytes"),
+ "Specify the amount of bytes which exchange should be simulated.")
+ .def("set_rate", &Comm::set_rate, py::call_guard<py::gil_scoped_release>(),
+ py::arg("rate"),
+ "Sets the maximal communication rate (in byte/sec). Must be done before start")
+ .def("cancel", [](Comm* self){ return self->cancel(); },
+ py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+ "Cancel the activity.")
+ .def("start", [](Comm* self){ return self->start(); },
+ py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+ "Starts a previously created activity. This function is optional: you can call wait() even if you didn't "
+ "call start()")
+ .def("suspend", [](Comm* self){ return self->suspend(); },
+ py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+ "Suspend the activity.")
+ .def("resume", [](Comm* self){ return self->resume(); },
+ py::call_guard<py::gil_scoped_release>(), py::return_value_policy::reference_internal,
+ "Resume the activity.")
+ .def("test", &Comm::test, py::call_guard<py::gil_scoped_release>(),
"Test whether the communication is terminated.")
- .def("wait", &simgrid::s4u::Comm::wait, py::call_guard<py::gil_scoped_release>(),
+ .def("wait", &Comm::wait, py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication.")
- .def("wait_for", &simgrid::s4u::Comm::wait_for,
+ .def("wait_for", &Comm::wait_for, py::call_guard<py::gil_scoped_release>(),
py::arg("timeout"),
- py::call_guard<py::gil_scoped_release>(),
"Block until the completion of that communication, or raises TimeoutException after the specified timeout.")
- .def("detach", [](simgrid::s4u::Comm* self) {
- return self->detach();
- },
+ .def("wait_until", &Comm::wait_until, py::call_guard<py::gil_scoped_release>(),
+ py::arg("time_limit"),
+ "Block until the completion of that communication, or raises TimeoutException after the specified time.")
+ .def("detach", [](Comm* self) { return self->detach(); },
py::return_value_policy::reference_internal,
py::call_guard<py::gil_scoped_release>(),
"Start the comm, and ignore its result. It can be completely forgotten after that.")
- .def_static(
- "wait_all", &simgrid::s4u::Comm::wait_all,
- py::arg("comms"),
- py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of all communications in the list.")
- .def_static("wait_all_for", &simgrid::s4u::Comm::wait_all_for,
- py::arg("comms"), py::arg("timeout"),
+ .def_static("sendto", &Comm::sendto, py::call_guard<py::gil_scoped_release>(),
+ py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"),
+ "Do a blocking communication between two arbitrary hosts.")
+ .def_static("sendto_init", py::overload_cast<Host*, Host*>(&Comm::sendto_init),
+ py::call_guard<py::gil_scoped_release>(),
+ py::arg("from"), py::arg("to"),
+ "Creates a communication between the two given hosts, bypassing the mailbox mechanism.")
+ .def_static("sendto_async", &Comm::sendto_async, py::call_guard<py::gil_scoped_release>(),
+ py::arg("from"), py::arg("to"), py::arg("simulated_size_in_bytes"),
+ "Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
+ "completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
+ "In particular, the actor does not have to be on one of the involved hosts.")
+ .def_static("test_any", &Comm::test_any,
py::call_guard<py::gil_scoped_release>(),
+ py::arg("comms"),
+ "take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
+ .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(),
+ py::arg("comms"),
+ "Block until the completion of all communications in the list.")
+ .def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(),
+ py::arg("comms"), py::arg("timeout"),
"Block until the completion of all communications in the list, or raises TimeoutException after "
"the specified timeout.")
- .def_static(
- "wait_any", &simgrid::s4u::Comm::wait_any,
- py::arg("comms"),
- py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of any communication in the list and return the index of the terminated one.")
- .def_static(
- "wait_any_for",
- &simgrid::s4u::Comm::wait_any_for,
- py::arg("comms"), py::arg("timeout"),
- py::call_guard<py::gil_scoped_release>(),
- "Block until the completion of any communication in the list and return the index of the terminated "
- "one, or -1 if a timeout occurred."
- );
+ .def_static("wait_any", &Comm::wait_any,
+ py::call_guard<py::gil_scoped_release>(),
+ py::arg("comms"),
+ "Block until the completion of any communication in the list and return the index of the "
+ "terminated one.")
+ .def_static("wait_any_for", &Comm::wait_any_for,
+ py::call_guard<py::gil_scoped_release>(),
+ py::arg("comms"), py::arg("timeout"),
+ "Block until the completion of any communication in the list and return the index of the terminated "
+ "one, or -1 if a timeout occurred.");
/* Class Io */
py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr>(m, "Io", "I/O activities. See the C++ documentation for details.")
return link == pimpl->links_.end() ? nullptr : link->second->get_iface();
}
-/** @brief Find a mailox from its name or create one if it does not exist) */
+/** @brief Find a mailbox from its name or create one if it does not exist) */
Mailbox* Engine::mailbox_by_name_or_create(const std::string& name) const
{
/* two actors may have pushed the same mbox_create simcall at the same time */