X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/dccf1b41e9c7b5a696f01abceaa2779fe65f154f..36fa571a13985879dc627c70ecc2340af606aa42:/src/smpi/smpi_base.cpp diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index b7af99977a..aeb6422897 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -816,8 +816,6 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* nsleeps++; } smpi_mpi_request_free(&request); - - return; } void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) @@ -837,9 +835,14 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) *request = MPI_REQUEST_NULL; } +static int sort_accumulates(MPI_Request a, MPI_Request b) +{ + return (a->tag < b->tag); +} + int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) { - xbt_dynar_t comms; + s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs int i; int size = 0; int index = MPI_UNDEFINED; @@ -847,40 +850,47 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) if(count > 0) { // Wait for a request to complete - comms = xbt_dynar_new(sizeof(smx_activity_t), nullptr); + xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr); map = xbt_new(int, count); XBT_DEBUG("Wait for one of %d", count); for(i = 0; i < count; i++) { if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED) && !(requests[i]->flags & FINISHED)) { if (requests[i]->action != nullptr) { XBT_DEBUG("Waiting any %p ", requests[i]); - xbt_dynar_push(comms, &requests[i]->action); + xbt_dynar_push(&comms, &requests[i]->action); map[size] = i; size++; - }else{ - //This is a finished detached request, let's return this one - size=0;//so we free the dynar but don't do the waitany call - index=i; - finish_wait(&requests[i], status);//cleanup if refcount = 0 - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[i]=MPI_REQUEST_NULL;//set to null - break; - } + } else { + // This is a finished detached request, let's return this one + size = 0; // so we free the dynar but don't do the waitany call + index = i; + finish_wait(&requests[i], status); // cleanup if refcount = 0 + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[i] = MPI_REQUEST_NULL; // set to null + break; + } } } if(size > 0) { - i = simcall_comm_waitany(comms, -1); + i = simcall_comm_waitany(&comms, -1); // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { index = map[i]; - finish_wait(&requests[index], status); - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[index] = MPI_REQUEST_NULL; + //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly. + if ((requests[index] == MPI_REQUEST_NULL) + || (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){ + finish_wait(&requests[index], status); + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[index] = MPI_REQUEST_NULL; + }else{ + XBT_WARN("huu?"); + } } } + + xbt_dynar_free_data(&comms); xbt_free(map); - xbt_dynar_free(&comms); } if (index==MPI_UNDEFINED) @@ -891,13 +901,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) { - int index, c; + std::vector accumulates; + int index; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; int retvalue = MPI_SUCCESS; //tag invalid requests in the set if (status != MPI_STATUSES_IGNORE) { - for (c = 0; c < count; c++) { + for (int c = 0; c < count; c++) { if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) { smpi_empty_status(&status[c]); } else if (requests[c]->src == MPI_PROC_NULL) { @@ -906,8 +917,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } } - for(c = 0; c < count; c++) { - + for (int c = 0; c < count; c++) { if (MC_is_active() || MC_record_replay_is_active()) { smpi_mpi_wait(&requests[c], pstat); index = c; @@ -915,8 +925,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) index = smpi_mpi_waitany(count, requests, pstat); if (index == MPI_UNDEFINED) break; + + if (requests[index] != MPI_REQUEST_NULL + && (requests[index]->flags & RECV) + && (requests[index]->flags & ACCUMULATE)) + accumulates.push_back(requests[index]); if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT)) - requests[index]=MPI_REQUEST_NULL; + requests[index] = MPI_REQUEST_NULL; } if (status != MPI_STATUSES_IGNORE) { status[index] = *pstat; @@ -925,6 +940,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } + if (!accumulates.empty()) { + std::sort(accumulates.begin(), accumulates.end(), sort_accumulates); + for (auto req : accumulates) { + finish_wait(&req, status); + } + } + return retvalue; }