Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not use xbt_mutex_t in SMPI
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 22 Feb 2019 20:32:54 +0000 (21:32 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Fri, 22 Feb 2019 20:32:54 +0000 (21:32 +0100)
include/simgrid/s4u/Mutex.hpp
src/s4u/s4u_Mutex.cpp
src/smpi/include/smpi_actor.hpp
src/smpi/include/smpi_win.hpp
src/smpi/internals/smpi_actor.cpp
src/smpi/mpi/smpi_request.cpp
src/smpi/mpi/smpi_win.cpp

index 0689f52..06889d8 100644 (file)
@@ -32,7 +32,7 @@ class XBT_PUBLIC Mutex {
 
   simgrid::kernel::activity::MutexImpl* const pimpl_;
   explicit Mutex(simgrid::kernel::activity::MutexImpl* mutex) : pimpl_(mutex) {}
-
+  ~Mutex();
   /* refcounting */
   friend XBT_PUBLIC void intrusive_ptr_add_ref(Mutex* mutex);
   friend XBT_PUBLIC void intrusive_ptr_release(Mutex* mutex);
@@ -46,7 +46,6 @@ public:
 
   /** Constructs a new mutex */
   static MutexPtr create();
-
   void lock();
   void unlock();
   bool try_lock();
index b46c298..88a271b 100644 (file)
@@ -9,6 +9,10 @@
 namespace simgrid {
 namespace s4u {
 
+Mutex::~Mutex()
+{
+  SIMIX_mutex_unref(pimpl_);
+}
 /** @brief Blocks the calling actor until the mutex can be obtained */
 void Mutex::lock()
 {
index b8d1fcc..7389c39 100644 (file)
@@ -17,9 +17,9 @@ namespace smpi {
 class ActorExt {
 private:
   double simulated_ = 0 /* Used to time with simulated_start/elapsed */;
-  simgrid::s4u::MailboxPtr mailbox_;
-  simgrid::s4u::MailboxPtr mailbox_small_;
-  xbt_mutex_t mailboxes_mutex_;
+  s4u::MailboxPtr mailbox_;
+  s4u::MailboxPtr mailbox_small_;
+  s4u::MutexPtr mailboxes_mutex_;
   xbt_os_timer_t timer_;
   MPI_Comm comm_self_   = MPI_COMM_NULL;
   MPI_Comm comm_intra_  = MPI_COMM_NULL;
@@ -28,9 +28,9 @@ private:
   int sampling_ = 0; /* inside an SMPI_SAMPLE_ block? */
   std::string instance_id_;
   bool replaying_ = false; /* is the process replaying a trace */
-  simgrid::s4u::Barrier* finalization_barrier_;
+  s4u::Barrier* finalization_barrier_;
   smpi_trace_call_location_t trace_call_loc_;
-  simgrid::s4u::ActorPtr actor_                  = nullptr;
+  s4u::ActorPtr actor_                           = nullptr;
   smpi_privatization_region_t privatized_region_ = nullptr;
   int optind                                     = 0; /*for getopt replacement */
 #if HAVE_PAPI
@@ -54,7 +54,7 @@ public:
   smpi_privatization_region_t privatized_region();
   s4u::MailboxPtr mailbox() { return mailbox_; }
   s4u::MailboxPtr mailbox_small() { return mailbox_small_; }
-  xbt_mutex_t mailboxes_mutex();
+  s4u::MutexPtr mailboxes_mutex();
 #if HAVE_PAPI
   int papi_event_set();
   papi_counter_t& papi_counters();
index 1202ae6..ade6075 100644 (file)
@@ -27,15 +27,15 @@ class Win : public F2C, public Keyval {
   MPI_Info info_;
   MPI_Comm comm_;
   std::vector<MPI_Request> *requests_;
-  xbt_mutex_t mut_;
-  simgrid::s4u::Barrier* bar_;
+  s4u::MutexPtr mut_;
+  s4u::Barrier* bar_;
   MPI_Win* connected_wins_;
   char* name_;
   int opened_;
   MPI_Group group_;
   int count_; //for ordering the accs
-  xbt_mutex_t lock_mut_;
-  xbt_mutex_t atomic_mut_;
+  s4u::MutexPtr lock_mut_;
+  s4u::MutexPtr atomic_mut_;
   std::list<int> lockers_;
   int rank_; // to identify owner for barriers in MPI_COMM_WORLD
   int mode_; // exclusive or shared lock
index 30327d6..e0aef7d 100644 (file)
@@ -25,9 +25,9 @@ using simgrid::s4u::ActorPtr;
 ActorExt::ActorExt(ActorPtr actor, simgrid::s4u::Barrier* finalization_barrier)
     : finalization_barrier_(finalization_barrier), actor_(actor)
 {
-  mailbox_         = simgrid::s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
-  mailbox_small_   = simgrid::s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
-  mailboxes_mutex_ = xbt_mutex_init();
+  mailbox_         = s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid()));
+  mailbox_small_   = s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid()));
+  mailboxes_mutex_ = s4u::Mutex::create();
   timer_           = xbt_os_timer_new();
   state_           = SmpiProcessState::UNINITIALIZED;
   if (MC_is_active())
@@ -58,7 +58,6 @@ ActorExt::~ActorExt()
   if (comm_intra_ != MPI_COMM_NULL)
     simgrid::smpi::Comm::destroy(comm_intra_);
   xbt_os_timer_free(timer_);
-  xbt_mutex_destroy(mailboxes_mutex_);
 }
 
 void ActorExt::set_data(const char* instance_id)
@@ -155,7 +154,7 @@ MPI_Comm ActorExt::comm_world()
   return comm_world_ == nullptr ? MPI_COMM_NULL : *comm_world_;
 }
 
-xbt_mutex_t ActorExt::mailboxes_mutex()
+s4u::MutexPtr ActorExt::mailboxes_mutex()
 {
   return mailboxes_mutex_;
 }
index b9ba848..3a01a16 100644 (file)
@@ -375,9 +375,9 @@ void Request::start()
 
     int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
 
-    xbt_mutex_t mut = process->mailboxes_mutex();
+    simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_acquire(mut);
+      mut->lock();
 
     if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) {
       mailbox = process->mailbox();
@@ -421,7 +421,7 @@ void Request::start()
     XBT_DEBUG("recv simcall posted");
 
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_release(mut);
+      mut->unlock();
   } else { /* the RECV flag was not set, so this is a send */
     simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
     int rank = src_;
@@ -470,10 +470,10 @@ void Request::start()
 
     int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
 
-    xbt_mutex_t mut=process->mailboxes_mutex();
+    simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
 
     if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
-      xbt_mutex_acquire(mut);
+      mut->lock();
 
     if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
       mailbox = process->mailbox();
@@ -521,7 +521,7 @@ void Request::start()
     }
 
     if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0))
