From: degomme Date: Tue, 21 Feb 2017 22:18:13 +0000 (+0100) Subject: Protect access to remote process list of rma requests, to avoid race conditions.. X-Git-Tag: v3_15~322 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/469b766cd3bb5331c1086b2e8f3bfd586fe0089c Protect access to remote process list of rma requests, to avoid race conditions.. --- diff --git a/src/smpi/smpi_rma.cpp b/src/smpi/smpi_rma.cpp index 6edbee3e47..147140c01a 100644 --- a/src/smpi/smpi_rma.cpp +++ b/src/smpi/smpi_rma.cpp @@ -19,6 +19,7 @@ typedef struct s_smpi_mpi_win{ MPI_Info info; int assert; std::vector *requests; + xbt_mutex_t mut; msg_bar_t bar; MPI_Win* connected_wins; char* name; @@ -48,6 +49,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info win->opened = 0; win->group = MPI_GROUP_NULL; win->requests = new std::vector(); + win->mut=xbt_mutex_init(); win->connected_wins = xbt_new0(MPI_Win, comm_size); win->connected_wins[rank] = win; win->count = 0; @@ -67,7 +69,9 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info int smpi_mpi_win_free( MPI_Win* win){ //As per the standard, perform a barrier to ensure every async comm is finished MSG_barrier_wait((*win)->bar); + xbt_mutex_acquire((*win)->mut); delete (*win)->requests; + xbt_mutex_release((*win)->mut); xbt_free((*win)->connected_wins); if ((*win)->name != nullptr){ xbt_free((*win)->name); @@ -80,6 +84,7 @@ int smpi_mpi_win_free( MPI_Win* win){ int rank=smpi_comm_rank((*win)->comm); if(rank == 0) MSG_barrier_destroy((*win)->bar); + xbt_mutex_destroy((*win)->mut); xbt_free(*win); *win = MPI_WIN_NULL; return MPI_SUCCESS; @@ -113,7 +118,7 @@ int smpi_mpi_win_fence( int assert, MPI_Win win){ win->opened=1; if(assert != MPI_MODE_NOPRECEDE){ MSG_barrier_wait(win->bar); - + xbt_mutex_acquire(win->mut); std::vector *reqs = win->requests; int size = static_cast(reqs->size()); // start all requests that have been prepared by another process @@ -128,6 +133,7 @@ int smpi_mpi_win_fence( int assert, MPI_Win win){ smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); } win->count=0; + xbt_mutex_release(win->mut); } win->assert = assert; @@ -158,13 +164,16 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL); //push request to receiver's win + xbt_mutex_acquire(recv_win->mut); recv_win->requests->push_back(rreq); - + xbt_mutex_release(recv_win->mut); //start send smpi_mpi_start(sreq); //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(sreq); + xbt_mutex_release(win->mut); }else{ smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype); } @@ -196,15 +205,17 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat //start the send, with another process than us as sender. smpi_mpi_start(sreq); - //push request to receiver's win + xbt_mutex_acquire(send_win->mut); send_win->requests->push_back(sreq); + xbt_mutex_release(send_win->mut); //start recv smpi_mpi_start(rreq); - //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(rreq); + xbt_mutex_release(win->mut); }else{ smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype); } @@ -235,12 +246,16 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi win->count++; //push request to receiver's win + xbt_mutex_acquire(recv_win->mut); recv_win->requests->push_back(rreq); + xbt_mutex_release(recv_win->mut); //start send smpi_mpi_start(sreq); //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(sreq); + xbt_mutex_release(win->mut); return MPI_SUCCESS; } @@ -265,7 +280,7 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ while(j!=size){ int src=smpi_group_index(group,j); - if(src!=smpi_process_index()){ + if(src!=smpi_process_index()&& src!=MPI_UNDEFINED){ reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD); i++; } @@ -292,7 +307,7 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ while(j!=size){ int dst=smpi_group_index(group,j); - if(dst!=smpi_process_index()){ + if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){ reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD); i++; } @@ -323,7 +338,7 @@ int smpi_mpi_win_complete(MPI_Win win){ while(j!=size){ int dst=smpi_group_index(win->group,j); - if(dst!=smpi_process_index()){ + if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){ reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } @@ -340,7 +355,7 @@ int smpi_mpi_win_complete(MPI_Win win){ xbt_free(reqs); //now we can finish RMA calls - + xbt_mutex_acquire(win->mut); std::vector *reqqs = win->requests; size = static_cast(reqqs->size()); @@ -354,6 +369,8 @@ int smpi_mpi_win_complete(MPI_Win win){ MPI_Request* treqs = &(*reqqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); reqqs->clear(); + xbt_mutex_release(win->mut); + smpi_group_unuse(win->group); win->opened--; //we're closed for business ! return MPI_SUCCESS; @@ -368,7 +385,7 @@ int smpi_mpi_win_wait(MPI_Win win){ while(j!=size){ int src=smpi_group_index(win->group,j); - if(src!=smpi_process_index()){ + if(src!=smpi_process_index() && src!=MPI_UNDEFINED){ reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } @@ -382,11 +399,11 @@ int smpi_mpi_win_wait(MPI_Win win){ smpi_mpi_request_free(&reqs[i]); } xbt_free(reqs); - + xbt_mutex_acquire(win->mut); std::vector *reqqs = win->requests; size = static_cast(reqqs->size()); - XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); + XBT_DEBUG("Win_wait - Finishing %d RMA calls", size); // start all requests that have been prepared by another process for(auto req: *reqqs){ @@ -397,6 +414,8 @@ int smpi_mpi_win_wait(MPI_Win win){ MPI_Request* treqs = &(*reqqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); reqqs->clear(); + xbt_mutex_release(win->mut); + smpi_group_unuse(win->group); win->opened--; //we're opened for business ! return MPI_SUCCESS;