Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Another step toward working CommPtr. chord example is broken ATM
authorMartin Quinson <martin.quinson@loria.fr>
Fri, 26 May 2017 20:29:58 +0000 (22:29 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Fri, 26 May 2017 20:29:58 +0000 (22:29 +0200)
15 files changed:
examples/s4u/dht-chord/node.cpp
examples/s4u/dht-chord/s4u_dht-chord.hpp
include/simgrid/forward.h
include/simgrid/s4u.hpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Actor.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/forward.hpp
src/kernel/activity/ActivityImpl.cpp
src/kernel/activity/ActivityImpl.hpp
src/s4u/s4u_actor.cpp
src/s4u/s4u_comm.cpp
src/simix/ActorImpl.cpp
src/simix/smx_network.cpp
teshsuite/s4u/listen_async/listen_async.cpp

index f9ed22a..d0ee0e9 100644 (file)
@@ -229,10 +229,10 @@ void Node::checkPredecessor()
   // receive the answer
   XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
             message->answer_to->name());
   // receive the answer
   XBT_DEBUG("Sent 'Predecessor Alive' request to %d, waiting for the answer on my mailbox '%s'", pred_id_,
             message->answer_to->name());
-  simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
+  simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
 
   try {
 
   try {
-    comm.wait(timeout);
+    comm->wait(timeout);
     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
     delete message;
   } catch (xbt_ex& e) {
     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
     delete message;
   } catch (xbt_ex& e) {
@@ -274,10 +274,10 @@ int Node::remoteGetPredecessor(int ask_to)
   // receive the answer
   XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
             message->answer_to->name());
   // receive the answer
   XBT_DEBUG("Sent 'Get Predecessor' request to %d, waiting for the answer on my mailbox '%s'", ask_to,
             message->answer_to->name());
-  simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
+  simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
 
   try {
 
   try {
-    comm.wait(timeout);
+    comm->wait(timeout);
     ChordMessage* answer = static_cast<ChordMessage*>(data);
     XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
               answer->answer_id);
     ChordMessage* answer = static_cast<ChordMessage*>(data);
     XBT_DEBUG("Received the answer to my 'Get Predecessor' request: the predecessor of node %d is %d", ask_to,
               answer->answer_id);
@@ -348,10 +348,10 @@ int Node::remoteFindSuccessor(int ask_to, int id)
   }
   // receive the answer
   XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
   }
   // receive the answer
   XBT_DEBUG("Sent a 'Find Successor' request to %d for key %d, waiting for the answer", ask_to, id);
-  simgrid::s4u::Comm& comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
+  simgrid::s4u::CommPtr comm = simgrid::s4u::this_actor::irecv(return_mailbox, &data);
 
   try {
 
   try {
-    comm.wait(timeout);
+    comm->wait(timeout);
     ChordMessage* answer = static_cast<ChordMessage*>(data);
     XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
               answer->request_id, id_, answer->answer_id);
     ChordMessage* answer = static_cast<ChordMessage*>(data);
     XBT_DEBUG("Received the answer to my 'Find Successor' request for id %d: the successor of key %d is %d",
               answer->request_id, id_, answer->answer_id);
@@ -387,15 +387,7 @@ void Node::remoteNotify(int notify_id, int predecessor_candidate_id)
   // send a "Notify" request to notify_id
   XBT_DEBUG("Sending a 'Notify' request to %d", notify_id);
   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(notify_id));
   // send a "Notify" request to notify_id
   XBT_DEBUG("Sending a 'Notify' request to %d", notify_id);
   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(notify_id));
-  try {
-    // TODO make it a dsend
-    simgrid::s4u::this_actor::isend(mailbox, message, 10);
-  } catch (xbt_ex& e) {
-    if (e.category == timeout_error) {
-      XBT_DEBUG("Send of 'Notify' failed due to an expired timeout on receiver side");
-      delete message;
-    }
-  }
+  simgrid::s4u::this_actor::dsend(mailbox, message, 10);
 }
 
 /* This function is called periodically. It checks the immediate successor of the current node. */
 }
 
 /* This function is called periodically. It checks the immediate successor of the current node. */