-      xbt_mutex_release(mut);
+      mut->unlock();
   }
 }
 
index cefc4c5..86059ca 100644 (file)
@@ -33,9 +33,9 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm,
   opened_                = 0;
   group_                 = MPI_GROUP_NULL;
   requests_              = new std::vector<MPI_Request>();
-  mut_                   = xbt_mutex_init();
-  lock_mut_              = xbt_mutex_init();
-  atomic_mut_            = xbt_mutex_init();
+  mut_                   = s4u::Mutex::create();
+  lock_mut_              = s4u::Mutex::create();
+  atomic_mut_            = s4u::Mutex::create();
   connected_wins_        = new MPI_Win[comm_size];
   connected_wins_[rank_] = this;
   count_                 = 0;
@@ -78,9 +78,6 @@ Win::~Win(){
   
   if (rank_ == 0)
     delete bar_;
-  xbt_mutex_destroy(mut_);
-  xbt_mutex_destroy(lock_mut_);
-  xbt_mutex_destroy(atomic_mut_);
 
   if(allocated_ !=0)
     xbt_free(base_);
@@ -165,7 +162,7 @@ int Win::fence(int assert)
   if (assert != MPI_MODE_NOPRECEDE) {
     // This is not the first fence => finalize what came before
     bar_->wait();
-    xbt_mutex_acquire(mut_);
+    mut_->lock();
     // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall.
     // Without this, the vector could get redimensionned when another process pushes.
     // This would result in the array used by Request::waitall() to be invalidated.
@@ -178,7 +175,7 @@ int Win::fence(int assert)
       Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
     }
     count_=0;
-    xbt_mutex_release(mut_);
+    mut_->unlock();
   }
 
   if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed.
@@ -231,16 +228,16 @@ int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     if(request!=nullptr){
       *request=sreq;
     }else{
-      xbt_mutex_acquire(mut_);
+      mut_->lock();
       requests_->push_back(sreq);
-      xbt_mutex_release(mut_);
+      mut_->unlock();
     }
 
     //push request to receiver's win
-    xbt_mutex_acquire(recv_win->mut_);
+    recv_win->mut_->lock();
     recv_win->requests_->push_back(rreq);
     rreq->start();
-    xbt_mutex_release(recv_win->mut_);
+    recv_win->mut_->unlock();
 
   }else{
     XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank);
@@ -288,9 +285,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     //start the send, with another process than us as sender.
     sreq->start();
     //push request to receiver's win
