Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rework MessImpl
[simgrid.git] / src / kernel / activity / MessImpl.cpp
1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/Exception.hpp>
7 #include <simgrid/s4u/Host.hpp>
8
9 #include "src/kernel/EngineImpl.hpp"
10 #include "src/kernel/activity/MessImpl.hpp"
11 #include "src/kernel/activity/MessageQueueImpl.hpp"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
14
15 namespace simgrid::kernel::activity {
16
17 MessImpl::~MessImpl()
18 {
19   if (queue_)
20     queue_->remove(this);
21 }
22
23 MessImpl& MessImpl::set_type(MessImplType type)
24 {
25   type_ = type;
26   return *this;
27 }
28
29 MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
30 {
31   queue_ = queue;
32   return *this;
33 }
34
35 MessImpl& MessImpl::set_payload(void* payload)
36 {
37   payload_ = payload;
38   return *this;
39 }
40
41 MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
42 {
43   dst_buff_      = buff;
44   dst_buff_size_ = size;
45   return *this;
46 }
47
48 MessImpl* MessImpl::start()
49 {
50   if (get_state() == State::READY) {
51     XBT_DEBUG("Starting message exchange %p from '%s' to '%s' (state: %s)", this, src_actor_->get_host()->get_cname(),
52               dst_actor_->get_host()->get_cname(), get_state_str());
53     set_state(State::RUNNING);
54     finish();
55   }
56   return this;
57 }
58
59 ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
60 {
61   auto* queue = observer->get_queue();
62   XBT_DEBUG("put from message queue %p", queue);
63
64   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
65   MessImplPtr this_mess(new MessImpl());
66   this_mess->set_type(MessImplType::PUT);
67
68   /* Look for message synchro matching our needs.
69    *
70    * If it is not found then push our communication into the rendez-vous point */
71   MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
72
73   if (not other_mess) {
74     other_mess = std::move(this_mess);
75     queue->push(other_mess);
76   } else {
77     XBT_DEBUG("Get already pushed");
78     other_mess->set_state(State::READY);
79   }
80
81   observer->set_message(other_mess.get());
82   observer->get_issuer()->activities_.insert(other_mess);
83
84   /* Setup synchro */
85   other_mess->src_actor_ = observer->get_issuer();
86   other_mess->payload_ = observer->get_payload();
87   other_mess->start();
88
89   return other_mess;
90 }
91
92 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
93 {
94   MessImplPtr this_mess(new MessImpl());
95   this_mess->set_type(MessImplType::GET);
96
97   auto* queue = observer->get_queue();
98   XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_mess.get());
99
100   MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
101
102   if (other_mess == nullptr) {
103     XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
104     other_mess = std::move(this_mess);
105     queue->push(other_mess);
106   } else {
107     XBT_DEBUG("Match my %p with the existing %p", this_mess.get(), other_mess.get());
108
109     other_mess->set_state(State::READY);
110   }
111
112   observer->get_issuer()->activities_.insert(other_mess);
113   observer->set_message(other_mess.get());
114
115   /* Setup synchro */
116   other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
117   other_mess->dst_actor_ = observer->get_issuer();
118
119   other_mess->start();
120
121   return other_mess;
122 }
123
124 void MessImpl::wait_for(actor::ActorImpl* issuer, double timeout)
125 {
126   XBT_DEBUG("MessImpl::wait_for(%g), %p, state %s", timeout, this, get_state_str());
127
128   /* Associate this simcall to the wait synchro */
129   register_simcall(&issuer->simcall_);
130   ActivityImpl::wait_for(issuer, timeout);
131 }
132
133 void MessImpl::finish()
134 {
135   XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
136             src_actor_.get(), dst_actor_.get());
137
138   if (get_iface()) {
139     const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
140     set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
141     piface.fire_on_completion_for_real();
142     piface.fire_on_this_completion_for_real();
143   }
144
145   /* Update synchro state */
146   if (get_state() == State::RUNNING) {
147     set_state(State::DONE);
148   }
149
150   /* If the synchro is still in a rendez-vous point then remove from it */
151   if (queue_)
152     queue_->remove(this);
153
154   if (get_state() == State::DONE && payload_ != nullptr)
155     *(void**)(dst_buff_) = payload_;
156
157   while (not simcalls_.empty()) {
158     actor::Simcall* simcall = simcalls_.front();
159     simcalls_.pop_front();
160
161     /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
162      * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
163      * simcall */
164
165     if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
166       continue;                                       // if actor handling comm is killed
167
168     handle_activity_waitany(simcall);
169
170     /* Check out for errors */
171
172     if (not simcall->issuer_->get_host()->is_on()) {
173       simcall->issuer_->set_wannadie();
174     } else {
175       // Do not answer to dying actors
176       if (not simcall->issuer_->wannadie()) {
177         set_exception(simcall->issuer_);
178         simcall->issuer_->simcall_answer();
179       }
180     }
181
182     simcall->issuer_->waiting_synchro_ = nullptr;
183     simcall->issuer_->activities_.erase(this);
184   }
185 }
186
187 } // namespace simgrid::kernel::activity