@@ -437,28 +429,14 @@ void Node::handleMessage(ChordMessage* message)
       message->answer_id = fingers_[0];
       XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
           message->issuer_host_name.c_str(), message->answer_to->name(), message->request_id, message->answer_id);
       message->answer_id = fingers_[0];
       XBT_DEBUG("Sending back a 'Find Successor Answer' to %s (mailbox %s): the successor of %d is %d",
           message->issuer_host_name.c_str(), message->answer_to->name(), message->request_id, message->answer_id);
-      // TODO Replace by dsend
-      try {
-        simgrid::s4u::this_actor::isend(message->answer_to, message, 10);
-      } catch(xbt_ex& e) {
-        if (e.category == timeout_error) {
-          XBT_DEBUG("Send of 'Find Successor Answer' failed due du an expired timeout on receiver side");
-        }
-      }
+      simgrid::s4u::this_actor::dsend(message->answer_to, message, 10);
     } else {
       // otherwise, forward the request to the closest preceding finger in my table
       int closest = closestPrecedingFinger(message->request_id);
       XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
           message->request_id, closest);
       simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(closest));
     } else {
       // otherwise, forward the request to the closest preceding finger in my table
       int closest = closestPrecedingFinger(message->request_id);
       XBT_DEBUG("Forwarding the 'Find Successor' request for id %d to my closest preceding finger %d",
           message->request_id, closest);
       simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName(std::to_string(closest));
-      //TODO make it a dsend
-      try{
-        simgrid::s4u::this_actor::isend(mailbox, message, 10);
-      } catch (xbt_ex& e) {
-        if (e.category == timeout_error) {
-          XBT_DEBUG("Forward of 'Find Successor' failed due du an expired timeout on receiver side");
-        }
-      }
+      simgrid::s4u::this_actor::dsend(mailbox, message, 10);
     }
     break;
 
     }
     break;
 
@@ -468,14 +446,7 @@ void Node::handleMessage(ChordMessage* message)
     message->answer_id = pred_id_;
     XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
         message->issuer_host_name.c_str(), message->answer_to->name(), message->answer_id);
     message->answer_id = pred_id_;
     XBT_DEBUG("Sending back a 'Get Predecessor Answer' to %s via mailbox '%s': my predecessor is %d",
         message->issuer_host_name.c_str(), message->answer_to->name(), message->answer_id);
-    //TODO make it a dsend
-    try{
-      simgrid::s4u::this_actor::isend(message->answer_to, message, 10);
-    } catch (xbt_ex& e) {
-      if (e.category == timeout_error) {
-        XBT_DEBUG("Send of 'Get Predecessor Answer' failed due du an expired timeout on receiver side");
-      }
-    }
+    simgrid::s4u::this_actor::dsend(message->answer_to, message, 10);
     break;
 
   case NOTIFY:
     break;
 
   case NOTIFY:
@@ -513,14 +484,7 @@ void Node::handleMessage(ChordMessage* message)
     message->type = PREDECESSOR_ALIVE_ANSWER;
     XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)",
         message->issuer_host_name.c_str(), message->answer_to->name());
     message->type = PREDECESSOR_ALIVE_ANSWER;
     XBT_DEBUG("Sending back a 'Predecessor Alive Answer' to %s (mailbox %s)",
         message->issuer_host_name.c_str(), message->answer_to->name());
-    //TODO Make it a dsend
-    try{
-      simgrid::s4u::this_actor::isend(message->answer_to, message, 10);
-    } catch (xbt_ex& e) {
-      if (e.category == timeout_error) {
-        XBT_DEBUG("Send of 'Predecessor Alive' failed due du an expired timeout on receiver side");
-      }
-    }
+    simgrid::s4u::this_actor::dsend(message->answer_to, message, 10);
     break;
 
   default:
     break;
 
   default:
index c3a3a53..60f3d6b 100644 (file)
@@ -120,7 +120,6 @@ public:
 
     if (not joined)
       return;
 
     if (not joined)
       return;
-    ChordMessage* message              = nullptr;
     void* data                         = nullptr;
     double now                         = simgrid::s4u::Engine::getClock();
     double next_stabilize_date         = start_time_ + PERIODIC_STABILIZE_DELAY;
     void* data                         = nullptr;
     double now                         = simgrid::s4u::Engine::getClock();
     double next_stabilize_date         = start_time_ + PERIODIC_STABILIZE_DELAY;
