From 45ae0195c6ad8232415288c5da07b4bdc23a31b4 Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Fri, 26 May 2017 22:29:58 +0200 Subject: [PATCH] Another step toward working CommPtr. chord example is broken ATM --- examples/s4u/dht-chord/node.cpp | 58 ++-------- examples/s4u/dht-chord/s4u_dht-chord.hpp | 11 +- include/simgrid/forward.h | 25 +++-- include/simgrid/s4u.hpp | 1 + include/simgrid/s4u/Activity.hpp | 7 +- include/simgrid/s4u/Actor.hpp | 5 +- include/simgrid/s4u/Comm.hpp | 20 +++- include/simgrid/s4u/forward.hpp | 1 + src/kernel/activity/ActivityImpl.cpp | 6 +- src/kernel/activity/ActivityImpl.hpp | 5 +- src/s4u/s4u_actor.cpp | 34 +++--- src/s4u/s4u_comm.cpp | 93 ++++++++++++---- src/simix/ActorImpl.cpp | 2 +- src/simix/smx_network.cpp | 116 ++++++++++---------- teshsuite/s4u/listen_async/listen_async.cpp | 6 +- 15 files changed, 218 insertions(+), 172 deletions(-) diff --git a/examples/s4u/dht-chord/node.cpp b/examples/s4u/dht-chord/node.cpp index f9ed22a7e8..d0ee0e92a2 100644 --- a/examples/s4u/dht-chord/node.cpp +++ b/examples/s4u/dht-chord/node.cpp @@ -229,10 +229,10 @@ void Node::checkPredecessor() // receive the answer XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_, message->answer_to->name()); - simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); + simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); try { - comm.wait(timeout); + comm->wait(timeout); XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_); delete message; } catch (xbt_ex& e) { @@ -274,10 +274,10 @@ int Node::remoteGetPredecessor(int ask_to) // receive the answer XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to, message->answer_to->name()); - simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); + simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); try { - comm.wait(timeout); + comm->wait(timeout); ChordMessage* answer = static_cast(data); XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to, answer->answer_id); @@ -348,10 +348,10 @@ int Node::remoteFindSuccessor(int ask_to, int id) } // receive the answer XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id); - simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); + simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data); try { - comm.wait(timeout); + comm->wait(timeout); ChordMessage* answer = static_cast(data); XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d", answer->request_id, id_, answer->answer_id); @@ -387,15 +387,7 @@ void Node::remoteNotify(int notify_id, int predecessor_candidate_id) // send a "Notify" request to notify_id XBT_DEBUG("Sending a 'Notify' request to %d", notify_id); simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(notify_id)); - try { - // TODO make it a dsend - simgrid::s4u::this_actor::isend(mailbox, message, 10); - } catch (xbt_ex& e) { - if (e.category == timeout_error) { - XBT_DEBUG("Send of 'Notify' failed due to an expired timeout on receiver side"); - delete message; - } - } + simgrid::s4u::this_actor::dsend(mailbox, message, 10); } /* This function is called periodically. It checks the immediate successor of the current node. */ @@ -437,28 +429,14 @@ void Node::handleMessage(ChordMessage* message) message->answer_id = fingers_[0]; XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d", message->issuer_host_name.c_str(), message->answer_to->name(), message->request_id, message->answer_id); - // TODO Replace by dsend - try { - simgrid::s4u::this_actor::isend(message->answer_to, message, 10); - } catch(xbt_ex& e) { - if (e.category == timeout_error) { - XBT_DEBUG("Send of 'Find Successor Answer' failed due du an expired timeout on receiver side"); - } - } + simgrid::s4u::this_actor::dsend(message->answer_to, message, 10); } else { // otherwise, forward the request to the closest preceding finger in my table int closest = closestPrecedingFinger(message->request_id); XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d", message->request_id, closest); simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(closest)); - //TODO make it a dsend - try{ - simgrid::s4u::this_actor::isend(mailbox, message, 10); - } catch (xbt_ex& e) { - if (e.category == timeout_error) { - XBT_DEBUG("Forward of 'Find Successor' failed due du an expired timeout on receiver side"); - } - } + simgrid::s4u::this_actor::dsend(mailbox, message, 10); } break; @@ -468,14 +446,7 @@ void Node::handleMessage(ChordMessage* message) message->answer_id = pred_id_; XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d", message->issuer_host_name.c_str(), message->answer_to->name(), message->answer_id); - //TODO make it a dsend - try{ - simgrid::s4u::this_actor::isend(message->answer_to, message, 10); - } catch (xbt_ex& e) { - if (e.category == timeout_error) { - XBT_DEBUG("Send of 'Get Predecessor Answer' failed due du an expired timeout on receiver side"); - } - } + simgrid::s4u::this_actor::dsend(message->answer_to, message, 10); break; case NOTIFY: @@ -513,14 +484,7 @@ void Node::handleMessage(ChordMessage* message) message->type = PREDECESSOR_ALIVE_ANSWER; XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)", message->issuer_host_name.c_str(), message->answer_to->name()); - //TODO Make it a dsend - try{ - simgrid::s4u::this_actor::isend(message->answer_to, message, 10); - } catch (xbt_ex& e) { - if (e.category == timeout_error) { - XBT_DEBUG("Send of 'Predecessor Alive' failed due du an expired timeout on receiver side"); - } - } + simgrid::s4u::this_actor::dsend(message->answer_to, message, 10); break; default: diff --git a/examples/s4u/dht-chord/s4u_dht-chord.hpp b/examples/s4u/dht-chord/s4u_dht-chord.hpp index c3a3a53f31..60f3d6b964 100644 --- a/examples/s4u/dht-chord/s4u_dht-chord.hpp +++ b/examples/s4u/dht-chord/s4u_dht-chord.hpp @@ -120,7 +120,6 @@ public: if (not joined) return; - ChordMessage* message = nullptr; void* data = nullptr; double now = simgrid::s4u::Engine::getClock(); double next_stabilize_date = start_time_ + PERIODIC_STABILIZE_DELAY; @@ -129,9 +128,9 @@ public: double next_lookup_date = start_time_ + PERIODIC_LOOKUP_DELAY; while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME) { - data = nullptr; - simgrid::s4u::Comm& comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data); - while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive.test()) { + data = nullptr; + simgrid::s4u::CommPtr comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data); + while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive->test()) { // no task was received: make some periodic calls if (now >= next_stabilize_date) { stabilize(); @@ -153,8 +152,10 @@ public: } if (data != nullptr) { - message = static_cast(data); + ChordMessage* message = static_cast(data); handleMessage(message); + } else { + comm_receive->cancel(); } now = simgrid::s4u::Engine::getClock(); } diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 8ae6a89cd4..9515436c61 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -9,19 +9,24 @@ #ifdef __cplusplus +#include "xbt/base.h" #include namespace simgrid { - namespace s4u { - class Actor; - class Host; - class Link; - class Mailbox; - class NetZone; - } - namespace kernel { - namespace activity { - class ActivityImpl; +namespace s4u { +class Actor; +class Comm; +class Host; +class Link; +class Mailbox; +class NetZone; + +XBT_PUBLIC(void) intrusive_ptr_release(Comm* c); +XBT_PUBLIC(void) intrusive_ptr_add_ref(Comm* c); +} +namespace kernel { +namespace activity { +class ActivityImpl; } namespace routing { class NetPoint; diff --git a/include/simgrid/s4u.hpp b/include/simgrid/s4u.hpp index fe0ec582ac..ee32b446d5 100644 --- a/include/simgrid/s4u.hpp +++ b/include/simgrid/s4u.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 1cdd79eab0..1a6004a623 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -15,9 +15,7 @@ #include #include -typedef enum { - inited, started, finished -} e_s4u_activity_state_t; +typedef enum { inited = 0, started = 1, canceled = 2, errored, finished } e_s4u_activity_state_t; namespace simgrid { namespace s4u { @@ -28,6 +26,9 @@ namespace s4u { */ XBT_PUBLIC_CLASS Activity { friend Comm; + friend void intrusive_ptr_release(Comm * c); + friend void intrusive_ptr_add_ref(Comm * c); + protected: Activity(); virtual ~Activity(); diff --git a/include/simgrid/s4u/Actor.hpp b/include/simgrid/s4u/Actor.hpp index 6eabdf592c..4914e5eb29 100644 --- a/include/simgrid/s4u/Actor.hpp +++ b/include/simgrid/s4u/Actor.hpp @@ -310,7 +310,7 @@ namespace this_actor { * See \ref Comm for the full communication API (including non blocking communications). */ XBT_PUBLIC(void*) recv(MailboxPtr chan); - XBT_PUBLIC(Comm&) irecv(MailboxPtr chan, void** data); + XBT_PUBLIC(CommPtr) irecv(MailboxPtr chan, void** data); /** Block the actor until it delivers a message of the given simulated size to the given mailbox * @@ -319,7 +319,8 @@ namespace this_actor { XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize); XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize, double timeout); - XBT_PUBLIC(Comm&) isend(MailboxPtr chan, void* payload, double simulatedSize); + XBT_PUBLIC(CommPtr) isend(MailboxPtr chan, void* payload, double simulatedSize); + XBT_PUBLIC(void) dsend(MailboxPtr chan, void* payload, double simulatedSize); /** @brief Returns the actor ID of the current actor (same as pid). */ XBT_PUBLIC(aid_t) pid(); diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index e2db122157..d889b61251 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -14,7 +14,6 @@ namespace simgrid { namespace s4u { - /** @brief Communication async * * Represents all asynchronous communications, that you can test or wait onto. @@ -23,7 +22,10 @@ XBT_PUBLIC_CLASS Comm : public Activity { Comm() : Activity() {} public: - virtual ~Comm() = default; + friend void intrusive_ptr_release(simgrid::s4u::Comm * c); + friend void intrusive_ptr_add_ref(simgrid::s4u::Comm * c); + + virtual ~Comm(); /*! take a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an * iterator on the finished Comms. */ @@ -73,13 +75,16 @@ public: return res; } /** Creates (but don't start) an async send to the mailbox @p dest */ - static Comm& send_init(MailboxPtr dest); + static CommPtr send_init(MailboxPtr dest); /** Creates and start an async send to the mailbox @p dest */ - static Comm& send_async(MailboxPtr dest, void* data, int simulatedByteAmount); + static CommPtr send_async(MailboxPtr dest, void* data, int simulatedByteAmount); /** Creates (but don't start) an async recv onto the mailbox @p from */ - static Comm& recv_init(MailboxPtr from); + static CommPtr recv_init(MailboxPtr from); /** Creates and start an async recv to the mailbox @p from */ - static Comm& recv_async(MailboxPtr from, void** data); + static CommPtr recv_async(MailboxPtr from, void** data); + /** Creates and start a detached send to the mailbox @p dest + * TODO: make it possible to detach an already created comm */ + static void send_detached(MailboxPtr dest, void* data, int simulatedSize); void start() override; void wait() override; @@ -103,6 +108,7 @@ public: size_t getDstDataSize(); bool test(); + void cancel(); private: double rate_ = -1; @@ -120,6 +126,8 @@ private: smx_actor_t sender_ = nullptr; smx_actor_t receiver_ = nullptr; MailboxPtr mailbox_ = nullptr; + + std::atomic_int_fast32_t refcount_{0}; }; } } // namespace simgrid::s4u diff --git a/include/simgrid/s4u/forward.hpp b/include/simgrid/s4u/forward.hpp index 834163240c..10c8d856bb 100644 --- a/include/simgrid/s4u/forward.hpp +++ b/include/simgrid/s4u/forward.hpp @@ -16,6 +16,7 @@ using ActorPtr = boost::intrusive_ptr; class Activity; class Comm; +using CommPtr = boost::intrusive_ptr; class Engine; class Host; class Mailbox; diff --git a/src/kernel/activity/ActivityImpl.cpp b/src/kernel/activity/ActivityImpl.cpp index e6723849ac..08e2a63924 100644 --- a/src/kernel/activity/ActivityImpl.cpp +++ b/src/kernel/activity/ActivityImpl.cpp @@ -13,13 +13,15 @@ void simgrid::kernel::activity::ActivityImpl::ref() refcount++; } -void simgrid::kernel::activity::ActivityImpl::unref() +bool simgrid::kernel::activity::ActivityImpl::unref() { xbt_assert(refcount > 0, "This activity has a negative refcount! You can only call test() or wait() once per activity."); refcount--; if (refcount>0) - return; + return false; delete this; + + return true; } diff --git a/src/kernel/activity/ActivityImpl.hpp b/src/kernel/activity/ActivityImpl.hpp index 4791484309..13c284a285 100644 --- a/src/kernel/activity/ActivityImpl.hpp +++ b/src/kernel/activity/ActivityImpl.hpp @@ -48,8 +48,11 @@ namespace activity { delete activity; } + /** @brief Increase the refcount */ void ref(); - void unref(); + /** @brief Reduce the refcount; returns true if the object was destroyed */ + bool unref(); + private: std::atomic_int_fast32_t refcount_{1}; int refcount = 1; diff --git a/src/s4u/s4u_actor.cpp b/src/s4u/s4u_actor.cpp index 1338b825ac..010a9d801d 100644 --- a/src/s4u/s4u_actor.cpp +++ b/src/s4u/s4u_actor.cpp @@ -187,36 +187,40 @@ e_smx_state_t execute(double flops) { void* recv(MailboxPtr chan) { void *res = nullptr; - Comm& c = Comm::recv_init(chan); - c.setDstData(&res,sizeof(res)); - c.wait(); + CommPtr c = Comm::recv_init(chan); + c->setDstData(&res, sizeof(res)); + c->wait(); return res; } void send(MailboxPtr chan, void* payload, double simulatedSize) { - Comm& c = Comm::send_init(chan); - c.setRemains(simulatedSize); - c.setSrcData(payload); - // c.start() is optional. - c.wait(); + CommPtr c = Comm::send_init(chan); + c->setRemains(simulatedSize); + c->setSrcData(payload); + // c->start() is optional. + c->wait(); } void send(MailboxPtr chan, void* payload, double simulatedSize, double timeout) { - Comm& c = Comm::send_init(chan); - c.setRemains(simulatedSize); - c.setSrcData(payload); - // c.start() is optional. - c.wait(timeout); + CommPtr c = Comm::send_init(chan); + c->setRemains(simulatedSize); + c->setSrcData(payload); + // c->start() is optional. + c->wait(timeout); } -Comm& isend(MailboxPtr chan, void* payload, double simulatedSize) +CommPtr isend(MailboxPtr chan, void* payload, double simulatedSize) { return Comm::send_async(chan, payload, simulatedSize); } +void dsend(MailboxPtr chan, void* payload, double simulatedSize) +{ + Comm::send_detached(chan, payload, simulatedSize); +} -Comm& irecv(MailboxPtr chan, void** data) +CommPtr irecv(MailboxPtr chan, void** data) { return Comm::recv_async(chan, data); } diff --git a/src/s4u/s4u_comm.cpp b/src/s4u/s4u_comm.cpp index 9ecedf2a27..da54cdd23a 100644 --- a/src/s4u/s4u_comm.cpp +++ b/src/s4u/s4u_comm.cpp @@ -13,19 +13,34 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm,s4u_activity,"S4U asynchronous communic namespace simgrid { namespace s4u { +Comm::~Comm() +{ + if (state_ == started && not detached_ && (pimpl_ == nullptr || pimpl_->state == SIMIX_RUNNING)) { + XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, state_); + if (pimpl_ != nullptr) + XBT_INFO("pimpl_->state: %d", pimpl_->state); + else + XBT_INFO("pimpl_ is null"); + xbt_backtrace_display_current(); + } + if (pimpl_) + pimpl_->unref(); +} -s4u::Comm &Comm::send_init(s4u::MailboxPtr chan) { - s4u::Comm *res = new s4u::Comm(); +s4u::CommPtr Comm::send_init(s4u::MailboxPtr chan) +{ + CommPtr res = CommPtr(new s4u::Comm()); res->sender_ = SIMIX_process_self(); res->mailbox_ = chan; - return *res; + return res; } -s4u::Comm &Comm::recv_init(s4u::MailboxPtr chan) { - s4u::Comm *res = new s4u::Comm(); +s4u::CommPtr Comm::recv_init(s4u::MailboxPtr chan) +{ + CommPtr res = CommPtr(new s4u::Comm()); res->receiver_ = SIMIX_process_self(); res->mailbox_ = chan; - return *res; + return res; } void Comm::setRate(double rate) { @@ -82,6 +97,11 @@ void Comm::start() { } else { xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver"); } + while (refcount_ > 1) { // Pass all the refcounts we had to the underlying pimpl since we are delegating the + // refcounting to it afterward + refcount_--; + pimpl_->ref(); + } state_ = started; } void Comm::wait() { @@ -89,7 +109,7 @@ void Comm::wait() { if (state_ == started) simcall_comm_wait(pimpl_, -1/*timeout*/); - else {// p_state == inited. Save a simcall and do directly a blocking send/recv + else { // state_ == inited. Save a simcall and do directly a blocking send/recv if (srcBuff_ != nullptr) { simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_, srcBuff_, srcBuffSize_, @@ -102,7 +122,6 @@ void Comm::wait() { } } state_ = finished; - delete this; } void Comm::wait(double timeout) { xbt_assert(state_ == started || state_ == inited); @@ -125,25 +144,40 @@ void Comm::wait(double timeout) { userData_, timeout, rate_); } state_ = finished; - delete this; } -s4u::Comm &Comm::send_async(MailboxPtr dest, void *data, int simulatedSize) { - s4u::Comm &res = s4u::Comm::send_init(dest); - res.setRemains(simulatedSize); - res.srcBuff_ = data; - res.srcBuffSize_ = sizeof(void*); - res.start(); +void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize) +{ + s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest)); + res->setRemains(simulatedSize); + res->srcBuff_ = data; + res->srcBuffSize_ = sizeof(void*); + res->detached_ = true; + res->start(); +} +s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize) +{ + s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest)); + res->setRemains(simulatedSize); + res->srcBuff_ = data; + res->srcBuffSize_ = sizeof(void*); + res->start(); return res; } -s4u::Comm &Comm::recv_async(MailboxPtr dest, void **data) { - s4u::Comm &res = s4u::Comm::recv_init(dest); - res.setDstData(data, sizeof(*data)); - res.start(); +s4u::CommPtr Comm::recv_async(MailboxPtr dest, void** data) +{ + s4u::CommPtr res = CommPtr(s4u::Comm::recv_init(dest)); + res->setDstData(data, sizeof(*data)); + res->start(); return res; } +void Comm::cancel() +{ + simgrid::kernel::activity::Comm* commPimpl = static_cast(pimpl_); + commPimpl->cancel(); +} bool Comm::test() { xbt_assert(state_ == inited || state_ == started || state_ == finished); @@ -156,11 +190,30 @@ bool Comm::test() { if(simcall_comm_test(pimpl_)){ state_ = finished; - delete this; return true; } return false; } +void intrusive_ptr_release(simgrid::s4u::Comm* c) +{ + if (c->pimpl_ != nullptr) { + if (c->pimpl_->unref()) { + c->pimpl_ = nullptr; + delete c; + } + } else if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + delete c; + } +} +void intrusive_ptr_add_ref(simgrid::s4u::Comm* c) +{ + if (c->pimpl_ != nullptr) { + c->pimpl_->ref(); + } else { + c->refcount_.fetch_add(1, std::memory_order_relaxed); + } } } +} // namespaces diff --git a/src/simix/ActorImpl.cpp b/src/simix/ActorImpl.cpp index dca4a3b11c..eda80779b2 100644 --- a/src/simix/ActorImpl.cpp +++ b/src/simix/ActorImpl.cpp @@ -121,7 +121,7 @@ void SIMIX_process_cleanup(smx_actor_t process) /* the comm will be freed right now, remove it from the sender */ comm->src_proc->comms.remove(comm); } - SIMIX_comm_unref(comm); + // SIMIX_comm_unref(comm); } else { xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro); } diff --git a/src/simix/smx_network.cpp b/src/simix/smx_network.cpp index 6233044326..209902ce24 100644 --- a/src/simix/smx_network.cpp +++ b/src/simix/smx_network.cpp @@ -12,6 +12,7 @@ #include "simgrid/s4u/Host.hpp" #include "mc/mc.h" +#include "simgrid/s4u/Activity.hpp" #include "simgrid/s4u/Mailbox.hpp" #include "src/mc/mc_replay.h" #include "src/simix/smx_private.h" @@ -184,8 +185,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void * void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one void *data, double rate) { - XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue); simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE); + XBT_DEBUG("recv from %p %p. this_synchro=%p", mbox, &mbox->comm_queue, this_synchro); simgrid::kernel::activity::Comm* other_comm; //communication already done, get it inside the list of completed comms @@ -536,63 +537,62 @@ void SIMIX_comm_finish(smx_activity_t synchro) simcall->issuer->context->iwannadie = 1; SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); } else { - switch (synchro->state) { - - case SIMIX_DONE: - XBT_DEBUG("Communication %p complete!", synchro); - SIMIX_comm_copy_data(synchro); - break; - - case SIMIX_SRC_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender"); - break; - - case SIMIX_DST_TIMEOUT: - SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver"); - break; - - case SIMIX_SRC_HOST_FAILURE: - if (simcall->issuer == comm->src_proc) - simcall->issuer->context->iwannadie = 1; - // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); - else - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); - break; - - case SIMIX_DST_HOST_FAILURE: - if (simcall->issuer == comm->dst_proc) - simcall->issuer->context->iwannadie = 1; - // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); - else - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); - break; - - case SIMIX_LINK_FAILURE: - - XBT_DEBUG( - "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d", - synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr, - comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer, - comm->detached); - if (comm->src_proc == simcall->issuer) { - XBT_DEBUG("I'm source"); - } else if (comm->dst_proc == simcall->issuer) { - XBT_DEBUG("I'm dest"); - } else { - XBT_DEBUG("I'm neither source nor dest"); - } - SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure"); - break; - - case SIMIX_CANCELED: - if (simcall->issuer == comm->dst_proc) - SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender"); - else - SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver"); - break; - - default: - xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state); + switch (comm->state) { + + case SIMIX_DONE: + XBT_DEBUG("Communication %p complete!", synchro); + SIMIX_comm_copy_data(synchro); + break; + + case SIMIX_SRC_TIMEOUT: + SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender"); + break; + + case SIMIX_DST_TIMEOUT: + SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver"); + break; + + case SIMIX_SRC_HOST_FAILURE: + if (simcall->issuer == comm->src_proc) + simcall->issuer->context->iwannadie = 1; + // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); + else + SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); + break; + + case SIMIX_DST_HOST_FAILURE: + if (simcall->issuer == comm->dst_proc) + simcall->issuer->context->iwannadie = 1; + // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed"); + else + SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed"); + break; + + case SIMIX_LINK_FAILURE: + XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) " + "detached:%d", + synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr, + comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer, + comm->detached); + if (comm->src_proc == simcall->issuer) { + XBT_DEBUG("I'm source"); + } else if (comm->dst_proc == simcall->issuer) { + XBT_DEBUG("I'm dest"); + } else { + XBT_DEBUG("I'm neither source nor dest"); + } + SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure"); + break; + + case SIMIX_CANCELED: + if (simcall->issuer == comm->dst_proc) + SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender"); + else + SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver"); + break; + + default: + xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state); } } diff --git a/teshsuite/s4u/listen_async/listen_async.cpp b/teshsuite/s4u/listen_async/listen_async.cpp index 9cae4a2e17..3b3b923d45 100644 --- a/teshsuite/s4u/listen_async/listen_async.cpp +++ b/teshsuite/s4u/listen_async/listen_async.cpp @@ -17,7 +17,7 @@ static void server() { simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName("mailbox"); - simgrid::s4u::this_actor::isend(mailbox, xbt_strdup("Some data"), 0); + simgrid::s4u::CommPtr sendComm = simgrid::s4u::this_actor::isend(mailbox, xbt_strdup("Some data"), 0); xbt_assert(mailbox->listen()); // True (1) XBT_INFO("Task listen works on regular mailboxes"); @@ -26,11 +26,12 @@ static void server() xbt_assert(not strcmp("Some data", res), "Data received: %s", res); XBT_INFO("Data successfully received from regular mailbox"); xbt_free(res); + sendComm->wait(); simgrid::s4u::MailboxPtr mailbox2 = simgrid::s4u::Mailbox::byName("mailbox2"); mailbox2->setReceiver(simgrid::s4u::Actor::self()); - simgrid::s4u::this_actor::isend(mailbox2, xbt_strdup("More data"), 0); + simgrid::s4u::this_actor::dsend(mailbox2, xbt_strdup("More data"), 0); xbt_assert(mailbox2->listen()); // used to break. XBT_INFO("Task listen works on asynchronous mailboxes"); @@ -40,6 +41,7 @@ static void server() xbt_free(res); XBT_INFO("Data successfully received from asynchronous mailbox"); + XBT_DEBUG("comm:%p", sendComm); } int main(int argc, char* argv[]) -- 2.20.1