Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Yet another dlopen merge
[simgrid.git] / src / smpi / smpi_request.cpp
index d868863..2ff10a2 100644 (file)
@@ -1,5 +1,4 @@
-/* Copyright (c) 2007-2015. The SimGrid Team.
- * All rights reserved.                                                     */
+/* Copyright (c) 2007-2017. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -109,12 +108,16 @@ namespace smpi{
 Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) 
 {
   void *old_buf = nullptr;
-  if((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)){
+  if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(buf_))){
     // This part handles the problem of non-contiguous memory
     old_buf = buf;
-    buf_ = count==0 ? nullptr : xbt_malloc(count*datatype->size());
-    if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
-      datatype->serialize(old_buf, buf_, count);
+    if (count==0){
+      buf_ = nullptr;
+    }else {
+      buf_ = xbt_malloc(count*datatype->size());
+      if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
+        datatype->serialize(old_buf, buf_, count);
+      }
     }
   }
   // This part handles the problem of non-contiguous memory (for the unserialisation at the reception)
@@ -244,19 +247,19 @@ void Request::print_request(const char *message)
 MPI_Request Request::send_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
 
-  return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                           comm->group()->index(dst), tag, comm, PERSISTENT | SEND | PREPARED);
 }
 
 MPI_Request Request::ssend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                         comm->group()->index(dst), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
 }
 
 MPI_Request Request::isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
-  return new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process_index(),
+  return new Request(buf==MPI_BOTTOM ? nullptr : buf , count, datatype, smpi_process()->index(),
                           comm->group()->index(dst), tag,comm, PERSISTENT | ISEND | SEND | PREPARED);
 }
 
@@ -280,7 +283,7 @@ MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int
 {
   return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype,
                           src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(src),
-                          smpi_process_index(), tag, comm, PERSISTENT | RECV | PREPARED);
+                          smpi_process()->index(), tag, comm, PERSISTENT | RECV | PREPARED);
 }
 
 MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
@@ -301,14 +304,14 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype,
 MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
 {
   return new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
-                          comm->group()->index(src), smpi_process_index(), tag,
+                          comm->group()->index(src), smpi_process()->index(), tag,
                           comm, PERSISTENT | RECV | PREPARED);
 }
 
 MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
-  request =  new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  request =  new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                            comm->group()->index(dst), tag, comm, NON_PERSISTENT | ISEND | SEND);
   request->start();
   return request;
@@ -317,7 +320,7 @@ MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst,
 MPI_Request Request::issend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
-  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                         comm->group()->index(dst), tag,comm, NON_PERSISTENT | ISEND | SSEND | SEND);
   request->start();
   return request;
@@ -328,7 +331,7 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src,
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
   request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE :
-                          comm->group()->index(src), smpi_process_index(), tag, comm,
+                          comm->group()->index(src), smpi_process()->index(), tag, comm,
                           NON_PERSISTENT | RECV);
   request->start();
   return request;
@@ -345,7 +348,7 @@ void Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag
 void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
-  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                           comm->group()->index(dst), tag, comm, NON_PERSISTENT | SEND);
 
   request->start();
@@ -356,7 +359,7 @@ void Request::send(void *buf, int count, MPI_Datatype datatype, int dst, int tag
 void Request::ssend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
 {
   MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
-  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process_index(),
+  request = new Request(buf==MPI_BOTTOM ? nullptr : buf, count, datatype, smpi_process()->index(),
                           comm->group()->index(dst), tag, comm, NON_PERSISTENT | SSEND | SEND);
 
   request->start();
@@ -370,7 +373,7 @@ void Request::sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,int d
 {
   MPI_Request requests[2];
   MPI_Status stats[2];
-  int myid=smpi_process_index();
+  int myid=smpi_process()->index();
   if ((comm->group()->index(dst) == myid) && (comm->group()->index(src) == myid)){
       Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
       return;
@@ -399,42 +402,44 @@ void Request::start()
   if ((flags_ & RECV) != 0) {
     this->print_request("New recv");
 
+    simgrid::smpi::Process* process = smpi_process_remote(dst_);
+
     int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
 
-    xbt_mutex_t mut = smpi_process_mailboxes_mutex();
+    xbt_mutex_t mut = process->mailboxes_mutex();
     if (async_small_thresh != 0 || (flags_ & RMA) != 0)
       xbt_mutex_acquire(mut);
 
     if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) {
-      mailbox = smpi_process_mailbox();
+      mailbox = process->mailbox();
     } 
     else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
       //We have to check both mailboxes (because SSEND messages are sent to the large mbox).
       //begin with the more appropriate one : the small one.
-      mailbox = smpi_process_mailbox_small();
+      mailbox = process->mailbox_small();
       XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
       smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv,
                                                   static_cast<void*>(this));
 
       if (action == nullptr) {
-        mailbox = smpi_process_mailbox();
+        mailbox = process->mailbox();
         XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
         action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast<void*>(this));
         if (action == nullptr) {
           XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
-          mailbox = smpi_process_mailbox_small();
+          mailbox = process->mailbox_small();
         }
       } else {
         XBT_DEBUG("yes there was something for us in the large mailbox");
       }
     } else {
-      mailbox = smpi_process_mailbox_small();
+      mailbox = process->mailbox_small();
       XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
       smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast<void*>(this));
 
       if (action == nullptr) {
         XBT_DEBUG("No, nothing in the permanent receive mailbox");
-        mailbox = smpi_process_mailbox();
+        mailbox = process->mailbox();
       } else {
         XBT_DEBUG("yes there was something for us in the small mailbox");
       }
@@ -442,19 +447,18 @@ void Request::start()
 
     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
     real_size_=size_;
-    action_ = simcall_comm_irecv(SIMIX_process_self(), mailbox, buf_, &real_size_, &match_recv,
-                                         ! smpi_process_get_replaying()? smpi_comm_copy_data_callback
+    action_ = simcall_comm_irecv(process->process(), mailbox, buf_, &real_size_, &match_recv,
+                                         ! process->replaying()? smpi_comm_copy_data_callback
                                          : &smpi_comm_null_copy_buffer_callback, this, -1.0);
     XBT_DEBUG("recv simcall posted");
 
     if (async_small_thresh != 0 || (flags_ & RMA) != 0 )
       xbt_mutex_release(mut);
   } else { /* the RECV flag was not set, so this is a send */
