Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix a doc error about actors (Tutorial_algorithms)
[simgrid.git] / src / s4u / s4u_Comm.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/msg/msg_private.hpp"
7 #include "xbt/log.h"
8
9 #include "simgrid/Exception.hpp"
10 #include "simgrid/s4u/Comm.hpp"
11 #include "simgrid/s4u/Mailbox.hpp"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
14
15 namespace simgrid {
16 namespace s4u {
17 xbt::signal<void(Actor const&)> Comm::on_sender_start;
18 xbt::signal<void(Actor const&)> Comm::on_receiver_start;
19 xbt::signal<void(Actor const&)> Comm::on_completion;
20
21 Comm::~Comm()
22 {
23   if (state_ == State::STARTED && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == SIMIX_RUNNING)) {
24     XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, (int)state_);
25     if (pimpl_ != nullptr)
26       XBT_INFO("pimpl_->state: %d", pimpl_->state_);
27     else
28       XBT_INFO("pimpl_ is null");
29     xbt_backtrace_display_current();
30   }
31 }
32
33 int Comm::wait_any_for(std::vector<CommPtr>* comms, double timeout)
34 {
35   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
36   std::transform(begin(*comms), end(*comms), rcomms.get(),
37                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
38   return simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
39 }
40
41 void Comm::wait_all(std::vector<CommPtr>* comms)
42 {
43   // TODO: this should be a simcall or something
44   // TODO: we are missing a version with timeout
45   for (CommPtr comm : *comms)
46     comm->wait();
47 }
48
49 CommPtr Comm::set_rate(double rate)
50 {
51   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
52              __FUNCTION__);
53   rate_ = rate;
54   return this;
55 }
56
57 CommPtr Comm::set_src_data(void* buff)
58 {
59   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
60              __FUNCTION__);
61   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
62   src_buff_ = buff;
63   return this;
64 }
65
66 CommPtr Comm::set_src_data_size(size_t size)
67 {
68   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
69              __FUNCTION__);
70   src_buff_size_ = size;
71   return this;
72 }
73
74 CommPtr Comm::set_src_data(void* buff, size_t size)
75 {
76   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
77              __FUNCTION__);
78
79   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
80   src_buff_      = buff;
81   src_buff_size_ = size;
82   return this;
83 }
84 CommPtr Comm::set_dst_data(void** buff)
85 {
86   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
87              __FUNCTION__);
88   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
89   dst_buff_ = buff;
90   return this;
91 }
92
93 size_t Comm::get_dst_data_size()
94 {
95   xbt_assert(state_ == State::FINISHED, "You cannot use %s before your communication terminated", __FUNCTION__);
96   return dst_buff_size_;
97 }
98 CommPtr Comm::set_dst_data(void** buff, size_t size)
99 {
100   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
101              __FUNCTION__);
102
103   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
104   dst_buff_      = buff;
105   dst_buff_size_ = size;
106   return this;
107 }
108
109 CommPtr Comm::set_tracing_category(const std::string& category)
110 {
111   xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
112   tracing_category_ = category;
113   return this;
114 }
115
116 Comm* Comm::start()
117 {
118   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
119              __FUNCTION__);
120
121   if (src_buff_ != nullptr) { // Sender side
122     on_sender_start(*Actor::self());
123     pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
124                                 clean_fun_, copy_data_function_, user_data_, detached_);
125   } else if (dst_buff_ != nullptr) { // Receiver side
126     xbt_assert(not detached_, "Receive cannot be detached");
127     on_receiver_start(*Actor::self());
128     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
129                                 copy_data_function_, user_data_, rate_);
130
131   } else {
132     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
133   }
134   state_ = State::STARTED;
135   return this;
136 }
137
138 /** @brief Block the calling actor until the communication is finished */
139 Comm* Comm::wait()
140 {
141   return this->wait_for(-1);
142 }
143
144 /** @brief Block the calling actor until the communication is finished, or until timeout
145  *
146  * On timeout, an exception is thrown and the communication is invalidated.
147  *
148  * @param timeout the amount of seconds to wait for the comm termination.
149  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
150 Comm* Comm::wait_for(double timeout)
151 {
152   switch (state_) {
153     case State::FINISHED:
154       break;
155
156     case State::INITED: // It's not started yet. Do it in one simcall
157       if (src_buff_ != nullptr) {
158         on_sender_start(*Actor::self());
159         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
160                           copy_data_function_, user_data_, timeout);
161
162       } else { // Receiver
163         on_receiver_start(*Actor::self());
164         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
165                           user_data_, timeout, rate_);
166       }
167       state_ = State::FINISHED;
168       break;
169
170     case State::STARTED:
171       simcall_comm_wait(pimpl_, timeout);
172       on_completion(*Actor::self());
173       state_ = State::FINISHED;
174       break;
175
176     case State::CANCELED:
177       throw CancelException(XBT_THROW_POINT, "Communication canceled");
178
179     default:
180       THROW_IMPOSSIBLE;
181   }
182   return this;
183 }
184 int Comm::test_any(std::vector<CommPtr>* comms)
185 {
186   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
187   std::transform(begin(*comms), end(*comms), rcomms.get(),
188                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
189   return simcall_comm_testany(rcomms.get(), comms->size());
190 }
191
192 Comm* Comm::detach()
193 {
194   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
195              __FUNCTION__);
196   xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
197   detached_ = true;
198   return start();
199 }
200
201 Comm* Comm::cancel()
202 {
203   kernel::actor::simcall([this] {
204     if (pimpl_)
205       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
206   });
207   state_ = State::CANCELED;
208   return this;
209 }
210
211 bool Comm::test()
212 {
213   xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
214
215   if (state_ == State::FINISHED)
216     return true;
217
218   if (state_ == State::INITED)
219     this->start();
220
221   if (simcall_comm_test(pimpl_)) {
222     state_ = State::FINISHED;
223     return true;
224   }
225   return false;
226 }
227
228 Mailbox* Comm::get_mailbox()
229 {
230   return mailbox_;
231 }
232
233 Actor* Comm::get_sender()
234 {
235   return sender_ ? sender_->ciface() : nullptr;
236 }
237
238 void intrusive_ptr_release(simgrid::s4u::Comm* c)
239 {
240   if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
241     std::atomic_thread_fence(std::memory_order_acquire);
242     delete c;
243   }
244 }
245 void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
246 {
247   c->refcount_.fetch_add(1, std::memory_order_relaxed);
248 }
249 } // namespace s4u
250 } // namespace simgrid