X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/c9943e3f2291c2815801df17d4641eb89a52fb83..ff498d2432d650dc50282b04e3bd175a588eef8c:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 30c21a461e..6287d7c027 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -36,8 +36,8 @@ extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl* namespace simgrid{ namespace smpi{ -Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) - : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) +Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags, MPI_Op op) + : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags), op_(op) { void *old_buf = nullptr; // FIXME Handle the case of a partial shared malloc. @@ -58,18 +58,19 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, size_ = datatype->size() * count; datatype->ref(); comm_->ref(); + if(op != MPI_REPLACE && op != MPI_OP_NULL) + op_->ref(); action_ = nullptr; - detached_ = 0; + detached_ = false; detached_sender_ = nullptr; real_src_ = 0; - truncated_ = 0; + truncated_ = false; real_size_ = 0; real_tag_ = 0; if (flags & MPI_REQ_PERSISTENT) refcount_ = 1; else refcount_ = 0; - op_ = MPI_REPLACE; cancelled_ = 0; generalized_funcs=nullptr; nbc_requests_=nullptr; @@ -95,6 +96,9 @@ void Request::unref(MPI_Request* request) Comm::unref((*request)->comm_); Datatype::unref((*request)->old_type_); } + if ((*request)->op_!=MPI_REPLACE && (*request)->op_!=MPI_OP_NULL) + Op::unref(&(*request)->op_); + (*request)->print_request("Destroying"); delete *request; *request = MPI_REQUEST_NULL; @@ -122,8 +126,8 @@ int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) if(ref->tag_ == MPI_ANY_TAG) ref->real_tag_ = req->tag_; if(ref->real_size_ < req->real_size_) - ref->truncated_ = 1; - if(req->detached_==1) + ref->truncated_ = true; + if (req->detached_) ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver if(req->cancelled_==0) req->cancelled_=-1;//mark as uncancellable @@ -147,8 +151,8 @@ int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*) if(req->tag_ == MPI_ANY_TAG) req->real_tag_ = ref->tag_; if(req->real_size_ < ref->real_size_) - req->truncated_ = 1; - if(ref->detached_==1) + req->truncated_ = true; + if (ref->detached_) req->detached_sender_=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver if(req->cancelled_==0) req->cancelled_=-1;//mark as uncancellable @@ -201,8 +205,7 @@ MPI_Request Request::rma_send_init(void *buf, int count, MPI_Datatype datatype, request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(), comm->group()->actor(dst)->get_pid(), tag, comm, MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED | - MPI_REQ_ACCUMULATE); - request->op_ = op; + MPI_REQ_ACCUMULATE, op); } return request; } @@ -226,8 +229,7 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, }else{ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(), comm->group()->actor(dst)->get_pid(), tag, comm, - MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE); - request->op_ = op; + MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op); } return request; } @@ -407,7 +409,7 @@ void Request::start() ((flags_ & MPI_REQ_RMA) != 0 || static_cast(size_) < simgrid::config::get_value("smpi/send-is-detached-thresh"))) { void *oldbuf = nullptr; - detached_ = 1; + detached_ = true; XBT_DEBUG("Send request %p is detached", this); this->ref(); if (not(old_type_->flags() & DT_FLAG_DERIVED)) { @@ -428,7 +430,7 @@ void Request::start() //if we are giving back the control to the user without waiting for completion, we have to inject timings double sleeptime = 0.0; - if (detached_ != 0 || ((flags_ & (MPI_REQ_ISEND | MPI_REQ_SSEND)) != 0)) { // issend should be treated as isend + if (detached_ || ((flags_ & (MPI_REQ_ISEND | MPI_REQ_SSEND)) != 0)) { // issend should be treated as isend // isend and send timings may be different sleeptime = ((flags_ & MPI_REQ_ISEND) != 0) ? simgrid::s4u::Actor::self()->get_host()->extension()->oisend(size_) @@ -488,8 +490,8 @@ void Request::start() /* FIXME: detached sends are not traceable (action_ == nullptr) */ if (action_ != nullptr) { - std::string category = smpi_process()->get_tracing_category(); - simgrid::simix::simcall([this, category] { this->action_->set_category(category); }); + boost::static_pointer_cast(action_)->set_tracing_category( + smpi_process()->get_tracing_category()); } if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0)) @@ -795,14 +797,13 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) return; } - if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) - && ((req->flags_ & MPI_REQ_PREPARED) == 0) - && ((req->flags_ & MPI_REQ_GENERALIZED) == 0)) { + if (not(req->detached_ && ((req->flags_ & MPI_REQ_SEND) != 0)) && ((req->flags_ & MPI_REQ_PREPARED) == 0) && + ((req->flags_ & MPI_REQ_GENERALIZED) == 0)) { if(status != MPI_STATUS_IGNORE) { int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_; status->MPI_SOURCE = req->comm_->group()->rank(src); status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_; - status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS; + status->MPI_ERROR = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS; // this handles the case were size in receive differs from size in send status->count = req->real_size_; } @@ -865,12 +866,25 @@ int Request::wait(MPI_Request * request, MPI_Status * status) if ((*request)->nbc_requests_size_>0){ ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE); for (int i = 0; i < (*request)->nbc_requests_size_; i++) { + if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case + void * buf=(*request)->nbc_requests_[i]->buf_; + if((*request)->old_type_->flags() & DT_FLAG_DERIVED) + buf=(*request)->nbc_requests_[i]->old_buf_; + if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){ + if((*request)->op_!=MPI_OP_NULL){ + int count=(*request)->size_/ (*request)->old_type_->size(); + (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_); + } + smpi_free_tmp_buffer(buf); + } + } if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL) Request::unref(&((*request)->nbc_requests_[i])); } delete[] (*request)->nbc_requests_; (*request)->nbc_requests_size_=0; unref(request); + (*request)=MPI_REQUEST_NULL; return ret; } @@ -1045,7 +1059,7 @@ int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Sta indices[count] = index; count++; for (int i = 0; i < incount; i++) { - if((requests[i] != MPI_REQUEST_NULL)) { + if (requests[i] != MPI_REQUEST_NULL) { test(&requests[i], pstat,&flag); if (flag==1){ indices[count] = i;