From 7752623c80751cf41b04ab007c50bd579bcf50e0 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Thu, 30 Jan 2020 11:53:13 +0100 Subject: [PATCH] dependencies support for Comm (needs testing) --- examples/s4u/async-ready/s4u-async-ready.tesh | 2 +- include/simgrid/s4u/Activity.hpp | 1 - src/s4u/s4u_Comm.cpp | 29 ++++++++++++++----- src/s4u/s4u_Mailbox.cpp | 9 ++++-- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/examples/s4u/async-ready/s4u-async-ready.tesh b/examples/s4u/async-ready/s4u-async-ready.tesh index d5d259156b..9b6d9a8e69 100644 --- a/examples/s4u/async-ready/s4u-async-ready.tesh +++ b/examples/s4u/async-ready/s4u-async-ready.tesh @@ -86,6 +86,6 @@ $ ${bindir:=.}/s4u-async-ready ${platfdir}/small_platform_fatpipe.xml s4u-async- > [ 0.110000] (3:peer@Perl) I got a 'Message 5 from peer 1'. > [ 0.110000] (3:peer@Perl) I got a 'finalize'. > [ 0.110000] (3:peer@Perl) I'm done, just waiting for my peers to receive the messages before exiting -> [ 0.110000] (3:peer@Perl) Goodbye now! > [ 0.110000] (1:peer@Tremblay) Goodbye now! > [ 0.110000] (2:peer@Ruby) Goodbye now! +> [ 0.110000] (3:peer@Perl) Goodbye now! diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 1c87d16e4d..87fb4f61e0 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -105,7 +105,6 @@ public: { state_ = State::STARTING; if (not has_dependencies()) { - state_ = State::STARTED; XBT_CDEBUG(s4u_activity, "All dependencies are solved, let's start '%s'", get_cname()); start(); } diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index b9a88112df..8ccb8628d5 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -36,7 +36,10 @@ int Comm::wait_any_for(const std::vector* comms, double timeout) std::unique_ptr rcomms(new kernel::activity::CommImpl*[comms->size()]); std::transform(begin(*comms), end(*comms), rcomms.get(), [](const CommPtr& comm) { return static_cast(comm->pimpl_.get()); }); - return simcall_comm_waitany(rcomms.get(), comms->size(), timeout); + int changed_pos = simcall_comm_waitany(rcomms.get(), comms->size(), timeout); + if (changed_pos != -1) + comms->at(changed_pos)->release_dependencies(); + return changed_pos; } void Comm::wait_all(const std::vector* comms) @@ -82,6 +85,7 @@ CommPtr Comm::set_src_data(void* buff, size_t size) src_buff_size_ = size; return this; } + CommPtr Comm::set_dst_data(void** buff) { xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)", @@ -116,8 +120,8 @@ CommPtr Comm::set_tracing_category(const std::string& category) Comm* Comm::start() { - xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)", - __FUNCTION__); + xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, + "You cannot use %s() once your communication started (not implemented)", __FUNCTION__); if (src_buff_ != nullptr) { // Sender side on_sender_start(*Actor::self()); @@ -154,7 +158,8 @@ Comm* Comm::wait_for(double timeout) case State::FINISHED: break; - case State::INITED: // It's not started yet. Do it in one simcall + case State::INITED: + case State::STARTING: // It's not started yet. Do it in one simcall if (src_buff_ != nullptr) { on_sender_start(*Actor::self()); simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_, @@ -166,12 +171,14 @@ Comm* Comm::wait_for(double timeout) get_user_data(), timeout, rate_); } state_ = State::FINISHED; + this->release_dependencies(); break; case State::STARTED: simcall_comm_wait(pimpl_, timeout); on_completion(*Actor::self()); state_ = State::FINISHED; + this->release_dependencies(); break; case State::CANCELED: @@ -182,12 +189,16 @@ Comm* Comm::wait_for(double timeout) } return this; } + int Comm::test_any(const std::vector* comms) { std::unique_ptr rcomms(new kernel::activity::CommImpl*[comms->size()]); std::transform(begin(*comms), end(*comms), rcomms.get(), [](const CommPtr& comm) { return static_cast(comm->pimpl_.get()); }); - return simcall_comm_testany(rcomms.get(), comms->size()); + int changed_pos = simcall_comm_testany(rcomms.get(), comms->size()); + if (changed_pos != -1) + comms->at(changed_pos)->release_dependencies(); + return changed_pos; } Comm* Comm::detach() @@ -196,7 +207,8 @@ Comm* Comm::detach() __FUNCTION__); xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs"); detached_ = true; - return start(); + vetoable_start(); + return this; } Comm* Comm::cancel() @@ -216,11 +228,12 @@ bool Comm::test() if (state_ == State::FINISHED) return true; - if (state_ == State::INITED) - this->start(); + if (state_ == State::INITED || state_ == State::STARTING) + this->vetoable_start(); if (simcall_comm_test(pimpl_)) { state_ = State::FINISHED; + this->release_dependencies(); return true; } return false; diff --git a/src/s4u/s4u_Mailbox.cpp b/src/s4u/s4u_Mailbox.cpp index 3fd04ac0cc..810fd7f231 100644 --- a/src/s4u/s4u_Mailbox.cpp +++ b/src/s4u/s4u_Mailbox.cpp @@ -94,7 +94,7 @@ s4u::CommPtr Mailbox::put_async(void* payload, uint64_t simulated_size_in_bytes) xbt_assert(payload != nullptr, "You cannot send nullptr"); s4u::CommPtr res = put_init(payload, simulated_size_in_bytes); - res->start(); + res->vetoable_start(); return res; } void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes) @@ -104,6 +104,7 @@ void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes) CommPtr c = put_init(); c->set_remaining(simulated_size_in_bytes); c->set_src_data(payload); + c->vetoable_start(); c->wait(); } /** Blocking send with timeout */ @@ -114,7 +115,7 @@ void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes, double timeou CommPtr c = put_init(); c->set_remaining(simulated_size_in_bytes); c->set_src_data(payload); - // c->start() is optional. + c->vetoable_start(); c->wait_for(timeout); } @@ -129,7 +130,7 @@ s4u::CommPtr Mailbox::get_async(void** data) { s4u::CommPtr res = get_init(); res->set_dst_data(data, sizeof(*data)); - res->start(); + res->vetoable_start(); return res; } @@ -138,6 +139,7 @@ void* Mailbox::get() void* res = nullptr; CommPtr c = get_init(); c->set_dst_data(&res, sizeof(res)); + c->vetoable_start(); c->wait(); return res; } @@ -146,6 +148,7 @@ void* Mailbox::get(double timeout) void* res = nullptr; CommPtr c = get_init(); c->set_dst_data(&res, sizeof(res)); + c->vetoable_start(); c->wait_for(timeout); return res; } -- 2.20.1