Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use type aid_t for process id. in instr_smpi.
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index 14bc5ce..9bc54ed 100644 (file)
@@ -9,6 +9,8 @@
 #include "private.hpp"
 #include "simgrid/Exception.hpp"
 #include "simgrid/s4u/Exec.hpp"
+#include "simgrid/s4u/Mutex.hpp"
+#include "simgrid/s4u/ConditionVariable.hpp"
 #include "smpi_comm.hpp"
 #include "smpi_datatype.hpp"
 #include "smpi_host.hpp"
@@ -73,31 +75,29 @@ void Request::ref(){
 
 void Request::unref(MPI_Request* request)
 {
-  if((*request) != MPI_REQUEST_NULL){
-    (*request)->refcount_--;
-    if((*request)->refcount_ < 0) {
-      (*request)->print_request("wrong refcount");
-      xbt_die("Whoops, wrong refcount");
-    }
-    if((*request)->refcount_==0){
-      if ((*request)->flags_ & MPI_REQ_GENERALIZED){
-        ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
-      }else{
-        Comm::unref((*request)->comm_);
-        Datatype::unref((*request)->old_type_);
-      }
-      if ((*request)->op_!=MPI_REPLACE && (*request)->op_!=MPI_OP_NULL)
-        Op::unref(&(*request)->op_);
+  xbt_assert(*request != MPI_REQUEST_NULL, "freeing an already free request");
 
-      (*request)->print_request("Destroying");
-      F2C::free_f((*request)->c2f());
-      delete *request;
-      *request = MPI_REQUEST_NULL;
-    }else{
-      (*request)->print_request("Decrementing");
+  (*request)->refcount_--;
+  if ((*request)->refcount_ < 0) {
+    (*request)->print_request("wrong refcount");
+    xbt_die("Whoops, wrong refcount");
+  }
+  if ((*request)->refcount_ == 0) {
+    if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
+      ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
+    } else {
+      Comm::unref((*request)->comm_);
+      Datatype::unref((*request)->old_type_);
     }
-  }else{
-    xbt_die("freeing an already free request");
+    if ((*request)->op_ != MPI_REPLACE && (*request)->op_ != MPI_OP_NULL)
+      Op::unref(&(*request)->op_);
+
+    (*request)->print_request("Destroying");
+    F2C::free_f((*request)->f2c_id());
+    delete *request;
+    *request = MPI_REQUEST_NULL;
+  } else {
+    (*request)->print_request("Decrementing");
   }
 }
 
@@ -119,9 +119,13 @@ bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request rece
       receiver->real_src_ = sender->src_;
     if (receiver->tag_ == MPI_ANY_TAG)
       receiver->real_tag_ = sender->tag_;
-    if (receiver->real_size_ < sender->real_size_ && ((receiver->flags_ & MPI_REQ_PROBE) == 0 )){
-      XBT_DEBUG("Truncating message - should not happen: receiver size : %zu < sender size : %zu", receiver->real_size_, sender->real_size_);
-      receiver->truncated_ = true;
+    if ((receiver->flags_ & MPI_REQ_PROBE) == 0 ){
+      if (receiver->real_size_ < sender->real_size_){
+        XBT_DEBUG("Truncating message - should not happen: receiver size : %zu < sender size : %zu", receiver->real_size_, sender->real_size_);
+        receiver->truncated_ = true;
+      } else if (receiver->real_size_ > sender->real_size_){
+        receiver->real_size_=sender->real_size_;
+      }
     }
     if (sender->detached_)
       receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in
@@ -176,28 +180,28 @@ void Request::print_request(const char* message) const
 MPI_Request Request::bsend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED | MPI_REQ_BSEND);
 }
 
 MPI_Request Request::send_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
 MPI_Request Request::ssend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
 MPI_Request Request::isend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
@@ -207,14 +211,15 @@ MPI_Request Request::rma_send_init(const void *buf, int count, MPI_Datatype data
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   if(op==MPI_OP_NULL){
-    request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+    request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src),
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
   }else{
-    request      = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+    request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src),
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED |
-                              MPI_REQ_ACCUMULATE, op);
+                              MPI_REQ_ACCUMULATE,
+                          op);
   }
   return request;
 }
