Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
smpi: fix issue with message IDs. In case of persistent request reused multiple times...
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index 6b58173..86130de 100644 (file)
@@ -69,7 +69,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a
     refcount_ = 1;
   else
     refcount_ = 0;
-  message_id_ = 0;
   init_buffer(count);
   this->add_f();
 }
@@ -181,15 +180,16 @@ void Request::init_buffer(int count){
 
 bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
 {
-  auto ref = static_cast<MPI_Request>(a);
-  auto req = static_cast<MPI_Request>(b);
+  auto* ref  = static_cast<MPI_Request>(a);
+  auto* req  = static_cast<MPI_Request>(b);
   bool match = match_common(req, req, ref);
   if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm())
     return match;
-
-  if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
-                                              ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) {
+  auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
+                                              ref->comm_->group()->rank(req->dst_), req->tag_));
+  if (it != req->message_id_.end()) {
     if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) {
+      req->message_id_.erase(it);
       XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_,
                 ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
                                                         ref->comm_->group()->rank(req->dst_), req->tag_),
@@ -204,20 +204,20 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
     match = false;
     req->flags_ &= ~MPI_REQ_MATCHED;
     ref->detached_sender_ = nullptr;
-    XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, "
+    XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, "
               "from pid %ld to pid %ld, with tag %d",
               ref->comm_,
               ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
                                                       ref->comm_->group()->rank(req->dst_), req->tag_),
-              req->message_id_, req->src_, req->dst_, req->tag_);
+              req->src_, req->dst_, req->tag_);
   }
   return match;
 }
 
 bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
 {
-  auto ref = static_cast<MPI_Request>(a);
-  auto req = static_cast<MPI_Request>(b);
+  auto* ref = static_cast<MPI_Request>(a);
+  auto* req = static_cast<MPI_Request>(b);
   return match_common(req, ref, req);
 }
 
@@ -323,9 +323,9 @@ MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int
 
 MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
   if(dst != MPI_PROC_NULL)
     request->start();
   return request;
@@ -333,9 +333,9 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i
 
 MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
     request->start();
   return request;
@@ -343,9 +343,9 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in
 
 MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
     request->start();
   return request;
@@ -358,8 +358,8 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src,
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
     source = comm->group()->actor(src);
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
-                             simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+                              simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
   if(src != MPI_PROC_NULL)
     request->start();
   return request;
@@ -374,9 +374,9 @@ int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag,
 
 void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
 
   if(dst != MPI_PROC_NULL)
    request->start();
@@ -385,9 +385,9 @@ void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst,
 
 void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
    request->start();
   wait(&request, MPI_STATUS_IGNORE);
@@ -395,9 +395,9 @@ void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, i
 
 void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                             dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
-                             MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
+  auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+                              dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                              MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
 
   if(dst != MPI_PROC_NULL)
    request->start();
@@ -510,7 +510,8 @@ void Request::start()
                                              process->replaying() ? &smpi_comm_null_copy_buffer_callback
                                                                   : smpi_comm_copy_data_callback,
                                              this,
-                                             -1.0};
+                                             -1.0,
+                                             process->call_location()->get_call_location()};
     observer.set_tag(tag_);
 
     action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
@@ -518,13 +519,13 @@ void Request::start()
 
     XBT_DEBUG("recv simcall posted");
   } else { /* the RECV flag was not set, so this is a send */
-    const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
+    simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
     xbt_assert(process, "Actor pid=%ld is gone??", dst_);
     if (TRACE_smpi_view_internals())
       TRACE_smpi_send(src_, src_, dst_, tag_, size_);
     this->print_request("New send");
 
-    message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+    message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_));
     comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
 
     void* buf = buf_;
@@ -609,7 +610,7 @@ void Request::start()
         &xbt_free_f, // how to free the userdata if a detached send fails
         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
         // detach if msg size < eager/rdv switch limit
-        detached_};
+        detached_, process->call_location()->get_call_location()};
     observer.set_tag(tag_);
     action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                               &observer);
@@ -666,7 +667,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
     if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
       try{
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(),
+                                                    process->call_location()->get_call_location()};
         *flag = kernel::actor::simcall_answered(
             [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer);
       } catch (const Exception&) {
@@ -755,7 +758,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M
     ssize_t i;
     try{
       kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-      kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+      simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+      kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()};
       i = kernel::actor::simcall_answered(
           [&observer] {
             return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
@@ -857,7 +861,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   static int nsleeps = 1;
   double speed        = s4u::this_actor::get_host()->get_speed();
   double maxrate      = smpi_cfg_iprobe_cpu_usage();
-  auto request =
+  auto* request =
       new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source),
                   simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
   if (smpi_iprobe_sleep > 0) {
@@ -1071,7 +1075,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
       try{
         // this is not a detached send
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1,
+                                                    process->call_location()->get_call_location()};
         kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); },
                                         &observer);
       } catch (const CancelException&) {
@@ -1141,7 +1147,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
       ssize_t i;
       try{
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1,
+                                                       process->call_location()->get_call_location()};
         i = kernel::actor::simcall_blocking(
             [&observer] {
               kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),