Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use functions from ActorImpl when available.
[simgrid.git] / src / smpi / mpi / smpi_request.cpp
index fd86a4e..43d50fc 100644 (file)
@@ -61,7 +61,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, int src, int
     refcount_ = 1;
   else
     refcount_ = 0;
-  cancelled_ = 0;
   nbc_requests_=nullptr;
   nbc_requests_size_=0;
   init_buffer(count);
@@ -127,8 +126,7 @@ bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request rece
     if (sender->detached_)
       receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in
                                            // the receiver
-    if (req->cancelled_ == 0)
-      req->cancelled_ = -1; // mark as uncancelable
+    req->flags_ |= MPI_REQ_MATCHED; // mark as impossible to cancel anymore
     XBT_DEBUG("match succeeded");
     return true;
   }
@@ -178,28 +176,28 @@ void Request::print_request(const char* message) const
 MPI_Request Request::bsend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     comm->group()->actor(dst)->get_pid(), tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED | MPI_REQ_BSEND);
 }
 
 MPI_Request Request::send_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     comm->group()->actor(dst)->get_pid(), tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
 MPI_Request Request::ssend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     comm->group()->actor(dst)->get_pid(), tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
 MPI_Request Request::isend_init(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                     comm->group()->actor(dst)->get_pid(), tag, comm,
+                     dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
 }
 
@@ -210,11 +208,11 @@ MPI_Request Request::rma_send_init(const void *buf, int count, MPI_Datatype data
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   if(op==MPI_OP_NULL){
     request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          comm->group()->actor(dst)->get_pid(), tag, comm,
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
   }else{
     request      = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          comm->group()->actor(dst)->get_pid(), tag, comm,
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED |
                               MPI_REQ_ACCUMULATE, op);
   }
@@ -223,8 +221,13 @@ MPI_Request Request::rma_send_init(const void *buf, int count, MPI_Datatype data
 
 MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
 {
+  int source = MPI_PROC_NULL;
+  if (src == MPI_ANY_SOURCE)
+    source = MPI_ANY_SOURCE;
+  else if (src != MPI_PROC_NULL)
+    source = comm->group()->actor(src)->get_pid();
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
-                     src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
+                     source,
                      simgrid::s4u::this_actor::get_pid(), tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
 }
@@ -233,13 +236,18 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype,
                                MPI_Op op)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
+  int source = MPI_PROC_NULL;
+  if (src == MPI_ANY_SOURCE)
+    source = MPI_ANY_SOURCE;
+  else if (src != MPI_PROC_NULL)
+    source = comm->group()->actor(src)->get_pid();
   if(op==MPI_OP_NULL){
-    request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          comm->group()->actor(dst)->get_pid(), tag, comm,
+    request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
   }else{
-    request      = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
-                          comm->group()->actor(dst)->get_pid(), tag, comm,
+    request      = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+                          dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                           MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op);
   }
   return request;
@@ -247,9 +255,13 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype,
 
 MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
 {
+  int source = MPI_PROC_NULL;
+  if (src == MPI_ANY_SOURCE)
+    source = MPI_ANY_SOURCE;
+  else if (src != MPI_PROC_NULL)
+    source = comm->group()->actor(src)->get_pid();
   return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
-                     src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
-                     simgrid::s4u::this_actor::get_pid(), tag, comm,
+                     source, simgrid::s4u::this_actor::get_pid(), tag, comm,
                      MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
 }
 
@@ -257,9 +269,10 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
-  request->start();
+  if(dst != MPI_PROC_NULL)
+    request->start();
   return request;
 }
 
@@ -267,9 +280,10 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
-  request->start();
+  if(dst != MPI_PROC_NULL)
+    request->start();
   return request;
 }
 
@@ -277,9 +291,10 @@ MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
-  request->start();
+  if(dst != MPI_PROC_NULL)
+    request->start();
   return request;
 }
 
@@ -287,10 +302,16 @@ MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, i
 MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
+  int source = MPI_PROC_NULL;
+  if (src == MPI_ANY_SOURCE)
+    source = MPI_ANY_SOURCE;
+  else if (src != MPI_PROC_NULL)
+    source = comm->group()->actor(src)->get_pid();
   request             = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
-                        src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
-                        simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
-  request->start();
+                        source, simgrid::s4u::this_actor::get_pid(), tag, comm, 
+                        MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
+  if(src != MPI_PROC_NULL)
+    request->start();
   return request;
 }
 
@@ -306,9 +327,11 @@ void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst,
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, 
+                        tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
 
-  request->start();
+  if(dst != MPI_PROC_NULL)
+   request->start();
   wait(&request, MPI_STATUS_IGNORE);
   request = nullptr;
 }
@@ -317,9 +340,10 @@ void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, i
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
-
-  request->start();
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, 
+                        tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
+  if(dst != MPI_PROC_NULL)
+   request->start();
   wait(&request, MPI_STATUS_IGNORE);
   request = nullptr;
 }
@@ -328,10 +352,11 @@ void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst,
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
-                        comm->group()->actor(dst)->get_pid(), tag, comm,
+                        dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL, tag, comm,
                         MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
 
