X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/c6683b41cf9ecda70c1d4d75d1effc61903a894f..8bf7ffc43ad5507982e924a7f05bbb13c89965cb:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index e62c37d2e8..64008cd40c 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -69,7 +69,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a refcount_ = 1; else refcount_ = 0; - message_id_ = 0; init_buffer(count); this->add_f(); } @@ -186,10 +185,11 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) bool match = match_common(req, req, ref); if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm()) return match; - - if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), - ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) { + auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), + ref->comm_->group()->rank(req->dst_), req->tag_)); + if (it != req->message_id_.end()) { if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) { + req->message_id_.erase(it); XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), @@ -204,12 +204,12 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) match = false; req->flags_ &= ~MPI_REQ_MATCHED; ref->detached_sender_ = nullptr; - XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, " + XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, " "from pid %ld to pid %ld, with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), - req->message_id_, req->src_, req->dst_, req->tag_); + req->src_, req->dst_, req->tag_); } return match; } @@ -439,6 +439,29 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype } } +void Request::isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag, + void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, + MPI_Comm comm, MPI_Request* request) +{ + aid_t source = MPI_PROC_NULL; + if (src == MPI_ANY_SOURCE) + source = MPI_ANY_SOURCE; + else if (src != MPI_PROC_NULL) + source = comm->group()->actor(src); + aid_t destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL; + + (*request) = new Request( nullptr, 0, MPI_BYTE, + src,dst, sendtag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); + std::vector requests; + if (aid_t myid = simgrid::s4u::this_actor::get_pid(); (destination == myid) && (source == myid)) { + Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); + return; + } + requests.push_back(isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm)); + requests.push_back(irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm)); + (*request)->start_nbc_requests(requests); +} + void Request::start() { s4u::Mailbox* mailbox; @@ -510,7 +533,8 @@ void Request::start() process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, - -1.0}; + -1.0, + process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, @@ -518,13 +542,13 @@ void Request::start() XBT_DEBUG("recv simcall posted"); } else { /* the RECV flag was not set, so this is a send */ - const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); xbt_assert(process, "Actor pid=%ld is gone??", dst_); if (TRACE_smpi_view_internals()) TRACE_smpi_send(src_, src_, dst_, tag_, size_); this->print_request("New send"); - message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); + message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_)); comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); void* buf = buf_; @@ -609,7 +633,7 @@ void Request::start() &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_}; + detached_, process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); @@ -666,7 +690,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){ try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(), + process->call_location()->get_call_location()}; *flag = kernel::actor::simcall_answered( [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer); } catch (const Exception&) { @@ -755,7 +781,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestanySimcall observer{issuer, comms}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()}; i = kernel::actor::simcall_answered( [&observer] { return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); @@ -1071,7 +1098,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status) try{ // this is not a detached send kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1, + process->call_location()->get_call_location()}; kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); }, &observer); } catch (const CancelException&) { @@ -1141,7 +1170,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1, + process->call_location()->get_call_location()}; i = kernel::actor::simcall_blocking( [&observer] { kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),