Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add support for MPI Errhandlers in Comm, File, Win.
[simgrid.git] / src / smpi / mpi / smpi_win.cpp
index c0abd4d..6ba1b1c 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -16,7 +16,6 @@
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)");
 
-using simgrid::s4u::Actor;
 
 namespace simgrid{
 namespace smpi{
@@ -33,24 +32,24 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm,
   opened_                = 0;
   group_                 = MPI_GROUP_NULL;
   requests_              = new std::vector<MPI_Request>();
-  mut_                   = xbt_mutex_init();
-  lock_mut_              = xbt_mutex_init();
-  atomic_mut_            = xbt_mutex_init();
+  mut_                   = s4u::Mutex::create();
+  lock_mut_              = s4u::Mutex::create();
+  atomic_mut_            = s4u::Mutex::create();
   connected_wins_        = new MPI_Win[comm_size];
   connected_wins_[rank_] = this;
   count_                 = 0;
   if(rank_==0){
-    bar_ = new simgrid::s4u::Barrier(comm_size);
+    bar_ = new s4u::Barrier(comm_size);
   }
   mode_=0;
-
+  errhandler_=MPI_ERRORS_RETURN;
   comm->add_rma_win(this);
   comm->ref();
 
   Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win),
                          MPI_BYTE, comm);
 
-  Colls::bcast(&(bar_), sizeof(simgrid::s4u::Barrier*), MPI_BYTE, 0, comm);
+  Colls::bcast(&(bar_), sizeof(s4u::Barrier*), MPI_BYTE, 0, comm);
 
   Colls::barrier(comm);
 }