@@ -129,9 +128,9 @@ public:
     double next_lookup_date            = start_time_ + PERIODIC_LOOKUP_DELAY;
 
     while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME) {
     double next_lookup_date            = start_time_ + PERIODIC_LOOKUP_DELAY;
 
     while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME) {
-      data                             = nullptr;
-      simgrid::s4u::Comm& comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
-      while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive.test()) {
+      data                               = nullptr;
+      simgrid::s4u::CommPtr comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
+      while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive->test()) {
         // no task was received: make some periodic calls
         if (now >= next_stabilize_date) {
           stabilize();
         // no task was received: make some periodic calls
         if (now >= next_stabilize_date) {
           stabilize();
@@ -153,8 +152,10 @@ public:
       }
 
       if (data != nullptr) {
       }
 
       if (data != nullptr) {
-        message = static_cast<ChordMessage*>(data);
+        ChordMessage* message = static_cast<ChordMessage*>(data);
         handleMessage(message);
         handleMessage(message);
+      } else {
+        comm_receive->cancel();
       }
       now = simgrid::s4u::Engine::getClock();
     }
       }
       now = simgrid::s4u::Engine::getClock();
     }
index 8ae6a89..9515436 100644 (file)
@@ -9,19 +9,24 @@
 
 #ifdef __cplusplus
 
 
 #ifdef __cplusplus
 