-    int receiver = dst_;
-
+    simgrid::smpi::Process* process = smpi_process_remote(dst_);
     int rank = src_;
     if (TRACE_smpi_view_internals()) {
-      TRACE_smpi_send(rank, rank, receiver, tag_, size_);
+      TRACE_smpi_send(rank, rank, dst_, tag_, size_);
     }
     this->print_request("New send");
 
@@ -467,8 +471,8 @@ void Request::start()
       refcount_++;
       if(!(old_type_->flags() & DT_FLAG_DERIVED)){
         oldbuf = buf_;
-        if (!smpi_process_get_replaying() && oldbuf != nullptr && size_!=0){
-          if((smpi_privatize_global_variables != 0)
+        if (!process->replaying() && oldbuf != nullptr && size_!=0){
+          if((smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP)
             && (static_cast<char*>(buf_) >= smpi_start_data_exe)
             && (static_cast<char*>(buf_) < smpi_start_data_exe + smpi_size_data_exe )){
             XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
@@ -495,36 +499,36 @@ void Request::start()
 
     int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh");
 
-    xbt_mutex_t mut=smpi_process_remote_mailboxes_mutex(receiver);
+    xbt_mutex_t mut=process->mailboxes_mutex();
 
     if (async_small_thresh != 0 || (flags_ & RMA) != 0)
       xbt_mutex_acquire(mut);
 
     if (!(async_small_thresh != 0 || (flags_ & RMA) !=0)) {
-      mailbox = smpi_process_remote_mailbox(receiver);
+      mailbox = process->mailbox();
     } else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
-      mailbox = smpi_process_remote_mailbox(receiver);
+      mailbox = process->mailbox();
       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
       smx_activity_t action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send,
                                                   static_cast<void*>(this));
       if (action == nullptr) {
         if ((flags_ & SSEND) == 0){
-          mailbox = smpi_process_remote_mailbox_small(receiver);
+          mailbox = process->mailbox_small();
           XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
         } else {
-          mailbox = smpi_process_remote_mailbox_small(receiver);
+          mailbox = process->mailbox_small();
           XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
           action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast<void*>(this));
           if (action == nullptr) {
             XBT_DEBUG("No, we are first, send to large mailbox");
-            mailbox = smpi_process_remote_mailbox(receiver);
+            mailbox = process->mailbox();
           }
         }
       } else {
         XBT_DEBUG("Yes there was something for us in the large mailbox");
       }
     } else {
-      mailbox = smpi_process_remote_mailbox(receiver);
+      mailbox = process->mailbox();
       XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_);
     }
 
@@ -533,7 +537,7 @@ void Request::start()
     action_ = simcall_comm_isend(SIMIX_process_from_PID(src_+1), mailbox, size_, -1.0,
                                          buf, real_size_, &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         !smpi_process_get_replaying() ? smpi_comm_copy_data_callback
+                         !process->replaying() ? smpi_comm_copy_data_callback
                          : &smpi_comm_null_copy_buffer_callback, this,
                          // detach if msg size < eager/rdv switch limit
                          detached_);
@@ -567,7 +571,7 @@ int Request::test(MPI_Request * request, MPI_Status * status) {
   if(smpi_test_sleep > 0)  
     simcall_process_sleep(nsleeps*smpi_test_sleep);
 
-  smpi_empty_status(status);
+  Status::empty(status);
   int flag = 1;
   if (((*request)->flags_ & PREPARED) == 0) {
     if ((*request)->action_ != nullptr)
@@ -650,7 +654,7 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status *
   } else {
       //all requests are null or inactive, return true
       flag = 1;
-      smpi_empty_status(status);
+      Status::empty(status);
   }
 
   return flag;
@@ -669,7 +673,7 @@ int Request::testall(int count, MPI_Request requests[], MPI_Status status[])
           requests[i]=MPI_REQUEST_NULL;
       }
     }else{
-      smpi_empty_status(pstat);
+      Status::empty(pstat);
     }
     if(status != MPI_STATUSES_IGNORE) {
       status[i] = *pstat;
@@ -708,14 +712,14 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
   request->print_request("New iprobe");
   // We have to test both mailboxes as we don't know if we will receive one one or another
   if (xbt_cfg_get_int("smpi/async-small-thresh") > 0){
-      mailbox = smpi_process_mailbox_small();
+      mailbox = smpi_process()->mailbox_small();
       XBT_DEBUG("Trying to probe the perm recv mailbox");
       request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_, request->tag_, &match_recv,
                                             static_cast<void*>(request));
   }
 
   if (request->action_ == nullptr){
-    mailbox = smpi_process_mailbox();
+    mailbox = smpi_process()->mailbox();
     XBT_DEBUG("trying to probe the other mailbox");
     request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_,request->tag_, &match_recv,
                                           static_cast<void*>(request));
