Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
mv ActorImpl where it belongs
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index f0f5184..8826c7c 100644 (file)
@@ -15,7 +15,6 @@
 #include "smpi_op.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/mc/mc_replay.hpp"
-#include "src/simix/ActorImpl.hpp"
 #include "src/smpi/include/smpi_actor.hpp"
 #include "xbt/config.hpp"
 
@@ -30,7 +29,7 @@ static simgrid::config::Flag<double> smpi_test_sleep(
 
 std::vector<s_smpi_factor_t> smpi_ois_values;
 
-extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t);
+extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl*, void*, size_t);
 
 namespace simgrid{
 namespace smpi{
@@ -375,9 +374,9 @@ void Request::start()
 
     int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
 
-    xbt_mutex_t mut = process->mailboxes_mutex();
+    simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_acquire(mut);
+      mut->lock();
 
     if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) {
       mailbox = process->mailbox();
@@ -421,7 +420,7 @@ void Request::start()
     XBT_DEBUG("recv simcall posted");
 
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_release(mut);
+      mut->unlock();
   } else { /* the RECV flag was not set, so this is a send */
     simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
     int rank = src_;
@@ -470,10 +469,10 @@ void Request::start()
 
     int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
 
-    xbt_mutex_t mut=process->mailboxes_mutex();
+    simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
 
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_acquire(mut);
+      mut->lock();
 
     if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
       mailbox = process->mailbox();
@@ -516,12 +515,12 @@ void Request::start()
 
     /* FIXME: detached sends are not traceable (action_ == nullptr) */
     if (action_ != nullptr) {
-      std::string category = TRACE_internal_smpi_get_category();
+      std::string category = smpi_process()->get_tracing_category();
       simgrid::simix::simcall([this, category] { this->action_->set_category(category); });
     }
 
     if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0))
-      xbt_mutex_release(mut);
+      mut->unlock();
   }
 }
 
@@ -603,7 +602,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta
 
 int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status)
 {
-  std::vector<simgrid::kernel::activity::ActivityImplPtr> comms;
+  std::vector<simgrid::kernel::activity::CommImpl*> comms;
   comms.reserve(count);
 
   int i;
@@ -614,7 +613,7 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status *
   std::vector<int> map; /** Maps all matching comms back to their location in requests **/
   for(i = 0; i < count; i++) {
     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
-      comms.push_back(requests[i]->action_);
+      comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
       map.push_back(i);
     }
   }
@@ -842,29 +841,24 @@ void Request::wait(MPI_Request * request, MPI_Status * status)
 
 int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
 {
-  s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs
+  std::vector<simgrid::kernel::activity::CommImpl*> comms;
+  comms.reserve(count);
   int index = MPI_UNDEFINED;
 
   if(count > 0) {
-    int size = 0;
     // Wait for a request to complete
-    xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){
-      intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
-    });
-    int *map = xbt_new(int, count);
+    std::vector<int> map;
     XBT_DEBUG("Wait for one of %d", count);
     for(int i = 0; i < count; i++) {
       if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
           not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
         if (requests[i]->action_ != nullptr) {
           XBT_DEBUG("Waiting any %p ", requests[i]);
-          intrusive_ptr_add_ref(requests[i]->action_.get());
-          xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get());
-          map[size] = i;
-          size++;
+          comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(requests[i]->action_.get()));
+          map.push_back(i);
         } else {
           // This is a finished detached request, let's return this one
-          size  = 0; // so we free the dynar but don't do the waitany call
+          comms.clear(); // so we free don't do the waitany call
           index = i;
           finish_wait(&requests[i], status); // cleanup if refcount = 0
           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
@@ -873,12 +867,12 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
         }
       }
     }
-    if (size > 0) {
-      XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms));
+    if (not comms.empty()) {
+      XBT_DEBUG("Enter waitany for %zu comms", comms.size());
       int i=MPI_UNDEFINED;
       try{
         // this is not a detached send
-        i = simcall_comm_waitany(&comms, -1);
+        i = simcall_comm_waitany(comms.data(), comms.size(), -1);
       }catch (xbt_ex& e) {
       XBT_INFO("request %d cancelled ",i);
         return i;
@@ -896,9 +890,6 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
         }
       }
     }
-
-    xbt_dynar_free_data(&comms);
-    xbt_free(map);
   }
 
   if (index==MPI_UNDEFINED)