X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7a40637a7db0cf339c5a09ba5594a004a5304a3c..845590d7b4f0790c34191697d1b6c28d50d1ccbd:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 451a57c489..5cf3a4990b 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -36,8 +36,8 @@ extern void (*smpi_comm_copy_data_callback)(simgrid::kernel::activity::CommImpl* namespace simgrid{ namespace smpi{ -Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) - : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags) +Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags, MPI_Op op) + : buf_(buf), old_type_(datatype), src_(src), dst_(dst), tag_(tag), comm_(comm), flags_(flags), op_(op) { void *old_buf = nullptr; // FIXME Handle the case of a partial shared malloc. @@ -58,6 +58,8 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, size_ = datatype->size() * count; datatype->ref(); comm_->ref(); + if(op != MPI_REPLACE && op != MPI_OP_NULL) + op_->ref(); action_ = nullptr; detached_ = 0; detached_sender_ = nullptr; @@ -69,7 +71,6 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, refcount_ = 1; else refcount_ = 0; - op_ = MPI_REPLACE; cancelled_ = 0; generalized_funcs=nullptr; nbc_requests_=nullptr; @@ -95,6 +96,9 @@ void Request::unref(MPI_Request* request) Comm::unref((*request)->comm_); Datatype::unref((*request)->old_type_); } + if ((*request)->op_!=MPI_REPLACE && (*request)->op_!=MPI_OP_NULL) + Op::unref(&(*request)->op_); + (*request)->print_request("Destroying"); delete *request; *request = MPI_REQUEST_NULL; @@ -201,8 +205,7 @@ MPI_Request Request::rma_send_init(void *buf, int count, MPI_Datatype datatype, request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(), comm->group()->actor(dst)->get_pid(), tag, comm, MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED | - MPI_REQ_ACCUMULATE); - request->op_ = op; + MPI_REQ_ACCUMULATE, op); } return request; } @@ -226,8 +229,7 @@ MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, }else{ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(), comm->group()->actor(dst)->get_pid(), tag, comm, - MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE); - request->op_ = op; + MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE, op); } return request; } @@ -805,8 +807,6 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS; // this handles the case were size in receive differs from size in send status->count = req->real_size_; -// int flag; -// Request::get_status(req,&flag,status); } req->print_request("Finishing"); @@ -867,6 +867,16 @@ int Request::wait(MPI_Request * request, MPI_Status * status) if ((*request)->nbc_requests_size_>0){ ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE); for (int i = 0; i < (*request)->nbc_requests_size_; i++) { + if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case + void * buf=(*request)->nbc_requests_[i]->buf_; + if((*request)->old_type_->flags() & DT_FLAG_DERIVED) + buf=(*request)->nbc_requests_[i]->old_buf_; + if((*request)->op_!=MPI_OP_NULL){ + int count=(*request)->size_/ (*request)->old_type_->size(); + (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_); + } + smpi_free_tmp_buffer(buf); + } if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL) Request::unref(&((*request)->nbc_requests_[i])); } @@ -1095,7 +1105,7 @@ int Request::get_status(MPI_Request req, int* flag, MPI_Status * status){ if(req != MPI_REQUEST_NULL && req->action_ != nullptr) { req->iprobe(req->src_, req->tag_, req->comm_, flag, status); - if(flag) + if(*flag) return MPI_SUCCESS; } if (req != MPI_REQUEST_NULL && @@ -1106,7 +1116,8 @@ int Request::get_status(MPI_Request req, int* flag, MPI_Status * status){ } *flag=1; - if(status != MPI_STATUS_IGNORE) { + if(req != MPI_REQUEST_NULL && + status != MPI_STATUS_IGNORE) { int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_; status->MPI_SOURCE = req->comm_->group()->rank(src); status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;