X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a86c635da5973707a1aca4d5301b813093b39ad3..93797e97a5cd7517c65e1c22c208816a784f4c61:/src/smpi/smpi_request.cpp diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index 733ae20b76..6001f75315 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -3,16 +3,17 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include "src/smpi/smpi_request.hpp" + #include "mc/mc.h" +#include "src/kernel/activity/CommImpl.hpp" #include "src/mc/mc_replay.h" #include "src/smpi/SmpiHost.hpp" -#include "src/kernel/activity/SynchroComm.hpp" #include "src/smpi/private.h" #include "src/smpi/smpi_comm.hpp" #include "src/smpi/smpi_datatype.hpp" #include "src/smpi/smpi_op.hpp" #include "src/smpi/smpi_process.hpp" -#include "src/smpi/smpi_request.hpp" #include @@ -30,7 +31,7 @@ extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t); namespace simgrid{ namespace smpi{ -Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) +Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) { void *old_buf = nullptr; // FIXME Handle the case of a partial shared malloc. @@ -130,7 +131,7 @@ int Request::match_recv(void* a, void* b, smx_activity_t ignored) { ref->real_src_ = req->src_; if(ref->tag_ == MPI_ANY_TAG) ref->real_tag_ = req->tag_; - if(ref->real_size_ < req->real_size_) + if(ref->real_size_ < req->real_size_) ref->truncated_ = 1; if(req->detached_==1) ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver @@ -338,7 +339,7 @@ void Request::start() if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) { mailbox = process->mailbox(); - } + } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { //We have to check both mailboxes (because SSEND messages are sent to the large mbox). //begin with the more appropriate one : the small one. @@ -480,7 +481,7 @@ void Request::start() void Request::startall(int count, MPI_Request * requests) { - if(requests== nullptr) + if(requests== nullptr) return; for(int i = 0; i < count; i++) { @@ -495,7 +496,7 @@ int Request::test(MPI_Request * request, MPI_Status * status) { // because the time will not normally advance when only calls to MPI_Test are made -> deadlock // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it static int nsleeps = 1; - if(smpi_test_sleep > 0) + if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); Status::empty(status); @@ -529,7 +530,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta count++; if (status != MPI_STATUSES_IGNORE) status[i] = *pstat; - if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT) + if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT)) requests[i] = MPI_REQUEST_NULL; } } else { @@ -543,7 +544,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status) { - std::vector comms; + std::vector comms; comms.reserve(count); int i; @@ -561,12 +562,12 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * if (not map.empty()) { //multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it static int nsleeps = 1; - if(smpi_test_sleep > 0) + if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches! if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches) - *index = map[i]; + *index = map[i]; finish_wait(&requests[*index],status); flag = 1; nsleeps = 1; @@ -651,7 +652,8 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* } if (request->action_ != nullptr){ - simgrid::kernel::activity::Comm *sync_comm = static_cast(request->action_); + simgrid::kernel::activity::CommImplPtr sync_comm = + boost::static_pointer_cast(request->action_); MPI_Request req = static_cast(sync_comm->src_data); *flag = 1; if(status != MPI_STATUS_IGNORE && (req->flags_ & PREPARED) == 0) { @@ -755,22 +757,23 @@ void Request::wait(MPI_Request * request, MPI_Status * status) int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) { s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs - int i; int size = 0; int index = MPI_UNDEFINED; - int *map; if(count > 0) { // Wait for a request to complete - xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr); - map = xbt_new(int, count); + xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){ + intrusive_ptr_release((simgrid::kernel::activity::ActivityImpl*)ptr); + }); + int *map = xbt_new(int, count); XBT_DEBUG("Wait for one of %d", count); - for(i = 0; i < count; i++) { + for(int i = 0; i < count; i++) { if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) && not(requests[i]->flags_ & FINISHED)) { if (requests[i]->action_ != nullptr) { XBT_DEBUG("Waiting any %p ", requests[i]); - xbt_dynar_push(&comms, &requests[i]->action_); + intrusive_ptr_add_ref(requests[i]->action_.get()); + xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get()); map[size] = i; size++; } else { @@ -784,8 +787,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) } } } - if(size > 0) { - i = simcall_comm_waitany(&comms, -1); + if (size > 0) { + XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms)); + int i = simcall_comm_waitany(&comms, -1); // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { @@ -820,7 +824,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) std::vector accumulates; int index; MPI_Status stat; - MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; + MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat); int retvalue = MPI_SUCCESS; //tag invalid requests in the set if (status != MPI_STATUSES_IGNORE) { @@ -838,7 +842,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) wait(&requests[c],pstat); index = c; } else { - index = waitany(count, requests, pstat); + index = waitany(count, (MPI_Request*)requests, pstat); if (index == MPI_UNDEFINED) break;