X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1687df79d61a9418bba830bbd0ab7de16e457090..0fabb12fc92ad7f1d47ec977e2d7e4eb36f16977:/src/smpi/smpi_rma.cpp diff --git a/src/smpi/smpi_rma.cpp b/src/smpi/smpi_rma.cpp index 6edbee3e47..42de98449f 100644 --- a/src/smpi/smpi_rma.cpp +++ b/src/smpi/smpi_rma.cpp @@ -6,11 +6,10 @@ #include "private.h" #include +#include "src/smpi/smpi_group.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)"); -msg_bar_t creation_bar = nullptr; - typedef struct s_smpi_mpi_win{ void* base; MPI_Aint size; @@ -19,6 +18,7 @@ typedef struct s_smpi_mpi_win{ MPI_Info info; int assert; std::vector *requests; + xbt_mutex_t mut; msg_bar_t bar; MPI_Win* connected_wins; char* name; @@ -29,13 +29,11 @@ typedef struct s_smpi_mpi_win{ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm){ - MPI_Win win; - int comm_size = smpi_comm_size(comm); - int rank=smpi_comm_rank(comm); + int rank = smpi_comm_rank(comm); XBT_DEBUG("Creating window"); - win = xbt_new(s_smpi_mpi_win_t, 1); + MPI_Win win = xbt_new(s_smpi_mpi_win_t, 1); win->base = base; win->size = size; win->disp_unit = disp_unit; @@ -48,6 +46,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info win->opened = 0; win->group = MPI_GROUP_NULL; win->requests = new std::vector(); + win->mut=xbt_mutex_init(); win->connected_wins = xbt_new0(MPI_Win, comm_size); win->connected_wins[rank] = win; win->count = 0; @@ -67,7 +66,9 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info int smpi_mpi_win_free( MPI_Win* win){ //As per the standard, perform a barrier to ensure every async comm is finished MSG_barrier_wait((*win)->bar); + xbt_mutex_acquire((*win)->mut); delete (*win)->requests; + xbt_mutex_release((*win)->mut); xbt_free((*win)->connected_wins); if ((*win)->name != nullptr){ xbt_free((*win)->name); @@ -80,6 +81,7 @@ int smpi_mpi_win_free( MPI_Win* win){ int rank=smpi_comm_rank((*win)->comm); if(rank == 0) MSG_barrier_destroy((*win)->bar); + xbt_mutex_destroy((*win)->mut); xbt_free(*win); *win = MPI_WIN_NULL; return MPI_SUCCESS; @@ -107,32 +109,39 @@ void smpi_mpi_win_set_name(MPI_Win win, char* name){ win->name = xbt_strdup(name); } -int smpi_mpi_win_fence( int assert, MPI_Win win){ +int smpi_mpi_win_fence(int assert, MPI_Win win) +{ XBT_DEBUG("Entering fence"); - if(win->opened==0) + if (win->opened == 0) win->opened=1; - if(assert != MPI_MODE_NOPRECEDE){ + if (assert != MPI_MODE_NOPRECEDE) { + // This is not the first fence => finalize what came before MSG_barrier_wait(win->bar); - + xbt_mutex_acquire(win->mut); + // This (simulated) mutex ensures that no process pushes to the vector of requests during the waitall. + // Without this, the vector could get redimensionned when another process pushes. + // This would result in the array used by smpi_mpi_waitall() to be invalidated. + // Another solution would be to copy the data and cleanup the vector *before* smpi_mpi_waitall std::vector *reqs = win->requests; int size = static_cast(reqs->size()); // start all requests that have been prepared by another process - if(size>0){ - for(auto req: *reqs){ - if (req && (req->flags & PREPARED)) - smpi_mpi_start(req); - } + if (size > 0) { + for (const auto& req : *reqs) { + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); + } - MPI_Request* treqs = &(*reqs)[0]; + MPI_Request* treqs = &(*reqs)[0]; - smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); + smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); } win->count=0; + xbt_mutex_release(win->mut); } win->assert = assert; MSG_barrier_wait(win->bar); - XBT_DEBUG("Leaving fence "); + XBT_DEBUG("Leaving fence"); return MPI_SUCCESS; } @@ -151,20 +160,23 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat if(target_rank != smpi_comm_rank(win->comm)){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(), - smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL); + smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(), - smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL); + smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL); //push request to receiver's win + xbt_mutex_acquire(recv_win->mut); recv_win->requests->push_back(rreq); - + xbt_mutex_release(recv_win->mut); //start send smpi_mpi_start(sreq); //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(sreq); + xbt_mutex_release(win->mut); }else{ smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype); } @@ -186,25 +198,27 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat if(target_rank != smpi_comm_rank(win->comm)){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype, - smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm, + smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype, - smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm, + smpi_comm_group(win->comm)->index(target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm, MPI_OP_NULL); //start the send, with another process than us as sender. smpi_mpi_start(sreq); - //push request to receiver's win + xbt_mutex_acquire(send_win->mut); send_win->requests->push_back(sreq); + xbt_mutex_release(send_win->mut); //start recv smpi_mpi_start(rreq); - //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(rreq); + xbt_mutex_release(win->mut); }else{ smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype); } @@ -227,20 +241,24 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi //As the tag will be used for ordering of the operations, add count to it //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op); + smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op); + smpi_process_index(), smpi_comm_group(win->comm)->index(target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op); win->count++; //push request to receiver's win + xbt_mutex_acquire(recv_win->mut); recv_win->requests->push_back(rreq); + xbt_mutex_release(recv_win->mut); //start send smpi_mpi_start(sreq); //push request to sender's win + xbt_mutex_acquire(win->mut); win->requests->push_back(sreq); + xbt_mutex_release(win->mut); return MPI_SUCCESS; } @@ -259,17 +277,18 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ must complete, without further dependencies. */ //naive, blocking implementation. - int i=0,j=0; - int size = smpi_group_size(group); - MPI_Request* reqs = xbt_new0(MPI_Request, size); - - while(j!=size){ - int src=smpi_group_index(group,j); - if(src!=smpi_process_index()){ - reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+4, MPI_COMM_WORLD); - i++; - } - j++; + int i = 0; + int j = 0; + int size = group->size(); + MPI_Request* reqs = xbt_new0(MPI_Request, size); + + while (j != size) { + int src = group->index(j); + if (src != smpi_process_index() && src != MPI_UNDEFINED) { + reqs[i] = smpi_irecv_init(nullptr, 0, MPI_CHAR, src, SMPI_RMA_TAG + 4, MPI_COMM_WORLD); + i++; + } + j++; } size=i; smpi_mpi_startall(size, reqs); @@ -280,19 +299,20 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ xbt_free(reqs); win->opened++; //we're open for business ! win->group=group; - smpi_group_use(group); + group->use(); return MPI_SUCCESS; } int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ //let's make a synchronous send here - int i=0,j=0; - int size = smpi_group_size(group); + int i = 0; + int j = 0; + int size = group->size(); MPI_Request* reqs = xbt_new0(MPI_Request, size); while(j!=size){ - int dst=smpi_group_index(group,j); - if(dst!=smpi_process_index()){ + int dst=group->index(j); + if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){ reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+4, MPI_COMM_WORLD); i++; } @@ -308,7 +328,7 @@ int smpi_mpi_win_post(MPI_Group group, int assert, MPI_Win win){ xbt_free(reqs); win->opened++; //we're open for business ! win->group=group; - smpi_group_use(group); + group->use(); return MPI_SUCCESS; } @@ -317,13 +337,14 @@ int smpi_mpi_win_complete(MPI_Win win){ xbt_die("Complete called on already opened MPI_Win"); XBT_DEBUG("Entering MPI_Win_Complete"); - int i=0,j=0; - int size = smpi_group_size(win->group); + int i = 0; + int j = 0; + int size = win->group->size(); MPI_Request* reqs = xbt_new0(MPI_Request, size); while(j!=size){ - int dst=smpi_group_index(win->group,j); - if(dst!=smpi_process_index()){ + int dst=win->group->index(j); + if(dst!=smpi_process_index() && dst!=MPI_UNDEFINED){ reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } @@ -340,21 +361,25 @@ int smpi_mpi_win_complete(MPI_Win win){ xbt_free(reqs); //now we can finish RMA calls - + xbt_mutex_acquire(win->mut); std::vector *reqqs = win->requests; size = static_cast(reqqs->size()); XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); - // start all requests that have been prepared by another process - for (auto req: *reqqs){ - if (req && (req->flags & PREPARED)) - smpi_mpi_start(req); + if (size > 0) { + // start all requests that have been prepared by another process + for (const auto& req : *reqqs) { + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); + } + + MPI_Request* treqs = &(*reqqs)[0]; + smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); + reqqs->clear(); } + xbt_mutex_release(win->mut); - MPI_Request* treqs = &(*reqqs)[0]; - smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); - reqqs->clear(); - smpi_group_unuse(win->group); + win->group->unuse(); win->opened--; //we're closed for business ! return MPI_SUCCESS; } @@ -363,12 +388,12 @@ int smpi_mpi_win_wait(MPI_Win win){ //naive, blocking implementation. XBT_DEBUG("Entering MPI_Win_Wait"); int i=0,j=0; - int size = smpi_group_size(win->group); + int size = win->group->size(); MPI_Request* reqs = xbt_new0(MPI_Request, size); while(j!=size){ - int src=smpi_group_index(win->group,j); - if(src!=smpi_process_index()){ + int src=win->group->index(j); + if(src!=smpi_process_index() && src!=MPI_UNDEFINED){ reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } @@ -382,22 +407,25 @@ int smpi_mpi_win_wait(MPI_Win win){ smpi_mpi_request_free(&reqs[i]); } xbt_free(reqs); - + xbt_mutex_acquire(win->mut); std::vector *reqqs = win->requests; size = static_cast(reqqs->size()); - XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); + XBT_DEBUG("Win_wait - Finishing %d RMA calls", size); + if (size > 0) { + // start all requests that have been prepared by another process + for (const auto& req : *reqqs) { + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); + } - // start all requests that have been prepared by another process - for(auto req: *reqqs){ - if (req && (req->flags & PREPARED)) - smpi_mpi_start(req); + MPI_Request* treqs = &(*reqqs)[0]; + smpi_mpi_waitall(size, treqs, MPI_STATUSES_IGNORE); + reqqs->clear(); } + xbt_mutex_release(win->mut); - MPI_Request* treqs = &(*reqqs)[0]; - smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); - reqqs->clear(); - smpi_group_unuse(win->group); + win->group->unuse(); win->opened--; //we're opened for business ! return MPI_SUCCESS; }