From 4c7a3670bc8e56e4f0fcdf46389c6aedc4b7b383 Mon Sep 17 00:00:00 2001 From: degomme Date: Sun, 12 Feb 2017 01:21:11 +0100 Subject: [PATCH] Add a way to ensure MPI_Accumulate are applied in order, as per the *** standard --- src/smpi/smpi_base.cpp | 37 ++++++++++++++++++++++++++++++++++--- src/smpi/smpi_rma.cpp | 12 ++++++++---- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index ece561ebd8..cdc04e8f3d 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -835,6 +835,12 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) *request = MPI_REQUEST_NULL; } +static int sort_accumulates(const void* pa, const void* pb) +{ + return (*static_cast(pa))->tag> + (*static_cast(pb))->tag; +} + int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) { s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs @@ -872,11 +878,18 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { index = map[i]; - finish_wait(&requests[index], status); - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[index] = MPI_REQUEST_NULL; + //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly. + if ((requests[index] == MPI_REQUEST_NULL) + || (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){ + finish_wait(&requests[index], status); + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[index] = MPI_REQUEST_NULL; + }else{ + XBT_WARN("huu?"); + } } } + xbt_dynar_free_data(&comms); xbt_free(map); } @@ -889,6 +902,7 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) { + s_xbt_dynar_t accumulates; int index, c; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; @@ -904,6 +918,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } } + xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr); for(c = 0; c < count; c++) { if (MC_is_active() || MC_record_replay_is_active()) { @@ -913,8 +928,14 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) index = smpi_mpi_waitany(count, requests, pstat); if (index == MPI_UNDEFINED) break; + + if (requests[index] != MPI_REQUEST_NULL + && (requests[index]->flags & RECV) + && (requests[index]->flags & ACCUMULATE)) + xbt_dynar_push(&accumulates, &requests[index]); if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT)) requests[index]=MPI_REQUEST_NULL; + } if (status != MPI_STATUSES_IGNORE) { status[index] = *pstat; @@ -923,6 +944,16 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } + if(!xbt_dynar_is_empty(&accumulates)){ + xbt_dynar_sort(&accumulates, sort_accumulates); + MPI_Request req; + unsigned int cursor; + xbt_dynar_foreach(&accumulates, cursor, req) { + finish_wait(&req, status); + } + } + xbt_dynar_free_data(&accumulates); + return retvalue; } diff --git a/src/smpi/smpi_rma.cpp b/src/smpi/smpi_rma.cpp index def9c738b2..68d8507b8d 100644 --- a/src/smpi/smpi_rma.cpp +++ b/src/smpi/smpi_rma.cpp @@ -24,6 +24,7 @@ typedef struct s_smpi_mpi_win{ char* name; int opened; MPI_Group group; + int count; //for ordering the accs } s_smpi_mpi_win_t; @@ -49,7 +50,7 @@ MPI_Win smpi_mpi_win_create( void *base, MPI_Aint size, int disp_unit, MPI_Info win->requests = new std::vector(); win->connected_wins = xbt_new0(MPI_Win, comm_size); win->connected_wins[rank] = win; - + win->count = 0; if(rank==0){ win->bar = MSG_barrier_init(comm_size); } @@ -123,6 +124,7 @@ int smpi_mpi_win_fence( int assert, MPI_Win win){ MPI_Request* treqs = &(*reqs)[0]; smpi_mpi_waitall(size,treqs,MPI_STATUSES_IGNORE); + win->count=0; } win->assert = assert; @@ -219,14 +221,16 @@ int smpi_mpi_accumulate( void *origin_addr, int origin_count, MPI_Datatype origi void* recv_addr = static_cast(static_cast(recv_win->base) + target_disp * recv_win->disp_unit); XBT_DEBUG("Entering MPI_Accumulate to %d", target_rank); - + //As the tag will be used for ordering of the operations, add count to it //prepare send_request MPI_Request sreq = smpi_rma_send_init(origin_addr, origin_count, origin_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, win->comm, op); + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, win->comm, op); //prepare receiver request MPI_Request rreq = smpi_rma_recv_init(recv_addr, target_count, target_datatype, - smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3, recv_win->comm, op); + smpi_process_index(), smpi_group_index(smpi_comm_group(win->comm),target_rank), SMPI_RMA_TAG+3+win->count, recv_win->comm, op); + + win->count++; //push request to receiver's win recv_win->requests->push_back(rreq); //start send -- 2.20.1