@@ -744,7 +748,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status*
 void Request::finish_wait(MPI_Request* request, MPI_Status * status)
 {
   MPI_Request req = *request;
-  smpi_empty_status(status);
+  Status::empty(status);
 
   if(!((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)){
     if(status != MPI_STATUS_IGNORE) {
@@ -759,12 +763,13 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
     req->print_request("Finishing");
     MPI_Datatype datatype = req->old_type_;
 
-    if(((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED)){
-      if (!smpi_process_get_replaying()){
-        if( smpi_privatize_global_variables != 0 && (static_cast<char*>(req->old_buf_) >= smpi_start_data_exe)
+    if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED)) && (!smpi_is_shared(req->old_buf_))){
+
+      if (!smpi_process()->replaying()){
+        if( smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP && (static_cast<char*>(req->old_buf_) >= smpi_start_data_exe)
             && ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){
             XBT_VERB("Privatization : We are unserializing to a zone in global memory  Switch data segment ");
-            smpi_switch_data_segment(smpi_process_index());
+            smpi_switch_data_segment(smpi_process()->index());
         }
       }
 
@@ -774,15 +779,17 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status)
           datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_);
         xbt_free(req->buf_);
       }else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate
-          int n =req->real_size_/datatype->size();
-          req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
+          if(datatype->size()!=0){
+            int n =req->real_size_/datatype->size();
+            req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
+          }
           xbt_free(req->buf_);
       }
     }
   }
 
   if (TRACE_smpi_view_internals() && ((req->flags_ & RECV) != 0)){
-    int rank = smpi_process_index();
+    int rank = smpi_process()->index();
     int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
     TRACE_smpi_recv(rank, src_traced, rank,req->tag_);
   }
@@ -805,7 +812,7 @@ void Request::wait(MPI_Request * request, MPI_Status * status)
 {
   (*request)->print_request("Waiting");
   if ((*request)->flags_ & PREPARED) {
-    smpi_empty_status(status);
+    Status::empty(status);
     return;
   }
 
@@ -861,8 +868,6 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
           finish_wait(&requests[index],status);
           if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT))
             requests[index] = MPI_REQUEST_NULL;
-        }else{
-            XBT_WARN("huu?");
         }
       }
     }
@@ -872,14 +877,14 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status)
   }
 
   if (index==MPI_UNDEFINED)
-    smpi_empty_status(status);
+    Status::empty(status);
 
   return index;
 }
 
 static int sort_accumulates(MPI_Request a, MPI_Request b)
 {
-  return (a->tag() < b->tag());
+  return (a->tag() > b->tag());
 }
 
 int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
@@ -893,9 +898,9 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[])
   if (status != MPI_STATUSES_IGNORE) {
     for (int c = 0; c < count; c++) {
       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL || (requests[c]->flags_ & PREPARED)) {
-        smpi_empty_status(&status[c]);
+        Status::empty(&status[c]);
       } else if (requests[c]->src_ == MPI_PROC_NULL) {
-        smpi_empty_status(&status[c]);
+        Status::empty(&status[c]);
         status[c].MPI_SOURCE = MPI_PROC_NULL;
       }
     }