Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
try to get the refcounting right on waitany simcalls
authorMartin Quinson <martin.quinson@loria.fr>
Wed, 14 Jun 2017 19:35:03 +0000 (21:35 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Wed, 14 Jun 2017 19:35:03 +0000 (21:35 +0200)
include/simgrid/s4u/Comm.hpp
src/msg/msg_gos.cpp
src/simix/smx_network.cpp
src/smpi/smpi_request.cpp

index 45db6ec..40f6a01 100644 (file)
@@ -31,13 +31,16 @@ public:
   template <class I> static I wait_any(I first, I last)
   {
     // Map to dynar<Synchro*>:
-    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), NULL);
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), [](void*ptr){
+      intrusive_ptr_release((simgrid::kernel::activity::ActivityImpl*)ptr);
+    });
     for (I iter = first; iter != last; iter++) {
       Comm& comm = **iter;
       if (comm.state_ == inited)
         comm.start();
       xbt_assert(comm.state_ == started);
-      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_);
+      intrusive_ptr_add_ref(comm.pimpl_.get());
+      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_.get());
     }
     // Call the underlying simcall:
     int idx = simcall_comm_waitany(comms, -1);
@@ -54,13 +57,16 @@ public:
   template <class I> static I wait_any_for(I first, I last, double timeout)
   {
     // Map to dynar<Synchro*>:
-    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), NULL);
+    xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), [](void*ptr){
+      intrusive_ptr_release((simgrid::kernel::activity::ActivityImpl*)ptr);
+    });
     for (I iter = first; iter != last; iter++) {
       Comm& comm = **iter;
       if (comm.state_ == inited)
         comm.start();
       xbt_assert(comm.state_ == started);
-      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_);
+      intrusive_ptr_add_ref(comm.pimpl_.get());
+      xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, comm.pimpl_.get());
     }
     // Call the underlying simcall:
     int idx = simcall_comm_waitany(comms, timeout);
index 4ec5694..98d5871 100644 (file)
@@ -634,11 +634,14 @@ int MSG_comm_waitany(xbt_dynar_t comms)
   int finished_index = -1;
 
   /* create the equivalent dynar with SIMIX objects */
-  xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr);
+  xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_activity_t), [](void*ptr){
+    intrusive_ptr_release((simgrid::kernel::activity::ActivityImpl*)ptr);
+  });
   msg_comm_t comm;
   unsigned int cursor;
   xbt_dynar_foreach(comms, cursor, comm) {
-    xbt_dynar_push(s_comms, &comm->s_comm);
+    intrusive_ptr_add_ref(comm->s_comm.get());
+    xbt_dynar_push_as(s_comms, simgrid::kernel::activity::ActivityImpl*, comm->s_comm.get());
   }
 
   msg_error_t status = MSG_OK;
index 7ea1ff4..080b391 100644 (file)
@@ -391,14 +391,11 @@ void simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activi
 
 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
 {
-  smx_activity_t synchro;
-  unsigned int cursor = 0;
-
   if (MC_is_active() || MC_record_replay_is_active()){
     if (timeout > 0.0)
       xbt_die("Timeout not implemented for waitany in the model-checker");
     int idx = SIMCALL_GET_MC_VALUE(simcall);
-    synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
+    smx_activity_t synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
     synchro->simcalls.push_back(simcall);
     simcall_comm_waitany__set__result(simcall, idx);
     synchro->state = SIMIX_DONE;
@@ -416,7 +413,10 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, d
     });
   }
 
-  xbt_dynar_foreach(synchros, cursor, synchro){
+  unsigned int cursor;
+  simgrid::kernel::activity::ActivityImpl* ptr;
+  xbt_dynar_foreach(synchros, cursor, ptr){
+    smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr);
     /* associate this simcall to the the synchro */
     synchro->simcalls.push_back(simcall);
 
@@ -430,11 +430,13 @@ void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, d
 
 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
 {
-  smx_activity_t synchro;
   unsigned int cursor = 0;
   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
 
-  xbt_dynar_foreach(synchros, cursor, synchro) {
+  simgrid::kernel::activity::ActivityImpl* ptr;
+  xbt_dynar_foreach(synchros, cursor, ptr){
+    smx_activity_t synchro = simgrid::kernel::activity::ActivityImplPtr(ptr);
+
     // Remove the first occurence of simcall:
     auto i = boost::range::find(synchro->simcalls, simcall);
     if (i !=  synchro->simcalls.end())
index a20b4af..6001f75 100644 (file)
@@ -530,7 +530,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta
         count++;
         if (status != MPI_STATUSES_IGNORE)
           status[i] = *pstat;
-        if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT)
+        if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT))
           requests[i] = MPI_REQUEST_NULL;
       }
     } else {
@@ -757,22 +757,23 @@ void Request::wait(MPI_Request * request, MPI_Status * status)
 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
   s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
-  int i;
   int size = 0;
   int index = MPI_UNDEFINED;
-  int *map;
 
   if(count > 0) {
     // Wait for a request to complete
-    xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr);
-    map = xbt_new(int, count);
+    xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
+      intrusive_ptr_release((simgrid::kernel::activity::ActivityImpl*)ptr);
+    });
+    int *map = xbt_new(int, count);
     XBT_DEBUG("Wait for one of %d", count);
-    for(i = 0; i < count; i++) {
+    for(int i = 0; i < count; i++) {
       if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) &&
           not(requests[i]->flags_ & FINISHED)) {
         if (requests[i]->action_ != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          xbt_dynar_push(&comms, &requests[i]->action_);
+          intrusive_ptr_add_ref(requests[i]->action_.get());
+          xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
           map[size] = i;
           size++;
         } else {
@@ -786,8 +787,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
         }
       }
     }
-    if(size > 0) {
-      i = simcall_comm_waitany(&comms, -1);
+    if (size > 0) {
+      XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+      int i = simcall_comm_waitany(&comms, -1);
 
       // not MPI_UNDEFINED, as this is a simix return code
       if (i != -1) {
@@ -822,7 +824,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
   std::vector<MPI_Request> accumulates;
   int index;
   MPI_Status stat;
-  MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
+  MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat);
   int retvalue = MPI_SUCCESS;
   //tag invalid requests in the set
   if (status != MPI_STATUSES_IGNORE) {
@@ -840,7 +842,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
       wait(&requests[c],pstat);
       index = c;
     } else {
-      index = waitany(count, requests, pstat);
+      index = waitany(count, (MPI_Request*)requests, pstat);
       if (index == MPI_UNDEFINED)
         break;