#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
#include "src/kernel/context/Context.hpp"
#include "src/kernel/resource/CpuImpl.hpp"
#include "src/kernel/resource/LinkImpl.hpp"
}
ActivityImplPtr
-CommImpl::isend(actor::ActorImpl* src_proc, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff,
+CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff,
size_t src_buff_size, bool (*match_fun)(void*, void*, CommImpl*),
void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send
void (*copy_data_fun)(CommImpl*, void*, size_t), // used to copy data if not default one
other_comm->clean_fun = clean_fun;
} else {
other_comm->clean_fun = nullptr;
- src_proc->activities_.emplace_back(other_comm);
+ sender->activities_.emplace_back(other_comm);
}
/* Setup the communication synchro */
- other_comm->src_actor_ = src_proc;
+ other_comm->src_actor_ = sender;
other_comm->src_data_ = data;
(*other_comm).set_src_buff(src_buff, src_buff_size).set_size(task_size).set_rate(rate);
else
other_comm->start();
+ if (auto* observer = dynamic_cast<actor::CommIsendSimcall*>(sender->simcall_.observer_)) {
+ observer->set_result(detached ? nullptr : other_comm);
+ sender->simcall_answer();
+ }
+
return (detached ? nullptr : other_comm);
}
return other_comm;
}
other_comm->start();
+
+ if (auto* observer = dynamic_cast<actor::CommIrecvSimcall*>(receiver->simcall_.observer_)) {
+ observer->set_result(other_comm);
+ receiver->simcall_answer();
+ }
+
return other_comm;
}
bool CommImpl::test(actor::ActorImpl* issuer)
} else if (src_buff_ != nullptr) { // Sender side
on_send(*this);
- pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
- clean_fun_, copy_data_function_, get_data<void>(), detached_);
+ kernel::actor::CommIsendSimcall observer{sender_,
+ mailbox_->get_impl(),
+ static_cast<size_t>(remains_),
+ rate_,
+ static_cast<unsigned char*>(src_buff_),
+ src_buff_size_,
+ match_fun_,
+ clean_fun_,
+ copy_data_function_,
+ get_data<void>(),
+ detached_};
+ pimpl_ = kernel::actor::simcall_blocking(
+ [&observer] {
+ return kernel::activity::CommImpl::isend(
+ observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(),
+ observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_,
+ observer.copy_data_fun_, observer.get_payload(), observer.is_detached());
+ },
+ &observer);
} else if (dst_buff_ != nullptr) { // Receiver side
xbt_assert(not detached_, "Receive cannot be detached");
on_recv(*this);
- pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
- copy_data_function_, get_data<void>(), rate_);
+ kernel::actor::CommIrecvSimcall observer{receiver_,
+ mailbox_->get_impl(),
+ static_cast<unsigned char*>(dst_buff_),
+ &dst_buff_size_,
+ match_fun_,
+ copy_data_function_,
+ get_data<void>(),
+ rate_};
+ pimpl_ = kernel::actor::simcall_blocking(
+ [&observer] {
+ return kernel::activity::CommImpl::irecv(
+ observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(),
+ observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate());
+ },
+ &observer);
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
}
}
if(!is_probe)
flags_ &= ~MPI_REQ_PROBE;
-
action_ = simcall_comm_irecv(
process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv,
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
+
+ // kernel::actor::CommIrecvSimcall observer{process->get_actor()->get_impl(),
+ // mailbox->get_impl(),
+ // static_cast<unsigned char*>(buf_),
+ // &real_size_,
+ // &match_recv,
+ // process->replaying() ? &smpi_comm_null_copy_buffer_callback
+ // : smpi_comm_copy_data_callback,
+ // this,
+ // -1.0};
+ //
+ // action_ = kernel::actor::simcall_blocking(
+ // [&observer] {
+ // return kernel::activity::CommImpl::irecv(
+ // observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(),
+ // observer.get_dst_buff_size(), observer.match_fun_, observer.copy_data_fun_, observer.get_payload(),
+ // observer.get_rate());
+ // },
+ // &observer);
+
XBT_DEBUG("recv simcall posted");
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
}
size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator)
- action_ = simcall_comm_isend(
- simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_,
- &match_send,
+ kernel::actor::CommIsendSimcall observer{
+ simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1,
+ static_cast<unsigned char*>(buf), real_size_, &match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
// detach if msg size < eager/rdv switch limit
- detached_);
+ detached_};
+ action_ = kernel::actor::simcall_blocking(
+ [&observer] {
+ return kernel::activity::CommImpl::isend(
+ observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(),
+ observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_,
+ observer.copy_data_fun_, observer.get_payload(), observer.is_detached());
+ },
+ &observer);
XBT_DEBUG("send simcall posted");
/* FIXME: detached sends are not traceable (action_ == nullptr) */