Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge pull request #181 from bcamus/master
[simgrid.git] / include / simgrid / s4u / Comm.hpp
index e2db122..697f259 100644 (file)
 #include <simgrid/forward.h>
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/forward.hpp>
-
 namespace simgrid {
 namespace s4u {
-
 /** @brief Communication async
  *
  * Represents all asynchronous communications, that you can test or wait onto.
@@ -23,20 +21,27 @@ XBT_PUBLIC_CLASS Comm : public Activity
 {
   Comm() : Activity() {}
 public:
-  virtual ~Comm() = default;
+  friend void intrusive_ptr_release(simgrid::s4u::Comm * c);
+  friend void intrusive_ptr_add_ref(simgrid::s4u::Comm * c);
 
-  /*! take a range of s4u::Comm* (last excluded) and return when one of them is finished. The return value is an
+  virtual ~Comm();
+
+  /*! take a range of s4u::CommPtr (last excluded) and return when one of them is finished. The return value is an
    * iterator on the finished Comms. */
   template <class I> static I wait_any(I first, I last)
   {
     // Map to dynar<Synchro*>:
-    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), NULL);
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), [](void*ptr){
+      intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
+    });
     for (I iter = first; iter != last; iter++) {
-      Comm& comm = **iter;
-      if (comm.state_ == inited)
-        comm.start();
-      xbt_assert(comm.state_ == started);
-      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_);
+      CommPtr comm = *iter;
+      if (comm->state_ == inited)
+        comm->start();
+      xbt_assert(comm->state_ == started);
+      simgrid::kernel::activity::ActivityImpl* ptr = comm->pimpl_.get();
+      intrusive_ptr_add_ref(ptr);
+      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, ptr);
     }
     // Call the underlying simcall:
     int idx = simcall_comm_waitany(comms, -1);
@@ -49,17 +54,21 @@ public:
     (*res)->state_ = finished;
     return res;
   }
-  /*! Same as wait_any, but with a timeout. If wait_any_for return because of the timeout last is returned.*/
+  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
   template <class I> static I wait_any_for(I first, I last, double timeout)
   {
     // Map to dynar<Synchro*>:
-    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), NULL);
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), [](void*ptr){
+      intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
+    });
     for (I iter = first; iter != last; iter++) {
-      Comm& comm = **iter;
-      if (comm.state_ == inited)
-        comm.start();
-      xbt_assert(comm.state_ == started);
-      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_);
+      CommPtr comm = *iter;
+      if (comm->state_ == inited)
+        comm->start();
+      xbt_assert(comm->state_ == started);
+      simgrid::kernel::activity::ActivityImpl* ptr = comm->pimpl_.get();
+      intrusive_ptr_add_ref(ptr);
+      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, ptr);
     }
     // Call the underlying simcall:
     int idx = simcall_comm_waitany(comms, timeout);
@@ -73,13 +82,16 @@ public:
     return res;
   }
   /** Creates (but don't start) an async send to the mailbox @p dest */
-  static Comm& send_init(MailboxPtr dest);
+  static CommPtr send_init(MailboxPtr dest);
   /** Creates and start an async send to the mailbox @p dest */
-  static Comm& send_async(MailboxPtr dest, void* data, int simulatedByteAmount);
+  static CommPtr send_async(MailboxPtr dest, void* data, int simulatedByteAmount);
   /** Creates (but don't start) an async recv onto the mailbox @p from */
-  static Comm& recv_init(MailboxPtr from);
+  static CommPtr recv_init(MailboxPtr from);
   /** Creates and start an async recv to the mailbox @p from */
-  static Comm& recv_async(MailboxPtr from, void** data);
+  static CommPtr recv_async(MailboxPtr from, void** data);
+  /** Creates and start a detached send to the mailbox @p dest
+   *  TODO: make it possible to detach an already created comm */
+  static void send_detached(MailboxPtr dest, void* data, int simulatedSize);
 
   void start() override;
   void wait() override;
@@ -103,6 +115,7 @@ public:
   size_t getDstDataSize();
 
   bool test();
+  void cancel();
 
 private:
   double rate_        = -1;
@@ -120,6 +133,8 @@ private:
   smx_actor_t sender_   = nullptr;
   smx_actor_t receiver_ = nullptr;
   MailboxPtr mailbox_   = nullptr;
+
+  std::atomic_int_fast32_t refcount_{0};
 };
 }
 } // namespace simgrid::s4u