@@ -78,9 +77,6 @@ Win::~Win(){
   
   if (rank_ == 0)
     delete bar_;
-  xbt_mutex_destroy(mut_);
-  xbt_mutex_destroy(lock_mut_);
-  xbt_mutex_destroy(atomic_mut_);
 
   if(allocated_ !=0)
     xbt_free(base_);
@@ -88,7 +84,8 @@ Win::~Win(){
   cleanup_attr<Win>();
 }
 
-int Win::attach (void *base, MPI_Aint size){
+int Win::attach(void* /*base*/, MPI_Aint size)
+{
   if (not(base_ == MPI_BOTTOM || base_ == 0))
     return MPI_ERR_ARG;
   base_=0;//actually the address will be given in the RMA calls, as being the disp.
@@ -96,7 +93,8 @@ int Win::attach (void *base, MPI_Aint size){
   return MPI_SUCCESS;
 }
 
-int Win::detach (void *base){
+int Win::detach(const void* /*base*/)
+{
   base_=MPI_BOTTOM;
   size_=-1;
   return MPI_SUCCESS;
@@ -153,7 +151,7 @@ void Win::set_info(MPI_Info info){
   info_=info;
 }
 
-void Win::set_name(char* name){
+void Win::set_name(const char* name){
   name_ = xbt_strdup(name);
 }
 
@@ -165,7 +163,7 @@ int Win::fence(int assert)
   if (assert != MPI_MODE_NOPRECEDE) {
     // This is not the first fence => finalize what came before
     bar_->wait();
-    xbt_mutex_acquire(mut_);
+    mut_->lock();
     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
     // Without this, the vector could get redimensionned when another process pushes.
     // This would result in the array used by Request::waitall() to be invalidated.
@@ -178,7 +176,7 @@ int Win::fence(int assert)
       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
     }
     count_=0;
-    xbt_mutex_release(mut_);
+    mut_->unlock();
   }
 
   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
@@ -191,7 +189,7 @@ int Win::fence(int assert)
   return MPI_SUCCESS;
 }
 
-int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+int Win::put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request)
 {
   //get receiver pointer
@@ -231,16 +229,16 @@ int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     if(request!=nullptr){
       *request=sreq;
     }else{
-      xbt_mutex_acquire(mut_);
+      mut_->lock();
       requests_->push_back(sreq);
-      xbt_mutex_release(mut_);
+      mut_->unlock();
     }
 
     //push request to receiver's win
-    xbt_mutex_acquire(recv_win->mut_);
+    recv_win->mut_->lock();
     recv_win->requests_->push_back(rreq);
     rreq->start();
-    xbt_mutex_release(recv_win->mut_);
+    recv_win->mut_->unlock();
 
   }else{
     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
@@ -288,9 +286,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     //start the send, with another process than us as sender.
     sreq->start();
     //push request to receiver's win
-    xbt_mutex_acquire(send_win->mut_);
+    send_win->mut_->lock();
     send_win->requests_->push_back(sreq);
-    xbt_mutex_release(send_win->mut_);
+    send_win->mut_->unlock();
 
     //start recv
     rreq->start();
@@ -298,9 +296,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     if(request!=nullptr){
       *request=rreq;
     }else{
-      xbt_mutex_acquire(mut_);
+      mut_->lock();
       requests_->push_back(rreq);
-      xbt_mutex_release(mut_);
+      mut_->unlock();
     }
 
   }else{
@@ -313,7 +311,7 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
 }
 
 
-int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
+int Win::accumulate(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,
               MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request)
 {
   XBT_DEBUG("Entering MPI_Win_Accumulate");
@@ -351,27 +349,27 @@ int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da
   // start send
   sreq->start();
   // push request to receiver's win
-  xbt_mutex_acquire(recv_win->mut_);
+  recv_win->mut_->lock();
   recv_win->requests_->push_back(rreq);
   rreq->start();
-  xbt_mutex_release(recv_win->mut_);
+  recv_win->mut_->unlock();
 
   if (request != nullptr) {
     *request = sreq;
   } else {
-    xbt_mutex_acquire(mut_);
+    mut_->lock();
     requests_->push_back(sreq);
-    xbt_mutex_release(mut_);
+    mut_->unlock();
   }
 
   XBT_DEBUG("Leaving MPI_Win_Accumulate");
   return MPI_SUCCESS;
 }
 
-int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr,
-              int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count,
-              MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){
-
+int Win::get_accumulate(const void* origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr,
+                        int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
+                        int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request*)
+{
   //get sender pointer
   MPI_Win send_win = connected_wins_[target_rank];
 
@@ -391,7 +389,7 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
   //need to be sure ops are correctly ordered, so finish request here ? slow.
   MPI_Request req;
-  xbt_mutex_acquire(send_win->atomic_mut_);
+  send_win->atomic_mut_->lock();
   get(result_addr, result_count, result_datatype, target_rank,
               target_disp, target_count, target_datatype, &req);
   if (req != MPI_REQUEST_NULL)
@@ -401,12 +399,12 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
               target_disp, target_count, target_datatype, op, &req);
   if (req != MPI_REQUEST_NULL)
     Request::wait(&req, MPI_STATUS_IGNORE);
-  xbt_mutex_release(send_win->atomic_mut_);
+  send_win->atomic_mut_->unlock();
   return MPI_SUCCESS;
 
 }
 
-int Win::compare_and_swap(void *origin_addr, void *compare_addr,
+int Win::compare_and_swap(const void *origin_addr, void *compare_addr,
         void *result_addr, MPI_Datatype datatype, int target_rank,
         MPI_Aint target_disp){
   //get sender pointer
@@ -424,7 +422,7 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr,
 
   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
   MPI_Request req = MPI_REQUEST_NULL;
-  xbt_mutex_acquire(send_win->atomic_mut_);
+  send_win->atomic_mut_->lock();
   get(result_addr, 1, datatype, target_rank,
               target_disp, 1, datatype, &req);
   if (req != MPI_REQUEST_NULL)
@@ -433,11 +431,12 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr,
     put(origin_addr, 1, datatype, target_rank,
               target_disp, 1, datatype);
   }
-  xbt_mutex_release(send_win->atomic_mut_);
+  send_win->atomic_mut_->unlock();
   return MPI_SUCCESS;
 }
 
-int Win::start(MPI_Group group, int assert){
+int Win::start(MPI_Group group, int /*assert*/)
+{
   /* From MPI forum advices
   The call to MPI_WIN_COMPLETE does not return until the put call has completed at the origin; and the target window
   will be accessed by the put operation only after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by
@@ -479,7 +478,8 @@ int Win::start(MPI_Group group, int assert){
   return MPI_SUCCESS;
 }
 
-int Win::post(MPI_Group group, int assert){
+int Win::post(MPI_Group group, int /*assert*/)
+{
   //let's make a synchronous send here
   int i             = 0;
   int j             = 0;
@@ -578,14 +578,15 @@ int Win::wait(){
   return MPI_SUCCESS;
 }
 
-int Win::lock(int lock_type, int rank, int assert){
+int Win::lock(int lock_type, int rank, int /*assert*/)
+{
   MPI_Win target_win = connected_wins_[rank];
 
   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
-    xbt_mutex_acquire(target_win->lock_mut_);
+    target_win->lock_mut_->lock();
     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
-      xbt_mutex_release(target_win->lock_mut_);
+      target_win->lock_mut_->unlock();
    }
   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
@@ -616,7 +617,7 @@ int Win::unlock(int rank){
   target_win->mode_= 0;
   target_win->lockers_.remove(comm_->rank());
   if (target_mode==MPI_LOCK_EXCLUSIVE){
-    xbt_mutex_release(target_win->lock_mut_);
+    target_win->lock_mut_->unlock();
   }
 
   int finished = finish_comms(rank);
@@ -673,7 +674,7 @@ Win* Win::f2c(int id){
 }
 
 int Win::finish_comms(){
-  xbt_mutex_acquire(mut_);
+  mut_->lock();
   //Finish own requests
   std::vector<MPI_Request> *reqqs = requests_;
   int size = static_cast<int>(reqqs->size());
@@ -682,12 +683,12 @@ int Win::finish_comms(){
     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
     reqqs->clear();
   }
-  xbt_mutex_release(mut_);
+  mut_->unlock();
   return size;
 }
 
 int Win::finish_comms(int rank){
-  xbt_mutex_acquire(mut_);
+  mut_->lock();
   //Finish own requests
   std::vector<MPI_Request> *reqqs = requests_;
   int size = static_cast<int>(reqqs->size());
@@ -715,7 +716,7 @@ int Win::finish_comms(int rank){
       myreqqs.clear();
     }
   }
-  xbt_mutex_release(mut_);
+  mut_->unlock();
   return size;
 }
 
@@ -736,5 +737,16 @@ int Win::shared_query(int rank, MPI_Aint* size, int* disp_unit, void* baseptr)
   }
   return MPI_SUCCESS;
 }
+
+MPI_Errhandler Win::errhandler(){
+  return errhandler_;
+}
+
+void Win::set_errhandler(MPI_Errhandler errhandler){
+  errhandler_=errhandler;
+  if(errhandler_!= MPI_ERRHANDLER_NULL)
+    errhandler->ref();
+}
+
 }
 }