MPI_Errhandler errhandler_ = _smpi_cfg_default_errhandler_is_error ? MPI_ERRORS_ARE_FATAL : MPI_ERRORS_RETURN;;
MPI_Errhandler* errhandlers_ = nullptr; //for MPI_COMM_WORLD only
+ std::unordered_map<std::string, unsigned int> sent_messages_;
+ std::unordered_map<std::string, unsigned int> recv_messages_;
public:
static std::unordered_map<int, smpi_key_elem> keyvals_;
static int keyval_id_;
void remove_rma_win(MPI_Win win);
void finish_rma_calls() const;
MPI_Comm split_type(int type, int key, const Info* info);
+ unsigned int get_sent_messages_count(int src, int dst, int tag);
+ void increment_sent_messages_count(int src, int dst, int tag);
+ unsigned int get_received_messages_count(int src, int dst, int tag);
+ void increment_received_messages_count(int src, int dst, int tag);
+
};
} // namespace smpi
simgrid::smpi::Info::unref(comm->info_);
if(comm->errhandlers_!=nullptr){
for (int i=0; i<comm->size(); i++)
- if (comm->errhandlers_[i]!=MPI_ERRHANDLER_NULL)
+ if (comm->errhandlers_[i]!=MPI_ERRHANDLER_NULL)
simgrid::smpi::Errhandler::unref(comm->errhandlers_[i]);
delete[] comm->errhandlers_;
} else if (comm->errhandler_ != MPI_ERRHANDLER_NULL)
}
}
+static inline std::string hash_message(int src, int dst, int tag){
+ return std::to_string(tag) + '_' + std::to_string(src) + '_' + std::to_string(dst);
+}
+
+unsigned int Comm::get_sent_messages_count(int src, int dst, int tag)
+{
+ return sent_messages_[hash_message(src, dst, tag)];
+}
+
+void Comm::increment_sent_messages_count(int src, int dst, int tag)
+{
+ sent_messages_[hash_message(src, dst, tag)]++;
+}
+
+unsigned int Comm::get_received_messages_count(int src, int dst, int tag)
+{
+ return recv_messages_[hash_message(src, dst, tag)];
+}
+
+void Comm::increment_received_messages_count(int src, int dst, int tag)
+{
+ recv_messages_[hash_message(src, dst, tag)]++;
+}
+
} // namespace smpi
} // namespace simgrid
refcount_ = 1;
else
refcount_ = 0;
+ message_id_ = 0;
init_buffer(count);
this->add_f();
}
if (receiver->real_size_ < sender->real_size_){
XBT_DEBUG("Truncating message - should not happen: receiver size : %zu < sender size : %zu", receiver->real_size_, sender->real_size_);
receiver->truncated_ = true;
- } else if (receiver->real_size_ > sender->real_size_){
- receiver->real_size_=sender->real_size_;
}
}
//0-sized datatypes/counts should not interfere and match
{
auto ref = static_cast<MPI_Request>(a);
auto req = static_cast<MPI_Request>(b);
- return match_common(req, req, ref);
+ bool match = match_common(req, req, ref);
+ if (match && (ref->comm_ != MPI_COMM_UNINITIALIZED) && !ref->comm_->is_smp_comm()){
+ if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_ ){
+ if (((ref->flags_ & MPI_REQ_PROBE) == 0 ) && ((req->flags_ & MPI_REQ_PROBE) == 0)){
+ XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), req->src_, req->dst_, req->tag_);
+ ref->comm_->increment_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_);
+ if (ref->real_size_ > req->real_size_){
+ ref->real_size_=req->real_size_;
+ }
+ }
+ } else {
+ match = false;
+ req->flags_ &= ~MPI_REQ_MATCHED;
+ ref->detached_sender_=nullptr;
+ XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, from pid %ld to pid %ld, with tag %d",ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), req->message_id_ , req->src_, req->dst_, req->tag_);
+ }
+ }
+ return match;
}
bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
mut->lock();
+ bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
+ flags_ |= MPI_REQ_PROBE;
+
if (smpi_cfg_async_small_thresh() == 0 && (flags_ & MPI_REQ_RMA) == 0) {
mailbox = process->mailbox();
} else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) {
mailbox = process->mailbox_small();
}
} else {
- XBT_DEBUG("yes there was something for us in the large mailbox");
+ XBT_DEBUG("yes there was something for us in the small mailbox");
}
} else {
mailbox = process->mailbox_small();
XBT_DEBUG("yes there was something for us in the small mailbox");
}
}
+ if(!is_probe)
+ flags_ &= ~MPI_REQ_PROBE;
action_ = simcall_comm_irecv(
process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
TRACE_smpi_send(src_, src_, dst_, tag_, size_);
this->print_request("New send");
+ message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+ comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+
void* buf = buf_;
if ((flags_ & MPI_REQ_SSEND) == 0 &&
((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
} else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < smpi_cfg_async_small_thresh()) { // eager mode
+ bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
+ flags_ |= MPI_REQ_PROBE;
+
mailbox = process->mailbox();
XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %s?", mailbox->get_cname());
simgrid::kernel::activity::ActivityImplPtr action = mailbox->iprobe(1, &match_send, static_cast<void*>(this));
} else {
XBT_DEBUG("Yes there was something for us in the large mailbox");
}
+ if(!is_probe)
+ flags_ &= ~MPI_REQ_PROBE;
} else {
mailbox = process->mailbox();
XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_);