associated example (examples/cpp/plugin-prodcons).
S4U:
- - New: simgrid::s4u::Comm::wait_all_for() (like Comm::Wait_all, but with a timeout).
+ - New: s4u::Comm::wait_all_for() (like s4u::Comm::wait_all, but with a timeout),
+ s4u::Io::wait_any(), s4u::Io::wait_any_for().
+ - Methods test_all/test_any/wait_all/wait_any in s4u now take their vector
+ parameter by reference, instead of a pointer.
- Fixed a bug where Activity::wait_for() killed the activity on timeout.
Explicitly cancel the activity to get back to previous behavior.
.. doxygenfunction:: simgrid::s4u::Comm::cancel
.. doxygenfunction:: simgrid::s4u::Comm::start
.. doxygenfunction:: simgrid::s4u::Comm::test
- .. doxygenfunction:: simgrid::s4u::Comm::test_any(const std::vector< CommPtr > *comms)
+ .. doxygenfunction:: simgrid::s4u::Comm::test_any(const std::vector< CommPtr >& comms)
.. doxygenfunction:: simgrid::s4u::Comm::wait
- .. doxygenfunction:: simgrid::s4u::Comm::wait_all(const std::vector< CommPtr > *comms)
- .. doxygenfunction:: simgrid::s4u::Comm::wait_any(const std::vector< CommPtr > *comms)
- .. doxygenfunction:: simgrid::s4u::Comm::wait_any_for(const std::vector< CommPtr > *comms_in, double timeout)
+ .. doxygenfunction:: simgrid::s4u::Comm::wait_all(const std::vector< CommPtr >& comms)
+ .. doxygenfunction:: simgrid::s4u::Comm::wait_all_for(const std::vector< CommPtr >& comms, double timeout)
+ .. doxygenfunction:: simgrid::s4u::Comm::wait_any(const std::vector< CommPtr >& comms)
+ .. doxygenfunction:: simgrid::s4u::Comm::wait_any_for(const std::vector< CommPtr >& comms, double timeout)
.. doxygenfunction:: simgrid::s4u::Comm::wait_for
.. group-tab:: Python
.. doxygenfunction:: simgrid::s4u::Exec::start
.. doxygenfunction:: simgrid::s4u::Exec::test
.. doxygenfunction:: simgrid::s4u::Exec::wait
- .. doxygenfunction:: simgrid::s4u::Exec::wait_any(std::vector< ExecPtr > *execs)
- .. doxygenfunction:: simgrid::s4u::Exec::wait_any_for(std::vector< ExecPtr > *execs, double timeout)
+ .. doxygenfunction:: simgrid::s4u::Exec::wait_any(std::vector< ExecPtr >& execs)
+ .. doxygenfunction:: simgrid::s4u::Exec::wait_any_for(std::vector< ExecPtr >& execs, double timeout)
.. doxygenfunction:: simgrid::s4u::Exec::wait_for
.. group-tab:: Python
simgrid::s4u::CommPtr comm = me->get_async<FilePiece>(&received);
pending_recvs.push_back(comm);
- int idx = simgrid::s4u::Comm::wait_any(&pending_recvs);
+ int idx = simgrid::s4u::Comm::wait_any(pending_recvs);
if (idx != -1) {
comm = pending_recvs.at(idx);
XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
simgrid::s4u::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
pending_sends.push_back(comm);
}
- simgrid::s4u::Comm::wait_all(&pending_sends);
+ simgrid::s4u::Comm::wait_all(pending_sends);
}
Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
p.joinChain();
p.forwardFile();
- simgrid::s4u::Comm::wait_all(&p.pending_sends);
+ simgrid::s4u::Comm::wait_all(p.pending_sends);
double end_time = simgrid::s4u::Engine::get_clock();
XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(&pending_comms);
+ sg4::Comm::wait_all(pending_comms);
XBT_INFO("Goodbye now!");
}
}
XBT_INFO("I'm done, just waiting for my peers to receive the messages before exiting");
- sg4::Comm::wait_all(&pending_comms);
+ sg4::Comm::wait_all(pending_comms);
XBT_INFO("Goodbye now!");
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(&pending_comms);
+ sg4::Comm::wait_all(pending_comms);
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");
pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
}
while (not pending_comms.empty()) {
- int index = sg4::Comm::wait_any(&pending_comms);
+ int index = sg4::Comm::wait_any(pending_comms);
std::string* msg = *pending_msgs[index];
XBT_INFO("I got '%s'.", msg->c_str());
/* cleanup memory and remove from vectors */
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(&pending_comms);
+ sg4::Comm::wait_all(pending_comms);
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");
* Even in this simple example, the pending comms do not terminate in the exact same order of creation.
*/
while (not pending_comms.empty()) {
- int changed_pos = sg4::Comm::wait_any(&pending_comms);
+ int changed_pos = sg4::Comm::wait_any(pending_comms);
pending_comms.erase(pending_comms.begin() + changed_pos);
if (changed_pos != 0)
XBT_INFO("Remove the %dth pending comm: it terminated earlier than another comm that was initiated first.",
std::vector<simgrid::s4u::CommPtr> comms;
for (int i = 0; i < flow_amount; i++)
comms.push_back(mailbox->put_async(bprintf("%d", i), comm_size));
- simgrid::s4u::Comm::wait_all(&comms);
+ simgrid::s4u::Comm::wait_all(comms);
}
XBT_INFO("sender done.");
}
for (int i = 0; i < flow_amount; i++)
comms.push_back(mailbox->get_async<char>(&data[i]));
- simgrid::s4u::Comm::wait_all(&comms);
+ simgrid::s4u::Comm::wait_all(comms);
for (int i = 0; i < flow_amount; i++)
xbt_free(data[i]);
}
// wait for the completion of all activities
while (not pending_execs.empty()) {
- int changed_pos = simgrid::s4u::Exec::wait_any_for(&pending_execs, -1);
+ int changed_pos = simgrid::s4u::Exec::wait_any_for(pending_execs, -1);
XBT_INFO("Exec '%s' is complete", pending_execs[changed_pos]->get_cname());
pending_execs.erase(pending_execs.begin() + changed_pos);
}
while (not pending_executions.empty()) {
int pos;
if (with_timeout)
- pos = simgrid::s4u::Exec::wait_any_for(&pending_executions, 4);
+ pos = simgrid::s4u::Exec::wait_any_for(pending_executions, 4);
else
- pos = simgrid::s4u::Exec::wait_any(&pending_executions);
+ pos = simgrid::s4u::Exec::wait_any(pending_executions);
if (pos < 0) {
XBT_INFO("Do not wait any longer for an activity");
// wait for the completion of all activities
bob_compute->wait();
while (not pending_ios.empty()) {
- int changed_pos = simgrid::s4u::Io::wait_any(&pending_ios);
+ int changed_pos = simgrid::s4u::Io::wait_any(pending_ios);
XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname());
pending_ios.erase(pending_ios.begin() + changed_pos);
}
/*! take a vector s4u::CommPtr and return when one of them is finished.
* The return value is the rank of the first finished CommPtr. */
- static int wait_any(const std::vector<CommPtr>* comms) { return wait_any_for(comms, -1); }
+ static int wait_any(const std::vector<CommPtr>& comms) { return wait_any_for(comms, -1); }
/*! Same as wait_any, but with a timeout. Return -1 if the timeout occurs.*/
- static int wait_any_for(const std::vector<CommPtr>* comms_in, double timeout);
+ static int wait_any_for(const std::vector<CommPtr>& comms, double timeout);
/*! take a vector s4u::CommPtr and return when all of them is finished. */
- static void wait_all(const std::vector<CommPtr>* comms);
+ static void wait_all(const std::vector<CommPtr>& comms);
/*! Same as wait_all, but with a timeout. Return the number of terminated comm (less than comms.size() if the timeout
* occurs). */
- static size_t wait_all_for(const std::vector<CommPtr>* comms, double timeout);
+ static size_t wait_all_for(const std::vector<CommPtr>& comms, double timeout);
/*! take a vector s4u::CommPtr and return the rank of the first finished one (or -1 if none is done). */
- static int test_any(const std::vector<CommPtr>* comms);
+ static int test_any(const std::vector<CommPtr>& comms);
+
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for parameter")
+ static int wait_any(const std::vector<CommPtr>* comms) { return wait_any_for(*comms, -1); }
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for first parameter")
+ static int wait_any_for(const std::vector<CommPtr>* comms, double timeout) { return wait_any_for(*comms, timeout); }
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for parameter")
+ static void wait_all(const std::vector<CommPtr>* comms) { wait_all(*comms); }
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for parameter")
+ static int test_any(const std::vector<CommPtr>* comms) { return test_any(*comms); }
Comm* start() override;
Comm* wait_for(double timeout) override;
/*! take a vector of s4u::ExecPtr and return when one of them is finished.
* The return value is the rank of the first finished ExecPtr. */
- static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(execs, -1); }
+ static int wait_any(const std::vector<ExecPtr>& execs) { return wait_any_for(execs, -1); }
/*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static int wait_any_for(std::vector<ExecPtr>* execs, double timeout);
+ static int wait_any_for(const std::vector<ExecPtr>& execs, double timeout);
+
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for parameter")
+ static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(*execs, -1); }
+ XBT_ATTRIB_DEPRECATED_v332("Please use a plain vector for first parameter")
+ static int wait_any_for(std::vector<ExecPtr>* execs, double timeout) { return wait_any_for(*execs, timeout); }
/** @brief On sequential executions, returns the amount of flops that remain to be done; This cannot be used on
* parallel executions. */
Io* start() override;
/*! take a vector of s4u::IoPtr and return when one of them is finished.
* The return value is the rank of the first finished IoPtr. */
- static int wait_any(std::vector<IoPtr>* ios) { return wait_any_for(ios, -1); }
+ static int wait_any(const std::vector<IoPtr>& ios) { return wait_any_for(ios, -1); }
/*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static int wait_any_for(std::vector<IoPtr>* ios, double timeout);
+ static int wait_any_for(const std::vector<IoPtr>& ios, double timeout);
double get_remaining() const override;
sg_size_t get_performed_ioops() const;
"Test whether the communication is terminated.")
.def("wait", &simgrid::s4u::Comm::wait, py::call_guard<GilScopedRelease>(),
"Block until the completion of that communication.")
- .def("wait_all", &simgrid::s4u::Comm::wait_all, py::call_guard<GilScopedRelease>(),
- "Block until the completion of all communications in the list.")
- .def("wait_any", &simgrid::s4u::Comm::wait_any, py::call_guard<GilScopedRelease>(),
- "Block until the completion of any communication in the list and return the index of the terminated one.");
+ // use py::overload_cast for wait_all/wait_any, until the overload marked XBT_ATTRIB_DEPRECATED_v332 is removed
+ .def_static("wait_all",
+ py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_all),
+ py::call_guard<GilScopedRelease>(), "Block until the completion of all communications in the list.")
+ .def_static(
+ "wait_any", py::overload_cast<const std::vector<simgrid::s4u::CommPtr>&>(&simgrid::s4u::Comm::wait_any),
+ py::call_guard<GilScopedRelease>(),
+ "Block until the completion of any communication in the list and return the index of the terminated one.");
/* Class Exec */
py::class_<simgrid::s4u::Exec, simgrid::s4u::ExecPtr>(m, "Exec", "Execution")
}
}
-int Comm::wait_any_for(const std::vector<CommPtr>* comms, double timeout)
+int Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms->size());
- std::transform(begin(*comms), end(*comms), begin(rcomms),
+ std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
+ std::transform(begin(comms), end(comms), begin(rcomms),
[](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
int changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
if (changed_pos != -1)
- comms->at(changed_pos)->complete(State::FINISHED);
+ comms.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
-void Comm::wait_all(const std::vector<CommPtr>* comms)
+void Comm::wait_all(const std::vector<CommPtr>& comms)
{
// TODO: this should be a simcall or something
- for (auto& comm : *comms)
+ for (auto& comm : comms)
comm->wait();
}
-size_t Comm::wait_all_for(const std::vector<CommPtr>* comms, double timeout)
+size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
{
if (timeout < 0.0) {
wait_all(comms);
- return comms->size();
+ return comms.size();
}
double deadline = Engine::get_clock() + timeout;
std::vector<CommPtr> waited_comm(1, nullptr);
- for (size_t i = 0; i < comms->size(); i++) {
+ for (size_t i = 0; i < comms.size(); i++) {
double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
- waited_comm[0] = (*comms)[i];
+ waited_comm[0] = comms[i];
// Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
- if (wait_any_for(&waited_comm, wait_timeout) == -1) {
+ if (wait_any_for(waited_comm, wait_timeout) == -1) {
XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
return i;
}
}
- return comms->size();
+ return comms.size();
}
CommPtr Comm::set_rate(double rate)
return this;
}
-int Comm::test_any(const std::vector<CommPtr>* comms)
+int Comm::test_any(const std::vector<CommPtr>& comms)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms->size());
- std::transform(begin(*comms), end(*comms), begin(rcomms),
+ std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
+ std::transform(begin(comms), end(comms), begin(rcomms),
[](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
int changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
if (changed_pos != -1)
- comms->at(changed_pos)->complete(State::FINISHED);
+ comms.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
for (size_t i = 0; i < count; i++)
s4u_comms.emplace_back(comms[i], false);
- size_t pos = simgrid::s4u::Comm::wait_all_for(&s4u_comms, timeout);
+ size_t pos = simgrid::s4u::Comm::wait_all_for(s4u_comms, timeout);
for (size_t i = pos; i < count; i++)
s4u_comms[i]->add_ref();
return pos;
for (unsigned int i = 0; i < count; i++)
s4u_comms.emplace_back(comms[i], false);
- int pos = simgrid::s4u::Comm::wait_any_for(&s4u_comms, timeout);
+ int pos = simgrid::s4u::Comm::wait_any_for(s4u_comms, timeout);
for (unsigned i = 0; i < count; i++) {
if (pos != -1 && static_cast<unsigned>(pos) != i)
s4u_comms[i]->add_ref();
return this;
}
-int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
+int Exec::wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
{
- std::vector<kernel::activity::ExecImpl*> rexecs(execs->size());
- std::transform(begin(*execs), end(*execs), begin(rexecs),
+ std::vector<kernel::activity::ExecImpl*> rexecs(execs.size());
+ std::transform(begin(execs), end(execs), begin(rexecs),
[](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
},
&observer);
if (changed_pos != -1)
- execs->at(changed_pos)->complete(State::FINISHED);
+ execs.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
for (unsigned int i = 0; i < count; i++)
s4u_execs.emplace_back(execs[i], false);
- int pos = simgrid::s4u::Exec::wait_any_for(&s4u_execs, timeout);
+ int pos = simgrid::s4u::Exec::wait_any_for(s4u_execs, timeout);
for (unsigned i = 0; i < count; i++) {
if (pos != -1 && static_cast<unsigned>(pos) != i)
s4u_execs[i]->add_ref();
return this;
}
-int Io::wait_any_for(std::vector<IoPtr>* ios, double timeout)
+int Io::wait_any_for(const std::vector<IoPtr>& ios, double timeout)
{
- std::vector<kernel::activity::IoImpl*> rios(ios->size());
- std::transform(begin(*ios), end(*ios), begin(rios),
+ std::vector<kernel::activity::IoImpl*> rios(ios.size());
+ std::transform(begin(ios), end(ios), begin(rios),
[](const IoPtr& io) { return static_cast<kernel::activity::IoImpl*>(io->pimpl_.get()); });
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
},
&observer);
if (changed_pos != -1)
- ios->at(changed_pos)->complete(State::FINISHED);
+ ios.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
int* data;
simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name("mb")->get_async<int>(&data);
std::vector<simgrid::s4u::CommPtr> pending_comms = {comm};
- REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(&pending_comms));
+ REQUIRE_NETWORK_FAILURE(simgrid::s4u::Comm::wait_any(pending_comms));
});
simgrid::s4u::ActorPtr sender = simgrid::s4u::Actor::create("sender", all_hosts[2], []() {
try {
std::vector<Activity> activities = {activity};
XBT_DEBUG("calling wait_any_for(%f)", duration);
- int index = Activity::element_type::wait_any_for(&activities, duration);
+ int index = Activity::element_type::wait_any_for(activities, duration);
if (index == -1) {
XBT_DEBUG("wait_any_for() timed out");
INFO("wait_any_for() timeout should expire at expected date: " << timeout);
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(&pending_comms);
+ sg4::Comm::wait_all(pending_comms);
XBT_INFO("Goodbye now!");
}
std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
while (not comms.empty()) {
- size_t index = simgrid::s4u::Comm::wait_all_for(&comms, 0.5);
+ size_t index = simgrid::s4u::Comm::wait_all_for(comms, 0.5);
if (index < comms.size())
XBT_INFO("wait_all_for: Timeout reached");
XBT_INFO("wait_all_for: %zu comms finished (#comms=%zu)", index, comms.size());
std::vector<simgrid::s4u::CommPtr> comms = {put1, put2, get1, get2};
while (not comms.empty()) {
- int index = simgrid::s4u::Comm::wait_any_for(&comms, 0.5);
+ int index = simgrid::s4u::Comm::wait_any_for(comms, 0.5);
if (index < 0)
XBT_INFO("wait_any_for: Timeout reached");
else {