Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
first step towards a CommPtr
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Wed, 24 May 2017 09:35:17 +0000 (11:35 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Wed, 24 May 2017 09:35:17 +0000 (11:35 +0200)
examples/smpi/replay_multiple/replay_multiple.tesh
include/simgrid/simix.h
src/kernel/activity/ActivityImpl.hpp
src/kernel/activity/CommImpl.hpp [new file with mode: 0644]
src/kernel/activity/SynchroComm.cpp
src/msg/msg_gos.cpp
src/simix/ActorImpl.cpp
src/simix/smx_network.cpp
tools/cmake/DefinePackages.cmake

index 72f83bb..a8816fd 100644 (file)
@@ -3,7 +3,7 @@ p Test the replay with multiple instances
 p first generate the deployment file
 $ ${srcdir:=.}/generate_multiple_deployment.sh -platform ${srcdir:=.}/../../platforms/small_platform_with_routers.xml -hostfile ${srcdir:=.}/../hostfile  ${srcdir:=.}/description_file ${srcdir:=.}/deployment.xml
 
-p This test needs maxmin/concurrency-limit=100 because it stats 64 hosts on 5 machines.
+p This test needs maxmin/concurrency-limit=100 because it starts 64 hosts on 5 machines.
 ! timeout 120
 $ ./replay_multiple description_file ${srcdir:=.}/../../platforms/small_platform_with_routers.xml ${srcdir:=.}/deployment.xml --log=smpi.:info --cfg=maxmin/concurrency-limit:100
 > [0.000000] [xbt_cfg/INFO] Configuration change: Set 'maxmin/concurrency-limit' to '100'
index 02fdab7..3f0e836 100644 (file)
@@ -226,6 +226,8 @@ XBT_PUBLIC(smx_activity_t) SIMIX_comm_get_send_match(smx_mailbox_t mbox, int (*m
 XBT_PUBLIC(int) SIMIX_comm_has_send_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data);
 XBT_PUBLIC(int) SIMIX_comm_has_recv_match(smx_mailbox_t mbox, int (*match_fun)(void*, void*), void* data);
 XBT_PUBLIC(void) SIMIX_comm_finish(smx_activity_t synchro);
+XBT_PUBLIC(smx_activity_t) SIMIX_comm_ref(smx_activity_t comm);
+XBT_PUBLIC(void) SIMIX_comm_unref(smx_activity_t comm);
 
 /******************************************************************************/
 /*                            SIMIX simcalls                                  */
index 94b8590..4791484 100644 (file)
@@ -12,6 +12,7 @@
 #include <xbt/base.h>
 #include "simgrid/forward.h"
 
+#include <atomic>
 #include <simgrid/simix.hpp>
 
 namespace simgrid {
@@ -30,9 +31,27 @@ namespace activity {
     virtual void resume()=0;
     virtual void post() =0; // What to do when a simcall terminates
 
+    // boost::intrusive_ptr<Activity> support:
+    friend void intrusive_ptr_add_ref(ActivityImpl * activity)
+    {
+      // Atomic operation! Do not split in two instructions!
+      auto previous = (activity->refcount_)++;
+      xbt_assert(previous != 0);
+      (void)previous;
+    }
+
+    friend void intrusive_ptr_release(ActivityImpl * activity)
+    {
+      // Atomic operation! Do not split in two instructions!
+      auto count = --(activity->refcount_);
+      if (count == 0)
+        delete activity;
+    }
+
     void ref();
     void unref();
   private:
+    std::atomic_int_fast32_t refcount_{1};
     int refcount = 1;
   };
 }}} // namespace simgrid::kernel::activity
diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp
new file mode 100644 (file)
index 0000000..d1e1949
--- /dev/null
@@ -0,0 +1,31 @@
+/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_
+#define SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_
+
+#include "simgrid/forward.h"
+
+#include <atomic>
+#include <simgrid/simix.hpp>
+
+namespace simgrid {
+namespace kernel {
+namespace activity {
+
+XBT_PUBLIC_CLASS CommImpl : ActivityImpl
+{
+public:
+  CommImpl() : piface_(this){};
+  ~CommImpl() = default;
+
+  using Ptr = boost::intrusive_ptr<ActivityImpl>;
+  simgrid::s4u::Comm piface_; // Our interface
+};
+}
+}
+}
+
+#endif /* SRC_KERNEL_ACTIVITY_COMMIMPL_HPP_ */
index 1949b21..9cb4c37 100644 (file)
@@ -16,7 +16,7 @@ simgrid::kernel::activity::Comm::Comm(e_smx_comm_type_t _type) : type(_type)
   state = SIMIX_WAITING;
   src_data=nullptr;
   dst_data=nullptr;
-
+  intrusive_ptr_add_ref(this);
   XBT_DEBUG("Create communicate synchro %p", this);
 }
 
@@ -37,6 +37,7 @@ simgrid::kernel::activity::Comm::~Comm()
   if(mbox)
     mbox->remove(this);
 }
