From b65f8c081fd0167701551de334cb84473eebcab9 Mon Sep 17 00:00:00 2001 From: degomme Date: Tue, 4 Apr 2017 10:05:58 +0200 Subject: [PATCH] Add MPI_Win_lock_all MPI_Win_unlock_all MPI_Win_flush MPI_Win_flush_local MPI_Win_flush_all MPI_Win_flush_local_all calls --- include/smpi/smpi.h | 12 +++++- src/smpi/smpi_mpi.cpp | 6 +++ src/smpi/smpi_pmpi.cpp | 94 ++++++++++++++++++++++++++++++++++++++++++ src/smpi/smpi_win.cpp | 84 +++++++++++++++++++++++++++++++++++++ src/smpi/smpi_win.hpp | 7 ++++ 5 files changed, 201 insertions(+), 2 deletions(-) diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index 079f565fe9..ad6c4d5f36 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -761,13 +761,21 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Comm_spawn_multiple,(int count, char **array_of_co MPI_Comm comm, MPI_Comm *intercomm, int* array_of_errcodes)); MPI_CALL(XBT_PUBLIC(int), MPI_Comm_get_parent,( MPI_Comm *parent)); MPI_CALL(XBT_PUBLIC(int), MPI_Win_complete,(MPI_Win win)); -MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win)); + MPI_CALL(XBT_PUBLIC(int), MPI_Win_post,(MPI_Group group, int assert, MPI_Win win)); MPI_CALL(XBT_PUBLIC(int), MPI_Win_start,(MPI_Group group, int assert, MPI_Win win)); MPI_CALL(XBT_PUBLIC(int), MPI_Win_test,(MPI_Win win, int *flag)); -MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock,(int rank, MPI_Win win)); MPI_CALL(XBT_PUBLIC(int), MPI_Win_wait,(MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_lock_all,(int assert, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock,(int rank, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_unlock_all,(MPI_Win win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush,(int rank, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_local,(int rank, MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_all,(MPI_Win win)); +MPI_CALL(XBT_PUBLIC(int), MPI_Win_flush_local_all,(MPI_Win win)); MPI_CALL(XBT_PUBLIC(int), MPI_File_get_errhandler , (MPI_File file, MPI_Errhandler *errhandler)); MPI_CALL(XBT_PUBLIC(int), MPI_File_set_errhandler, (MPI_File file, MPI_Errhandler errhandler)); diff --git a/src/smpi/smpi_mpi.cpp b/src/smpi/smpi_mpi.cpp index 167e1cd981..be3cecee17 100644 --- a/src/smpi/smpi_mpi.cpp +++ b/src/smpi/smpi_mpi.cpp @@ -201,6 +201,12 @@ WRAPPED_PMPI_CALL(int,MPI_Win_start,(MPI_Group group, int assert, MPI_Win win),( WRAPPED_PMPI_CALL(int,MPI_Win_wait,(MPI_Win win),(win)) WRAPPED_PMPI_CALL(int,MPI_Win_lock,(int lock_type, int rank, int assert, MPI_Win win) ,(lock_type, rank, assert, win)) WRAPPED_PMPI_CALL(int,MPI_Win_unlock,(int rank, MPI_Win win),(rank, win)) +WRAPPED_PMPI_CALL(int,MPI_Win_lock_all,(int assert, MPI_Win win) ,(assert, win)) +WRAPPED_PMPI_CALL(int,MPI_Win_unlock_all,(MPI_Win win),(win)) +WRAPPED_PMPI_CALL(int,MPI_Win_flush,(int rank, MPI_Win win),(rank, win)) +WRAPPED_PMPI_CALL(int,MPI_Win_flush_local,(int rank, MPI_Win win),(rank, win)) +WRAPPED_PMPI_CALL(int,MPI_Win_flush_all,(MPI_Win win),(win)) +WRAPPED_PMPI_CALL(int,MPI_Win_flush_local_all,(MPI_Win win),(win)) WRAPPED_PMPI_CALL(int,MPI_Win_get_attr, (MPI_Win type, int type_keyval, void *attribute_val, int* flag), (type, type_keyval, attribute_val, flag)) WRAPPED_PMPI_CALL(int,MPI_Win_set_attr, (MPI_Win type, int type_keyval, void *att), (type, type_keyval, att)) WRAPPED_PMPI_CALL(int,MPI_Win_delete_attr, (MPI_Win type, int comm_keyval), (type, comm_keyval)) diff --git a/src/smpi/smpi_pmpi.cpp b/src/smpi/smpi_pmpi.cpp index d09c579b55..5c5915263b 100644 --- a/src/smpi/smpi_pmpi.cpp +++ b/src/smpi/smpi_pmpi.cpp @@ -2861,6 +2861,100 @@ int PMPI_Win_unlock(int rank, MPI_Win win){ return retval; } +int PMPI_Win_lock_all(int assert, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->lock_all(assert); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_unlock_all(MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->unlock_all(); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_flush(int rank, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (rank == MPI_PROC_NULL){ + retval = MPI_SUCCESS; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->flush(rank); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_flush_local(int rank, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (rank == MPI_PROC_NULL){ + retval = MPI_SUCCESS; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->flush_local(rank); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_flush_all(MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->flush_all(); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_flush_local_all(MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else { + int myrank = smpi_process()->index(); + TRACE_smpi_collective_in(myrank, -1, __FUNCTION__, nullptr); + retval = win->flush_local_all(); + TRACE_smpi_collective_out(myrank, -1, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){ void *ptr = xbt_malloc(size); if(ptr==nullptr) diff --git a/src/smpi/smpi_win.cpp b/src/smpi/smpi_win.cpp index bd90414d69..fb16d16e03 100644 --- a/src/smpi/smpi_win.cpp +++ b/src/smpi/smpi_win.cpp @@ -523,6 +523,17 @@ int Win::lock(int lock_type, int rank, int assert){ return MPI_SUCCESS; } +int Win::lock_all(int assert){ + int i=0; + int retval = MPI_SUCCESS; + for (i=0; isize();i++){ + int ret = this->lock(MPI_LOCK_SHARED, i, assert); + if(ret != MPI_SUCCESS) + retval = ret; + } + return retval; +} + int Win::unlock(int rank){ if(opened_!=0) return MPI_ERR_WIN; @@ -541,6 +552,50 @@ int Win::unlock(int rank){ return MPI_SUCCESS; } +int Win::unlock_all(){ + int i=0; + int retval = MPI_SUCCESS; + for (i=0; isize();i++){ + int ret = this->unlock(i); + if(ret != MPI_SUCCESS) + retval = ret; + } + return retval; +} + +int Win::flush(int rank){ + MPI_Win target_win = connected_wins_[rank]; + int finished = finish_comms(rank); + XBT_DEBUG("Win_flush on local %d - Finished %d RMA calls", rank_, finished); + finished = target_win->finish_comms(rank_); + XBT_DEBUG("Win_flush on remote %d - Finished %d RMA calls", rank, finished); + return MPI_SUCCESS; +} + +int Win::flush_local(int rank){ + int finished = finish_comms(rank); + XBT_DEBUG("Win_flush_local for rank %d - Finished %d RMA calls", rank, finished); + return MPI_SUCCESS; +} + +int Win::flush_all(){ + int i=0; + int finished = 0; + finished = finish_comms(); + XBT_DEBUG("Win_flush_all on local - Finished %d RMA calls", finished); + for (i=0; isize();i++){ + finished = connected_wins_[i]->finish_comms(rank_); + XBT_DEBUG("Win_flush_all on %d - Finished %d RMA calls", i, finished); + } + return MPI_SUCCESS; +} + +int Win::flush_local_all(){ + int finished = finish_comms(); + XBT_DEBUG("Win_flush_local_all - Finished %d RMA calls", finished); + return MPI_SUCCESS; +} + Win* Win::f2c(int id){ return static_cast(F2C::f2c(id)); } @@ -560,6 +615,35 @@ int Win::finish_comms(){ return size; } +int Win::finish_comms(int rank){ + xbt_mutex_acquire(mut_); + //Finish own requests + std::vector *reqqs = requests_; + int size = static_cast(reqqs->size()); + if (size > 0) { + size = 0; + std::vector* myreqqs = new std::vector(); + std::vector::iterator iter = reqqs->begin(); + while (iter != reqqs->end()){ + if(((*iter)->src() == rank) || ((*iter)->dst() == rank)){ + myreqqs->push_back(*iter); + iter = reqqs->erase(iter); + size++; + } else { + ++iter; + } + } + if(size >0){ + MPI_Request* treqs = &(*myreqqs)[0]; + Request::waitall(size, treqs, MPI_STATUSES_IGNORE); + myreqqs->clear(); + delete myreqqs; + } + } + xbt_mutex_release(mut_); + return size; +} + } } diff --git a/src/smpi/smpi_win.hpp b/src/smpi/smpi_win.hpp index 5beb90caab..1d36f36c73 100644 --- a/src/smpi/smpi_win.hpp +++ b/src/smpi/smpi_win.hpp @@ -73,7 +73,14 @@ public: static Win* f2c(int id); int lock(int lock_type, int rank, int assert); int unlock(int rank); + int lock_all(int assert); + int unlock_all(); + int flush(int rank); + int flush_local(int rank); + int flush_all(); + int flush_local_all(); int finish_comms(); + int finish_comms(int rank); }; -- 2.20.1