namespace simgrid{
namespace smpi{
-Request::Request(const void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
+Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, aid_t dst, int tag, MPI_Comm comm,
unsigned flags, MPI_Op op)
: buf_(const_cast<void*>(buf))
- , old_type_(datatype)
+ , type_(datatype)
, size_(datatype->size() * count)
, src_(src)
, dst_(dst)
refcount_ = 1;
else
refcount_ = 0;
- nbc_requests_=nullptr;
- nbc_requests_size_=0;
init_buffer(count);
this->add_f();
}
((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state);
} else {
Comm::unref((*request)->comm_);
- Datatype::unref((*request)->old_type_);
+ Datatype::unref((*request)->type_);
}
if ((*request)->op_ != MPI_REPLACE && (*request)->op_ != MPI_OP_NULL)
Op::unref(&(*request)->op_);
{
xbt_assert(sender, "Cannot match against null sender");
xbt_assert(receiver, "Cannot match against null receiver");
- XBT_DEBUG("Trying to match %s of sender src %d against %d, tag %d against %d, id %d against %d",
+ XBT_DEBUG("Trying to match %s of sender src %ld against %ld, tag %d against %d, id %d against %d",
(req == receiver ? "send" : "recv"), sender->src_, receiver->src_, sender->tag_, receiver->tag_,
sender->comm_->id(), receiver->comm_->id());
void *old_buf = nullptr;
// FIXME Handle the case of a partial shared malloc.
// This part handles the problem of non-contiguous memory (for the unserialization at the reception)
- if ((((flags_ & MPI_REQ_RECV) != 0) && ((flags_ & MPI_REQ_ACCUMULATE) != 0)) || (old_type_->flags() & DT_FLAG_DERIVED)) {
+ if ((((flags_ & MPI_REQ_RECV) != 0) && ((flags_ & MPI_REQ_ACCUMULATE) != 0)) || (type_->flags() & DT_FLAG_DERIVED)) {
// This part handles the problem of non-contiguous memory
old_buf = buf_;
if (count==0){
buf_ = nullptr;
}else {
- buf_ = xbt_malloc(count*old_type_->size());
- if ((old_type_->flags() & DT_FLAG_DERIVED) && ((flags_ & MPI_REQ_SEND) != 0)) {
- old_type_->serialize(old_buf, buf_, count);
+ buf_ = xbt_malloc(count*type_->size());
+ if ((type_->flags() & DT_FLAG_DERIVED) && ((flags_ & MPI_REQ_SEND) != 0)) {
+ type_->serialize(old_buf, buf_, count);
}
}
}
void Request::print_request(const char* message) const
{
- XBT_VERB("%s request %p [buf = %p, size = %zu, src = %d, dst = %d, tag = %d, flags = %x]",
- message, this, buf_, size_, src_, dst_, tag_, flags_);
+ XBT_VERB("%s request %p [buf = %p, size = %zu, src = %ld, dst = %ld, tag = %d, flags = %x]", message, this, buf_,
+ size_, src_, dst_, tag_, flags_);
}
/* factories, to hide the internal flags from the caller */
MPI_REQ_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
}
-
MPI_Request Request::rma_send_init(const void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
MPI_Op op)
{
MPI_Request Request::recv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
- int source = MPI_PROC_NULL;
+ aid_t source = MPI_PROC_NULL;
if (src == MPI_ANY_SOURCE)
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
MPI_Op op)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- int source = MPI_PROC_NULL;
+ aid_t source = MPI_PROC_NULL;
if (src == MPI_ANY_SOURCE)
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
- int source = MPI_PROC_NULL;
+ aid_t source = MPI_PROC_NULL;
if (src == MPI_ANY_SOURCE)
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
return request;
}
-
MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
- int source = MPI_PROC_NULL;
+ aid_t source = MPI_PROC_NULL;
if (src == MPI_ANY_SOURCE)
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
MPI_Comm comm, MPI_Status * status)
{
- int source = MPI_PROC_NULL;
+ aid_t source = MPI_PROC_NULL;
if (src == MPI_ANY_SOURCE)
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
source = comm->group()->actor(src);
- int destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL;
+ aid_t destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL;
std::array<MPI_Request, 2> requests;
std::array<MPI_Status, 2> stats;
- int myid = simgrid::s4u::this_actor::get_pid();
+ aid_t myid = simgrid::s4u::this_actor::get_pid();
if ((destination == myid) && (source == myid)) {
Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
if (status != MPI_STATUS_IGNORE) {
//reinitialize temporary buffer for persistent requests
if(real_size_ > 0 && flags_ & MPI_REQ_FINISHED){
buf_ = old_buf_;
- init_buffer(real_size_/old_type_->size());
+ init_buffer(real_size_/type_->size());
}
flags_ &= ~MPI_REQ_PREPARED;
flags_ &= ~MPI_REQ_FINISHED;
mut->unlock();
} else { /* the RECV flag was not set, so this is a send */
const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
- xbt_assert(process, "Actor pid=%d is gone??", dst_);
+ xbt_assert(process, "Actor pid=%ld is gone??", dst_);
if (TRACE_smpi_view_internals())
TRACE_smpi_send(src_, src_, dst_, tag_, size_);
this->print_request("New send");
detached_ = true;
XBT_DEBUG("Send request %p is detached", this);
this->ref();
- if (not(old_type_->flags() & DT_FLAG_DERIVED)) {
+ if (not(type_->flags() & DT_FLAG_DERIVED)) {
oldbuf = buf_;
if (not process->replaying() && oldbuf != nullptr && size_ != 0) {
if ((smpi_cfg_privatization() != SmpiPrivStrategies::NONE) &&
static int nsleeps = 1;
int ret = MPI_SUCCESS;
-
- // Are we testing a request meant for non blocking collectives ?
- // If so, test all the subrequests.
- if ((*request)->nbc_requests_size_>0){
- ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
- if(*flag){
- delete[] (*request)->nbc_requests_;
- (*request)->nbc_requests_size_=0;
- unref(request);
- }
- return ret;
- }
-
+
if(smpi_test_sleep > 0)
simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep);
Status::empty(status);
*flag = 1;
+
+ if ((*request)->flags_ & MPI_REQ_NBC){
+ *flag = finish_nbc_requests(request, 1);
+ }
+
if (((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) == 0) {
if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
try{
return ret;
}
}
- if (((*request)->flags_ & MPI_REQ_GENERALIZED) && !((*request)->flags_ & MPI_REQ_COMPLETE))
+ if (((*request)->flags_ & MPI_REQ_GENERALIZED) && not((*request)->flags_ & MPI_REQ_COMPLETE))
*flag=0;
if (*flag) {
finish_wait(request, status); // may invalidate *request
if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches)
*index = map[i];
- if (requests[*index] != MPI_REQUEST_NULL &&
- (requests[*index]->flags_ & MPI_REQ_GENERALIZED)
- && !(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED) &&
+ not(requests[*index]->flags_ & MPI_REQ_COMPLETE)) {
*flag=0;
} else {
finish_wait(&requests[*index],status);
ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus);
}
- if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT))
+ if (requests[*index] != MPI_REQUEST_NULL && requests[*index]->flags_ & MPI_REQ_NBC){
+ *flag = finish_nbc_requests(&requests[*index] , 1);
+ }
+
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[*index] = MPI_REQUEST_NULL;
XBT_DEBUG("Testany - returning with index %d", *index);
*flag=1;
int ret = test(&requests[i], pstat, &flag);
if (flag){
flag=0;
- requests[i]=MPI_REQUEST_NULL;
}else{
*outflag=0;
}
xbt_assert(request == MPI_REQUEST_NULL);
}
+int Request::finish_nbc_requests(MPI_Request* request, int test){
+ int flag = 1;
+ int ret = 0;
+ if(test == 0)
+ ret = waitall((*request)->nbc_requests_.size(), (*request)->nbc_requests_.data(), MPI_STATUSES_IGNORE);
+ else{
+ ret = testall((*request)->nbc_requests_.size(), (*request)->nbc_requests_.data(), &flag, MPI_STATUSES_IGNORE);
+ }
+ if(ret!=MPI_SUCCESS)
+ xbt_die("Failure when waiting on non blocking collective sub-requests");
+ if(flag == 1){
+ XBT_DEBUG("Finishing non blocking collective request with %zu sub-requests", (*request)->nbc_requests_.size());
+ for(auto& req: (*request)->nbc_requests_){
+ if((*request)->buf_!=nullptr && req!=MPI_REQUEST_NULL){//reduce case
+ void * buf=req->buf_;
+ if((*request)->type_->flags() & DT_FLAG_DERIVED)
+ buf=req->old_buf_;
+ if(req->flags_ & MPI_REQ_RECV ){
+ if((*request)->op_!=MPI_OP_NULL){
+ int count=(*request)->size_/ (*request)->type_->size();
+ (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->type_);
+ }
+ smpi_free_tmp_buffer(static_cast<unsigned char*>(buf));
+ }
+ }
+ if(req!=MPI_REQUEST_NULL)
+ Request::unref(&req);
+ }
+ (*request)->nbc_requests_.clear();
+ }
+ return flag;
+}
+
void Request::finish_wait(MPI_Request* request, MPI_Status * status)
{
MPI_Request req = *request;
Status::empty(status);
status->MPI_SOURCE = MPI_PROC_NULL;
} else {
- int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
+ aid_t src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
status->MPI_SOURCE = req->comm_->group()->rank(src);
status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
status->MPI_ERROR = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
//detached send will be finished at the other end
if (not(req->detached_ && ((req->flags_ & MPI_REQ_SEND) != 0))) {
req->print_request("Finishing");
- MPI_Datatype datatype = req->old_type_;
+ MPI_Datatype datatype = req->type_;
// FIXME Handle the case of a partial shared malloc.
if (((req->flags_ & MPI_REQ_ACCUMULATE) != 0) ||
}
if (TRACE_smpi_view_internals() && ((req->flags_ & MPI_REQ_RECV) != 0)) {
- int rank = simgrid::s4u::this_actor::get_pid();
- int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
+ aid_t rank = simgrid::s4u::this_actor::get_pid();
+ aid_t src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
TRACE_smpi_recv(src_traced, rank,req->tag_);
}
if(req->detached_sender_ != nullptr){
(*request)=MPI_REQUEST_NULL;
return ret;
}
- // Are we waiting on a request meant for non blocking collectives ?
- // If so, wait for all the subrequests.
- if ((*request)->nbc_requests_size_>0){
- ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE);
- for (int i = 0; i < (*request)->nbc_requests_size_; i++) {
- if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case
- void * buf=(*request)->nbc_requests_[i]->buf_;
- if((*request)->old_type_->flags() & DT_FLAG_DERIVED)
- buf=(*request)->nbc_requests_[i]->old_buf_;
- if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){
- if((*request)->op_!=MPI_OP_NULL){
- int count=(*request)->size_/ (*request)->old_type_->size();
- (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_);
- }
- smpi_free_tmp_buffer(static_cast<unsigned char*>(buf));
- }
- }
- if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL)
- Request::unref(&((*request)->nbc_requests_[i]));
- }
- delete[] (*request)->nbc_requests_;
- (*request)->nbc_requests_size_=0;
- unref(request);
- (*request)=MPI_REQUEST_NULL;
- return ret;
- }
(*request)->print_request("Waiting");
if ((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) {
try{
// this is not a detached send
simcall_comm_wait((*request)->action_.get(), -1.0);
- } catch (const Exception&) {
+ } catch (const CancelException&) {
XBT_VERB("Request cancelled");
}
}
if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
- if(!((*request)->flags_ & MPI_REQ_COMPLETE)){
+ if (not((*request)->flags_ & MPI_REQ_COMPLETE)) {
((*request)->generalized_funcs)->mutex->lock();
((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
((*request)->generalized_funcs)->mutex->unlock();
if ((*request)->truncated_)
ret = MPI_ERR_TRUNCATE;
+ if ((*request)->flags_ & MPI_REQ_NBC)
+ finish_nbc_requests(request, 0);
+
finish_wait(request, status); // may invalidate *request
if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
*request = MPI_REQUEST_NULL;
// This is a finished detached request, let's return this one
comms.clear(); // don't do the waitany call afterwards
index = i;
+ if (requests[index] != MPI_REQUEST_NULL && (requests[index])->flags_ & MPI_REQ_NBC)
+ finish_nbc_requests(&requests[index], 0);
finish_wait(&requests[i], status); // cleanup if refcount = 0
if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[i] = MPI_REQUEST_NULL; // set to null
int i;
try{
i = simcall_comm_waitany(comms.data(), comms.size(), -1);
- } catch (const Exception&) {
+ } catch (const CancelException&) {
XBT_INFO("request cancelled");
i = -1;
}
}
}
+
if (index==MPI_UNDEFINED)
Status::empty(status);
if(*flag)
return MPI_SUCCESS;
}
- if (req != MPI_REQUEST_NULL &&
- (req->flags_ & MPI_REQ_GENERALIZED)
- && !(req->flags_ & MPI_REQ_COMPLETE)) {
- *flag=0;
+ if (req != MPI_REQUEST_NULL && (req->flags_ & MPI_REQ_GENERALIZED) && not(req->flags_ & MPI_REQ_COMPLETE)) {
+ *flag = 0;
return MPI_SUCCESS;
}
*flag=1;
if(req != MPI_REQUEST_NULL &&
status != MPI_STATUS_IGNORE) {
- int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
+ aid_t src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
status->MPI_SOURCE = req->comm_->group()->rank(src);
status->MPI_TAG = req->tag_ == MPI_ANY_TAG ? req->real_tag_ : req->tag_;
status->MPI_ERROR = req->truncated_ ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
int Request::grequest_complete(MPI_Request request)
{
- if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr)
+ if ((not(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr)
return MPI_ERR_REQUEST;
request->generalized_funcs->mutex->lock();
request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
return MPI_SUCCESS;
}
-void Request::set_nbc_requests(MPI_Request* reqs, int size){
- nbc_requests_size_ = size;
- if (size > 0) {
+void Request::start_nbc_requests(std::vector<MPI_Request> reqs){
+ if (not reqs.empty()) {
nbc_requests_ = reqs;
- } else {
- delete[] reqs;
- nbc_requests_ = nullptr;
+ Request::startall(reqs.size(), reqs.data());
}
}
-int Request::get_nbc_requests_size() const
-{
- return nbc_requests_size_;
-}
-
-MPI_Request* Request::get_nbc_requests() const
+std::vector<MPI_Request> Request::get_nbc_requests() const
{
return nbc_requests_;
}