}
}
+int Comm::wait_any_for(std::vector<CommPtr>* comms_in, double timeout)
+{
+ // Map to dynar<Synchro*>:
+ xbt_dynar_t comms = xbt_dynar_new(sizeof(simgrid::kernel::activity::ActivityImpl*), [](void* ptr) {
+ intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
+ });
+ for (auto const& comm : *comms_in) {
+ if (comm->state_ == Activity::State::inited)
+ comm->start();
+ xbt_assert(comm->state_ == Activity::State::started);
+ simgrid::kernel::activity::ActivityImpl* ptr = comm->pimpl_.get();
+ intrusive_ptr_add_ref(ptr);
+ xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, ptr);
+ }
+ // Call the underlying simcall:
+ int idx = simcall_comm_waitany(comms, timeout);
+ xbt_dynar_free(&comms);
+ return idx;
+}
+
+void Comm::wait_all(std::vector<CommPtr>* comms)
+{
+ // TODO: this should be a simcall or something
+ // TODO: we are missing a version with timeout
+ for (CommPtr comm : *comms) {
+ comm->wait();
+ }
+}
+
Activity* Comm::set_rate(double rate)
{
xbt_assert(state_ == State::inited);
Activity* Comm::set_src_data(void* buff)
{
xbt_assert(state_ == State::inited);
- xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
- srcBuff_ = buff;
+ xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
+ src_buff_ = buff;
return this;
}
Activity* Comm::set_src_data_size(size_t size)
{
xbt_assert(state_ == State::inited);
- srcBuffSize_ = size;
+ src_buff_size_ = size;
return this;
}
Activity* Comm::set_src_data(void* buff, size_t size)
{
xbt_assert(state_ == State::inited);
- xbt_assert(dstBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
- srcBuff_ = buff;
- srcBuffSize_ = size;
+ xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
+ src_buff_ = buff;
+ src_buff_size_ = size;
return this;
}
Activity* Comm::set_dst_data(void** buff)
{
xbt_assert(state_ == State::inited);
- xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
- dstBuff_ = buff;
+ xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
+ dst_buff_ = buff;
return this;
}
size_t Comm::get_dst_data_size()
{
xbt_assert(state_ == State::finished);
- return dstBuffSize_;
+ return dst_buff_size_;
}
Activity* Comm::set_dst_data(void** buff, size_t size)
{
xbt_assert(state_ == State::inited);
- xbt_assert(srcBuff_ == nullptr, "Cannot set the src and dst buffers at the same time");
- dstBuff_ = buff;
- dstBuffSize_ = size;
+ xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
+ dst_buff_ = buff;
+ dst_buff_size_ = size;
return this;
}
{
xbt_assert(state_ == State::inited);
- if (srcBuff_ != nullptr) { // Sender side
- pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, srcBuff_, srcBuffSize_, matchFunction_,
- cleanFunction_, copyDataFunction_, user_data_, detached_);
- } else if (dstBuff_ != nullptr) { // Receiver side
+ if (src_buff_ != nullptr) { // Sender side
+ pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
+ clean_fun_, copy_data_function_, user_data_, detached_);
+ } else if (dst_buff_ != nullptr) { // Receiver side
xbt_assert(not detached_, "Receive cannot be detached");
- pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dstBuff_, &dstBuffSize_, matchFunction_,
- copyDataFunction_, user_data_, rate_);
+ pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
+ copy_data_function_, user_data_, rate_);
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
return this;
case State::inited: // It's not started yet. Do it in one simcall
- if (srcBuff_ != nullptr) {
- simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, srcBuff_, srcBuffSize_, matchFunction_,
- copyDataFunction_, user_data_, timeout);
+ if (src_buff_ != nullptr) {
+ simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
+ copy_data_function_, user_data_, timeout);
} else { // Receiver
- simcall_comm_recv(receiver_, mailbox_->get_impl(), dstBuff_, &dstBuffSize_, matchFunction_, copyDataFunction_,
+ simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
user_data_, timeout, rate_);
}
state_ = State::finished;
Activity* Comm::detach()
{
xbt_assert(state_ == State::inited, "You cannot detach communications once they are started (not implemented).");
- xbt_assert(srcBuff_ != nullptr && srcBuffSize_ != 0, "You can only detach sends, not recvs");
+ xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
detached_ = true;
return start();
}