X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/9c9f17b4ac25fe6fcf026724a7d3cc4978b775ed..43368eb677c67eac96bffc546feba6497ad775ec:/src/smpi/smpi_request.cpp diff --git a/src/smpi/smpi_request.cpp b/src/smpi/smpi_request.cpp index aa1bf22d07..1838d94d1a 100644 --- a/src/smpi/smpi_request.cpp +++ b/src/smpi/smpi_request.cpp @@ -3,16 +3,18 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include -#include -#include "private.h" #include "mc/mc.h" #include "src/mc/mc_replay.h" -#include "src/simix/smx_private.h" -#include "simgrid/sg_config.h" -#include "smpi/smpi_utils.hpp" -#include +#include "src/smpi/SmpiHost.hpp" #include "src/kernel/activity/SynchroComm.hpp" +#include "src/smpi/private.h" +#include "src/smpi/smpi_comm.hpp" +#include "src/smpi/smpi_datatype.hpp" +#include "src/smpi/smpi_op.hpp" +#include "src/smpi/smpi_process.hpp" +#include "src/smpi/smpi_request.hpp" + +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (reques)"); @@ -21,99 +23,27 @@ static simgrid::config::Flag smpi_iprobe_sleep( static simgrid::config::Flag smpi_test_sleep( "smpi/test", "Minimum time to inject inside a call to MPI_Test", 1e-4); -std::vector smpi_os_values; -std::vector smpi_or_values; std::vector smpi_ois_values; extern void (*smpi_comm_copy_data_callback) (smx_activity_t, void*, size_t); -static double smpi_os(size_t size) -{ - if (smpi_os_values.empty()) { - smpi_os_values = parse_factor(xbt_cfg_get_string("smpi/os")); - } - double current=smpi_os_values.empty()?0.0:smpi_os_values[0].values[0]+smpi_os_values[0].values[1]*size; - // Iterate over all the sections that were specified and find the right - // value. (fact.factor represents the interval sizes; we want to find the - // section that has fact.factor <= size and no other such fact.factor <= size) - // Note: parse_factor() (used before) already sorts the vector we iterate over! - for (auto& fact : smpi_os_values) { - if (size <= fact.factor) { // Values already too large, use the previously computed value of current! - XBT_DEBUG("os : %zu <= %zu return %.10f", size, fact.factor, current); - return current; - }else{ - // If the next section is too large, the current section must be used. - // Hence, save the cost, as we might have to use it. - current = fact.values[0]+fact.values[1]*size; - } - } - XBT_DEBUG("Searching for smpi/os: %zu is larger than the largest boundary, return %.10f", size, current); - - return current; -} - -static double smpi_ois(size_t size) -{ - if (smpi_ois_values.empty()) { - smpi_ois_values = parse_factor(xbt_cfg_get_string("smpi/ois")); - } - double current=smpi_ois_values.empty()?0.0:smpi_ois_values[0].values[0]+smpi_ois_values[0].values[1]*size; - // Iterate over all the sections that were specified and find the right value. (fact.factor represents the interval - // sizes; we want to find the section that has fact.factor <= size and no other such fact.factor <= size) - // Note: parse_factor() (used before) already sorts the vector we iterate over! - for (auto& fact : smpi_ois_values) { - if (size <= fact.factor) { // Values already too large, use the previously computed value of current! - XBT_DEBUG("ois : %zu <= %zu return %.10f", size, fact.factor, current); - return current; - }else{ - // If the next section is too large, the current section must be used. - // Hence, save the cost, as we might have to use it. - current = fact.values[0]+fact.values[1]*size; - } - } - XBT_DEBUG("Searching for smpi/ois: %zu is larger than the largest boundary, return %.10f", size, current); - - return current; -} - -static double smpi_or(size_t size) -{ - if (smpi_or_values.empty()) { - smpi_or_values = parse_factor(xbt_cfg_get_string("smpi/or")); - } - - double current=smpi_or_values.empty()?0.0:smpi_or_values.front().values[0]+smpi_or_values.front().values[1]*size; - - // Iterate over all the sections that were specified and find the right value. (fact.factor represents the interval - // sizes; we want to find the section that has fact.factor <= size and no other such fact.factor <= size) - // Note: parse_factor() (used before) already sorts the vector we iterate over! - for (auto fact : smpi_or_values) { - if (size <= fact.factor) { // Values already too large, use the previously computed value of current! - XBT_DEBUG("or : %zu <= %zu return %.10f", size, fact.factor, current); - return current; - } else { - // If the next section is too large, the current section must be used. - // Hence, save the cost, as we might have to use it. - current=fact.values[0]+fact.values[1]*size; - } - } - XBT_DEBUG("smpi_or: %zu is larger than largest boundary, return %.10f", size, current); - - return current; -} - namespace simgrid{ namespace smpi{ Request::Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) { void *old_buf = nullptr; - if((((flags & RECV) != 0) && ((flags & ACCUMULATE) !=0)) || (datatype->flags() & DT_FLAG_DERIVED)){ +// FIXME Handle the case of a partial shared malloc. + if ((((flags & RECV) != 0) && ((flags & ACCUMULATE) != 0)) || (datatype->flags() & DT_FLAG_DERIVED)) { // This part handles the problem of non-contiguous memory old_buf = buf; - buf_ = count==0 ? nullptr : xbt_malloc(count*datatype->size()); - if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) { - datatype->serialize(old_buf, buf_, count); + if (count==0){ + buf_ = nullptr; + }else { + buf_ = xbt_malloc(count*datatype->size()); + if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) { + datatype->serialize(old_buf, buf_, count); + } } } // This part handles the problem of non-contiguous memory (for the unserialisation at the reception) @@ -398,42 +328,44 @@ void Request::start() if ((flags_ & RECV) != 0) { this->print_request("New recv"); + simgrid::smpi::Process* process = smpi_process_remote(dst_); + int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh"); - xbt_mutex_t mut = smpi_process()->mailboxes_mutex(); + xbt_mutex_t mut = process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & RMA) != 0) xbt_mutex_acquire(mut); if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) { - mailbox = smpi_process()->mailbox(); + mailbox = process->mailbox(); } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { //We have to check both mailboxes (because SSEND messages are sent to the large mbox). //begin with the more appropriate one : the small one. - mailbox = smpi_process()->mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox); smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { - mailbox = smpi_process()->mailbox(); + mailbox = process->mailbox(); XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox); action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox); - mailbox = smpi_process()->mailbox_small(); + mailbox = process->mailbox_small(); } } else { XBT_DEBUG("yes there was something for us in the large mailbox"); } } else { - mailbox = smpi_process()->mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("Is there a corresponding send already posted the small mailbox?"); smx_activity_t action = simcall_comm_iprobe(mailbox, 0, src_,tag_, &match_recv, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, nothing in the permanent receive mailbox"); - mailbox = smpi_process()->mailbox(); + mailbox = process->mailbox(); } else { XBT_DEBUG("yes there was something for us in the small mailbox"); } @@ -441,19 +373,18 @@ void Request::start() // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later real_size_=size_; - action_ = simcall_comm_irecv(SIMIX_process_self(), mailbox, buf_, &real_size_, &match_recv, - ! smpi_process()->replaying()? smpi_comm_copy_data_callback + action_ = simcall_comm_irecv(process->process(), mailbox, buf_, &real_size_, &match_recv, + ! process->replaying()? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this, -1.0); XBT_DEBUG("recv simcall posted"); if (async_small_thresh != 0 || (flags_ & RMA) != 0 ) xbt_mutex_release(mut); } else { /* the RECV flag was not set, so this is a send */ - int receiver = dst_; - + simgrid::smpi::Process* process = smpi_process_remote(dst_); int rank = src_; if (TRACE_smpi_view_internals()) { - TRACE_smpi_send(rank, rank, receiver, tag_, size_); + TRACE_smpi_send(rank, rank, dst_, tag_, size_); } this->print_request("New send"); @@ -466,7 +397,7 @@ void Request::start() refcount_++; if(!(old_type_->flags() & DT_FLAG_DERIVED)){ oldbuf = buf_; - if (!smpi_process()->replaying() && oldbuf != nullptr && size_!=0){ + if (!process->replaying() && oldbuf != nullptr && size_!=0){ if((smpi_privatize_global_variables != 0) && (static_cast(buf_) >= smpi_start_data_exe) && (static_cast(buf_) < smpi_start_data_exe + smpi_size_data_exe )){ @@ -482,9 +413,11 @@ void Request::start() //if we are giving back the control to the user without waiting for completion, we have to inject timings double sleeptime = 0.0; - if(detached_ != 0 || ((flags_ & (ISEND|SSEND)) != 0)){// issend should be treated as isend - //isend and send timings may be different - sleeptime = ((flags_ & ISEND) != 0) ? smpi_ois(size_) : smpi_os(size_); + if (detached_ != 0 || ((flags_ & (ISEND | SSEND)) != 0)) { // issend should be treated as isend + // isend and send timings may be different + sleeptime = ((flags_ & ISEND) != 0) + ? simgrid::s4u::Actor::self()->host()->extension()->oisend(size_) + : simgrid::s4u::Actor::self()->host()->extension()->osend(size_); } if(sleeptime > 0.0){ @@ -494,36 +427,36 @@ void Request::start() int async_small_thresh = xbt_cfg_get_int("smpi/async-small-thresh"); - xbt_mutex_t mut=smpi_process_remote(receiver)->mailboxes_mutex(); + xbt_mutex_t mut=process->mailboxes_mutex(); if (async_small_thresh != 0 || (flags_ & RMA) != 0) xbt_mutex_acquire(mut); if (!(async_small_thresh != 0 || (flags_ & RMA) !=0)) { - mailbox = smpi_process_remote(receiver)->mailbox(); + mailbox = process->mailbox(); } else if (((flags_ & RMA) != 0) || static_cast(size_) < async_small_thresh) { // eager mode - mailbox = smpi_process_remote(receiver)->mailbox(); + mailbox = process->mailbox(); XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox); smx_activity_t action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast(this)); if (action == nullptr) { if ((flags_ & SSEND) == 0){ - mailbox = smpi_process_remote(receiver)->mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox); } else { - mailbox = smpi_process_remote(receiver)->mailbox_small(); + mailbox = process->mailbox_small(); XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox); action = simcall_comm_iprobe(mailbox, 1,dst_, tag_, &match_send, static_cast(this)); if (action == nullptr) { XBT_DEBUG("No, we are first, send to large mailbox"); - mailbox = smpi_process_remote(receiver)->mailbox(); + mailbox = process->mailbox(); } } } else { XBT_DEBUG("Yes there was something for us in the large mailbox"); } } else { - mailbox = smpi_process_remote(receiver)->mailbox(); + mailbox = process->mailbox(); XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, this,buf_); } @@ -532,7 +465,7 @@ void Request::start() action_ = simcall_comm_isend(SIMIX_process_from_PID(src_+1), mailbox, size_, -1.0, buf, real_size_, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails - !smpi_process()->replaying() ? smpi_comm_copy_data_callback + !process->replaying() ? smpi_comm_copy_data_callback : &smpi_comm_null_copy_buffer_callback, this, // detach if msg size < eager/rdv switch limit detached_); @@ -585,24 +518,22 @@ int Request::test(MPI_Request * request, MPI_Status * status) { int Request::testsome(int incount, MPI_Request requests[], int *indices, MPI_Status status[]) { - int i; int count = 0; int count_dead = 0; MPI_Status stat; MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat; - for(i = 0; i < incount; i++) { - if((requests[i] != MPI_REQUEST_NULL)) { - if(test(&requests[i], pstat)) { - indices[i] = 1; - count++; - if(status != MPI_STATUSES_IGNORE) { - status[i] = *pstat; - } - if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT) - requests[i]=MPI_REQUEST_NULL; + for (int i = 0; i < incount; i++) { + if (requests[i] != MPI_REQUEST_NULL) { + if (test(&requests[i], pstat)) { + indices[i] = 1; + count++; + if (status != MPI_STATUSES_IGNORE) + status[i] = *pstat; + if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags_ & NON_PERSISTENT) + requests[i] = MPI_REQUEST_NULL; } - }else{ + } else { count_dead++; } } @@ -758,7 +689,10 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) req->print_request("Finishing"); MPI_Datatype datatype = req->old_type_; - if(((req->flags_ & ACCUMULATE) != 0) || (datatype->flags() & DT_FLAG_DERIVED)){ +// FIXME Handle the case of a partial shared malloc. + if (((req->flags_ & ACCUMULATE) != 0) || + (datatype->flags() & DT_FLAG_DERIVED)) { // && (!smpi_is_shared(req->old_buf_))){ + if (!smpi_process()->replaying()){ if( smpi_privatize_global_variables != 0 && (static_cast(req->old_buf_) >= smpi_start_data_exe) && ((char*)req->old_buf_ < smpi_start_data_exe + smpi_size_data_exe )){ @@ -773,8 +707,10 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_); xbt_free(req->buf_); }else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate - int n =req->real_size_/datatype->size(); - req->op_->apply(req->buf_, req->old_buf_, &n, datatype); + if(datatype->size()!=0){ + int n =req->real_size_/datatype->size(); + req->op_->apply(req->buf_, req->old_buf_, &n, datatype); + } xbt_free(req->buf_); } } @@ -787,7 +723,7 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) } if(req->detached_sender_ != nullptr){ //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0 - double sleeptime = smpi_or(req->real_size_); + double sleeptime = simgrid::s4u::Actor::self()->host()->extension()->orecv(req->real_size()); if(sleeptime > 0.0){ simcall_process_sleep(sleeptime); XBT_DEBUG("receiving size of %zu : sleep %f ", req->real_size_, sleeptime);