X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6a908b79ea45f85f305620c09375b72483b7eee9..039807fd1fb02afb72365fe19a6ad620d1bbf4c7:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 6b58173182..86130def39 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -69,7 +69,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a refcount_ = 1; else refcount_ = 0; - message_id_ = 0; init_buffer(count); this->add_f(); } @@ -181,15 +180,16 @@ void Request::init_buffer(int count){ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); bool match = match_common(req, req, ref); if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm()) return match; - - if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), - ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) { + auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), + ref->comm_->group()->rank(req->dst_), req->tag_)); + if (it != req->message_id_.end()) { if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) { + req->message_id_.erase(it); XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), @@ -204,20 +204,20 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) match = false; req->flags_ &= ~MPI_REQ_MATCHED; ref->detached_sender_ = nullptr; - XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, " + XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, " "from pid %ld to pid %ld, with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), - req->message_id_, req->src_, req->dst_, req->tag_); + req->src_, req->dst_, req->tag_); } return match; } bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); return match_common(req, ref, req); } @@ -323,9 +323,9 @@ MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -333,9 +333,9 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -343,9 +343,9 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -358,8 +358,8 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, source = MPI_ANY_SOURCE; else if (src != MPI_PROC_NULL) source = comm->group()->actor(src); - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, - simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, + simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); if(src != MPI_PROC_NULL) request->start(); return request; @@ -374,9 +374,9 @@ int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); @@ -385,9 +385,9 @@ void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request, MPI_STATUS_IGNORE); @@ -395,9 +395,9 @@ void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, i void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); @@ -510,7 +510,8 @@ void Request::start() process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, - -1.0}; + -1.0, + process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, @@ -518,13 +519,13 @@ void Request::start() XBT_DEBUG("recv simcall posted"); } else { /* the RECV flag was not set, so this is a send */ - const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); xbt_assert(process, "Actor pid=%ld is gone??", dst_); if (TRACE_smpi_view_internals()) TRACE_smpi_send(src_, src_, dst_, tag_, size_); this->print_request("New send"); - message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); + message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_)); comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); void* buf = buf_; @@ -609,7 +610,7 @@ void Request::start() &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_}; + detached_, process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); @@ -666,7 +667,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){ try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(), + process->call_location()->get_call_location()}; *flag = kernel::actor::simcall_answered( [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer); } catch (const Exception&) { @@ -755,7 +758,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestanySimcall observer{issuer, comms}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()}; i = kernel::actor::simcall_answered( [&observer] { return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); @@ -857,7 +861,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* static int nsleeps = 1; double speed = s4u::this_actor::get_host()->get_speed(); double maxrate = smpi_cfg_iprobe_cpu_usage(); - auto request = + auto* request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source), simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE); if (smpi_iprobe_sleep > 0) { @@ -1071,7 +1075,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status) try{ // this is not a detached send kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1, + process->call_location()->get_call_location()}; kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); }, &observer); } catch (const CancelException&) { @@ -1141,7 +1147,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1, + process->call_location()->get_call_location()}; i = kernel::actor::simcall_blocking( [&observer] { kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),