-    xbt_mutex_acquire(send_win->mut_);
+    send_win->mut_->lock();
     send_win->requests_->push_back(sreq);
-    xbt_mutex_release(send_win->mut_);
+    send_win->mut_->unlock();
 
     //start recv
     rreq->start();
@@ -298,9 +295,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype,
     if(request!=nullptr){
       *request=rreq;
     }else{
-      xbt_mutex_acquire(mut_);
+      mut_->lock();
       requests_->push_back(rreq);
-      xbt_mutex_release(mut_);
+      mut_->unlock();
     }
 
   }else{
@@ -351,17 +348,17 @@ int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da
   // start send
   sreq->start();
   // push request to receiver's win
-  xbt_mutex_acquire(recv_win->mut_);
+  recv_win->mut_->lock();
   recv_win->requests_->push_back(rreq);
   rreq->start();
-  xbt_mutex_release(recv_win->mut_);
+  recv_win->mut_->unlock();
 
   if (request != nullptr) {
     *request = sreq;
   } else {
-    xbt_mutex_acquire(mut_);
+    mut_->lock();
     requests_->push_back(sreq);
-    xbt_mutex_release(mut_);
+    mut_->unlock();
   }
 
   XBT_DEBUG("Leaving MPI_Win_Accumulate");
@@ -391,7 +388,7 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
   XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank);
   //need to be sure ops are correctly ordered, so finish request here ? slow.
   MPI_Request req;
-  xbt_mutex_acquire(send_win->atomic_mut_);
+  send_win->atomic_mut_->lock();
   get(result_addr, result_count, result_datatype, target_rank,
               target_disp, target_count, target_datatype, &req);
   if (req != MPI_REQUEST_NULL)
@@ -401,7 +398,7 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi
               target_disp, target_count, target_datatype, op, &req);
   if (req != MPI_REQUEST_NULL)
     Request::wait(&req, MPI_STATUS_IGNORE);
-  xbt_mutex_release(send_win->atomic_mut_);
+  send_win->atomic_mut_->unlock();
   return MPI_SUCCESS;
 
 }
@@ -424,7 +421,7 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr,
 
   XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank);
   MPI_Request req = MPI_REQUEST_NULL;
-  xbt_mutex_acquire(send_win->atomic_mut_);
+  send_win->atomic_mut_->lock();
   get(result_addr, 1, datatype, target_rank,
               target_disp, 1, datatype, &req);
   if (req != MPI_REQUEST_NULL)
@@ -433,7 +430,7 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr,
     put(origin_addr, 1, datatype, target_rank,
               target_disp, 1, datatype);
   }
-  xbt_mutex_release(send_win->atomic_mut_);
+  send_win->atomic_mut_->unlock();
   return MPI_SUCCESS;
 }
 
@@ -582,10 +579,10 @@ int Win::lock(int lock_type, int rank, int assert){
   MPI_Win target_win = connected_wins_[rank];
 
   if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){
-    xbt_mutex_acquire(target_win->lock_mut_);
+    target_win->lock_mut_->lock();
     target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock)
     if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared.
-      xbt_mutex_release(target_win->lock_mut_);
+      target_win->lock_mut_->unlock();
    }
   } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE))
     target_win->mode_ += lock_type; // don't set to exclusive if it's already shared
@@ -616,7 +613,7 @@ int Win::unlock(int rank){
   target_win->mode_= 0;
   target_win->lockers_.remove(comm_->rank());
   if (target_mode==MPI_LOCK_EXCLUSIVE){
-    xbt_mutex_release(target_win->lock_mut_);
+    target_win->lock_mut_->unlock();
   }
 
   int finished = finish_comms(rank);
@@ -673,7 +670,7 @@ Win* Win::f2c(int id){
 }
 
 int Win::finish_comms(){
-  xbt_mutex_acquire(mut_);
+  mut_->lock();
   //Finish own requests
   std::vector<MPI_Request> *reqqs = requests_;
   int size = static_cast<int>(reqqs->size());
@@ -682,12 +679,12 @@ int Win::finish_comms(){
     Request::waitall(size, treqs, MPI_STATUSES_IGNORE);
     reqqs->clear();
   }
-  xbt_mutex_release(mut_);
+  mut_->unlock();
   return size;
 }
 
 int Win::finish_comms(int rank){
-  xbt_mutex_acquire(mut_);
+  mut_->lock();
   //Finish own requests
   std::vector<MPI_Request> *reqqs = requests_;
   int size = static_cast<int>(reqqs->size());
@@ -715,7 +712,7 @@ int Win::finish_comms(int rank){
       myreqqs.clear();
     }
   }
-  xbt_mutex_release(mut_);
+  mut_->unlock();
   return size;
 }