1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <simgrid/Exception.hpp>
7 #include <simgrid/s4u/Host.hpp>
9 #include "src/kernel/EngineImpl.hpp"
10 #include "src/kernel/activity/MessImpl.hpp"
11 #include "src/kernel/activity/MessageQueueImpl.hpp"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
15 namespace simgrid::kernel::activity {
23 MessImpl& MessImpl::set_type(MessImplType type)
29 MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
35 MessImpl& MessImpl::set_payload(void* payload)
41 MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
44 dst_buff_size_ = size;
48 MessImpl* MessImpl::start()
50 if (get_state() == State::READY)
51 set_state(State::DONE);
55 ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
57 auto* queue = observer->get_queue();
58 XBT_DEBUG("put from message queue %p", queue);
60 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
61 MessImplPtr this_mess(new MessImpl());
62 this_mess->set_type(MessImplType::PUT);
64 /* Look for message synchro matching our needs.
66 * If it is not found then push our communication into the rendez-vous point */
67 MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
70 other_mess = std::move(this_mess);
71 queue->push(other_mess);
73 XBT_DEBUG("Get already pushed");
74 other_mess->set_state(State::READY);
77 observer->set_message(other_mess.get());
78 observer->get_issuer()->activities_.insert(other_mess);
81 other_mess->src_actor_ = observer->get_issuer();
82 other_mess->payload_ = observer->get_payload();
88 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
90 MessImplPtr this_synchro(new MessImpl());
91 this_synchro->set_type(MessImplType::GET);
93 auto* queue = observer->get_queue();
94 XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
96 MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
98 if (other_mess == nullptr) {
99 XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
100 other_mess = std::move(this_synchro);
101 queue->push(other_mess);
103 XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
105 other_mess->set_state(State::READY);
108 observer->get_issuer()->activities_.insert(other_mess);
109 observer->set_message(other_mess.get());
112 other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
113 other_mess->dst_actor_ = observer->get_issuer();
120 void MessImpl::finish()
122 XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
123 src_actor_.get(), dst_actor_.get());
126 const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
127 set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
128 piface.fire_on_completion_for_real();
129 piface.fire_on_this_completion_for_real();
132 /* Update synchro state */
133 if (get_state() == State::RUNNING) {
134 set_state(State::DONE);
137 /* If the synchro is still in a rendez-vous point then remove from it */
139 queue_->remove(this);
141 if (get_state() == State::DONE && payload_ != nullptr)
142 *(void**)(dst_buff_) = payload_;
144 while (not simcalls_.empty()) {
145 actor::Simcall* simcall = simcalls_.front();
146 simcalls_.pop_front();
148 /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
149 * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
152 if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
153 continue; // if actor handling comm is killed
155 handle_activity_waitany(simcall);
157 /* Check out for errors */
159 if (not simcall->issuer_->get_host()->is_on()) {
160 simcall->issuer_->set_wannadie();
162 // Do not answer to dying actors
163 if (not simcall->issuer_->wannadie()) {
164 set_exception(simcall->issuer_);
165 simcall->issuer_->simcall_answer();
169 simcall->issuer_->waiting_synchro_ = nullptr;
170 simcall->issuer_->activities_.erase(this);
174 } // namespace simgrid::kernel::activity