Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
- Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
+ - Comm::waitall() is gone. Please use ActivitySet() instead.
----------------------------------------------------------------------------
void operator()() const
{
/* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ sg4::ActivitySet pending_comms;
/* Make a vector of the mailboxes to use */
std::vector<sg4::Mailbox*> mboxes;
auto* mbox = sg4::Mailbox::by_name(host->get_name());
mboxes.push_back(mbox);
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
XBT_INFO("Goodbye now!");
}
foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
app-masterworkers
- comm-wait comm-waitall comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
+ comm-wait comm-waitallfor comm-waitany comm-failure comm-host2host comm-pingpong
comm-ready comm-suspend comm-testany comm-throttling comm-waitallfor comm-waituntil
exec-async exec-basic exec-dvfs exec-remote exec-ptask
task-io task-simple task-switch-host task-variable-load
# Actors that are created as object will execute their __call__ method.
# So, the following constitutes the main function of the Sender actor.
def __call__(self):
- pending_comms = []
+ pending_comms = simgrid.ActivitySet()
mboxes = []
for host in self.hosts:
msg = "Hello, I'm alive and running on " + simgrid.this_actor.get_host().name
mbox = simgrid.Mailbox.by_name(host.name)
mboxes.append(mbox)
- pending_comms.append(mbox.put_async(msg, self.msg_size))
+ pending_comms.push(mbox.put_async(msg, self.msg_size))
simgrid.this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- simgrid.Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
simgrid.this_actor.info("Goodbye now!")
from typing import List
import sys
-from simgrid import Actor, Comm, Engine, Mailbox, this_actor
+from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
FINALIZE_MESSAGE = "finalize"
def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
my_mailbox: Mailbox = get_peer_mailbox(my_id)
my_mailbox.set_receiver(Actor.self())
- pending_comms: List[Comm] = []
+ pending_comms = ActivitySet()
# Start dispatching all messages to peers others that myself
for i in range(message_count):
for peer_id in range(peers_count):
peer_mailbox = get_peer_mailbox(peer_id)
message = f"Message {i} from peer {my_id}"
this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
- pending_comms.append(peer_mailbox.put_async(message, payload_size))
+ pending_comms.push(peer_mailbox.put_async(message, payload_size))
# Start sending messages to let peers know that they should stop
for peer_id in range(peers_count):
if peer_id != my_id:
peer_mailbox = get_peer_mailbox(peer_id)
payload = str(FINALIZE_MESSAGE)
- pending_comms.append(peer_mailbox.put_async(payload, payload_size))
+ pending_comms.push(peer_mailbox.put_async(payload, payload_size))
this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
this_actor.info("Done dispatching all messages")
this_actor.sleep_for(0.01)
this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
+++ /dev/null
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then blocks until all ongoing communication terminate, using simgrid.Comm.wait_all()
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
- # List in which we store all ongoing communications
- pending_comms = []
-
- # Vector of the used mailboxes
- mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
- for i in range(0, receivers_count)]
-
- # Start dispatching all messages to receivers, in a round robin fashion
- for i in range(0, messages_count):
- content = "Message {:d}".format(i)
- mbox = mboxes[i % receivers_count]
-
- this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
- # Create a communication representing the ongoing communication, and store it in pending_comms
- comm = mbox.put_async(content, msg_size)
- pending_comms.append(comm)
-
- # Start sending messages to let the workers know that they should stop
- for i in range(0, receivers_count):
- mbox = mboxes[i]
- this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
- comm = mbox.put_async("finalize", 0)
- pending_comms.append(comm)
-
- this_actor.info("Done dispatching all messages")
-
- # Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
-
- this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
- mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-
- this_actor.info("Wait for my first message")
- while True:
- received = mbox.get()
- this_actor.info("I got a '{:s}'.".format(received))
- if received == "finalize":
- break # If it's a finalize message, we're done.
-
-
-if __name__ == '__main__':
- e = Engine(sys.argv)
-
- # Load the platform description
- e.load_platform(sys.argv[1])
-
- Actor.create("sender", Host.by_name("Tremblay"), sender, 5, 1000000, 2)
- Actor.create("receiver", Host.by_name("Ruby"), receiver, 0)
- Actor.create("receiver", Host.by_name("Perl"), receiver, 1)
-
- e.run()
+++ /dev/null
-#!/usr/bin/env tesh
-
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitall.py ${platfdir}/small_platform_fatpipe.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (2:receiver@Ruby) Wait for my first message
-> [ 0.000000] (3:receiver@Perl) Wait for my first message
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [ 0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [ 0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [ 0.004022] (2:receiver@Ruby) I got a 'Message 0'.
-> [ 0.004022] (3:receiver@Perl) I got a 'Message 1'.
-> [ 0.008043] (2:receiver@Ruby) I got a 'Message 2'.
-> [ 0.008043] (3:receiver@Perl) I got a 'Message 3'.
-> [ 0.009995] (3:receiver@Perl) I got a 'finalize'.
-> [ 0.012065] (2:receiver@Ruby) I got a 'Message 4'.
-> [ 0.014016] (2:receiver@Ruby) I got a 'finalize'.
-> [ 0.014016] (1:sender@Tremblay) Goodbye now!
import functools
import sys
-from simgrid import Actor, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
+from simgrid import Actor, ActivitySet, Engine, Comm, Mailbox, NetZone, Link, LinkInRoute, this_actor
class Sender:
"""
# Actors that are created as object will execute their __call__ method.
# So, the following constitutes the main function of the Sender actor.
def __call__(self):
- pending_comms = []
+ pending_comms = ActivitySet()
mbox = Mailbox.by_name("receiver")
for i in range(self.msg_count):
size = self.msg_size * (i + 1)
this_actor.info("Send '%s' to '%s, msg size: %d'" % (msg, mbox.name, size))
comm = mbox.put_async(msg, size)
- pending_comms.append(comm)
+ pending_comms.push(comm)
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
from typing import List, Tuple
import sys
-from simgrid import Engine, Actor, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
+from simgrid import Engine, Actor, ActivitySet, Comm, Host, LinkInRoute, Mailbox, NetZone, this_actor
RECEIVER_MAILBOX_NAME = "receiver"
def __call__(self) -> None:
# List in which we store all ongoing communications
- pending_comms: List[Comm] = []
+ pending_comms = ActivitySet()
# Make a vector of the mailboxes to use
receiver_mailbox: Mailbox = Mailbox.by_name(RECEIVER_MAILBOX_NAME)
message_content = f"Message {i}"
this_actor.info(f"Send '{message_content}' to '{receiver_mailbox.name}'")
# Create a communication representing the ongoing communication, and store it in pending_comms
- pending_comms.append(receiver_mailbox.put_async(message_content, self.message_size))
+ pending_comms.push(receiver_mailbox.put_async(message_content, self.message_size))
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
- Comm.wait_all(pending_comms)
+ pending_comms.wait_all()
this_actor.info("Goodbye now!")
/*! \static Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout);
- /*! \static take a vector s4u::CommPtr and return when all of them is finished. */
- static void wait_all(const std::vector<CommPtr>& comms);
/*! \static Same as wait_all, but with a timeout. Return the number of terminated comm (less than comms.size() if
* the timeout occurs). */
static size_t wait_all_for(const std::vector<CommPtr>& comms, double timeout);
+
+#ifndef DOXYGEN
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static void wait_all(const std::vector<CommPtr>& comms);
+#endif
};
} // namespace simgrid::s4u
"In particular, the actor does not have to be on one of the involved hosts.")
.def_static("test_any", &Comm::test_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
"take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done)")
- .def_static("wait_all", &Comm::wait_all, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- "Block until the completion of all communications in the list.")
.def_static("wait_all_for", &Comm::wait_all_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
py::arg("timeout"),
"Block until the completion of all communications in the list, or raises TimeoutException after "
return changed_pos;
}
-void Comm::wait_all(const std::vector<CommPtr>& comms)
+void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
{
// TODO: this should be a simcall or something
for (const auto& comm : comms)
size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
{
if (timeout < 0.0) {
- wait_all(comms);
+ for (const auto& comm : comms)
+ comm->wait();
return comms.size();
}
XBT_LOG_NEW_DEFAULT_CATEGORY(issue105, "Issue105");
static void load_generator(sg4::Mailbox* mailbox)
{
- std::vector<sg4::CommPtr> comms;
+ sg4::ActivitySet comms;
// Send the task messages
for (int i = 0; i < 100; i++) {
auto* payload = new int(i);
sg4::CommPtr comm = mailbox->put_async(payload, 1024);
- comms.push_back(comm);
+ comms.push(comm);
sg4::this_actor::sleep_for(1.0);
}
auto* payload = new int(-1);
sg4::CommPtr comm = mailbox->put_async(payload, 1024);
XBT_INFO("Sent shutdown");
- comms.push_back(comm);
+ comms.push(comm);
// Wait for all messages to be consumed before ending the simulation
- sg4::Comm::wait_all(comms);
+ comms.wait_all();
XBT_INFO("Load generator finished");
}
foreach(x actor actor-autorestart actor-suspend
activity-lifecycle
- comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for
+ comm-get-sender comm-pt2pt comm-fault-scenarios wait-all-for wait-any-for
cloud-interrupt-migration cloud-two-execs
monkey-masterworkers monkey-semaphore
concurrent_rw
void operator()() const
{
/* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ sg4::ActivitySet pending_comms;
/* Make a vector of the mailboxes to use */
std::vector<sg4::Mailbox*> mboxes;
auto* mbox = sg4::Mailbox::by_name(host->get_name());
mboxes.push_back(mbox);
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
XBT_INFO("Goodbye now!");
}