+
 void simgrid::kernel::activity::Comm::suspend()
 {
   /* FIXME: shall we suspend also the timeout synchro? */
@@ -61,6 +62,7 @@ void simgrid::kernel::activity::Comm::cancel()
   if (state == SIMIX_WAITING) {
     mbox->remove(this);
     state = SIMIX_CANCELED;
+    SIMIX_comm_unref(this);
   } else if (not MC_is_active() /* when running the MC there are no surf actions */
              && not MC_record_replay_is_active() && (state == SIMIX_READY || state == SIMIX_RUNNING)) {
 
@@ -120,6 +122,8 @@ void simgrid::kernel::activity::Comm::post()
   cleanupSurf();
 
   /* if there are simcalls associated with the synchro, then answer them */
-  if (not simcalls.empty())
+  if (not simcalls.empty()) {
     SIMIX_comm_finish(this);
+    SIMIX_comm_unref(this);
+  }
 }
index 7bbb303..8e921e3 100644 (file)
@@ -270,6 +270,7 @@ msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, d
     simcall_comm_recv(MSG_process_self()->getImpl(), mailbox->getImpl(), task, nullptr, nullptr, nullptr, nullptr, timeout, rate);
     XBT_DEBUG("Got task %s from %s",(*task)->name,mailbox->name());
     (*task)->simdata->setNotUsed();
+    SIMIX_comm_unref((*task)->simdata->comm);
   }
   catch (xbt_ex& e) {
     switch (e.category) {
@@ -489,6 +490,7 @@ int MSG_comm_test(msg_comm_t comm)
     if (finished && comm->task_received != nullptr) {
       /* I am the receiver */
       (*comm->task_received)->simdata->setNotUsed();
+      SIMIX_comm_unref(comm->s_comm);
     }
   }
   catch (xbt_ex& e) {
@@ -556,6 +558,7 @@ int MSG_comm_testany(xbt_dynar_t comms)
     if (status == MSG_OK && comm->task_received != nullptr) {
       /* I am the receiver */
       (*comm->task_received)->simdata->setNotUsed();
+      SIMIX_comm_unref(comm->s_comm);
     }
   }
 
@@ -584,6 +587,7 @@ msg_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
 {
   try {
     simcall_comm_wait(comm->s_comm, timeout);
+    SIMIX_comm_unref(comm->s_comm);
 
     if (comm->task_received != nullptr) {
       /* I am the receiver */
@@ -668,6 +672,7 @@ int MSG_comm_waitany(xbt_dynar_t comms)
   if (comm->task_received != nullptr) {
     /* I am the receiver */
     (*comm->task_received)->simdata->setNotUsed();
+    SIMIX_comm_unref(comm->s_comm);
   }
 
   return finished_index;
@@ -795,6 +800,7 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl
       simcall_set_category(comm, task->category);
      t_simdata->comm = static_cast<simgrid::kernel::activity::Comm*>(comm);
      simcall_comm_wait(comm, timeout);
+     SIMIX_comm_unref(comm);
   }
   catch (xbt_ex& e) {
     switch (e.category) {
index 758d26d..dca4a3b 100644 (file)
@@ -111,10 +111,8 @@ void SIMIX_process_cleanup(smx_actor_t process)
       if (comm->detached)
         XBT_DEBUG("Don't destroy it since it's a detached comm and I'm the sender");
       else
-        comm->unref();
-
-    }
-    else if (comm->dst_proc == process){
+        SIMIX_comm_unref(comm);
+    } else if (comm->dst_proc == process) {
       XBT_DEBUG("Found an unfinished recv comm %p, state %d, src = %p, dst = %p",
           comm, (int)comm->state, comm->src_proc, comm->dst_proc);
       comm->dst_proc = nullptr;
@@ -123,8 +121,7 @@ void SIMIX_process_cleanup(smx_actor_t process)
         /* the comm will be freed right now, remove it from the sender */
         comm->src_proc->comms.remove(comm);
       }
-      
-      comm->unref();
+      SIMIX_comm_unref(comm);
     } else {
       xbt_die("Communication synchro %p is in my list but I'm not the sender nor the receiver", synchro);
     }
@@ -440,16 +437,11 @@ void SIMIX_process_kill(smx_actor_t process, smx_actor_t issuer) {
     } else if (comm != nullptr) {
       process->comms.remove(process->waiting_synchro);
       comm->cancel();
-
       // Remove first occurrence of &process->simcall:
-      auto i = boost::range::find(
-        process->waiting_synchro->simcalls,
-        &process->simcall);
+      auto i = boost::range::find(process->waiting_synchro->simcalls, &process->simcall);
       if (i != process->waiting_synchro->simcalls.end())
         process->waiting_synchro->simcalls.remove(&process->simcall);
-
-      comm->unref();
-
+      SIMIX_comm_unref(comm);
     } else if (sleep != nullptr) {
       SIMIX_process_sleep_destroy(process->waiting_synchro);
 
index ef153ce..f542074 100644 (file)
@@ -45,7 +45,7 @@ _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* dequ
 
   for(auto it = deque->begin(); it != deque->end(); it++){
     smx_activity_t synchro = *it;
-    simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
+    simgrid::kernel::activity::Commcomm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
 
     if (comm->type == SIMIX_COMM_SEND) {
       other_user_data = comm->src_data;
@@ -57,7 +57,7 @@ _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* dequ
       XBT_DEBUG("Found a matching communication synchro %p", comm);
       if (remove_matching)
         deque->erase(it);
-      comm->ref();
+      comm = static_cast<simgrid::kernel::activity::Comm*>(SIMIX_comm_ref(comm));
 #if SIMGRID_HAVE_MC
       comm->mbox_cpy = comm->mbox;
 #endif
@@ -114,7 +114,7 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx
       //this mailbox is for small messages, which have to be sent right now
       other_comm->state   = SIMIX_READY;
       other_comm->dst_proc=mbox->permanent_receiver.get();
-      other_comm->ref();
+      other_comm          = static_cast<simgrid::kernel::activity::Comm*>(SIMIX_comm_ref(other_comm));
       mbox->done_comm_queue.push_back(other_comm);
       XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
 
@@ -123,7 +123,7 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx
     }
   } else {
     XBT_DEBUG("Receive already pushed");
-    this_comm->unref();
+    SIMIX_comm_unref(this_comm);
 
     other_comm->state = SIMIX_READY;
     other_comm->type = SIMIX_COMM_READY;
@@ -207,8 +207,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
         other_comm->type = SIMIX_COMM_DONE;
         other_comm->mbox = nullptr;
       }
-      other_comm->unref();
-      static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
+      SIMIX_comm_unref(other_comm);
+      SIMIX_comm_unref(this_synchro);
     }
   } else {
     /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
@@ -225,7 +225,8 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
       other_comm = this_synchro;
       mbox->push(this_synchro);
     } else {
-      this_synchro->unref();
+      SIMIX_comm_unref(this_synchro);
+      other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_comm);
 
       other_comm->state = SIMIX_READY;
       other_comm->type = SIMIX_COMM_READY;
@@ -287,9 +288,9 @@ smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int t
   }
 
   if(other_synchro)
-    other_synchro->unref();
+    SIMIX_comm_unref(other_synchro);
 
-  this_comm->unref();
+  SIMIX_comm_unref(this_comm);
   return other_synchro;
 }
 
