From f707404e382a0c8d914c6b324cf05eb0ee896351 Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Fri, 5 Dec 2014 14:51:37 +0100 Subject: [PATCH] Add MPI_Win_post, MPI_Win_start, MPI_Win_complete, and MPI_Win_wait support. This is the second (out of 3) of the classic MPI RMA synchronization methods. This version is naive and may not be what real MPI lib do, as the standard lets the implementer chose the behavior of theses calls. --- src/smpi/instr_smpi.c | 6 +- src/smpi/private.h | 9 ++ src/smpi/smpi_f77.c | 17 ++++ src/smpi/smpi_pmpi.c | 94 ++++++++++++++++---- src/smpi/smpi_rma.c | 198 +++++++++++++++++++++++++++++++++++++++++- 5 files changed, 306 insertions(+), 18 deletions(-) diff --git a/src/smpi/instr_smpi.c b/src/smpi/instr_smpi.c index bda2b95ed7..1560ae5d07 100644 --- a/src/smpi/instr_smpi.c +++ b/src/smpi/instr_smpi.c @@ -53,7 +53,11 @@ static const char *smpi_colors[] ={ "put", "0.3 1 0", "get", "0 1 0.3", "accumulate", "1 0.3 0", - "fence", "1 0 0.3", + "win_fence", "1 0 0.3", + "win_post", "1 0 0.8", + "win_wait", "1 0.8 0", + "win_start", "0.8 0 1", + "win_complete", "0.8 1 0", NULL, NULL, }; diff --git a/src/smpi/private.h b/src/smpi/private.h index 13f2e76189..bc36a1cf6e 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -398,6 +398,11 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name); int smpi_mpi_win_fence( int assert, MPI_Win win); +int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win); +int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win); +int smpi_mpi_win_complete(MPI_Win win); +int smpi_mpi_win_wait(MPI_Win win); + int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win); int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, @@ -558,6 +563,10 @@ void mpi_win_free_( int* win, int* ierr); void mpi_win_create_( int *base, MPI_Aint* size, int* disp_unit, int* info, int* comm, int *win, int* ierr); void mpi_win_set_name_ (int* win, char * name, int* ierr, int size); void mpi_win_get_name_ (int* win, char * name, int* len, int* ierr); +void mpi_win_post_(int* group, int assert, int* win, int* ierr); +void mpi_win_start_(int* group, int assert, int* win, int* ierr); +void mpi_win_complete_(int* win, int* ierr); +void mpi_win_wait_(int* win, int* ierr); void mpi_info_create_( int *info, int* ierr); void mpi_info_set_( int *info, char *key, char *value, int* ierr, unsigned int keylen, unsigned int valuelen); void mpi_info_free_(int* info, int* ierr); diff --git a/src/smpi/smpi_f77.c b/src/smpi/smpi_f77.c index a832f33a37..b044802389 100644 --- a/src/smpi/smpi_f77.c +++ b/src/smpi/smpi_f77.c @@ -748,6 +748,22 @@ void mpi_win_create_( int *base, MPI_Aint* size, int* disp_unit, int* info, int* } } +void mpi_win_post_(int* group, int assert, int* win, int* ierr){ + *ierr = MPI_Win_post(smpi_group_f2c(*group), assert, smpi_win_f2c(*win)); +} + +void mpi_win_start_(int* group, int assert, int* win, int* ierr){ + *ierr = MPI_Win_start(smpi_group_f2c(*group), assert, smpi_win_f2c(*win)); +} + +void mpi_win_complete_(int* win, int* ierr){ + *ierr = MPI_Win_complete(smpi_win_f2c(*win)); +} + +void mpi_win_wait_(int* win, int* ierr){ + *ierr = MPI_Win_wait(smpi_win_f2c(*win)); +} + void mpi_win_set_name_ (int* win, char * name, int* ierr, int size){ //handle trailing blanks while(name[size-1]==' ')size--; @@ -1729,3 +1745,4 @@ void mpi_comm_get_parent_ ( int* parent, int* ierr){ *parent = smpi_comm_c2f(tmp); } } + diff --git a/src/smpi/smpi_pmpi.c b/src/smpi/smpi_pmpi.c index bbd04baaae..7c060285c1 100644 --- a/src/smpi/smpi_pmpi.c +++ b/src/smpi/smpi_pmpi.c @@ -3013,6 +3013,85 @@ int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da } +int PMPI_Win_post(MPI_Group group, int assert, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (group==MPI_GROUP_NULL){ + retval = MPI_ERR_GROUP; + } +#ifdef HAVE_TRACING + int rank = smpi_process_index(); + TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL); +#endif + retval = smpi_mpi_win_post(group,assert,win); +#ifdef HAVE_TRACING + TRACE_smpi_collective_out(rank, -1, __FUNCTION__); +#endif + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_start(MPI_Group group, int assert, MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (group==MPI_GROUP_NULL){ + retval = MPI_ERR_GROUP; + } + +#ifdef HAVE_TRACING + int rank = smpi_process_index(); + TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL); +#endif + retval = smpi_mpi_win_start(group,assert,win); +#ifdef HAVE_TRACING + TRACE_smpi_collective_out(rank, -1, __FUNCTION__); +#endif + smpi_bench_begin(); + return retval; +} + + +int PMPI_Win_complete(MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } + +#ifdef HAVE_TRACING + int rank = smpi_process_index(); + TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL); +#endif + retval = smpi_mpi_win_complete(win); +#ifdef HAVE_TRACING + TRACE_smpi_collective_out(rank, -1, __FUNCTION__); +#endif + smpi_bench_begin(); + return retval; +} + +int PMPI_Win_wait(MPI_Win win){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } +#ifdef HAVE_TRACING + int rank = smpi_process_index(); + TRACE_smpi_collective_in(rank, -1, __FUNCTION__, NULL); +#endif + retval = smpi_mpi_win_wait(win); +#ifdef HAVE_TRACING + TRACE_smpi_collective_out(rank, -1, __FUNCTION__); +#endif + smpi_bench_begin(); + return retval; +} + int PMPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr){ void *ptr = xbt_malloc(size); if(!ptr) @@ -3664,22 +3743,10 @@ int PMPI_Comm_get_parent( MPI_Comm *parent){ NOT_YET_IMPLEMENTED } -int PMPI_Win_complete(MPI_Win win){ - NOT_YET_IMPLEMENTED -} - int PMPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win) { NOT_YET_IMPLEMENTED } -int PMPI_Win_post(MPI_Group group, int assert, MPI_Win win){ - NOT_YET_IMPLEMENTED -} - -int PMPI_Win_start(MPI_Group group, int assert, MPI_Win win){ - NOT_YET_IMPLEMENTED -} - int PMPI_Win_test(MPI_Win win, int *flag){ NOT_YET_IMPLEMENTED } @@ -3688,6 +3755,3 @@ int PMPI_Win_unlock(int rank, MPI_Win win){ NOT_YET_IMPLEMENTED } -int PMPI_Win_wait(MPI_Win win){ - NOT_YET_IMPLEMENTED -} diff --git a/src/smpi/smpi_rma.c b/src/smpi/smpi_rma.c index 356ab585e0..b35bc1a13c 100644 --- a/src/smpi/smpi_rma.c +++ b/src/smpi/smpi_rma.c @@ -24,6 +24,8 @@ typedef struct s_smpi_mpi_win{ xbt_bar_t bar; MPI_Win* connected_wins; char* name; + int opened; + MPI_Group group; } s_smpi_mpi_win_t; @@ -45,6 +47,8 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info->refcount++; win->comm = comm; win->name = NULL; + win->opened = 0; + win->group = MPI_GROUP_NULL; win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL); win->connected_wins = xbt_malloc0(comm_size*sizeof(MPI_Win)); win->connected_wins[rank] = win; @@ -100,8 +104,10 @@ void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){ } void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){ - if(win->comm != MPI_COMM_NULL) + if(win->comm != MPI_COMM_NULL){ *group = smpi_comm_group(win->comm); + smpi_group_use(*group); + } } void smpi_mpi_win_set_name(MPI_Win win, char* name){ @@ -112,7 +118,8 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name){ int smpi_mpi_win_fence( int assert, MPI_Win win){ XBT_DEBUG("Entering fence"); - + if(!win->opened) + win->opened=1; if(assert != MPI_MODE_NOPRECEDE){ xbt_barrier_wait(win->bar); @@ -142,6 +149,9 @@ int smpi_mpi_win_fence( int assert, MPI_Win win){ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) { + + if(!win->opened)//check that post/start has been done + return MPI_ERR_WIN; //get receiver pointer MPI_Win recv_win = win->connected_wins[target_rank]; @@ -178,6 +188,8 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) { + if(!win->opened)//check that post/start has been done + return MPI_ERR_WIN; //get sender pointer MPI_Win send_win = win->connected_wins[target_rank]; @@ -218,6 +230,8 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win) { + if(!win->opened)//check that post/start has been done + return MPI_ERR_WIN; //FIXME: local version //get receiver pointer MPI_Win recv_win = win->connected_wins[target_rank]; @@ -249,3 +263,183 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi return MPI_SUCCESS; } +int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ + /* From MPI forum advices + The call to MPI_WIN_COMPLETE does not return until the put call has completed at + the origin; and the target window will be accessed by the put operation only + after the call to MPI_WIN_START has matched a call to MPI_WIN_POST by the target + process. This still leaves much choice to implementors. The call to + MPI_WIN_START can block until the matching call to MPI_WIN_POST occurs at all + target processes. One can also have implementations where the call to + MPI_WIN_START is nonblocking, but the call to MPI_PUT blocks until the matching + call to MPI_WIN_POST occurred; or implementations where the first two calls are + nonblocking, but the call to MPI_WIN_COMPLETE blocks until the call to + MPI_WIN_POST occurred; or even implementations where all three calls can + complete before any target process called MPI_WIN_POST --- the data put must be + buffered, in this last case, so as to allow the put to complete at the origin + ahead of its completion at the target. However, once the call to MPI_WIN_POST is + issued, the sequence above must complete, without further dependencies. + */ + + //naive, blocking implementation. + int i=0,j=0; + int size = smpi_group_size(group); + MPI_Request* reqs = xbt_new0(MPI_Request, size); + +// for(i=0;iopened++; //we're open for business ! + win->group=group; + smpi_group_use(group); + return MPI_SUCCESS; +} + +int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ + //let's make a synchronous send here + int i=0,j=0; + int size = smpi_group_size(group); + MPI_Request* reqs = xbt_new0(MPI_Request, size); + + while(j!=size){ + int dst=smpi_group_index(group,j); + if(dst!=smpi_process_index()){ + reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, + RMA_TAG+4, MPI_COMM_WORLD); + i++; + } + j++; + } + size=i; + + smpi_mpi_startall(size, reqs); + smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE); + for(i=0;iopened++; //we're open for business ! + win->group=group; + smpi_group_use(group); + return MPI_SUCCESS; +} + +int smpi_mpi_win_complete(MPI_Win win){ + if(win->opened==0) + xbt_die("Complete called on already opened MPI_Win"); +// xbt_barrier_wait(win->bar); + //MPI_Comm comm = smpi_comm_new(win->group, NULL); + //mpi_coll_barrier_fun(comm); + //smpi_comm_destroy(comm); + + XBT_DEBUG("Entering MPI_Win_Complete"); + int i=0,j=0; + int size = smpi_group_size(win->group); + MPI_Request* reqs = xbt_new0(MPI_Request, size); + + while(j!=size){ + int dst=smpi_group_index(win->group,j); + if(dst!=smpi_process_index()){ + reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, + RMA_TAG+5, MPI_COMM_WORLD); + i++; + } + j++; + } + size=i; + XBT_DEBUG("Win_complete - Sending sync messages to %d processes", size); + smpi_mpi_startall(size, reqs); + smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE); + + for(i=0;irequests; + size = xbt_dynar_length(reqqs); + + XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); + unsigned int cpt=0; + MPI_Request req; + // start all requests that have been prepared by another process + xbt_dynar_foreach(reqqs, cpt, req){ + if (req->flags & PREPARED) smpi_mpi_start(req); + } + + MPI_Request* treqs = xbt_dynar_to_array(reqqs); + smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); + xbt_free(treqs); + win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + win->opened--; //we're closed for business ! + return MPI_SUCCESS; +} + + + +int smpi_mpi_win_wait(MPI_Win win){ +// xbt_barrier_wait(win->bar); + //MPI_Comm comm = smpi_comm_new(win->group, NULL); + //mpi_coll_barrier_fun(comm); + //smpi_comm_destroy(comm); + //naive, blocking implementation. + XBT_DEBUG("Entering MPI_Win_Wait"); + int i=0,j=0; + int size = smpi_group_size(win->group); + MPI_Request* reqs = xbt_new0(MPI_Request, size); + +// for(i=0;igroup,j); + if(src!=smpi_process_index()){ + reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD); + i++; + } + j++; + } + size=i; + XBT_DEBUG("Win_wait - Receiving sync messages from %d processes", size); + smpi_mpi_startall(size, reqs); + smpi_mpi_waitall(size, reqs, MPI_STATUSES_IGNORE); + for(i=0;irequests; + size = xbt_dynar_length(reqqs); + + XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); + + unsigned int cpt=0; + MPI_Request req; + // start all requests that have been prepared by another process + xbt_dynar_foreach(reqqs, cpt, req){ + if (req->flags & PREPARED) smpi_mpi_start(req); + } + + MPI_Request* treqs = xbt_dynar_to_array(reqqs); + smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); + xbt_free(treqs); + win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + win->opened--; //we're opened for business ! + return MPI_SUCCESS; +} + + + -- 2.20.1