+#include "xbt/base.h"
 #include <boost/intrusive_ptr.hpp>
 
 namespace simgrid {
 #include <boost/intrusive_ptr.hpp>
 
 namespace simgrid {
-  namespace s4u {
-  class Actor;
-  class Host;
-  class Link;
-  class Mailbox;
-  class NetZone;
-  }
-  namespace kernel {
-     namespace activity {
-       class ActivityImpl;
+namespace s4u {
+class Actor;
+class Comm;
+class Host;
+class Link;
+class Mailbox;
+class NetZone;
+
+XBT_PUBLIC(void) intrusive_ptr_release(Comm* c);
+XBT_PUBLIC(void) intrusive_ptr_add_ref(Comm* c);
+}
+namespace kernel {
+namespace activity {
+class ActivityImpl;
      }
      namespace routing {
      class NetPoint;
      }
      namespace routing {
      class NetPoint;
index fe0ec58..ee32b44 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/Actor.hpp>
 
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/Actor.hpp>
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Host.hpp>
 #include <simgrid/s4u/Link.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Host.hpp>
 #include <simgrid/s4u/Link.hpp>
index 1cdd79e..1a6004a 100644 (file)
@@ -15,9 +15,7 @@
 #include <simgrid/s4u/forward.hpp>
 #include <simgrid/forward.h>
 
 #include <simgrid/s4u/forward.hpp>
 #include <simgrid/forward.h>
 
-typedef enum {
-  inited, started, finished
-} e_s4u_activity_state_t;
+typedef enum { inited = 0, started = 1, canceled = 2, errored, finished } e_s4u_activity_state_t;
 
 namespace simgrid {
 namespace s4u {
 
 namespace simgrid {
 namespace s4u {
@@ -28,6 +26,9 @@ namespace s4u {
  */
 XBT_PUBLIC_CLASS Activity {
   friend Comm;
  */
 XBT_PUBLIC_CLASS Activity {
   friend Comm;
+  friend void intrusive_ptr_release(Comm * c);
+  friend void intrusive_ptr_add_ref(Comm * c);
+
 protected:
   Activity();
   virtual ~Activity();
 protected:
   Activity();
   virtual ~Activity();
index 6eabdf5..4914e5e 100644 (file)
@@ -310,7 +310,7 @@ namespace this_actor {
    * See \ref Comm for the full communication API (including non blocking communications).
    */
   XBT_PUBLIC(void*) recv(MailboxPtr chan);
    * See \ref Comm for the full communication API (including non blocking communications).
    */
   XBT_PUBLIC(void*) recv(MailboxPtr chan);
-  XBT_PUBLIC(Comm&) irecv(MailboxPtr chan, void** data);
+  XBT_PUBLIC(CommPtr) irecv(MailboxPtr chan, void** data);
 
   /** Block the actor until it delivers a message of the given simulated size to the given mailbox
    *
 
   /** Block the actor until it delivers a message of the given simulated size to the given mailbox
    *
@@ -319,7 +319,8 @@ namespace this_actor {
   XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize);
   XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize, double timeout);
 
   XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize);
   XBT_PUBLIC(void) send(MailboxPtr chan, void* payload, double simulatedSize, double timeout);
 
-  XBT_PUBLIC(Comm&) isend(MailboxPtr chan, void* payload, double simulatedSize);
+  XBT_PUBLIC(CommPtr) isend(MailboxPtr chan, void* payload, double simulatedSize);
+  XBT_PUBLIC(void) dsend(MailboxPtr chan, void* payload, double simulatedSize);
 
   /** @brief Returns the actor ID of the current actor (same as pid). */
   XBT_PUBLIC(aid_t) pid();
 
   /** @brief Returns the actor ID of the current actor (same as pid). */
   XBT_PUBLIC(aid_t) pid();
index e2db122..d889b61 100644 (file)
@@ -14,7 +14,6 @@
 
 namespace simgrid {
 namespace s4u {
 
 namespace simgrid {
 namespace s4u {
-
 /** @brief Communication async
  *
  * Represents all asynchronous communications, that you can test or wait onto.
 /** @brief Communication async
  *
  * Represents all asynchronous communications, that you can test or wait onto.
@@ -23,7 +22,10 @@ XBT_PUBLIC_CLASS Comm : public Activity
 {
   Comm() : Activity() {}
 public:
 {
   Comm() : Activity() {}
 public:
-  virtual ~Comm() = default;
+  friend void intrusive_ptr_release(simgrid::s4u::Comm * c);
+  friend void intrusive_ptr_add_ref(simgrid::s4u::Comm * c);
+
+  virtual ~Comm();
 
   /*! take a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an
    * iterator on the finished Comms. */
 
   /*! take a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an
    * iterator on the finished Comms. */
@@ -73,13 +75,16 @@ public:
     return res;
   }
   /** Creates (but don't start) an async send to the mailbox @p dest */
     return res;
   }
   /** Creates (but don't start) an async send to the mailbox @p dest */
-  static Comm& send_init(MailboxPtr dest);
+  static CommPtr send_init(MailboxPtr dest);
   /** Creates and start an async send to the mailbox @p dest */
   /** Creates and start an async send to the mailbox @p dest */
-  static Comm& send_async(MailboxPtr dest, void* data, int simulatedByteAmount);
+  static CommPtr send_async(MailboxPtr dest, void* data, int simulatedByteAmount);
   /** Creates (but don't start) an async recv onto the mailbox @p from */
   /** Creates (but don't start) an async recv onto the mailbox @p from */
-  static Comm& recv_init(MailboxPtr from);
+  static CommPtr recv_init(MailboxPtr from);
   /** Creates and start an async recv to the mailbox @p from */
   /** Creates and start an async recv to the mailbox @p from */
-  static Comm& recv_async(MailboxPtr from, void** data);
+  static CommPtr recv_async(MailboxPtr from, void** data);
+  /** Creates and start a detached send to the mailbox @p dest
+   *  TODO: make it possible to detach an already created comm */
+  static void send_detached(MailboxPtr dest, void* data, int simulatedSize);
 
   void start() override;
   void wait() override;
 
   void start() override;
   void wait() override;
@@ -103,6 +108,7 @@ public:
   size_t getDstDataSize();
 
   bool test();
   size_t getDstDataSize();
 
   bool test();
+  void cancel();
 
 private:
   double rate_        = -1;
 
 private:
   double rate_        = -1;
@@ -120,6 +126,8 @@ private:
   smx_actor_t sender_   = nullptr;
   smx_actor_t receiver_ = nullptr;
   MailboxPtr mailbox_   = nullptr;
   smx_actor_t sender_   = nullptr;
   smx_actor_t receiver_ = nullptr;
   MailboxPtr mailbox_   = nullptr;
+
+  std::atomic_int_fast32_t refcount_{0};
 };
 }
 } // namespace simgrid::s4u
 };
 }
 } // namespace simgrid::s4u
index 8341632..10c8d85 100644 (file)
@@ -16,6 +16,7 @@ using ActorPtr = boost::intrusive_ptr<Actor>;
 
 class Activity;
 class Comm;
 
 class Activity;
 class Comm;
+using CommPtr = boost::intrusive_ptr<Comm>;
 class Engine;
 class Host;
 class Mailbox;
 class Engine;
 class Host;
 class Mailbox;
index e672384..08e2a63 100644 (file)
@@ -13,13 +13,15 @@ void simgrid::kernel::activity::ActivityImpl::ref()
   refcount++;
 }
 
   refcount++;
 }
 
-void simgrid::kernel::activity::ActivityImpl::unref()
+bool simgrid::kernel::activity::ActivityImpl::unref()
 {
   xbt_assert(refcount > 0,
       "This activity has a negative refcount! You can only call test() or wait() once per activity.");
 
   refcount--;
   if (refcount>0)
 {
   xbt_assert(refcount > 0,
       "This activity has a negative refcount! You can only call test() or wait() once per activity.");
 
   refcount--;
   if (refcount>0)
-    return;
+    return false;
   delete this;
   delete this;
+
+  return true;
 }
 }
index 4791484..13c284a 100644 (file)
@@ -48,8 +48,11 @@ namespace activity {
         delete activity;
     }
 
         delete activity;
     }
 
+    /** @brief Increase the refcount */
     void ref();
     void ref();
-    void unref();
+    /** @brief Reduce the refcount; returns true if the object was destroyed */
+    bool unref();
+
   private:
     std::atomic_int_fast32_t refcount_{1};
     int refcount = 1;
   private:
     std::atomic_int_fast32_t refcount_{1};
     int refcount = 1;
index 1338b82..010a9d8 100644 (file)
@@ -187,36 +187,40 @@ e_smx_state_t execute(double flops) {
 
 void* recv(MailboxPtr chan) {
   void *res = nullptr;
 
 void* recv(MailboxPtr chan) {
   void *res = nullptr;
-  Comm& c = Comm::recv_init(chan);
-  c.setDstData(&res,sizeof(res));
-  c.wait();
+  CommPtr c = Comm::recv_init(chan);
+  c->setDstData(&res, sizeof(res));
+  c->wait();
   return res;
 }
 
 void send(MailboxPtr chan, void* payload, double simulatedSize)
 {
   return res;
 }
 
 void send(MailboxPtr chan, void* payload, double simulatedSize)
 {
-  Comm& c = Comm::send_init(chan);
-  c.setRemains(simulatedSize);
-  c.setSrcData(payload);
-  // c.start() is optional.
-  c.wait();
+  CommPtr c = Comm::send_init(chan);
+  c->setRemains(simulatedSize);
+  c->setSrcData(payload);
+  // c->start() is optional.
+  c->wait();
 }
 
 void send(MailboxPtr chan, void* payload, double simulatedSize, double timeout)
 {
 }
 
 void send(MailboxPtr chan, void* payload, double simulatedSize, double timeout)
 {
-  Comm& c = Comm::send_init(chan);
-  c.setRemains(simulatedSize);
-  c.setSrcData(payload);
-  // c.start() is optional.
-  c.wait(timeout);
+  CommPtr c = Comm::send_init(chan);
+  c->setRemains(simulatedSize);
+  c->setSrcData(payload);
+  // c->start() is optional.
+  c->wait(timeout);
 }
 
 }
 
-Comm& isend(MailboxPtr chan, void* payload, double simulatedSize)
+CommPtr isend(MailboxPtr chan, void* payload, double simulatedSize)
 {
   return Comm::send_async(chan, payload, simulatedSize);
 }
 {
   return Comm::send_async(chan, payload, simulatedSize);
 }
+void dsend(MailboxPtr chan, void* payload, double simulatedSize)
+{
+  Comm::send_detached(chan, payload, simulatedSize);
+}
 
 
-Comm& irecv(MailboxPtr chan, void** data)
+CommPtr irecv(MailboxPtr chan, void** data)
 {
   return Comm::recv_async(chan, data);
 }
 {
   return Comm::recv_async(chan, data);
 }
index 9ecedf2..da54cdd 100644 (file)
@@ -13,19 +13,34 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm,s4u_activity,"S4U asynchronous communic
 
 namespace simgrid {
 namespace s4u {
 
 namespace simgrid {
 namespace s4u {
+Comm::~Comm()
+{
+  if (state_ == started && not detached_ && (pimpl_ == nullptr || pimpl_->state == SIMIX_RUNNING)) {
+    XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, state_);
+    if (pimpl_ != nullptr)
+      XBT_INFO("pimpl_->state: %d", pimpl_->state);
+    else
+      XBT_INFO("pimpl_ is null");
+    xbt_backtrace_display_current();
+  }
+  if (pimpl_)
+    pimpl_->unref();
+}
 
 
-s4u::Comm &Comm::send_init(s4u::MailboxPtr chan) {
-  s4u::Comm *res = new s4u::Comm();
+s4u::CommPtr Comm::send_init(s4u::MailboxPtr chan)
+{
+  CommPtr res   = CommPtr(new s4u::Comm());
   res->sender_ = SIMIX_process_self();
   res->mailbox_ = chan;
   res->sender_ = SIMIX_process_self();
   res->mailbox_ = chan;
-  return *res;
+  return res;
 }
 
 }
 
-s4u::Comm &Comm::recv_init(s4u::MailboxPtr chan) {
-  s4u::Comm *res = new s4u::Comm();
+s4u::CommPtr Comm::recv_init(s4u::MailboxPtr chan)
+{
+  CommPtr res    = CommPtr(new s4u::Comm());
   res->receiver_ = SIMIX_process_self();
   res->mailbox_ = chan;
   res->receiver_ = SIMIX_process_self();
   res->mailbox_ = chan;
-  return *res;
+  return res;
 }
 
 void Comm::setRate(double rate) {
 }
 
 void Comm::setRate(double rate) {
@@ -82,6 +97,11 @@ void Comm::start() {
   } else {
     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
   }
   } else {
     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
   }
+  while (refcount_ > 1) { // Pass all the refcounts we had to the underlying pimpl since we are delegating the
+                          // refcounting to it afterward
+    refcount_--;
+    pimpl_->ref();
+  }
   state_ = started;
 }
 void Comm::wait() {
   state_ = started;
 }
 void Comm::wait() {
@@ -89,7 +109,7 @@ void Comm::wait() {
 
   if (state_ == started)
     simcall_comm_wait(pimpl_, -1/*timeout*/);
 
   if (state_ == started)
     simcall_comm_wait(pimpl_, -1/*timeout*/);
-  else {// p_state == inited. Save a simcall and do directly a blocking send/recv
+  else { // state_ == inited. Save a simcall and do directly a blocking send/recv
     if (srcBuff_ != nullptr) {
       simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
           srcBuff_, srcBuffSize_,
     if (srcBuff_ != nullptr) {
       simcall_comm_send(sender_, mailbox_->getImpl(), remains_, rate_,
           srcBuff_, srcBuffSize_,
@@ -102,7 +122,6 @@ void Comm::wait() {
     }
   }
   state_ = finished;
     }
   }
   state_ = finished;
-  delete this;
 }
 void Comm::wait(double timeout) {
   xbt_assert(state_ == started || state_ == inited);
 }
 void Comm::wait(double timeout) {
   xbt_assert(state_ == started || state_ == inited);
@@ -125,25 +144,40 @@ void Comm::wait(double timeout) {
         userData_, timeout, rate_);
   }
   state_ = finished;
         userData_, timeout, rate_);
   }
   state_ = finished;
-  delete this;
 }
 
 }
 
-s4u::Comm &Comm::send_async(MailboxPtr dest, void *data, int simulatedSize) {
-  s4u::Comm &res = s4u::Comm::send_init(dest);
-  res.setRemains(simulatedSize);
-  res.srcBuff_ = data;
-  res.srcBuffSize_ = sizeof(void*);
-  res.start();
+void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
+{
+  s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
+  res->setRemains(simulatedSize);
+  res->srcBuff_     = data;
+  res->srcBuffSize_ = sizeof(void*);
+  res->detached_    = true;
+  res->start();
+}
+s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize)
+{
+  s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
+  res->setRemains(simulatedSize);
+  res->srcBuff_     = data;
+  res->srcBuffSize_ = sizeof(void*);
+  res->start();
   return res;
 }
 
   return res;
 }
 
-s4u::Comm &Comm::recv_async(MailboxPtr dest, void **data) {
-  s4u::Comm &res = s4u::Comm::recv_init(dest);
-  res.setDstData(data, sizeof(*data));
-  res.start();
+s4u::CommPtr Comm::recv_async(MailboxPtr dest, void** data)
+{
+  s4u::CommPtr res = CommPtr(s4u::Comm::recv_init(dest));
+  res->setDstData(data, sizeof(*data));
+  res->start();
   return res;
 }
 
   return res;
 }
 
+void Comm::cancel()
+{
+  simgrid::kernel::activity::Comm* commPimpl = static_cast<simgrid::kernel::activity::Comm*>(pimpl_);
+  commPimpl->cancel();
+}
 bool Comm::test() {
   xbt_assert(state_ == inited || state_ == started || state_ == finished);
   
 bool Comm::test() {
   xbt_assert(state_ == inited || state_ == started || state_ == finished);
   
@@ -156,11 +190,30 @@ bool Comm::test() {
   
   if(simcall_comm_test(pimpl_)){
     state_ = finished;
   
   if(simcall_comm_test(pimpl_)){
     state_ = finished;
-    delete this;
     return true;
   }
   return false;
 }
 
     return true;
   }
   return false;
 }
 
+void intrusive_ptr_release(simgrid::s4u::Comm* c)
+{
+  if (c->pimpl_ != nullptr) {
+    if (c->pimpl_->unref()) {
+      c->pimpl_ = nullptr;
+      delete c;
+    }
+  } else if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+    std::atomic_thread_fence(std::memory_order_acquire);
+    delete c;
+  }
+}
+void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
+{
+  if (c->pimpl_ != nullptr) {
+    c->pimpl_->ref();
+  } else {
+    c->refcount_.fetch_add(1, std::memory_order_relaxed);
+  }
 }
 }
 }
 }
+} // namespaces
index dca4a3b..eda8077 100644 (file)
@@ -121,7 +121,7 @@ void SIMIX_process_cleanup(smx_actor_t process)
         /* the comm will be freed right now, remove it from the sender */
         comm->src_proc->comms.remove(comm);
       }
         /* the comm will be freed right now, remove it from the sender */
         comm->src_proc->comms.remove(comm);
       }
-      SIMIX_comm_unref(comm);
+      // SIMIX_comm_unref(comm);
     } else {
       xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro);
     }
     } else {
       xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro);
     }