-  request->start();
+  if(dst != MPI_PROC_NULL)
+   request->start();
   wait(&request,MPI_STATUS_IGNORE);
   request = nullptr;
 }
@@ -340,13 +365,20 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype
                        void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
                        MPI_Comm comm, MPI_Status * status)
 {
+  int source = MPI_PROC_NULL;
+  if (src == MPI_ANY_SOURCE)
+    source = MPI_ANY_SOURCE;
+  else if (src != MPI_PROC_NULL)
+    source = comm->group()->actor(src)->get_pid();
+  int destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst)->get_pid() : MPI_PROC_NULL;
+
   std::array<MPI_Request, 2> requests;
   std::array<MPI_Status, 2> stats;
   int myid = simgrid::s4u::this_actor::get_pid();
-  if ((comm->group()->actor(dst)->get_pid() == myid) && (comm->group()->actor(src)->get_pid() == myid)) {
+  if ((destination == myid) && (source == myid)) {
     Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
     if (status != MPI_STATUS_IGNORE) {
-      status->MPI_SOURCE = src;
+      status->MPI_SOURCE = source;
       status->MPI_TAG    = recvtag;
       status->MPI_ERROR  = MPI_SUCCESS;
       status->count      = sendcount * sendtype->size();
@@ -515,8 +547,9 @@ void Request::start()
     }
 
     size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator)
-    action_   = simcall_comm_isend(
-        simgrid::s4u::Actor::by_pid(src_)->get_impl(), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_, &match_send,
+    action_              = simcall_comm_isend(
+        simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_,
+        &match_send,
         &xbt_free_f, // how to free the userdata if a detached send fails
         process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
         // detach if msg size < eager/rdv switch limit
@@ -540,14 +573,14 @@ void Request::startall(int count, MPI_Request * requests)
     return;
 
   for(int i = 0; i < count; i++) {
-    requests[i]->start();
+    if(requests[i]->src_ != MPI_PROC_NULL && requests[i]->dst_ != MPI_PROC_NULL)
+      requests[i]->start();
   }
 }
 
 void Request::cancel()
 {
-  if(cancelled_!=-1)
-    cancelled_=1;
+  this->flags_ |= MPI_REQ_CANCELLED;
   if (this->action_ != nullptr)
     (boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(this->action_))->cancel();
 }
@@ -581,7 +614,7 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) {
   Status::empty(status);
   *flag = 1;
   if (((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) == 0) {
-    if ((*request)->action_ != nullptr && (*request)->cancelled_ != 1){
+    if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
       try{
         *flag = simcall_comm_test((*request)->action_.get());
       } catch (const Exception&) {
@@ -824,8 +857,7 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
 {
   MPI_Request req = *request;
   Status::empty(status);
-  
-  if (req->cancelled_==1){
+  if((req->flags_ & MPI_REQ_CANCELLED) != 0 && (req->flags_ & MPI_REQ_MATCHED) == 0) {
     if (status!=MPI_STATUS_IGNORE)
       status->cancelled=1;
     if(req->detached_sender_ != nullptr)
@@ -835,11 +867,16 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
   }
 
   if ((req->flags_ & (MPI_REQ_PREPARED | MPI_REQ_GENERALIZED | MPI_REQ_FINISHED)) == 0) {
-    if(status != MPI_STATUS_IGNORE) {
-      int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
-      status->MPI_SOURCE = req->comm_->group()->rank(src);
-      status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
-      status->MPI_ERROR  = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
+    if (status != MPI_STATUS_IGNORE) {
+      if (req->src_== MPI_PROC_NULL || req->dst_== MPI_PROC_NULL){
+        Status::empty(status);
+        status->MPI_SOURCE = MPI_PROC_NULL;
+      } else {
+        int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
+        status->MPI_SOURCE = req->comm_->group()->rank(src);
+        status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
+        status->MPI_ERROR  = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
+      }
       // this handles the case were size in receive differs from size in send
       status->count = req->real_size_;
     }
@@ -910,7 +947,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
       simgrid::smpi::Errhandler::unref(err);
     MC_assert(not MC_is_active()); /* Only fail in MC mode */
   }
-  unref(request);
+  if(req->src_ != MPI_PROC_NULL && req->dst_ != MPI_PROC_NULL)
+    unref(request);
 }
 
 int Request::wait(MPI_Request * request, MPI_Status * status)
@@ -919,6 +957,15 @@ int Request::wait(MPI_Request * request, MPI_Status * status)
   xbt_assert(*request != MPI_REQUEST_NULL);
 
   int ret=MPI_SUCCESS;
+
+  if((*request)->src_ == MPI_PROC_NULL || (*request)->dst_ == MPI_PROC_NULL){
+    if (status != MPI_STATUS_IGNORE) {
+      Status::empty(status);
+      status->MPI_SOURCE = MPI_PROC_NULL;
+    }
+    (*request)=MPI_REQUEST_NULL;
+    return ret;
+  }
   // Are we waiting on a request meant for non blocking collectives ?
   // If so, wait for all the subrequests.
   if ((*request)->nbc_requests_size_>0){