X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/48eccb2c1532e35819830ca56fad7cf89887359f..1f32679abc2f9c0cae9e27951b643a0f70d71b3a:/src/smpi/smpi_rma.cpp diff --git a/src/smpi/smpi_rma.cpp b/src/smpi/smpi_rma.cpp index 98d3ae5aef..def9c738b2 100644 --- a/src/smpi/smpi_rma.cpp +++ b/src/smpi/smpi_rma.cpp @@ -1,4 +1,3 @@ - /* Copyright (c) 2007-2015. The SimGrid Team. * All rights reserved. */ @@ -6,12 +5,11 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "private.h" +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_rma, smpi, "Logging specific to SMPI (RMA operations)"); -#define RMA_TAG -1234 - -xbt_bar_t creation_bar = NULL; +msg_bar_t creation_bar = nullptr; typedef struct s_smpi_mpi_win{ void* base; @@ -20,8 +18,8 @@ typedef struct s_smpi_mpi_win{ MPI_Comm comm; MPI_Info info; int assert; - xbt_dynar_t requests; - xbt_bar_t bar; + std::vector *requests; + msg_bar_t bar; MPI_Win* connected_wins; char* name; int opened; @@ -45,21 +43,20 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info if(info!=MPI_INFO_NULL) info->refcount++; win->comm = comm; - win->name = NULL; + win->name = nullptr; win->opened = 0; win->group = MPI_GROUP_NULL; - win->requests = xbt_dynar_new(sizeof(MPI_Request), NULL); + win->requests = new std::vector(); win->connected_wins = xbt_new0(MPI_Win, comm_size); win->connected_wins[rank] = win; if(rank==0){ - win->bar=xbt_barrier_init(comm_size); + win->bar = MSG_barrier_init(comm_size); } - mpi_coll_allgather_fun(&(win->connected_wins[rank]), sizeof(MPI_Win), MPI_BYTE, win->connected_wins, sizeof(MPI_Win), MPI_BYTE, comm); - mpi_coll_bcast_fun( &(win->bar), sizeof(xbt_bar_t), MPI_BYTE, 0, comm); + mpi_coll_bcast_fun(&(win->bar), sizeof(msg_bar_t), MPI_BYTE, 0, comm); mpi_coll_barrier_fun(comm); @@ -68,66 +65,68 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info int smpi_mpi_win_free( MPI_Win* win){ //As per the standard, perform a barrier to ensure every async comm is finished - xbt_barrier_wait((*win)->bar); - xbt_dynar_free(&(*win)->requests); + MSG_barrier_wait((*win)->bar); + delete (*win)->requests; xbt_free((*win)->connected_wins); - if ((*win)->name != NULL){ + if ((*win)->name != nullptr){ xbt_free((*win)->name); } if((*win)->info!=MPI_INFO_NULL){ MPI_Info_free(&(*win)->info); } + + mpi_coll_barrier_fun((*win)->comm); + int rank=smpi_comm_rank((*win)->comm); + if(rank == 0) + MSG_barrier_destroy((*win)->bar); xbt_free(*win); *win = MPI_WIN_NULL; return MPI_SUCCESS; } void smpi_mpi_win_get_name(MPI_Win win, char* name, int* length){ - if(win->name==NULL){ + if(win->name==nullptr){ *length=0; - name=NULL; + name=nullptr; return; } *length = strlen(win->name); - strcpy(name, win->name); + strncpy(name, win->name, *length+1); } void smpi_mpi_win_get_group(MPI_Win win, MPI_Group* group){ if(win->comm != MPI_COMM_NULL){ *group = smpi_comm_group(win->comm); - smpi_group_use(*group); + } else { + *group = MPI_GROUP_NULL; } } void smpi_mpi_win_set_name(MPI_Win win, char* name){ - win->name = xbt_strdup(name);; + win->name = xbt_strdup(name); } int smpi_mpi_win_fence( int assert, MPI_Win win){ XBT_DEBUG("Entering fence"); - if(!win->opened) + if(win->opened==0) win->opened=1; if(assert != MPI_MODE_NOPRECEDE){ - xbt_barrier_wait(win->bar); + MSG_barrier_wait(win->bar); - xbt_dynar_t reqs = win->requests; - int size = xbt_dynar_length(reqs); - unsigned int cpt=0; - MPI_Request req; + std::vector *reqs = win->requests; + int size = static_cast(reqs->size()); // start all requests that have been prepared by another process - xbt_dynar_foreach(reqs, cpt, req){ - if (req->flags & PREPARED) smpi_mpi_start(req); + for(auto req: *reqs){ + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); } - MPI_Request* treqs = static_cast(xbt_dynar_to_array(reqs)); - win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + MPI_Request* treqs = &(*reqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); - xbt_free(treqs); - } win->assert = assert; - xbt_barrier_wait(win->bar); + MSG_barrier_wait(win->bar); XBT_DEBUG("Leaving fence "); return MPI_SUCCESS; @@ -136,31 +135,31 @@ int smpi_mpi_win_fence( int assert, MPI_Win win){ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) { - if(!win->opened)//check that post/start has been done + if(win->opened==0)//check that post/start has been done return MPI_ERR_WIN; //get receiver pointer MPI_Win recv_win = win->connected_wins[target_rank]; - void* recv_addr = (void*) ( ((char*)recv_win->base) + target_disp * recv_win->disp_unit); + void* recv_addr = static_cast ( static_cast(recv_win->base) + target_disp * recv_win->disp_unit); XBT_DEBUG("Entering MPI_Put to %d", target_rank); if(target_rank != smpi_comm_rank(win->comm)){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, smpi_process_index(), - smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, win->comm, MPI_OP_NULL); + smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, win->comm, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, smpi_process_index(), - smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+1, recv_win->comm, MPI_OP_NULL); + smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+1, recv_win->comm, MPI_OP_NULL); //push request to receiver's win - xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq); + recv_win->requests->push_back(rreq); //start send smpi_mpi_start(sreq); //push request to sender's win - xbt_dynar_push_as(win->requests, MPI_Request, sreq); + win->requests->push_back(sreq); }else{ smpi_datatype_copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype); } @@ -171,36 +170,36 @@ int smpi_mpi_put( void *origin_addr, int origin_count, MPI_Datatype origin_datat int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win) { - if(!win->opened)//check that post/start has been done + if(win->opened==0)//check that post/start has been done return MPI_ERR_WIN; //get sender pointer MPI_Win send_win = win->connected_wins[target_rank]; - void* send_addr = (void*)( ((char*)send_win->base) + target_disp * send_win->disp_unit); + void* send_addr = static_cast(static_cast(send_win->base) + target_disp * send_win->disp_unit); XBT_DEBUG("Entering MPI_Get from %d", target_rank); if(target_rank != smpi_comm_rank(win->comm)){ //prepare send_request MPI_Request sreq = smpi_rma_send_init(send_addr, target_count, target_datatype, - smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, send_win->comm, + smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, send_win->comm, MPI_OP_NULL); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(origin_addr, origin_count, origin_datatype, - smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), RMA_TAG+2, win->comm, + smpi_group_index(smpi_comm_group(win->comm),target_rank), smpi_process_index(), SMPI_RMA_TAG+2, win->comm, MPI_OP_NULL); //start the send, with another process than us as sender. smpi_mpi_start(sreq); //push request to receiver's win - xbt_dynar_push_as(send_win->requests, MPI_Request, sreq); + send_win->requests->push_back(sreq); //start recv smpi_mpi_start(rreq); //push request to sender's win - xbt_dynar_push_as(win->requests, MPI_Request, rreq); + win->requests->push_back(rreq); }else{ smpi_datatype_copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype); } @@ -212,29 +211,29 @@ int smpi_mpi_get( void *origin_addr, int origin_count, MPI_Datatype origin_datat int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win) { - if(!win->opened)//check that post/start has been done + if(win->opened==0)//check that post/start has been done return MPI_ERR_WIN; //FIXME: local version //get receiver pointer MPI_Win recv_win = win->connected_wins[target_rank]; - void* recv_addr = (void*)( ((char*)recv_win->base) + target_disp * recv_win->disp_unit); + void* recv_addr = static_cast(static_cast(recv_win->base) + target_disp * recv_win->disp_unit); XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank); //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, win->comm, op); + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), RMA_TAG+3, recv_win->comm, op); + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op); //push request to receiver's win - xbt_dynar_push_as(recv_win->requests, MPI_Request, rreq); + recv_win->requests->push_back(rreq); //start send smpi_mpi_start(sreq); //push request to sender's win - xbt_dynar_push_as(win->requests, MPI_Request, sreq); + win->requests->push_back(sreq); return MPI_SUCCESS; } @@ -257,11 +256,10 @@ int smpi_mpi_win_start(MPI_Group group, int assert, MPI_Win win){ int size = smpi_group_size(group); MPI_Request* reqs = xbt_new0(MPI_Request, size); -// for(i=0;iopened==0) xbt_die("Complete called on already opened MPI_Win"); -// xbt_barrier_wait(win->bar); - //MPI_Comm comm = smpi_comm_new(win->group, NULL); - //mpi_coll_barrier_fun(comm); - //smpi_comm_destroy(comm); XBT_DEBUG("Entering MPI_Win_Complete"); int i=0,j=0; @@ -323,7 +317,7 @@ int smpi_mpi_win_complete(MPI_Win win){ while(j!=size){ int dst=smpi_group_index(win->group,j); if(dst!=smpi_process_index()){ - reqs[i]=smpi_mpi_send_init(NULL, 0, MPI_CHAR, dst, RMA_TAG+5, MPI_COMM_WORLD); + reqs[i]=smpi_mpi_send_init(nullptr, 0, MPI_CHAR, dst, SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } j++; @@ -340,41 +334,35 @@ int smpi_mpi_win_complete(MPI_Win win){ //now we can finish RMA calls - xbt_dynar_t reqqs = win->requests; - size = xbt_dynar_length(reqqs); + std::vector *reqqs = win->requests; + size = static_cast(reqqs->size()); XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); - unsigned int cpt=0; - MPI_Request req; // start all requests that have been prepared by another process - xbt_dynar_foreach(reqqs, cpt, req){ - if (req->flags & PREPARED) smpi_mpi_start(req); + for (auto req: *reqqs){ + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); } - MPI_Request* treqs = static_cast(xbt_dynar_to_array(reqqs)); - win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + MPI_Request* treqs = &(*reqqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); - xbt_free(treqs); + reqqs->clear(); + smpi_group_unuse(win->group); win->opened--; //we're closed for business ! return MPI_SUCCESS; } int smpi_mpi_win_wait(MPI_Win win){ -// xbt_barrier_wait(win->bar); - //MPI_Comm comm = smpi_comm_new(win->group, NULL); - //mpi_coll_barrier_fun(comm); - //smpi_comm_destroy(comm); //naive, blocking implementation. XBT_DEBUG("Entering MPI_Win_Wait"); int i=0,j=0; int size = smpi_group_size(win->group); MPI_Request* reqs = xbt_new0(MPI_Request, size); -// for(i=0;igroup,j); if(src!=smpi_process_index()){ - reqs[i]=smpi_irecv_init(NULL, 0, MPI_CHAR, src,RMA_TAG+5, MPI_COMM_WORLD); + reqs[i]=smpi_irecv_init(nullptr, 0, MPI_CHAR, src,SMPI_RMA_TAG+5, MPI_COMM_WORLD); i++; } j++; @@ -388,22 +376,21 @@ int smpi_mpi_win_wait(MPI_Win win){ } xbt_free(reqs); - xbt_dynar_t reqqs = win->requests; - size = xbt_dynar_length(reqqs); + std::vector *reqqs = win->requests; + size = static_cast(reqqs->size()); XBT_DEBUG("Win_complete - Finishing %d RMA calls", size); - unsigned int cpt=0; - MPI_Request req; // start all requests that have been prepared by another process - xbt_dynar_foreach(reqqs, cpt, req){ - if (req->flags & PREPARED) smpi_mpi_start(req); + for(auto req: *reqqs){ + if (req && (req->flags & PREPARED)) + smpi_mpi_start(req); } - MPI_Request* treqs = static_cast(xbt_dynar_to_array(reqqs)); - win->requests=xbt_dynar_new(sizeof(MPI_Request), NULL); + MPI_Request* treqs = &(*reqqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); - xbt_free(treqs); + reqqs->clear(); + smpi_group_unuse(win->group); win->opened--; //we're opened for business ! return MPI_SUCCESS; }