Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
authorMartin Quinson <martin.quinson@loria.fr>
Tue, 6 Jun 2017 06:34:33 +0000 (08:34 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Tue, 6 Jun 2017 06:34:33 +0000 (08:34 +0200)
12 files changed:
examples/msg/app-chainsend/peer.c
examples/s4u/dht-chord/node.cpp
examples/s4u/dht-chord/s4u_dht-chord.hpp
examples/smpi/replay_multiple/replay_multiple.c
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
src/kernel/activity/ActivityImpl.hpp
src/s4u/s4u_activity.cpp
src/s4u/s4u_actor.cpp
src/s4u/s4u_comm.cpp
src/simix/ActorImpl.cpp
src/simix/smx_network.cpp

index f0f6fb1..63fa465 100644 (file)
@@ -19,9 +19,8 @@ void peer_init_chain(peer_t peer, message_t msg)
 static void peer_forward_msg(peer_t peer, message_t msg)
 {
   msg_task_t task = task_message_data_new(NULL, msg->data_length);
-  msg_comm_t comm = NULL;
   XBT_DEBUG("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
-  comm = MSG_task_isend(task, peer->next);
+  msg_comm_t comm = MSG_task_isend(task, peer->next);
   queue_pending_connection(comm, peer->pending_sends);
 }
 
index d0ee0e9..02c37e4 100644 (file)
@@ -120,18 +120,20 @@ void Node::notifyAndQuit()
     }
   }
 
-  // send the SUCCESSOR_LEAVING to our predecessor
-  ChordMessage* succ_msg = new ChordMessage(SUCCESSOR_LEAVING);
-  succ_msg->request_id   = fingers_[0];
-  succ_msg->answer_to    = mailbox_;
-  XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
-
-  try {
-    simgrid::s4u::this_actor::send(simgrid::s4u::Mailbox::byName(std::to_string(pred_id_)), succ_msg, 10, timeout);
-  } catch (xbt_ex& e) {
-    if (e.category == timeout_error) {
-      XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
-      delete succ_msg;
+  if (pred_id_ != -1) {
+    // send the SUCCESSOR_LEAVING to our predecessor (only if I have one)
+    ChordMessage* succ_msg = new ChordMessage(SUCCESSOR_LEAVING);
+    succ_msg->request_id   = fingers_[0];
+    succ_msg->answer_to    = mailbox_;
+    XBT_DEBUG("Sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
+
+    try {
+      simgrid::s4u::this_actor::send(simgrid::s4u::Mailbox::byName(std::to_string(pred_id_)), succ_msg, 10, timeout);
+    } catch (xbt_ex& e) {
+      if (e.category == timeout_error) {
+        XBT_DEBUG("Timeout expired when sending a 'SUCCESSOR_LEAVING' to my predecessor %d", pred_id_);
+        delete succ_msg;
+      }
     }
   }
 }
@@ -234,13 +236,13 @@ void Node::checkPredecessor()
   try {
     comm->wait(timeout);
     XBT_DEBUG("Received the answer to my 'Predecessor Alive': my predecessor %d is alive", pred_id_);
-    delete message;
   } catch (xbt_ex& e) {
     if (e.category == timeout_error) {
       XBT_DEBUG("Failed to receive the answer to my 'Predecessor Alive' request");
       pred_id_ = -1;
     }
   }
+  delete message;
 }
 
 /* Asks its predecessor to a remote node
index 60f3d6b..a415bf9 100644 (file)
@@ -126,10 +126,10 @@ public:
     double next_fix_fingers_date       = start_time_ + PERIODIC_FIX_FINGERS_DELAY;
     double next_check_predecessor_date = start_time_ + PERIODIC_CHECK_PREDECESSOR_DELAY;
     double next_lookup_date            = start_time_ + PERIODIC_LOOKUP_DELAY;
-
+    simgrid::s4u::CommPtr comm_receive = nullptr;
     while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME) {
-      data                               = nullptr;
-      simgrid::s4u::CommPtr comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
+      if (comm_receive == nullptr)
+        comm_receive = simgrid::s4u::this_actor::irecv(mailbox_, &data);
       while ((now < (start_time_ + deadline_)) && now < MAX_SIMULATION_TIME && not comm_receive->test()) {
         // no task was received: make some periodic calls
         if (now >= next_stabilize_date) {
@@ -154,8 +154,8 @@ public:
       if (data != nullptr) {
         ChordMessage* message = static_cast<ChordMessage*>(data);
         handleMessage(message);
-      } else {
-        comm_receive->cancel();
+        comm_receive = nullptr;
+        data         = nullptr;
       }
       now = simgrid::s4u::Engine::getClock();
     }
index 4c064df..a7cfeb5 100644 (file)
@@ -31,7 +31,6 @@ int main(int argc, char *argv[]){
     xbt_die("Cannot open %s", argv[1]);
   char *line = NULL;
   size_t n   = 0;
-  int instance_size = 0;
   const char* instance_id = NULL;
   while (xbt_getline(&line, &n, fp) != -1 ){
     xbt_dynar_t elems = xbt_str_split_quoted_in_place(line);
@@ -41,7 +40,7 @@ int main(int argc, char *argv[]){
 
     const char** line_char= xbt_dynar_to_array(elems);
     instance_id = line_char[0];
-    instance_size = xbt_str_parse_int(line_char[2], "Invalid size: %s");
+    int instance_size     = xbt_str_parse_int(line_char[2], "Invalid size: %s");
 
     XBT_INFO("Initializing instance %s of size %d", instance_id, instance_size);
     SMPI_app_instance_register(instance_id, smpi_replay,instance_size);
index b85daaa..aaa9e38 100644 (file)
@@ -24,8 +24,8 @@ XBT_PUBLIC_CLASS Activity {
   friend void intrusive_ptr_add_ref(Comm * c);
 
 protected:
-  Activity();
-  virtual ~Activity();
+  Activity()  = default;
+  ~Activity() = default;
 
 public:
   Activity(Activity const&) = delete;
index d889b61..45db6ec 100644 (file)
@@ -11,7 +11,6 @@
 #include <simgrid/forward.h>
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/forward.hpp>
-
 namespace simgrid {
 namespace s4u {
 /** @brief Communication async
index 1a28a5f..1496c41 100644 (file)
@@ -35,7 +35,8 @@ namespace activity {
     void ref();
     /** @brief Reduces the refcount */
     void unref();
-    // boost::intrusive_ptr<Activity> support:
+
+     // boost::intrusive_ptr<Activity> support:
     friend void intrusive_ptr_add_ref(ActivityImpl * activity);
     friend void intrusive_ptr_release(ActivityImpl * activity);
 
index 4e8caaa..157010d 100644 (file)
@@ -15,13 +15,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activity,s4u,"S4U activities");
 namespace simgrid {
 namespace s4u {
 
-Activity::Activity() {
-
-}
-Activity::~Activity() {
-
-}
-
 void Activity::setRemains(double remains) {
   xbt_assert(state_ == inited, "Cannot change the remaining amount of work once the Activity is started");
   remains_ = remains;
index 010a9d8..37d935f 100644 (file)
@@ -215,6 +215,7 @@ CommPtr isend(MailboxPtr chan, void* payload, double simulatedSize)
 {
   return Comm::send_async(chan, payload, simulatedSize);
 }
+
 void dsend(MailboxPtr chan, void* payload, double simulatedSize)
 {
   Comm::send_detached(chan, payload, simulatedSize);
index d8c635b..547e39c 100644 (file)
@@ -117,13 +117,17 @@ void Comm::wait() {
     }
   }
   state_ = finished;
+  if (pimpl_)
+    pimpl_->unref();
 }
+
 void Comm::wait(double timeout) {
   xbt_assert(state_ == started || state_ == inited);
 
   if (state_ == started) {
     simcall_comm_wait(pimpl_, timeout);
     state_ = finished;
+    pimpl_->unref();
     return;
   }
 
@@ -139,6 +143,8 @@ void Comm::wait(double timeout) {
         userData_, timeout, rate_);
   }
   state_ = finished;
+  if (pimpl_)
+    pimpl_->unref();
 }
 
 void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
@@ -150,6 +156,7 @@ void Comm::send_detached(MailboxPtr dest, void* data, int simulatedSize)
   res->detached_    = true;
   res->start();
 }
