Python:
- Make the host_load plugin available from Python. See examples/python/plugin-host-load
- Mailbox::get_async() does not return a pair anymore. Use comm.get_payload() instead.
- - Comm::waitall/testany() are gone. Please use ActivitySet() instead.
+ - Comm::waitall/waitany/testany() are gone. Please use ActivitySet() instead.
- Comm::waitallfor() is gone too. Its semantic was unclear on timeout anyway.
- Io::waitany() and waitanyfor() are gone. Please use ActivitySet() instead.
import sys
-from simgrid import Engine, Actor, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
+from simgrid import Engine, Actor, ActivitySet, Comm, NetZone, Link, LinkInRoute, Mailbox, this_actor, NetworkFailureException
def sender(mailbox1_name: str, mailbox2_name: str) -> None:
comm2: Comm = mailbox2.put_async(666, 2)
this_actor.info("Calling wait_any..")
- pending_comms = [comm1, comm2]
+ pending_comms = ActivitySet([comm1, comm2])
try:
- index = Comm.wait_any([comm1, comm2])
- this_actor.info(f"Wait any returned index {index} (comm to {pending_comms[index].mailbox.name})")
+ comm = pending_comms.wait_any()
+ this_actor.info(f"Wait any returned a comm to {comm.mailbox.name})")
except NetworkFailureException:
this_actor.info("Sender has experienced a network failure exception, so it knows that something went wrong")
this_actor.info("Now it needs to figure out which of the two comms failed by looking at their state:")
this_actor.info(f"Waiting on a FAILED comm raises an exception: '{err}'")
this_actor.info("Wait for remaining comm, just to be nice")
- pending_comms.pop(0)
try:
- Comm.wait_any(pending_comms)
+ pending_comms.wait_all()
except Exception as e:
this_actor.warning(str(e))
def __call__(self):
mbox = Mailbox.by_name("receiver")
- pending_comms = []
+ pending_comms = ActivitySet()
this_actor.info("Wait for %d messages asynchronously" % self.msg_count)
for _ in range(self.msg_count):
comm = mbox.get_async()
- pending_comms.append(comm)
+ pending_comms.push(comm)
- while pending_comms:
- index = Comm.wait_any(pending_comms)
- msg = pending_comms[index].get_payload()
- this_actor.info("I got '%s'." % msg)
- del pending_comms[index]
+ while not pending_comms.empty():
+ comm = pending_comms.wait_any()
+ this_actor.info("I got '%s'." % comm.get_payload())
####################################################################################################
def link_nonlinear(link: Link, capacity: float, n: int) -> float:
def __call__(self):
# List in which we store all incoming msgs
- pending_comms: List[Comm] = []
+ pending_comms = ActivitySet()
this_actor.info(f"Wait for {self.messages_count} messages asynchronously")
for _ in range(self.messages_count):
- pending_comms.append(self.mailbox.get_async())
- while pending_comms:
- index = Comm.wait_any(pending_comms)
- this_actor.info(f"I got '{pending_comms[index].get_payload()}'.")
- pending_comms.pop(index)
+ pending_comms.push(self.mailbox.get_async())
+ while not pending_comms.empty():
+ comm = pending_comms.wait_any()
+ this_actor.info(f"I got '{comm.get_payload()}'.")
def main():
Comm* wait_for(double timeout) override;
#ifndef DOXYGEN
- static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
- static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<CommPtr>& comms) { return deprecated_wait_any_for(comms, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<CommPtr>& comms, double timeout) { return deprecated_wait_any_for(comms, timeout); }
static ssize_t deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout);
py::arg("to"), py::arg("simulated_size_in_bytes"),
"Do a blocking communication between two arbitrary hosts.\n\nThis initializes a communication that "
"completely bypass the mailbox and actors mechanism. There is really no limit on the hosts involved. "
- "In particular, the actor does not have to be on one of the involved hosts.")
- .def_static("wait_any", &Comm::wait_any, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- "Block until the completion of any communication in the list and return the index of the "
- "terminated one.")
- .def_static("wait_any_for", &Comm::wait_any_for, py::call_guard<py::gil_scoped_release>(), py::arg("comms"),
- py::arg("timeout"),
- "Block until the completion of any communication in the list and return the index of the terminated "
- "one, or -1 if a timeout occurred.");
+ "In particular, the actor does not have to be on one of the involved hosts.");
/* Class Io */
py::class_<simgrid::s4u::Io, simgrid::s4u::IoPtr, Activity>(m, "Io",
return this;
}
-ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
+ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
{
std::vector<ActivityPtr> activities;
for (const auto& comm : comms)
return this;
}
-ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout)
+ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
std::vector<ActivityPtr> activities;
for (const auto& comm : comms)
return comms.size();
}
- double deadline = Engine::get_clock() + timeout;
- std::vector<CommPtr> waited_comm(1, nullptr);
- for (size_t i = 0; i < comms.size(); i++) {
- double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
- waited_comm[0] = comms[i];
- // Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
- if (wait_any_for(waited_comm, wait_timeout) == -1) {
- XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
- return i;
- }
- }
- return comms.size();
+ ActivitySet set;
+ for (auto comm : comms)
+ set.push(comm);
+ set.wait_all_for(timeout);
+
+ return set.size();
}
} // namespace simgrid::s4u
/* **************************** Public C interface *************************** */
foreach(x actor actor-autorestart actor-suspend
activity-lifecycle
- comm-get-sender comm-pt2pt comm-fault-scenarios wait-any-for
+ comm-get-sender comm-pt2pt comm-fault-scenarios
cloud-interrupt-migration cloud-two-execs
monkey-masterworkers monkey-semaphore
concurrent_rw
## Add the tests.
## Some need to be run with all factories, some don't need tesh to run
-foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender wait-any-for
+foreach(x actor actor-autorestart actor-suspend activity-lifecycle comm-get-sender
cloud-interrupt-migration cloud-two-execs concurrent_rw dag-incomplete-simulation dependencies io-set-bw io-stream
vm-live-migration vm-suicide)
set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.tesh)
simgrid::s4u::ActorPtr receiver = simgrid::s4u::Actor::create("receiver", all_hosts[1], []() {
assert_exit(true, 2);
int* data;
- simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
- std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
- REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(pending_comms));
+ simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
+ simgrid::s4u::ActivitySet pending_comms({comm});
+ REQUIRE_NETWORK_FAILURE(pending_comms.wait_any());
});
simgrid::s4u::ActorPtr sender = simgrid::s4u::Actor::create("sender", all_hosts[2], []() {
+++ /dev/null
-/* Copyright (c) 2019-2023. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include <cstdlib>
-#include <iostream>
-#include <string>
-#include <simgrid/s4u.hpp>
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(meh, "meh");
-
-static void worker()
-{
- auto* mbox = simgrid::s4u::Mailbox::by_name("meh");
- int input1 = 42;
- int input2 = 51;
-
- XBT_INFO("Sending and receiving %d and %d asynchronously", input1, input2);
-
- auto put1 = mbox->put_async(&input1, 1000 * 1000 * 500);
- auto put2 = mbox->put_async(&input2, 1000 * 1000 * 1000);
-
- int* out1;
- auto get1 = mbox->get_async<int>(&out1);
-
- int* out2;
- auto get2 = mbox->get_async<int>(&out2);
-
- XBT_INFO("All comms have started");
- std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
-
- while (not comms.empty()) {
- ssize_t index = simgrid::s4u::Comm::wait_any_for(comms, 0.5);
- if (index < 0)
- XBT_INFO("wait_any_for: Timeout reached");
- else {
- XBT_INFO("wait_any_for: A comm finished (index=%zd, #comms=%zu)", index, comms.size());
- comms.erase(comms.begin() + index);
- }
- }
-
- XBT_INFO("All comms have finished");
- XBT_INFO("Got %d and %d", *out1, *out2);
-}
-
-int main(int argc, char* argv[])
-
-{
- simgrid::s4u::Engine e(&argc, argv);
- e.load_platform(argv[1]);
- simgrid::s4u::Actor::create("worker", e.host_by_name("Tremblay"), worker);
- e.run();
- return 0;
-}
+++ /dev/null
-#!/usr/bin/env tesh
-
-p Testing the wait_any_for feature of S4U
-
-! output sort 19
-$ ${bindir:=.}/wait-any-for ${platfdir:=.}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [ 0.000000] (1:worker@Tremblay) Sending and receiving 42 and 51 asynchronously
-> [ 0.000000] (1:worker@Tremblay) All comms have started
-> [ 0.500000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 1.000000] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=4)
-> [ 1.035263] (1:worker@Tremblay) wait_any_for: A comm finished (index=1, #comms=3)
-> [ 1.535263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 2.035263] (1:worker@Tremblay) wait_any_for: Timeout reached
-> [ 2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=2)
-> [ 2.070331] (1:worker@Tremblay) wait_any_for: A comm finished (index=0, #comms=1)
-> [ 2.070331] (1:worker@Tremblay) All comms have finished
-> [ 2.070331] (1:worker@Tremblay) Got 42 and 51