From c5a48995c0e24c9ae38c3d14203388523c565a5b Mon Sep 17 00:00:00 2001 From: degomme Date: Mon, 20 Mar 2017 11:49:27 +0100 Subject: [PATCH] Simple MPI_Win_lock and MPI_Win_unlock implementation. Also allow calls to MPI_Barrier to finalize pending RMA calls. --- src/smpi/smpi_comm.cpp | 18 +++++ src/smpi/smpi_comm.hpp | 8 ++- src/smpi/smpi_mpi.cpp | 4 +- src/smpi/smpi_pmpi.cpp | 41 +++++++++++ src/smpi/smpi_request.cpp | 2 - src/smpi/smpi_win.cpp | 146 +++++++++++++++++++++++++++----------- src/smpi/smpi_win.hpp | 8 +++ 7 files changed, 180 insertions(+), 47 deletions(-) diff --git a/src/smpi/smpi_comm.cpp b/src/smpi/smpi_comm.cpp index 36c03e6361..97997d46fb 100644 --- a/src/smpi/smpi_comm.cpp +++ b/src/smpi/smpi_comm.cpp @@ -505,6 +505,24 @@ int Comm::add_f() { } +void Comm::add_rma_win(MPI_Win win){ + rma_wins_.push_back(win); +} + +void Comm::remove_rma_win(MPI_Win win){ + rma_wins_.remove(win); +} + +void Comm::finish_rma_calls(){ + for(auto it : rma_wins_){ + if(it->rank()==this->rank()){//is it ours (for MPI_COMM_WORLD)? + int finished = it->finish_comms(); + XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls",this->rank(), finished); + } + } +} + + } } diff --git a/src/smpi/smpi_comm.hpp b/src/smpi/smpi_comm.hpp index c0b0c204ac..a2a2a60c1e 100644 --- a/src/smpi/smpi_comm.hpp +++ b/src/smpi/smpi_comm.hpp @@ -8,7 +8,7 @@ #define SMPI_COMM_HPP_INCLUDED #include "private.h" - +#include namespace simgrid{ namespace smpi{ @@ -28,6 +28,8 @@ class Comm : public F2C, public Keyval{ int* non_uniform_map_; //set if smp nodes have a different number of processes allocated int is_blocked_;// are ranks allocated on the same smp node contiguous ? + std::list rma_wins_; // attached windows for synchronization. + public: static std::unordered_map keyvals_; static int keyval_id_; @@ -63,6 +65,10 @@ class Comm : public F2C, public Keyval{ static int keyval_free(int* keyval); static void keyval_cleanup(); + void add_rma_win(MPI_Win win); + void remove_rma_win(MPI_Win win); + void finish_rma_calls(); + }; } diff --git a/src/smpi/smpi_mpi.cpp b/src/smpi/smpi_mpi.cpp index 67c315c056..e610f99ba7 100644 --- a/src/smpi/smpi_mpi.cpp +++ b/src/smpi/smpi_mpi.cpp @@ -190,6 +190,8 @@ WRAPPED_PMPI_CALL(int,MPI_Win_post,(MPI_Group group, int assert, MPI_Win win),(g WRAPPED_PMPI_CALL(int,MPI_Win_set_name,(MPI_Win win, char * name),(win, name)) WRAPPED_PMPI_CALL(int,MPI_Win_start,(MPI_Group group, int assert, MPI_Win win),(group, assert, win)) WRAPPED_PMPI_CALL(int,MPI_Win_wait,(MPI_Win win),(win)) +WRAPPED_PMPI_CALL(int,MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win) ,(lock_type, rank, assert, win)) +WRAPPED_PMPI_CALL(int,MPI_Win_unlock,(int rank, MPI_Win win),(rank, win)) WRAPPED_PMPI_CALL(int,MPI_Win_get_attr, (MPI_Win type, int type_keyval, void *attribute_val, int* flag), (type, type_keyval, attribute_val, flag)) WRAPPED_PMPI_CALL(int,MPI_Win_set_attr, (MPI_Win type, int type_keyval, void *att), (type, type_keyval, att)) WRAPPED_PMPI_CALL(int,MPI_Win_delete_attr, (MPI_Win type, int comm_keyval), (type, comm_keyval)) @@ -316,10 +318,8 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_envelope,( MPI_Datatype datatyp UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_match_size,(int typeclass,int size,MPI_Datatype *datatype),(typeclass,size,datatype)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Unpack_external,(char *datarep, void *inbuf, MPI_Aint insize, MPI_Aint *position, void *outbuf, int outcount, MPI_Datatype datatype),( datarep, inbuf, insize, position, outbuf, outcount, datatype)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Unpublish_name,( char *service_name, MPI_Info info, char *port_name),( service_name, info, port_name)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win) ,(lock_type, rank, assert, win)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Win_set_errhandler,(MPI_Win win, MPI_Errhandler errhandler) ,(win, errhandler)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Win_test,(MPI_Win win, int *flag),(win, flag)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Win_unlock,(int rank, MPI_Win win),(rank, win)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(MPI_Errhandler, MPI_Errhandler_f2c,(MPI_Fint errhandler),(errhandler)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(MPI_Fint, MPI_Errhandler_c2f,(MPI_Errhandler errhandler),(errhandler)) diff --git a/src/smpi/smpi_pmpi.cpp b/src/smpi/smpi_pmpi.cpp index 2baf7c1b0d..a265c3f8b4 100644 --- a/src/smpi/smpi_pmpi.cpp +++ b/src/smpi/smpi_pmpi.cpp @@ -1410,6 +1410,10 @@ int PMPI_Barrier(MPI_Comm comm) TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra); Colls::barrier(comm); + + //Barrier can be used to synchronize RMA calls. Finish all requests from comm before. + comm->finish_rma_calls(); + retval = MPI_SUCCESS; TRACE_smpi_collective_out(rank, -1, __FUNCTION__); @@ -2680,6 +2684,43 @@ int PMPI_Win_wait(MPI_Win win){ return retval; } +int PMPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (lock_type != MPI_LOCK_EXCLUSIVE && + lock_type != MPI_LOCK_SHARED) { + retval = MPI_ERR_LOCKTYPE; + } else if (rank == MPI_PROC_NULL){ + retval = MPI_SUCCESS; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->lock(lock_type,rank,assert); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_unlock(int rank, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (rank == MPI_PROC_NULL){ + retval = MPI_SUCCESS; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->unlock(rank); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){ void *ptr = xbt_malloc(size); if(ptr==nullptr) diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index a96b63d5b1..397a8cc640 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -860,8 +860,6 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) finish_wait(&requests[index],status); if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT)) requests[index] = MPI_REQUEST_NULL; - }else{ - XBT_WARN("huu?"); } } } diff --git a/src/smpi/smpi_win.cpp b/src/smpi/smpi_win.cpp index 29caa9f544..9333f02732 100644 --- a/src/smpi/smpi_win.cpp +++ b/src/smpi/smpi_win.cpp @@ -16,7 +16,7 @@ int Win::keyval_id_=0; Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm): base_(base), size_(size), disp_unit_(disp_unit), assert_(0), info_(info), comm_(comm){ int comm_size = comm->size(); - int rank = comm->rank(); + rank_ = comm->rank(); XBT_DEBUG("Creating window"); if(info!=MPI_INFO_NULL) info->ref(); @@ -25,13 +25,17 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm) group_ = MPI_GROUP_NULL; requests_ = new std::vector(); mut_=xbt_mutex_init(); + lock_mut_=xbt_mutex_init(); connected_wins_ = new MPI_Win[comm_size]; - connected_wins_[rank] = this; + connected_wins_[rank_] = this; count_ = 0; - if(rank==0){ + if(rank_==0){ bar_ = MSG_barrier_init(comm_size); } - Colls::allgather(&(connected_wins_[rank]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), + + comm->add_rma_win(this); + + Colls::allgather(&(connected_wins_[rank_]), sizeof(MPI_Win), MPI_BYTE, connected_wins_, sizeof(MPI_Win), MPI_BYTE, comm); Colls::bcast(&(bar_), sizeof(msg_bar_t), MPI_BYTE, 0, comm); @@ -42,9 +46,11 @@ Win::Win(void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm) Win::~Win(){ //As per the standard, perform a barrier to ensure every async comm is finished MSG_barrier_wait(bar_); - xbt_mutex_acquire(mut_); + + int finished = finish_comms(); + XBT_DEBUG("Win destructor - Finished %d RMA calls", finished); + delete requests_; - xbt_mutex_release(mut_); delete[] connected_wins_; if (name_ != nullptr){ xbt_free(name_); @@ -53,11 +59,14 @@ Win::~Win(){ MPI_Info_free(&info_); } + comm_->remove_rma_win(this); + Colls::barrier(comm_); int rank=comm_->rank(); if(rank == 0) MSG_barrier_destroy(bar_); xbt_mutex_destroy(mut_); + xbt_mutex_destroy(lock_mut_); cleanup_attr(); } @@ -80,6 +89,10 @@ void Win::get_group(MPI_Group* group){ } } +int Win::rank(){ + return rank_; +} + MPI_Aint Win::size(){ return size_; } @@ -137,11 +150,19 @@ int Win::fence(int assert) int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) { - if(opened_==0)//check that post/start has been done - return MPI_ERR_WIN; //get receiver pointer MPI_Win recv_win = connected_wins_[target_rank]; + if(opened_==0){//check that post/start has been done + // no fence or start .. lock ok ? + int locked=0; + for(auto it : recv_win->lockers_) + if (it == comm_->rank()) + locked = 1; + if(locked != 1) + return MPI_ERR_WIN; + } + if(target_count*target_datatype->get_extent()>recv_win->size_) return MPI_ERR_ARG; @@ -178,11 +199,19 @@ int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) { - if(opened_==0)//check that post/start has been done - return MPI_ERR_WIN; //get sender pointer MPI_Win send_win = connected_wins_[target_rank]; + if(opened_==0){//check that post/start has been done + // no fence or start .. lock ok ? + int locked=0; + for(auto it : send_win->lockers_) + if (it == comm_->rank()) + locked = 1; + if(locked != 1) + return MPI_ERR_WIN; + } + if(target_count*target_datatype->get_extent()>send_win->size_) return MPI_ERR_ARG; @@ -224,12 +253,21 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op) { - if(opened_==0)//check that post/start has been done - return MPI_ERR_WIN; - //FIXME: local version + //get receiver pointer MPI_Win recv_win = connected_wins_[target_rank]; + if(opened_==0){//check that post/start has been done + // no fence or start .. lock ok ? + int locked=0; + for(auto it : recv_win->lockers_) + if (it == comm_->rank()) + locked = 1; + if(locked != 1) + return MPI_ERR_WIN; + } + //FIXME: local version + if(target_count*target_datatype->get_extent()>recv_win->size_) return MPI_ERR_ARG; @@ -357,24 +395,8 @@ int Win::complete(){ } xbt_free(reqs); - //now we can finish RMA calls - xbt_mutex_acquire(mut_); - std::vector *reqqs = requests_; - size = static_cast(reqqs->size()); - - XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); - if (size > 0) { - // start all requests that have been prepared by another process - for (const auto& req : *reqqs) { - if (req && (req->flags() & PREPARED)) - req->start(); - } - - MPI_Request* treqs = &(*reqqs)[0]; - Request::waitall(size, treqs, MPI_STATUSES_IGNORE); - reqqs->clear(); - } - xbt_mutex_release(mut_); + int finished = finish_comms(); + XBT_DEBUG("Win_complete - Finished %d RMA calls", finished); Group::unref(group_); opened_--; //we're closed for business ! @@ -404,12 +426,57 @@ int Win::wait(){ Request::unref(&reqs[i]); } xbt_free(reqs); - xbt_mutex_acquire(mut_); - std::vector *reqqs = requests_; - size = static_cast(reqqs->size()); + int finished = finish_comms(); + XBT_DEBUG("Win_wait - Finished %d RMA calls", finished); + + Group::unref(group_); + opened_--; //we're opened for business ! + return MPI_SUCCESS; +} + +int Win::lock(int lock_type, int rank, int assert){ + MPI_Win target_win = connected_wins_[rank]; + + int finished = finish_comms(); + XBT_DEBUG("Win_lock - Finished %d RMA calls", finished); + + //window already locked, we have to wait + if (lock_type == MPI_LOCK_EXCLUSIVE) + xbt_mutex_acquire(target_win->lock_mut_); + + xbt_mutex_acquire(target_win->mut_); + target_win->lockers_.push_back(comm_->rank()); + xbt_mutex_release(target_win->mut_); - XBT_DEBUG("Win_wait - Finishing %d RMA calls", size); + return MPI_SUCCESS; +} + +int Win::unlock(int rank){ + MPI_Win target_win = connected_wins_[rank]; + + int finished = finish_comms(); + XBT_DEBUG("Win_unlock - Finished %d RMA calls", finished); + + xbt_mutex_acquire(target_win->mut_); + target_win->lockers_.remove(comm_->rank()); + xbt_mutex_release(target_win->mut_); + + xbt_mutex_try_acquire(target_win->lock_mut_); + xbt_mutex_release(target_win->lock_mut_); + return MPI_SUCCESS; +} + +Win* Win::f2c(int id){ + return static_cast(F2C::f2c(id)); +} + + +int Win::finish_comms(){ + //Finish own requests + std::vector *reqqs = requests_; + int size = static_cast(reqqs->size()); if (size > 0) { + xbt_mutex_acquire(mut_); // start all requests that have been prepared by another process for (const auto& req : *reqqs) { if (req && (req->flags() & PREPARED)) @@ -419,17 +486,12 @@ int Win::wait(){ MPI_Request* treqs = &(*reqqs)[0]; Request::waitall(size, treqs, MPI_STATUSES_IGNORE); reqqs->clear(); + xbt_mutex_release(mut_); } - xbt_mutex_release(mut_); - Group::unref(group_); - opened_--; //we're opened for business ! - return MPI_SUCCESS; + return size; } -Win* Win::f2c(int id){ - return static_cast(F2C::f2c(id)); -} } } diff --git a/src/smpi/smpi_win.hpp b/src/smpi/smpi_win.hpp index 8185d5c37e..fbf009dc90 100644 --- a/src/smpi/smpi_win.hpp +++ b/src/smpi/smpi_win.hpp @@ -9,6 +9,7 @@ #include "private.h" #include +#include namespace simgrid{ namespace smpi{ @@ -29,6 +30,9 @@ class Win : public F2C, public Keyval { int opened_; MPI_Group group_; int count_; //for ordering the accs + xbt_mutex_t lock_mut_; + std::list lockers_; + int rank_; // to identify owner for barriers in MPI_COMM_WORLD public: static std::unordered_map keyvals_; @@ -39,6 +43,7 @@ public: void get_name( char* name, int* length); void get_group( MPI_Group* group); void set_name( char* name); + int rank(); int start(MPI_Group group, int assert); int post(MPI_Group group, int assert); int complete(); @@ -54,6 +59,9 @@ public: int accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op); static Win* f2c(int id); + int lock(int lock_type, int rank, int assert); + int unlock(int rank); + int finish_comms(); }; -- 2.20.1