index 6233044..209902c 100644 (file)
@@ -12,6 +12,7 @@
 #include "simgrid/s4u/Host.hpp"
 
 #include "mc/mc.h"
 #include "simgrid/s4u/Host.hpp"
 
 #include "mc/mc.h"
+#include "simgrid/s4u/Activity.hpp"
 #include "simgrid/s4u/Mailbox.hpp"
 #include "src/mc/mc_replay.h"
 #include "src/simix/smx_private.h"
 #include "simgrid/s4u/Mailbox.hpp"
 #include "src/mc/mc_replay.h"
 #include "src/simix/smx_private.h"
@@ -184,8 +185,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
     void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
     void *data, double rate)
 {
     void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
     void *data, double rate)
 {
-  XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
+  XBT_DEBUG("recv from %p %p. this_synchro=%p", mbox, &mbox->comm_queue, this_synchro);
 
   simgrid::kernel::activity::Comm* other_comm;
   //communication already done, get it inside the list of completed comms
 
   simgrid::kernel::activity::Comm* other_comm;
   //communication already done, get it inside the list of completed comms
@@ -536,63 +537,62 @@ void SIMIX_comm_finish(smx_activity_t synchro)
       simcall->issuer->context->iwannadie = 1;
       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
     } else {
       simcall->issuer->context->iwannadie = 1;
       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
     } else {
-      switch (synchro->state) {
-
-      case SIMIX_DONE:
-        XBT_DEBUG("Communication %p complete!", synchro);
-        SIMIX_comm_copy_data(synchro);
-        break;
-
-      case SIMIX_SRC_TIMEOUT:
-        SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
-        break;
-
-      case SIMIX_DST_TIMEOUT:
-        SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
-        break;
-
-      case SIMIX_SRC_HOST_FAILURE:
-        if (simcall->issuer == comm->src_proc)
-          simcall->issuer->context->iwannadie = 1;
-  //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
-        else
-          SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
-        break;
-
-      case SIMIX_DST_HOST_FAILURE:
-        if (simcall->issuer == comm->dst_proc)
-          simcall->issuer->context->iwannadie = 1;
-  //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
-        else
-          SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
-        break;
-
-      case SIMIX_LINK_FAILURE:
-
-        XBT_DEBUG(
-            "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
-            synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
-            comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
-            comm->detached);
-        if (comm->src_proc == simcall->issuer) {
-          XBT_DEBUG("I'm source");
-        } else if (comm->dst_proc == simcall->issuer) {
-          XBT_DEBUG("I'm dest");
-        } else {
-          XBT_DEBUG("I'm neither source nor dest");
-        }
-        SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
-        break;
-
-      case SIMIX_CANCELED:
-        if (simcall->issuer == comm->dst_proc)
-          SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
-        else
-          SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
-        break;
-
-      default:
-        xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
+      switch (comm->state) {
+
+        case SIMIX_DONE:
+          XBT_DEBUG("Communication %p complete!", synchro);
+          SIMIX_comm_copy_data(synchro);
+          break;
+
+        case SIMIX_SRC_TIMEOUT:
+          SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
+          break;
+
+        case SIMIX_DST_TIMEOUT:
+          SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
+          break;
+
+        case SIMIX_SRC_HOST_FAILURE:
+          if (simcall->issuer == comm->src_proc)
+            simcall->issuer->context->iwannadie = 1;
+          //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
+          else
+            SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
+          break;
+
+        case SIMIX_DST_HOST_FAILURE:
+          if (simcall->issuer == comm->dst_proc)
+            simcall->issuer->context->iwannadie = 1;
+          //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
+          else
+            SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
+          break;
+
+        case SIMIX_LINK_FAILURE:
+          XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) "
+                    "detached:%d",
+                    synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
+                    comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
+                    comm->detached);
+          if (comm->src_proc == simcall->issuer) {
+            XBT_DEBUG("I'm source");
+          } else if (comm->dst_proc == simcall->issuer) {
+            XBT_DEBUG("I'm dest");
+          } else {
+            XBT_DEBUG("I'm neither source nor dest");
+          }
+          SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
+          break;
+
+        case SIMIX_CANCELED:
+          if (simcall->issuer == comm->dst_proc)
+            SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
+          else
+            SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
+          break;
+
+        default:
+          xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
       }
     }
 
       }
     }
 
