X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/e553a03ec20f531e48fab905d2034551c58a683f..00e649212cc6ca64c4cb8103e4c5bb4b98d2ff3f:/src/smpi/smpi_base.cpp diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index 9bae432bef..cd9ae5b301 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -332,7 +332,7 @@ void smpi_mpi_start(MPI_Request request) // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later request->real_size=request->size; request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv, - ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback + ! smpi_process_get_replaying()? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, -1.0); XBT_DEBUG("recv simcall posted"); @@ -422,7 +422,7 @@ void smpi_mpi_start(MPI_Request request) request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0, buf, request->real_size, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback + !smpi_process_get_replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, // detach if msg size < eager/rdv switch limit request->detached); @@ -835,6 +835,11 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) *request = MPI_REQUEST_NULL; } +static int sort_accumulates(MPI_Request a, MPI_Request b) +{ + return (a->tag < b->tag); +} + int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) { s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs @@ -855,15 +860,15 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) xbt_dynar_push(&comms, &requests[i]->action); map[size] = i; size++; - }else{ - //This is a finished detached request, let's return this one - size=0;//so we free the dynar but don't do the waitany call - index=i; - finish_wait(&requests[i], status);//cleanup if refcount = 0 - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[i]=MPI_REQUEST_NULL;//set to null - break; - } + } else { + // This is a finished detached request, let's return this one + size = 0; // so we free the dynar but don't do the waitany call + index = i; + finish_wait(&requests[i], status); // cleanup if refcount = 0 + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[i] = MPI_REQUEST_NULL; // set to null + break; + } } } if(size > 0) { @@ -872,11 +877,19 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { index = map[i]; - finish_wait(&requests[index], status); - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[index] = MPI_REQUEST_NULL; + //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly. + if ((requests[index] == MPI_REQUEST_NULL) + || (!((requests[index]->flags & ACCUMULATE) && (requests[index]->flags & RECV)))){ + finish_wait(&requests[index], status); + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[index] = MPI_REQUEST_NULL; + }else{ + XBT_WARN("huu?"); + } } } + + xbt_dynar_free_data(&comms); xbt_free(map); } @@ -888,13 +901,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) { - int index, c; + std::vector accumulates; + int index; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; int retvalue = MPI_SUCCESS; //tag invalid requests in the set if (status != MPI_STATUSES_IGNORE) { - for (c = 0; c < count; c++) { + for (int c = 0; c < count; c++) { if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) { smpi_empty_status(&status[c]); } else if (requests[c]->src == MPI_PROC_NULL) { @@ -903,8 +917,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } } - for(c = 0; c < count; c++) { - + for (int c = 0; c < count; c++) { if (MC_is_active() || MC_record_replay_is_active()) { smpi_mpi_wait(&requests[c], pstat); index = c; @@ -912,8 +925,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) index = smpi_mpi_waitany(count, requests, pstat); if (index == MPI_UNDEFINED) break; + + if (requests[index] != MPI_REQUEST_NULL + && (requests[index]->flags & RECV) + && (requests[index]->flags & ACCUMULATE)) + accumulates.push_back(requests[index]); if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT)) - requests[index]=MPI_REQUEST_NULL; + requests[index] = MPI_REQUEST_NULL; } if (status != MPI_STATUSES_IGNORE) { status[index] = *pstat; @@ -922,6 +940,13 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } + if (!accumulates.empty()) { + std::sort(accumulates.begin(), accumulates.end(), sort_accumulates); + for (auto req : accumulates) { + finish_wait(&req, status); + } + } + return retvalue; } @@ -1313,7 +1338,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1372,7 +1398,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int recvbuf_is_empty=1; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1394,6 +1421,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); + if(smpi_op_is_commute(op)){ for (int other = 0; other < size - 1; other++) { index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); @@ -1414,11 +1442,11 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat for (int other = 0; other < size - 1; other++) { smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); if(index < rank) { - if(recvbuf_is_empty){ - smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); - recvbuf_is_empty=0; - } else - smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + if (recvbuf_is_empty) { + smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty = 0; + } else + smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); } } }