@@ -225,7 +230,7 @@ MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int
   if (src == MPI_ANY_SOURCE)
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
-    source = comm->group()->actor(src)->get_pid();
+    source = comm->group()->actor(src);
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
                      source,
                      simgrid::s4u::this_actor::get_pid(), tag, comm,
@@ -240,15 +245,16 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype,
   if (src == MPI_ANY_SOURCE)
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
-    source = comm->group()->actor(src)->get_pid();
+    source = comm->group()->actor(src);
   if(op==MPI_OP_NULL){
     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
-                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
   }else{
-    request      = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
-                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
-                          MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op);
+    request =
+        new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+                    dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                    MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op);
   }
   return request;
 }
@@ -259,7 +265,7 @@ MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int
   if (src == MPI_ANY_SOURCE)
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
-    source = comm->group()->actor(src)->get_pid();
+    source = comm->group()->actor(src);
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
                      source, simgrid::s4u::this_actor::get_pid(), tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
@@ -269,7 +275,7 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
   if(dst != MPI_PROC_NULL)
     request->start();
@@ -280,7 +286,7 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
     request->start();
@@ -291,7 +297,7 @@ MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
     request->start();
@@ -306,7 +312,7 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src,
   if (src == MPI_ANY_SOURCE)
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
-    source = comm->group()->actor(src)->get_pid();
+    source = comm->group()->actor(src);
   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
                         source, simgrid::s4u::this_actor::get_pid(), tag, comm, 
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
@@ -315,20 +321,21 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src,
   return request;
 }
 
-void Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status)
+int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = irecv(buf, count, datatype, src, tag, comm);
-  wait(&request,status);
+  int retval = wait(&request,status);
   request = nullptr;
+  return retval;
 }
 
 void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, 
-                        tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                        MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
 
   if(dst != MPI_PROC_NULL)
    request->start();
@@ -340,8 +347,8 @@ void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, 
-                        tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+                        MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
   if(dst != MPI_PROC_NULL)
    request->start();
   wait(&request, MPI_STATUS_IGNORE);
@@ -352,7 +359,7 @@ void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst,
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
 
   if(dst != MPI_PROC_NULL)
@@ -369,8 +376,8 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype
   if (src == MPI_ANY_SOURCE)
     source = MPI_ANY_SOURCE;
   else if (src != MPI_PROC_NULL)
-    source = comm->group()->actor(src)->get_pid();
-  int destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL;
+    source = comm->group()->actor(src);
+  int destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL;
 
   std::array<MPI_Request, 2> requests;
   std::array<MPI_Status, 2> stats;
@@ -466,10 +473,8 @@ void Request::start()
   } else { /* the RECV flag was not set, so this is a send */
     const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
     xbt_assert(process, "Actor pid=%d is gone??", dst_);
-    int rank = src_;
-    if (TRACE_smpi_view_internals()) {
-      TRACE_smpi_send(rank, rank, dst_, tag_, size_);
-    }
+    if (TRACE_smpi_view_internals())
+      TRACE_smpi_send(src_, src_, dst_, tag_, size_);
     this->print_request("New send");
 
     void* buf = buf_;
@@ -798,9 +803,9 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   static int nsleeps = 1;
   double speed        = s4u::this_actor::get_host()->get_speed();
   double maxrate      = smpi_cfg_iprobe_cpu_usage();
-  auto request        = new Request(nullptr, 0, MPI_CHAR,
-                             source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source)->get_pid(),
-                             simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
+  auto request =
+      new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source),
+                  simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
   if (smpi_iprobe_sleep > 0) {
     /** Compute the number of flops we will sleep **/
     s4u::this_actor::exec_init(/*nsleeps: See comment above */ nsleeps *
@@ -914,8 +919,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
   }
 
   if (TRACE_smpi_view_internals() && ((req->flags_ & MPI_REQ_RECV) != 0)) {
-    int rank       = simgrid::s4u::this_actor::get_pid();
-    int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
+    aid_t rank       = simgrid::s4u::this_actor::get_pid();
+    aid_t src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
     TRACE_smpi_recv(src_traced, rank,req->tag_);
   }
   if(req->detached_sender_ != nullptr){
@@ -1025,6 +1030,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
     ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus);
   }
 
+  if ((*request)->truncated_)
+    ret = MPI_ERR_TRUNCATE;
+
   finish_wait(request, status); // may invalidate *request
   if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
     *request = MPI_REQUEST_NULL;