X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/babcb606fcfb6572ea85211e9540d7dbdd380214..d53d00d608a60a6f05e77ea7b7cd5c4e544d7ab1:/src/smpi/smpi_base.cpp diff --git a/src/smpi/smpi_base.cpp b/src/smpi/smpi_base.cpp index aeb6422897..0ec0ca0fe1 100644 --- a/src/smpi/smpi_base.cpp +++ b/src/smpi/smpi_base.cpp @@ -18,11 +18,15 @@ #include "simgrid/sg_config.h" #include "smpi/smpi_utils.hpp" #include "colls/colls.h" +#include #include "src/kernel/activity/SynchroComm.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)"); +extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t); + + static int match_recv(void* a, void* b, smx_activity_t ignored) { MPI_Request ref = static_cast(a); MPI_Request req = static_cast(b); @@ -332,7 +336,7 @@ void smpi_mpi_start(MPI_Request request) // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later request->real_size=request->size; request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf, &request->real_size, &match_recv, - ! smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback + ! smpi_process_get_replaying()? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, -1.0); XBT_DEBUG("recv simcall posted"); @@ -422,7 +426,7 @@ void smpi_mpi_start(MPI_Request request) request->action = simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0, buf, request->real_size, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - !smpi_process_get_replaying() ? &smpi_comm_copy_buffer_callback + !smpi_process_get_replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, request, // detach if msg size < eager/rdv switch limit request->detached); @@ -777,8 +781,12 @@ void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... ) // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it static int nsleeps = 1; - if(smpi_iprobe_sleep > 0) - simcall_process_sleep(nsleeps*smpi_iprobe_sleep); + double speed = simgrid::s4u::Actor::self()->host()->speed(); + double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage"); + if (smpi_iprobe_sleep > 0) { + smx_activity_t iprobe_sleep = simcall_execution_start("iprobe", /* flops to executek*/nsleeps*smpi_iprobe_sleep*speed*maxrate, /* priority */1.0, /* performance bound */maxrate*speed); + simcall_execution_wait(iprobe_sleep); + } // behave like a receive, but don't do it smx_mailbox_t mailbox; @@ -1338,7 +1346,8 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype da void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1397,7 +1406,8 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatyp void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int system_tag = -888; - MPI_Aint lb = 0, dataext = 0; + MPI_Aint lb = 0; + MPI_Aint dataext = 0; int recvbuf_is_empty=1; int rank = smpi_comm_rank(comm); int size = smpi_comm_size(comm); @@ -1419,6 +1429,7 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); + if(smpi_op_is_commute(op)){ for (int other = 0; other < size - 1; other++) { index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); @@ -1439,11 +1450,11 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datat for (int other = 0; other < size - 1; other++) { smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); if(index < rank) { - if(recvbuf_is_empty){ - smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); - recvbuf_is_empty=0; - } else - smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + if (recvbuf_is_empty) { + smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty = 0; + } else + smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); } } }