Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9f77c7da8e029288196af7fd459296e4951f561e
[simgrid.git] / src / kernel / activity / MessImpl.cpp
1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/Exception.hpp>
7 #include <simgrid/s4u/Host.hpp>
8
9 #include "src/kernel/EngineImpl.hpp"
10 #include "src/kernel/activity/MessImpl.hpp"
11 #include "src/kernel/activity/MessageQueueImpl.hpp"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
14
15 namespace simgrid::kernel::activity {
16
17 MessImpl::~MessImpl()
18 {
19   if (queue_)
20     queue_->remove(this);
21 }
22
23 MessImpl& MessImpl::set_type(MessImplType type)
24 {
25   type_ = type;
26   return *this;
27 }
28
29 MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
30 {
31   queue_ = queue;
32   return *this;
33 }
34
35 MessImpl& MessImpl::set_payload(void* payload)
36 {
37   payload_ = payload;
38   return *this;
39 }
40
41 MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
42 {
43   dst_buff_      = buff;
44   dst_buff_size_ = size;
45   return *this;
46 }
47
48 MessImpl* MessImpl::start()
49 {
50   if (get_state() == State::READY)
51     set_state(State::DONE);
52   return this;
53 }
54
55 ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
56 {
57   auto* queue = observer->get_queue();
58   XBT_DEBUG("put from message queue %p", queue);
59
60   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
61   MessImplPtr this_mess(new MessImpl());
62   this_mess->set_type(MessImplType::PUT);
63
64   /* Look for message synchro matching our needs.
65    *
66    * If it is not found then push our communication into the rendez-vous point */
67   MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
68
69   if (not other_mess) {
70     other_mess = std::move(this_mess);
71     queue->push(other_mess);
72   } else {
73     XBT_DEBUG("Get already pushed");
74     other_mess->set_state(State::READY);
75   }
76
77   observer->set_message(other_mess.get());
78   observer->get_issuer()->activities_.insert(other_mess);
79
80   /* Setup synchro */
81   other_mess->src_actor_ = observer->get_issuer();
82   other_mess->payload_ = observer->get_payload();
83   other_mess->start();
84
85   return other_mess;
86 }
87
88 ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
89 {
90   MessImplPtr this_synchro(new MessImpl());
91   this_synchro->set_type(MessImplType::GET);
92
93   auto* queue = observer->get_queue();
94   XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
95
96   MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
97
98   if (other_mess == nullptr) {
99     XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
100     other_mess = std::move(this_synchro);
101     queue->push(other_mess);
102   } else {
103     XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
104
105     other_mess->set_state(State::READY);
106   }
107
108   observer->get_issuer()->activities_.insert(other_mess);
109   observer->set_message(other_mess.get());
110
111   /* Setup synchro */
112   other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
113   other_mess->dst_actor_ = observer->get_issuer();
114
115   other_mess->start();
116
117   return other_mess;
118 }
119
120 void MessImpl::finish()
121 {
122   XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
123             src_actor_.get(), dst_actor_.get());
124
125   if (get_iface()) {
126     const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
127     set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
128     piface.fire_on_completion_for_real();
129     piface.fire_on_this_completion_for_real();
130   }
131
132   /* Update synchro state */
133   if (get_state() == State::RUNNING) {
134     set_state(State::DONE);
135   }
136
137   /* If the synchro is still in a rendez-vous point then remove from it */
138   if (queue_)
139     queue_->remove(this);
140
141   if (get_state() == State::DONE && payload_ != nullptr)
142     *(void**)(dst_buff_) = payload_;
143
144   while (not simcalls_.empty()) {
145     actor::Simcall* simcall = simcalls_.front();
146     simcalls_.pop_front();
147
148     /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
149      * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
150      * simcall */
151
152     if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
153       continue;                                       // if actor handling comm is killed
154
155     handle_activity_waitany(simcall);
156
157     /* Check out for errors */
158
159     if (not simcall->issuer_->get_host()->is_on()) {
160       simcall->issuer_->set_wannadie();
161     } else {
162       // Do not answer to dying actors
163       if (not simcall->issuer_->wannadie()) {
164         set_exception(simcall->issuer_);
165         simcall->issuer_->simcall_answer();
166       }
167     }
168
169     simcall->issuer_->waiting_synchro_ = nullptr;
170     simcall->issuer_->activities_.erase(this);
171   }
172 }
173
174 } // namespace simgrid::kernel::activity