index 9cae4a2..3b3b923 100644 (file)
@@ -17,7 +17,7 @@ static void server()
 {
   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName("mailbox");
 
 {
   simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::byName("mailbox");
 
-  simgrid::s4u::this_actor::isend(mailbox, xbt_strdup("Some data"), 0);
+  simgrid::s4u::CommPtr sendComm = simgrid::s4u::this_actor::isend(mailbox, xbt_strdup("Some data"), 0);
 
   xbt_assert(mailbox->listen()); // True (1)
   XBT_INFO("Task listen works on regular mailboxes");
 
   xbt_assert(mailbox->listen()); // True (1)
   XBT_INFO("Task listen works on regular mailboxes");
@@ -26,11 +26,12 @@ static void server()
   xbt_assert(not strcmp("Some data", res), "Data received: %s", res);
   XBT_INFO("Data successfully received from regular mailbox");
   xbt_free(res);
   xbt_assert(not strcmp("Some data", res), "Data received: %s", res);
   XBT_INFO("Data successfully received from regular mailbox");
   xbt_free(res);
+  sendComm->wait();
 
   simgrid::s4u::MailboxPtr mailbox2 = simgrid::s4u::Mailbox::byName("mailbox2");
   mailbox2->setReceiver(simgrid::s4u::Actor::self());
 
 
   simgrid::s4u::MailboxPtr mailbox2 = simgrid::s4u::Mailbox::byName("mailbox2");
   mailbox2->setReceiver(simgrid::s4u::Actor::self());
 
-  simgrid::s4u::this_actor::isend(mailbox2, xbt_strdup("More data"), 0);
+  simgrid::s4u::this_actor::dsend(mailbox2, xbt_strdup("More data"), 0);
 
   xbt_assert(mailbox2->listen()); // used to break.
   XBT_INFO("Task listen works on asynchronous mailboxes");
 
   xbt_assert(mailbox2->listen()); // used to break.
   XBT_INFO("Task listen works on asynchronous mailboxes");
@@ -40,6 +41,7 @@ static void server()
   xbt_free(res);
 
   XBT_INFO("Data successfully received from asynchronous mailbox");
   xbt_free(res);
 
   XBT_INFO("Data successfully received from asynchronous mailbox");
+  XBT_DEBUG("comm:%p", sendComm);
 }
 
 int main(int argc, char* argv[])
 }
 
 int main(int argc, char* argv[])