Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into clean_events
[simgrid.git] / src / simix / smx_network.cpp
index d709632..1fbcb08 100644 (file)
@@ -58,7 +58,7 @@ _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* dequ
       XBT_DEBUG("Found a matching communication synchro %p", comm);
       if (remove_matching)
         deque->erase(it);
-      comm = static_cast<simgrid::kernel::activity::CommImpl*>(SIMIX_comm_ref(comm));
+      comm->ref();
 #if SIMGRID_HAVE_MC
       comm->mbox_cpy = comm->mbox;
 #endif
@@ -115,20 +115,20 @@ XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx
       //this mailbox is for small messages, which have to be sent right now
       other_comm->state   = SIMIX_READY;
       other_comm->dst_proc=mbox->permanent_receiver.get();
-      other_comm          = static_cast<simgrid::kernel::activity::CommImpl*>(SIMIX_comm_ref(other_comm));
+      other_comm->ref();
       mbox->done_comm_queue.push_back(other_comm);
-      XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
+      XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, other_comm);
 
     }else{
       mbox->push(this_comm);
     }
   } else {
     XBT_DEBUG("Receive already pushed");
-    SIMIX_comm_unref(this_comm);
+    this_comm->unref();
+    this_comm->unref();
 
     other_comm->state = SIMIX_READY;
     other_comm->type = SIMIX_COMM_READY;
-
   }
   src_proc->comms.push_back(other_comm);
 
@@ -192,6 +192,7 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
   //communication already done, get it inside the list of completed comms
   if (mbox->permanent_receiver != nullptr && not mbox->done_comm_queue.empty()) {
 
+    this_synchro->unref();
     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
     //find a match in the list of already received comms
     other_comm = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
@@ -202,14 +203,15 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
       other_comm = this_synchro;
       mbox->push(this_synchro);
     } else {
-      if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
+      if (other_comm->surf_comm && other_comm->remains() < 1e-12) {
         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
         other_comm->state = SIMIX_DONE;
         other_comm->type = SIMIX_COMM_DONE;
         other_comm->mbox = nullptr;
+        other_comm->unref();
       }
-      SIMIX_comm_unref(other_comm);
-      SIMIX_comm_unref(this_synchro);
+      other_comm->unref();
+      this_synchro->unref();
     }
   } else {
     /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
@@ -226,11 +228,14 @@ smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *
       other_comm = this_synchro;
       mbox->push(this_synchro);
     } else {
-      SIMIX_comm_unref(this_synchro);
+      XBT_DEBUG("Match my %p with the existing %p", this_synchro, other_comm);
+
       other_comm = static_cast<simgrid::kernel::activity::CommImpl*>(other_comm);
 
       other_comm->state = SIMIX_READY;
       other_comm->type = SIMIX_COMM_READY;
+      this_synchro->unref();
+      this_synchro->unref();
     }
     dst_proc->comms.push_back(other_comm);
   }
@@ -275,7 +280,7 @@ smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int t
   } else{
     this_comm = new simgrid::kernel::activity::CommImpl(SIMIX_COMM_RECEIVE);
     smx_type = SIMIX_COMM_SEND;
-  } 
+  }
   smx_activity_t other_synchro=nullptr;
   if (mbox->permanent_receiver != nullptr && not mbox->done_comm_queue.empty()) {
     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
@@ -289,9 +294,9 @@ smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int t
   }
 
   if(other_synchro)
-    SIMIX_comm_unref(other_synchro);
+    other_synchro->unref();
 
-  SIMIX_comm_unref(this_comm);
+  this_comm->unref();
   return other_synchro;
 }
 
@@ -405,7 +410,7 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, d
 
   if (MC_is_active() || MC_record_replay_is_active()){
     if (timeout > 0.0)
-      xbt_die("Timeout not implemented for waitany in the model-checker"); 
+      xbt_die("Timeout not implemented for waitany in the model-checker");
     int idx = SIMCALL_GET_MC_VALUE(simcall);
     synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
     synchro->simcalls.push_back(simcall);
@@ -414,7 +419,7 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, d
     SIMIX_comm_finish(synchro);
     return;
   }
-  
+
   if (timeout < 0.0){
     simcall->timer = NULL;
   } else {
@@ -424,7 +429,7 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, d
       SIMIX_simcall_answer(simcall);
     });
   }
-  
+
   xbt_dynar_foreach(synchros, cursor, synchro){
     /* associate this simcall to the the synchro */
     synchro->simcalls.push_back(simcall);
@@ -465,12 +470,13 @@ static inline void SIMIX_comm_start(smx_activity_t synchro)
     simgrid::s4u::Host* sender   = comm->src_proc->host;
     simgrid::s4u::Host* receiver = comm->dst_proc->host;
 
-    XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
-
     comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
     comm->surf_comm->setData(synchro);
     comm->state = SIMIX_RUNNING;
 
+    XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", synchro, sender->cname(),
+              receiver->cname(), comm->surf_comm);
+
     /* If a link is failed, detect it immediately */
     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
@@ -527,7 +533,7 @@ void SIMIX_comm_finish(smx_activity_t synchro)
 
     /* If the synchro is still in a rendez-vous point then remove from it */
     if (comm->mbox)
-      comm->mbox->remove(synchro);
+      comm->mbox->remove(comm);
 
     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
 
@@ -714,18 +720,3 @@ void SIMIX_comm_copy_data(smx_activity_t synchro)
   /* (this function might be called from both communication ends) */
   comm->copied = 1;
 }
-
-/** Increase the refcount for this comm */
-smx_activity_t SIMIX_comm_ref(smx_activity_t comm)
-{
-  if (comm != nullptr)
-    intrusive_ptr_add_ref(comm);
-  return comm;
-}
-
-/** Decrease the refcount for this comm */
-void SIMIX_comm_unref(smx_activity_t comm)
-{
-  if (comm != nullptr)
-    intrusive_ptr_release(comm);
-}