@@ -501,16 +502,14 @@ static inline void SIMIX_comm_start(smx_activity_t synchro)
 void SIMIX_comm_finish(smx_activity_t synchro)
 {
   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
-  unsigned int destroy_count = 0;
 
   while (not synchro->simcalls.empty()) {
     smx_simcall_t simcall = synchro->simcalls.front();
     synchro->simcalls.pop_front();
 
-    /* If a waitany simcall is waiting for this synchro to finish, then remove
-       it from the other synchros in the waitany list. Afterwards, get the
-       position of the actual synchro in the waitany dynar and
-       return it as the result of the simcall */
+    /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+     * list. Afterwards, get the position of the actual synchro in the waitany dynar and return it as the result of the
+     * simcall */
 
     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
       continue; // if process handling comm is killed
@@ -521,7 +520,8 @@ void SIMIX_comm_finish(smx_activity_t synchro)
         simcall->timer = nullptr;
       }
       if (not MC_is_active() && not MC_record_replay_is_active())
-        simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
+        simcall_comm_waitany__set__result(simcall,
+                                          xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
     }
 
     /* If the synchro is still in a rendez-vous point then remove from it */
@@ -642,16 +642,10 @@ void SIMIX_comm_finish(smx_activity_t synchro)
         comm->dst_proc->comms.remove(synchro);
         comm->src_proc->comms.remove(synchro);
       }
-      //in case of a detached comm we have an extra ref to remove, as the sender won't do it
-      destroy_count++;
     }
 
     SIMIX_simcall_answer(simcall);
-    destroy_count++;
   }
-
-  while (destroy_count-- > 0)
-    static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
 }
 
 /******************************************************************************/
@@ -684,7 +678,6 @@ void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t
   }
 }
 
-
 /**
  *  @brief Copy the communication data from the sender's buffer to the receiver's one
  *  @param synchro The communication
@@ -717,8 +710,22 @@ void SIMIX_comm_copy_data(smx_activity_t synchro)
         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
   }
 
-
   /* Set the copied flag so we copy data only once */
   /* (this function might be called from both communication ends) */
   comm->copied = 1;
 }
+
+/** Increase the refcount for this comm */
+smx_activity_t SIMIX_comm_ref(smx_activity_t comm)
+{
+  if (comm != nullptr)
+    intrusive_ptr_add_ref(comm);
+  return comm;
+}
+
+/** Decrease the refcount for this comm */
+void SIMIX_comm_unref(smx_activity_t comm)
+{
+  if (comm != nullptr)
+    intrusive_ptr_release(comm);
+}
index 71961df..17f25b1 100644 (file)
@@ -31,6 +31,7 @@ set(EXTRA_DIST
   src/simix/smx_private.h
   src/simix/smx_synchro_private.h
   src/kernel/activity/ActivityImpl.hpp
+  src/kernel/activity/CommImpl.hpp
   src/kernel/activity/SynchroComm.hpp
   src/kernel/activity/SynchroExec.hpp
   src/kernel/activity/SynchroIo.hpp