X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/4c7a3670bc8e56e4f0fcdf46389c6aedc4b7b383..0b9803aff2b8893bed56c21012d86d0a19b186a0:/src/smpi/smpi_base.cpp diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index cdc04e8f3d..b64113ca98 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -18,11 +18,15 @@ #include "simgrid/sg_config.h" #include "smpi/smpi_utils.hpp" #include "colls/colls.h" +#include #include "src/kernel/activity/SynchroComm.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)"); +extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t); + + static int match_recv(void* a, void* b, smx_activity_t ignored) { MPI_Request ref = static_cast(a); MPI_Request req = static_cast(b); @@ -332,7 +336,7 @@ void smpi_mpi_start(MPI_Request request) // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later request->real_size=request->size; request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv, - ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback + ! smpi_process_get_replaying()? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, -1.0); XBT_DEBUG("recv simcall posted"); @@ -422,7 +426,7 @@ void smpi_mpi_start(MPI_Request request) request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0, buf, request->real_size, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback + !smpi_process_get_replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, // detach if msg size < eager/rdv switch limit request->detached); @@ -775,10 +779,15 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... ) - // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it + // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it + // (This can speed up the execution of certain applications by an order of magnitude, such as HPL) static int nsleeps = 1; - if(smpi_iprobe_sleep > 0) - simcall_process_sleep(nsleeps*smpi_iprobe_sleep); + double speed = simgrid::s4u::Actor::self()->host()->speed(); + double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage"); + if (smpi_iprobe_sleep > 0) { + smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed); + simcall_execution_wait(iprobe_sleep); + } // behave like a receive, but don't do it smx_mailbox_t mailbox; @@ -835,10 +844,9 @@ void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) *request = MPI_REQUEST_NULL; } -static int sort_accumulates(const void* pa, const void* pb) +static int sort_accumulates(MPI_Request a, MPI_Request b) { - return (*static_cast(pa))->tag> - (*static_cast(pb))->tag; + return (a->tag < b->tag); } int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) @@ -861,15 +869,15 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) xbt_dynar_push(&comms, &requests[i]->action); map[size] = i; size++; - }else{ - //This is a finished detached request, let's return this one - size=0;//so we free the dynar but don't do the waitany call - index=i; - finish_wait(&requests[i], status);//cleanup if refcount = 0 - if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) - requests[i]=MPI_REQUEST_NULL;//set to null - break; - } + } else { + // This is a finished detached request, let's return this one + size = 0; // so we free the dynar but don't do the waitany call + index = i; + finish_wait(&requests[i], status); // cleanup if refcount = 0 + if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT)) + requests[i] = MPI_REQUEST_NULL; // set to null + break; + } } } if(size > 0) { @@ -902,14 +910,14 @@ int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status * status) int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) { - s_xbt_dynar_t accumulates; - int index, c; + std::vector accumulates; + int index; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; int retvalue = MPI_SUCCESS; //tag invalid requests in the set if (status != MPI_STATUSES_IGNORE) { - for (c = 0; c < count; c++) { + for (int c = 0; c < count; c++) { if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL || (requests[c]->flags & PREPARED)) { smpi_empty_status(&status[c]); } else if (requests[c]->src == MPI_PROC_NULL) { @@ -918,9 +926,7 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } } - xbt_dynar_init(&accumulates, sizeof(MPI_Request), nullptr); - for(c = 0; c < count; c++) { - + for (int c = 0; c < count; c++) { if (MC_is_active() || MC_record_replay_is_active()) { smpi_mpi_wait(&requests[c], pstat); index = c; @@ -932,10 +938,9 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & RECV) && (requests[index]->flags & ACCUMULATE)) - xbt_dynar_push(&accumulates, &requests[index]); + accumulates.push_back(requests[index]); if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT)) - requests[index]=MPI_REQUEST_NULL; - + requests[index] = MPI_REQUEST_NULL; } if (status != MPI_STATUSES_IGNORE) { status[index] = *pstat; @@ -944,15 +949,12 @@ int smpi_mpi_waitall(int count, MPI_Request requests[], MPI_Status status[]) } } - if(!xbt_dynar_is_empty(&accumulates)){ - xbt_dynar_sort(&accumulates, sort_accumulates); - MPI_Request req; - unsigned int cursor; - xbt_dynar_foreach(&accumulates, cursor, req) { + if (!accumulates.empty()) { + std::sort(accumulates.begin(), accumulates.end(), sort_accumulates); + for (auto req : accumulates) { finish_wait(&req, status); } } - xbt_dynar_free_data(&accumulates); return retvalue; } @@ -1345,7 +1347,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1404,7 +1407,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int recvbuf_is_empty=1; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1426,6 +1430,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); + if(smpi_op_is_commute(op)){ for (int other = 0; other < size - 1; other++) { index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); @@ -1446,11 +1451,11 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat for (int other = 0; other < size - 1; other++) { smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); if(index < rank) { - if(recvbuf_is_empty){ - smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); - recvbuf_is_empty=0; - } else - smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + if (recvbuf_is_empty) { + smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty = 0; + } else + smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); } } }