From: Frederic Suter Date: Fri, 22 Feb 2019 20:32:54 +0000 (+0100) Subject: do not use xbt_mutex_t in SMPI X-Git-Tag: v3_22~261 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/7013392b1a0a3aed2219fa352e81826f8657fbe2 do not use xbt_mutex_t in SMPI --- diff --git a/include/simgrid/s4u/Mutex.hpp b/include/simgrid/s4u/Mutex.hpp index 0689f52cd7..06889d8c27 100644 --- a/include/simgrid/s4u/Mutex.hpp +++ b/include/simgrid/s4u/Mutex.hpp @@ -32,7 +32,7 @@ class XBT_PUBLIC Mutex { simgrid::kernel::activity::MutexImpl* const pimpl_; explicit Mutex(simgrid::kernel::activity::MutexImpl* mutex) : pimpl_(mutex) {} - + ~Mutex(); /* refcounting */ friend XBT_PUBLIC void intrusive_ptr_add_ref(Mutex* mutex); friend XBT_PUBLIC void intrusive_ptr_release(Mutex* mutex); @@ -46,7 +46,6 @@ public: /** Constructs a new mutex */ static MutexPtr create(); - void lock(); void unlock(); bool try_lock(); diff --git a/src/s4u/s4u_Mutex.cpp b/src/s4u/s4u_Mutex.cpp index b46c2986bf..88a271b0e5 100644 --- a/src/s4u/s4u_Mutex.cpp +++ b/src/s4u/s4u_Mutex.cpp @@ -9,6 +9,10 @@ namespace simgrid { namespace s4u { +Mutex::~Mutex() +{ + SIMIX_mutex_unref(pimpl_); +} /** @brief Blocks the calling actor until the mutex can be obtained */ void Mutex::lock() { diff --git a/src/smpi/include/smpi_actor.hpp b/src/smpi/include/smpi_actor.hpp index b8d1fcc1a3..7389c39cbc 100644 --- a/src/smpi/include/smpi_actor.hpp +++ b/src/smpi/include/smpi_actor.hpp @@ -17,9 +17,9 @@ namespace smpi { class ActorExt { private: double simulated_ = 0 /* Used to time with simulated_start/elapsed */; - simgrid::s4u::MailboxPtr mailbox_; - simgrid::s4u::MailboxPtr mailbox_small_; - xbt_mutex_t mailboxes_mutex_; + s4u::MailboxPtr mailbox_; + s4u::MailboxPtr mailbox_small_; + s4u::MutexPtr mailboxes_mutex_; xbt_os_timer_t timer_; MPI_Comm comm_self_ = MPI_COMM_NULL; MPI_Comm comm_intra_ = MPI_COMM_NULL; @@ -28,9 +28,9 @@ private: int sampling_ = 0; /* inside an SMPI_SAMPLE_ block? */ std::string instance_id_; bool replaying_ = false; /* is the process replaying a trace */ - simgrid::s4u::Barrier* finalization_barrier_; + s4u::Barrier* finalization_barrier_; smpi_trace_call_location_t trace_call_loc_; - simgrid::s4u::ActorPtr actor_ = nullptr; + s4u::ActorPtr actor_ = nullptr; smpi_privatization_region_t privatized_region_ = nullptr; int optind = 0; /*for getopt replacement */ #if HAVE_PAPI @@ -54,7 +54,7 @@ public: smpi_privatization_region_t privatized_region(); s4u::MailboxPtr mailbox() { return mailbox_; } s4u::MailboxPtr mailbox_small() { return mailbox_small_; } - xbt_mutex_t mailboxes_mutex(); + s4u::MutexPtr mailboxes_mutex(); #if HAVE_PAPI int papi_event_set(); papi_counter_t& papi_counters(); diff --git a/src/smpi/include/smpi_win.hpp b/src/smpi/include/smpi_win.hpp index 1202ae69fa..ade60756c8 100644 --- a/src/smpi/include/smpi_win.hpp +++ b/src/smpi/include/smpi_win.hpp @@ -27,15 +27,15 @@ class Win : public F2C, public Keyval { MPI_Info info_; MPI_Comm comm_; std::vector *requests_; - xbt_mutex_t mut_; - simgrid::s4u::Barrier* bar_; + s4u::MutexPtr mut_; + s4u::Barrier* bar_; MPI_Win* connected_wins_; char* name_; int opened_; MPI_Group group_; int count_; //for ordering the accs - xbt_mutex_t lock_mut_; - xbt_mutex_t atomic_mut_; + s4u::MutexPtr lock_mut_; + s4u::MutexPtr atomic_mut_; std::list lockers_; int rank_; // to identify owner for barriers in MPI_COMM_WORLD int mode_; // exclusive or shared lock diff --git a/src/smpi/internals/smpi_actor.cpp b/src/smpi/internals/smpi_actor.cpp index 30327d64bb..e0aef7d1e5 100644 --- a/src/smpi/internals/smpi_actor.cpp +++ b/src/smpi/internals/smpi_actor.cpp @@ -25,9 +25,9 @@ using simgrid::s4u::ActorPtr; ActorExt::ActorExt(ActorPtr actor, simgrid::s4u::Barrier* finalization_barrier) : finalization_barrier_(finalization_barrier), actor_(actor) { - mailbox_ = simgrid::s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid())); - mailbox_small_ = simgrid::s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid())); - mailboxes_mutex_ = xbt_mutex_init(); + mailbox_ = s4u::Mailbox::by_name("SMPI-" + std::to_string(actor_->get_pid())); + mailbox_small_ = s4u::Mailbox::by_name("small-" + std::to_string(actor_->get_pid())); + mailboxes_mutex_ = s4u::Mutex::create(); timer_ = xbt_os_timer_new(); state_ = SmpiProcessState::UNINITIALIZED; if (MC_is_active()) @@ -58,7 +58,6 @@ ActorExt::~ActorExt() if (comm_intra_ != MPI_COMM_NULL) simgrid::smpi::Comm::destroy(comm_intra_); xbt_os_timer_free(timer_); - xbt_mutex_destroy(mailboxes_mutex_); } void ActorExt::set_data(const char* instance_id) @@ -155,7 +154,7 @@ MPI_Comm ActorExt::comm_world() return comm_world_ == nullptr ? MPI_COMM_NULL : *comm_world_; } -xbt_mutex_t ActorExt::mailboxes_mutex() +s4u::MutexPtr ActorExt::mailboxes_mutex() { return mailboxes_mutex_; } diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index b9ba84893e..3a01a1631a 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -375,9 +375,9 @@ void Request::start() int async_small_thresh = simgrid::config::get_value("smpi/async-small-thresh"); - xbt_mutex_t mut = process->mailboxes_mutex(); + simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0) - xbt_mutex_acquire(mut); + mut->lock(); if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) { mailbox = process->mailbox(); @@ -421,7 +421,7 @@ void Request::start() XBT_DEBUG("recv simcall posted"); if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0) - xbt_mutex_release(mut); + mut->unlock(); } else { /* the RECV flag was not set, so this is a send */ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); int rank = src_; @@ -470,10 +470,10 @@ void Request::start() int async_small_thresh = simgrid::config::get_value("smpi/async-small-thresh"); - xbt_mutex_t mut=process->mailboxes_mutex(); + simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0) - xbt_mutex_acquire(mut); + mut->lock(); if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) { mailbox = process->mailbox(); @@ -521,7 +521,7 @@ void Request::start() } if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0)) - xbt_mutex_release(mut); + mut->unlock(); } } diff --git a/src/smpi/mpi/smpi_win.cpp b/src/smpi/mpi/smpi_win.cpp index cefc4c567a..86059ca339 100644 --- a/src/smpi/mpi/smpi_win.cpp +++ b/src/smpi/mpi/smpi_win.cpp @@ -33,9 +33,9 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, opened_ = 0; group_ = MPI_GROUP_NULL; requests_ = new std::vector(); - mut_ = xbt_mutex_init(); - lock_mut_ = xbt_mutex_init(); - atomic_mut_ = xbt_mutex_init(); + mut_ = s4u::Mutex::create(); + lock_mut_ = s4u::Mutex::create(); + atomic_mut_ = s4u::Mutex::create(); connected_wins_ = new MPI_Win[comm_size]; connected_wins_[rank_] = this; count_ = 0; @@ -78,9 +78,6 @@ Win::~Win(){ if (rank_ == 0) delete bar_; - xbt_mutex_destroy(mut_); - xbt_mutex_destroy(lock_mut_); - xbt_mutex_destroy(atomic_mut_); if(allocated_ !=0) xbt_free(base_); @@ -165,7 +162,7 @@ int Win::fence(int assert) if (assert != MPI_MODE_NOPRECEDE) { // This is not the first fence => finalize what came before bar_->wait(); - xbt_mutex_acquire(mut_); + mut_->lock(); // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall. // Without this, the vector could get redimensionned when another process pushes. // This would result in the array used by Request::waitall() to be invalidated. @@ -178,7 +175,7 @@ int Win::fence(int assert) Request::waitall(size, treqs, MPI_STATUSES_IGNORE); } count_=0; - xbt_mutex_release(mut_); + mut_->unlock(); } if(assert==MPI_MODE_NOSUCCEED)//there should be no ops after this one, tell we are closed. @@ -231,16 +228,16 @@ int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, if(request!=nullptr){ *request=sreq; }else{ - xbt_mutex_acquire(mut_); + mut_->lock(); requests_->push_back(sreq); - xbt_mutex_release(mut_); + mut_->unlock(); } //push request to receiver's win - xbt_mutex_acquire(recv_win->mut_); + recv_win->mut_->lock(); recv_win->requests_->push_back(rreq); rreq->start(); - xbt_mutex_release(recv_win->mut_); + recv_win->mut_->unlock(); }else{ XBT_DEBUG("Entering MPI_Put from myself to myself, rank %d", target_rank); @@ -288,9 +285,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, //start the send, with another process than us as sender. sreq->start(); //push request to receiver's win - xbt_mutex_acquire(send_win->mut_); + send_win->mut_->lock(); send_win->requests_->push_back(sreq); - xbt_mutex_release(send_win->mut_); + send_win->mut_->unlock(); //start recv rreq->start(); @@ -298,9 +295,9 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, if(request!=nullptr){ *request=rreq; }else{ - xbt_mutex_acquire(mut_); + mut_->lock(); requests_->push_back(rreq); - xbt_mutex_release(mut_); + mut_->unlock(); } }else{ @@ -351,17 +348,17 @@ int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da // start send sreq->start(); // push request to receiver's win - xbt_mutex_acquire(recv_win->mut_); + recv_win->mut_->lock(); recv_win->requests_->push_back(rreq); rreq->start(); - xbt_mutex_release(recv_win->mut_); + recv_win->mut_->unlock(); if (request != nullptr) { *request = sreq; } else { - xbt_mutex_acquire(mut_); + mut_->lock(); requests_->push_back(sreq); - xbt_mutex_release(mut_); + mut_->unlock(); } XBT_DEBUG("Leaving MPI_Win_Accumulate"); @@ -391,7 +388,7 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi XBT_DEBUG("Entering MPI_Get_accumulate from %d", target_rank); //need to be sure ops are correctly ordered, so finish request here ? slow. MPI_Request req; - xbt_mutex_acquire(send_win->atomic_mut_); + send_win->atomic_mut_->lock(); get(result_addr, result_count, result_datatype, target_rank, target_disp, target_count, target_datatype, &req); if (req != MPI_REQUEST_NULL) @@ -401,7 +398,7 @@ int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi target_disp, target_count, target_datatype, op, &req); if (req != MPI_REQUEST_NULL) Request::wait(&req, MPI_STATUS_IGNORE); - xbt_mutex_release(send_win->atomic_mut_); + send_win->atomic_mut_->unlock(); return MPI_SUCCESS; } @@ -424,7 +421,7 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr, XBT_DEBUG("Entering MPI_Compare_and_swap with %d", target_rank); MPI_Request req = MPI_REQUEST_NULL; - xbt_mutex_acquire(send_win->atomic_mut_); + send_win->atomic_mut_->lock(); get(result_addr, 1, datatype, target_rank, target_disp, 1, datatype, &req); if (req != MPI_REQUEST_NULL) @@ -433,7 +430,7 @@ int Win::compare_and_swap(void *origin_addr, void *compare_addr, put(origin_addr, 1, datatype, target_rank, target_disp, 1, datatype); } - xbt_mutex_release(send_win->atomic_mut_); + send_win->atomic_mut_->unlock(); return MPI_SUCCESS; } @@ -582,10 +579,10 @@ int Win::lock(int lock_type, int rank, int assert){ MPI_Win target_win = connected_wins_[rank]; if ((lock_type == MPI_LOCK_EXCLUSIVE && target_win->mode_ != MPI_LOCK_SHARED)|| target_win->mode_ == MPI_LOCK_EXCLUSIVE){ - xbt_mutex_acquire(target_win->lock_mut_); + target_win->lock_mut_->lock(); target_win->mode_+= lock_type;//add the lock_type to differentiate case when we are switching from EXCLUSIVE to SHARED (no release needed in the unlock) if(lock_type == MPI_LOCK_SHARED){//the window used to be exclusive, it's now shared. - xbt_mutex_release(target_win->lock_mut_); + target_win->lock_mut_->unlock(); } } else if (not(target_win->mode_ == MPI_LOCK_SHARED && lock_type == MPI_LOCK_EXCLUSIVE)) target_win->mode_ += lock_type; // don't set to exclusive if it's already shared @@ -616,7 +613,7 @@ int Win::unlock(int rank){ target_win->mode_= 0; target_win->lockers_.remove(comm_->rank()); if (target_mode==MPI_LOCK_EXCLUSIVE){ - xbt_mutex_release(target_win->lock_mut_); + target_win->lock_mut_->unlock(); } int finished = finish_comms(rank); @@ -673,7 +670,7 @@ Win* Win::f2c(int id){ } int Win::finish_comms(){ - xbt_mutex_acquire(mut_); + mut_->lock(); //Finish own requests std::vector *reqqs = requests_; int size = static_cast(reqqs->size()); @@ -682,12 +679,12 @@ int Win::finish_comms(){ Request::waitall(size, treqs, MPI_STATUSES_IGNORE); reqqs->clear(); } - xbt_mutex_release(mut_); + mut_->unlock(); return size; } int Win::finish_comms(int rank){ - xbt_mutex_acquire(mut_); + mut_->lock(); //Finish own requests std::vector *reqqs = requests_; int size = static_cast(reqqs->size()); @@ -715,7 +712,7 @@ int Win::finish_comms(int rank){ myreqqs.clear(); } } - xbt_mutex_release(mut_); + mut_->unlock(); return size; }