+
 s4u::CommPtr Comm::send_async(MailboxPtr dest, void* data, int simulatedSize)
 {
   s4u::CommPtr res = CommPtr(s4u::Comm::send_init(dest));
@@ -173,6 +180,7 @@ void Comm::cancel()
   simgrid::kernel::activity::CommImpl* commPimpl = static_cast<simgrid::kernel::activity::CommImpl*>(pimpl_);
   commPimpl->cancel();
 }
+
 bool Comm::test() {
   xbt_assert(state_ == inited || state_ == started || state_ == finished);
   
@@ -185,6 +193,7 @@ bool Comm::test() {
   
   if(simcall_comm_test(pimpl_)){
     state_ = finished;
+    pimpl_->unref();
     return true;
   }
   return false;
index ad1998e..2c59aab 100644 (file)
@@ -81,15 +81,14 @@ int SIMIX_process_has_pending_comms(smx_actor_t process) {
  */
 void SIMIX_process_cleanup(smx_actor_t process)
 {
-  XBT_DEBUG("Cleanup process %s (%p), waiting synchro %p",
-      process->name.c_str(), process, process->waiting_synchro);
+  XBT_DEBUG("Cleanup process %s (%p), waiting synchro %p", process->name.c_str(), process, process->waiting_synchro);
 
   process->finished = true;
   SIMIX_process_on_exit_runall(process);
 
   /* Unregister from the kill timer if any */
   if (process->kill_timer != nullptr)
-      SIMIX_timer_remove(process->kill_timer);
+    SIMIX_timer_remove(process->kill_timer);
 
   xbt_os_mutex_acquire(simix_global->mutex);
 
@@ -100,7 +99,6 @@ void SIMIX_process_cleanup(smx_actor_t process)
 
     /* make sure no one will finish the comm after this process is destroyed,
      * because src_proc or dst_proc would be an invalid pointer */
-    comm->cancel();
 
     if (comm->src_proc == process) {
       XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d, src = %p, dst = %p",
@@ -126,6 +124,7 @@ void SIMIX_process_cleanup(smx_actor_t process)
     }
     process->comms.pop_front();
     synchro = static_cast<smx_activity_t>(process->comms.front());
+    comm->cancel();
   }
 
   XBT_DEBUG("%p should not be run anymore",process);
index b9e5ced..528ceda 100644 (file)
@@ -129,7 +129,6 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx
 
     other_comm->state = SIMIX_READY;
     other_comm->type = SIMIX_COMM_READY;
-
   }
   src_proc->comms.push_back(other_comm);