Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
smpi: fix issue with message IDs. In case of persistent request reused multiple times...
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index e62c37d..86130de 100644 (file)
@@ -69,7 +69,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a
     refcount_ = 1;
   else
     refcount_ = 0;
-  message_id_ = 0;
   init_buffer(count);
   this->add_f();
 }
@@ -186,10 +185,11 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
   bool match = match_common(req, req, ref);
   if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm())
     return match;
-
-  if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
-                                              ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) {
+  auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
+                                              ref->comm_->group()->rank(req->dst_), req->tag_));
+  if (it != req->message_id_.end()) {
     if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) {
+      req->message_id_.erase(it);
       XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_,
                 ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
                                                         ref->comm_->group()->rank(req->dst_), req->tag_),
@@ -204,12 +204,12 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
     match = false;
     req->flags_ &= ~MPI_REQ_MATCHED;
     ref->detached_sender_ = nullptr;
-    XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, "
+    XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, "
               "from pid %ld to pid %ld, with tag %d",
               ref->comm_,
               ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
                                                       ref->comm_->group()->rank(req->dst_), req->tag_),
-              req->message_id_, req->src_, req->dst_, req->tag_);
+              req->src_, req->dst_, req->tag_);
   }
   return match;
 }
@@ -510,7 +510,8 @@ void Request::start()
                                              process->replaying() ? &smpi_comm_null_copy_buffer_callback
                                                                   : smpi_comm_copy_data_callback,
                                              this,
-                                             -1.0};
+                                             -1.0,
+                                             process->call_location()->get_call_location()};
     observer.set_tag(tag_);
 
     action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
@@ -518,13 +519,13 @@ void Request::start()
 
     XBT_DEBUG("recv simcall posted");
   } else { /* the RECV flag was not set, so this is a send */
-    const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
+    simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
     xbt_assert(process, "Actor pid=%ld is gone??", dst_);
     if (TRACE_smpi_view_internals())
       TRACE_smpi_send(src_, src_, dst_, tag_, size_);
     this->print_request("New send");
 
-    message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+    message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_));
     comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
 
     void* buf = buf_;
@@ -609,7 +610,7 @@ void Request::start()
         &xbt_free_f, // how to free the userdata if a detached send fails
         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
         // detach if msg size < eager/rdv switch limit
-        detached_};
+        detached_, process->call_location()->get_call_location()};
     observer.set_tag(tag_);
     action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                               &observer);
@@ -666,7 +667,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
     if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
       try{
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(),
+                                                    process->call_location()->get_call_location()};
         *flag = kernel::actor::simcall_answered(
             [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer);
       } catch (const Exception&) {
@@ -755,7 +758,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M
     ssize_t i;
     try{
       kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-      kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+      simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+      kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()};
       i = kernel::actor::simcall_answered(
           [&observer] {
             return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
@@ -1071,7 +1075,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
       try{
         // this is not a detached send
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1,
+                                                    process->call_location()->get_call_location()};
         kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); },
                                         &observer);
       } catch (const CancelException&) {
@@ -1141,7 +1147,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
       ssize_t i;
       try{
         kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
-        kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+        simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+        kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1,
+                                                       process->call_location()->get_call_location()};
         i = kernel::actor::simcall_blocking(
             [&observer] {
               kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),