X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/3d04e86369329fc50278341b47224953f0095a2a..a784ea0b57101613a92e9fe1403d224e7ea4d501:/src/smpi/smpi_request.cpp diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index 29661e8902..e3d477bee1 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -3,17 +3,19 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include -#include -#include "private.h" +#include "src/smpi/smpi_request.hpp" + #include "mc/mc.h" +#include "src/kernel/activity/CommImpl.hpp" #include "src/mc/mc_replay.h" -#include "src/simix/smx_private.h" -#include "simgrid/sg_config.h" -#include "smpi/smpi_utils.hpp" #include "src/smpi/SmpiHost.hpp" -#include -#include "src/kernel/activity/SynchroComm.hpp" +#include "src/smpi/private.h" +#include "src/smpi/smpi_comm.hpp" +#include "src/smpi/smpi_datatype.hpp" +#include "src/smpi/smpi_op.hpp" +#include "src/smpi/smpi_process.hpp" + +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)"); @@ -29,11 +31,11 @@ extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t); namespace simgrid{ namespace smpi{ -Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) +Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) { void *old_buf = nullptr; // FIXME Handle the case of a partial shared malloc. - if(((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED))) { // && (!smpi_is_shared(buf_))){ + if ((((flags & RECV) != 0) && ((flags & ACCUMULATE) != 0)) || (datatype->flags() & DT_FLAG_DERIVED)) { // This part handles the problem of non-contiguous memory old_buf = buf; if (count==0){ @@ -115,7 +117,8 @@ void Request::unref(MPI_Request* request) } } -int Request::match_recv(void* a, void* b, smx_activity_t ignored) { +int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored) +{ MPI_Request ref = static_cast(a); MPI_Request req = static_cast(b); XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src_,req->src_, ref->tag_, req->tag_); @@ -129,7 +132,7 @@ int Request::match_recv(void* a, void* b, smx_activity_t ignored) { ref->real_src_ = req->src_; if(ref->tag_ == MPI_ANY_TAG) ref->real_tag_ = req->tag_; - if(ref->real_size_ < req->real_size_) + if(ref->real_size_ < req->real_size_) ref->truncated_ = 1; if(req->detached_==1) ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver @@ -138,7 +141,8 @@ int Request::match_recv(void* a, void* b, smx_activity_t ignored) { }else return 0; } -int Request::match_send(void* a, void* b,smx_activity_t ignored) { +int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl* ignored) +{ MPI_Request ref = static_cast(a); MPI_Request req = static_cast(b); XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src_,req->src_, ref->tag_, req->tag_); @@ -337,19 +341,18 @@ void Request::start() if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) { mailbox = process->mailbox(); - } + } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { //We have to check both mailboxes (because SSEND messages are sent to the large mbox). //begin with the more appropriate one : the small one. mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox); - smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, - static_cast(this)); + smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast(this)); if (action == nullptr) { mailbox = process->mailbox(); XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox); - action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); + action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox); mailbox = process->mailbox_small(); @@ -360,7 +363,7 @@ void Request::start() } else { mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted the small mailbox?"); - smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); + smx_activity_t action = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, nothing in the permanent receive mailbox"); @@ -372,9 +375,9 @@ void Request::start() // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later real_size_=size_; - action_ = simcall_comm_irecv(process->process(), mailbox, buf_, &real_size_, &match_recv, - ! process->replaying()? smpi_comm_copy_data_callback - : &smpi_comm_null_copy_buffer_callback, this, -1.0); + action_ = simcall_comm_irecv( + process->process(), mailbox, buf_, &real_size_, &match_recv, + process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0); XBT_DEBUG("recv simcall posted"); if (async_small_thresh != 0 || (flags_ & RMA) != 0 ) @@ -394,9 +397,9 @@ void Request::start() detached_ = 1; XBT_DEBUG("Send request %p is detached", this); refcount_++; - if(!(old_type_->flags() & DT_FLAG_DERIVED)){ + if (not(old_type_->flags() & DT_FLAG_DERIVED)) { oldbuf = buf_; - if (!process->replaying() && oldbuf != nullptr && size_!=0){ + if (not process->replaying() && oldbuf != nullptr && size_ != 0) { if((smpi_privatize_global_variables != 0) && (static_cast(buf_) >= smpi_start_data_exe) && (static_cast(buf_) < smpi_start_data_exe + smpi_size_data_exe )){ @@ -415,8 +418,8 @@ void Request::start() if (detached_ != 0 || ((flags_ & (ISEND | SSEND)) != 0)) { // issend should be treated as isend // isend and send timings may be different sleeptime = ((flags_ & ISEND) != 0) - ? simgrid::s4u::Actor::self()->host()->extension()->oisend(size_) - : simgrid::s4u::Actor::self()->host()->extension()->osend(size_); + ? simgrid::s4u::Actor::self()->getHost()->extension()->oisend(size_) + : simgrid::s4u::Actor::self()->getHost()->extension()->osend(size_); } if(sleeptime > 0.0){ @@ -431,13 +434,12 @@ void Request::start() if (async_small_thresh != 0 || (flags_ & RMA) != 0) xbt_mutex_acquire(mut); - if (!(async_small_thresh != 0 || (flags_ & RMA) !=0)) { + if (not(async_small_thresh != 0 || (flags_ & RMA) != 0)) { mailbox = process->mailbox(); } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { // eager mode mailbox = process->mailbox(); XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox); - smx_activity_t action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, - static_cast(this)); + smx_activity_t action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast(this)); if (action == nullptr) { if ((flags_ & SSEND) == 0){ mailbox = process->mailbox_small(); @@ -445,7 +447,7 @@ void Request::start() } else { mailbox = process->mailbox_small(); XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox); - action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast(this)); + action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, we are first, send to large mailbox"); mailbox = process->mailbox(); @@ -461,13 +463,12 @@ void Request::start() // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later real_size_=size_; - action_ = simcall_comm_isend(SIMIX_process_from_PID(src_+1), mailbox, size_, -1.0, - buf, real_size_, &match_send, - &xbt_free_f, // how to free the userdata if a detached send fails - !process->replaying() ? smpi_comm_copy_data_callback - : &smpi_comm_null_copy_buffer_callback, this, - // detach if msg size < eager/rdv switch limit - detached_); + action_ = simcall_comm_isend( + SIMIX_process_from_PID(src_ + 1), mailbox, size_, -1.0, buf, real_size_, &match_send, + &xbt_free_f, // how to free the userdata if a detached send fails + not process->replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this, + // detach if msg size < eager/rdv switch limit + detached_); XBT_DEBUG("send simcall posted"); /* FIXME: detached sends are not traceable (action_ == nullptr) */ @@ -480,7 +481,7 @@ void Request::start() void Request::startall(int count, MPI_Request * requests) { - if(requests== nullptr) + if(requests== nullptr) return; for(int i = 0; i < count; i++) { @@ -495,7 +496,7 @@ int Request::test(MPI_Request * request, MPI_Status * status) { // because the time will not normally advance when only calls to MPI_Test are made -> deadlock // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it static int nsleeps = 1; - if(smpi_test_sleep > 0) + if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); Status::empty(status); @@ -517,24 +518,22 @@ int Request::test(MPI_Request * request, MPI_Status * status) { int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[]) { - int i; int count = 0; int count_dead = 0; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; - for(i = 0; i < incount; i++) { - if((requests[i] != MPI_REQUEST_NULL)) { - if(test(&requests[i], pstat)) { - indices[i] = 1; - count++; - if(status != MPI_STATUSES_IGNORE) { - status[i] = *pstat; - } - if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT) - requests[i]=MPI_REQUEST_NULL; + for (int i = 0; i < incount; i++) { + if (requests[i] != MPI_REQUEST_NULL) { + if (test(&requests[i], pstat)) { + indices[i] = 1; + count++; + if (status != MPI_STATUSES_IGNORE) + status[i] = *pstat; + if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT)) + requests[i] = MPI_REQUEST_NULL; } - }else{ + } else { count_dead++; } } @@ -545,7 +544,7 @@ int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Sta int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * status) { - std::vector comms; + std::vector comms; comms.reserve(count); int i; @@ -555,20 +554,20 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * std::vector map; /** Maps all matching comms back to their location in requests **/ for(i = 0; i < count; i++) { - if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && !(requests[i]->flags_ & PREPARED)) { - comms.push_back(requests[i]->action_); - map.push_back(i); + if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & PREPARED)) { + comms.push_back(requests[i]->action_); + map.push_back(i); } } - if(!map.empty()) { + if (not map.empty()) { //multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it static int nsleeps = 1; - if(smpi_test_sleep > 0) + if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches! if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches) - *index = map[i]; + *index = map[i]; finish_wait(&requests[*index],status); flag = 1; nsleeps = 1; @@ -593,7 +592,7 @@ int Request::testall(int count, MPI_Request requests[], MPI_Status status[]) MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; int flag=1; for(int i=0; iflags_ & PREPARED)) { + if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED)) { if (test(&requests[i], pstat)!=1){ flag=0; }else{ @@ -625,7 +624,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* // nsleeps is a multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it // This can speed up the execution of certain applications by an order of magnitude, such as HPL static int nsleeps = 1; - double speed = simgrid::s4u::Actor::self()->host()->speed(); + double speed = simgrid::s4u::Actor::self()->getHost()->getSpeed(); double maxrate = xbt_cfg_get_double("smpi/iprobe-cpu-usage"); MPI_Request request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->index(source), comm->rank(), tag, comm, PERSISTENT | RECV); @@ -641,19 +640,18 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* if (xbt_cfg_get_int("smpi/async-small-thresh") > 0){ mailbox = smpi_process()->mailbox_small(); XBT_DEBUG("Trying to probe the perm recv mailbox"); - request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_, request->tag_, &match_recv, - static_cast(request)); + request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast(request)); } if (request->action_ == nullptr){ mailbox = smpi_process()->mailbox(); XBT_DEBUG("trying to probe the other mailbox"); - request->action_ = simcall_comm_iprobe(mailbox, 0, request->src_,request->tag_, &match_recv, - static_cast(request)); + request->action_ = simcall_comm_iprobe(mailbox, 0, &match_recv, static_cast(request)); } if (request->action_ != nullptr){ - simgrid::kernel::activity::Comm *sync_comm = static_cast(request->action_); + simgrid::kernel::activity::CommImplPtr sync_comm = + boost::static_pointer_cast(request->action_); MPI_Request req = static_cast(sync_comm->src_data); *flag = 1; if(status != MPI_STATUS_IGNORE && (req->flags_ & PREPARED) == 0) { @@ -677,7 +675,7 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) MPI_Request req = *request; Status::empty(status); - if(!((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)){ + if (not((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)) { if(status != MPI_STATUS_IGNORE) { int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_; status->MPI_SOURCE = req->comm_->group()->rank(src); @@ -691,9 +689,10 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) MPI_Datatype datatype = req->old_type_; // FIXME Handle the case of a partial shared malloc. - if((((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED))){// && (!smpi_is_shared(req->old_buf_))){ + if (((req->flags_ & ACCUMULATE) != 0) || + (datatype->flags() & DT_FLAG_DERIVED)) { // && (not smpi_is_shared(req->old_buf_))){ - if (!smpi_process()->replaying()){ + if (not smpi_process()->replaying()) { if( smpi_privatize_global_variables != 0 && (static_cast(req->old_buf_) >= smpi_start_data_exe) && ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){ XBT_VERB("Privatization : We are unserializing to a zone in global memory Switch data segment "); @@ -723,7 +722,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) } if(req->detached_sender_ != nullptr){ //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0 - double sleeptime = simgrid::s4u::Actor::self()->host()->extension()->orecv(req->real_size()); + double sleeptime = + simgrid::s4u::Actor::self()->getHost()->extension()->orecv(req->real_size()); if(sleeptime > 0.0){ simcall_process_sleep(sleeptime); XBT_DEBUG("receiving size of %zu : sleep %f ", req->real_size_, sleeptime); @@ -756,21 +756,23 @@ void Request::wait(MPI_Request * request, MPI_Status * status) int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) { s_xbt_dynar_t comms; // Keep it on stack to save some extra mallocs - int i; - int size = 0; int index = MPI_UNDEFINED; - int *map; if(count > 0) { + int size = 0; // Wait for a request to complete - xbt_dynar_init(&comms, sizeof(smx_activity_t), nullptr); - map = xbt_new(int, count); + xbt_dynar_init(&comms, sizeof(smx_activity_t), [](void*ptr){ + intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr); + }); + int *map = xbt_new(int, count); XBT_DEBUG("Wait for one of %d", count); - for(i = 0; i < count; i++) { - if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags_ & PREPARED) && !(requests[i]->flags_ & FINISHED)) { + for(int i = 0; i < count; i++) { + if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) && + not(requests[i]->flags_ & FINISHED)) { if (requests[i]->action_ != nullptr) { XBT_DEBUG("Waiting any %p ", requests[i]); - xbt_dynar_push(&comms, &requests[i]->action_); + intrusive_ptr_add_ref(requests[i]->action_.get()); + xbt_dynar_push_as(&comms, simgrid::kernel::activity::ActivityImpl*, requests[i]->action_.get()); map[size] = i; size++; } else { @@ -784,15 +786,16 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) } } } - if(size > 0) { - i = simcall_comm_waitany(&comms, -1); + if (size > 0) { + XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms)); + int i = simcall_comm_waitany(&comms, -1); // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { index = map[i]; //in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly. - if ((requests[index] == MPI_REQUEST_NULL) - || (!((requests[index]->flags_ & ACCUMULATE) && (requests[index]->flags_ & RECV)))){ + if ((requests[index] == MPI_REQUEST_NULL) || + (not((requests[index]->flags_ & ACCUMULATE) && (requests[index]->flags_ & RECV)))) { finish_wait(&requests[index],status); if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT)) requests[index] = MPI_REQUEST_NULL; @@ -820,7 +823,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) std::vector accumulates; int index; MPI_Status stat; - MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; + MPI_Status *pstat = (status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat); int retvalue = MPI_SUCCESS; //tag invalid requests in the set if (status != MPI_STATUSES_IGNORE) { @@ -838,7 +841,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) wait(&requests[c],pstat); index = c; } else { - index = waitany(count, requests, pstat); + index = waitany(count, (MPI_Request*)requests, pstat); if (index == MPI_UNDEFINED) break; @@ -856,7 +859,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) } } - if (!accumulates.empty()) { + if (not accumulates.empty()) { std::sort(accumulates.begin(), accumulates.end(), sort_accumulates); for (auto req : accumulates) { finish_wait(&req, status); @@ -868,15 +871,12 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[]) { - int i; int count = 0; - int index; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; - for(i = 0; i < incount; i++) - { - index=waitany(incount, requests, pstat); + for (int i = 0; i < incount; i++) { + int index = waitany(incount, requests, pstat); if(index!=MPI_UNDEFINED){ indices[count] = index; count++; @@ -910,9 +910,10 @@ int Request::add_f() { } void Request::free_f(int id) { - char key[KEY_SIZE]; - if(id!=MPI_FORTRAN_REQUEST_NULL) + if (id != MPI_FORTRAN_REQUEST_NULL) { + char key[KEY_SIZE]; xbt_dict_remove(F2C::f2c_lookup(), get_key_id(key, id)); + } } }