Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cosmetic cleanups in S4U
[simgrid.git] / src / s4u / s4u_Comm.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/msg/msg_private.hpp"
7 #include "xbt/log.h"
8
9 #include "simgrid/Exception.hpp"
10 #include "simgrid/s4u/Comm.hpp"
11 #include "simgrid/s4u/Mailbox.hpp"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
14
15 namespace simgrid {
16 namespace s4u {
17 xbt::signal<void(ActorPtr)> Comm::on_sender_start;
18 xbt::signal<void(ActorPtr)> Comm::on_receiver_start;
19 xbt::signal<void(ActorPtr)> Comm::on_completion;
20
21 Comm::~Comm()
22 {
23   if (state_ == State::STARTED && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == SIMIX_RUNNING)) {
24     XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, (int)state_);
25     if (pimpl_ != nullptr)
26       XBT_INFO("pimpl_->state: %d", pimpl_->state_);
27     else
28       XBT_INFO("pimpl_ is null");
29     xbt_backtrace_display_current();
30   }
31 }
32
33 int Comm::wait_any_for(std::vector<CommPtr>* comms, double timeout)
34 {
35   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
36   std::transform(begin(*comms), end(*comms), rcomms.get(),
37                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
38   return simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
39 }
40
41 void Comm::wait_all(std::vector<CommPtr>* comms)
42 {
43   // TODO: this should be a simcall or something
44   // TODO: we are missing a version with timeout
45   for (CommPtr comm : *comms)
46     comm->wait();
47 }
48
49 CommPtr Comm::set_rate(double rate)
50 {
51   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
52              __FUNCTION__);
53   rate_ = rate;
54   return this;
55 }
56
57 CommPtr Comm::set_src_data(void* buff)
58 {
59   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
60              __FUNCTION__);
61   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
62   src_buff_ = buff;
63   return this;
64 }
65
66 CommPtr Comm::set_src_data_size(size_t size)
67 {
68   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
69              __FUNCTION__);
70   src_buff_size_ = size;
71   return this;
72 }
73
74 CommPtr Comm::set_src_data(void* buff, size_t size)
75 {
76   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
77              __FUNCTION__);
78
79   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
80   src_buff_      = buff;
81   src_buff_size_ = size;
82   return this;
83 }
84 CommPtr Comm::set_dst_data(void** buff)
85 {
86   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
87              __FUNCTION__);
88   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
89   dst_buff_ = buff;
90   return this;
91 }
92
93 size_t Comm::get_dst_data_size()
94 {
95   xbt_assert(state_ == State::FINISHED, "You cannot use %s before your communication terminated", __FUNCTION__);
96   return dst_buff_size_;
97 }
98 CommPtr Comm::set_dst_data(void** buff, size_t size)
99 {
100   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
101              __FUNCTION__);
102
103   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
104   dst_buff_      = buff;
105   dst_buff_size_ = size;
106   return this;
107 }
108
109 Comm* Comm::start()
110 {
111   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
112              __FUNCTION__);
113
114   if (src_buff_ != nullptr) { // Sender side
115     on_sender_start(Actor::self());
116     pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
117                                 clean_fun_, copy_data_function_, user_data_, detached_);
118   } else if (dst_buff_ != nullptr) { // Receiver side
119     xbt_assert(not detached_, "Receive cannot be detached");
120     on_receiver_start(Actor::self());
121     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
122                                 copy_data_function_, user_data_, rate_);
123
124   } else {
125     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
126   }
127   state_ = State::STARTED;
128   return this;
129 }
130
131 /** @brief Block the calling actor until the communication is finished */
132 Comm* Comm::wait()
133 {
134   return this->wait_for(-1);
135 }
136
137 /** @brief Block the calling actor until the communication is finished, or until timeout
138  *
139  * On timeout, an exception is thrown.
140  *
141  * @param timeout the amount of seconds to wait for the comm termination.
142  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
143 Comm* Comm::wait_for(double timeout)
144 {
145   switch (state_) {
146     case State::FINISHED:
147       break;
148
149     case State::INITED: // It's not started yet. Do it in one simcall
150       if (src_buff_ != nullptr) {
151         on_sender_start(Actor::self());
152         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
153                           copy_data_function_, user_data_, timeout);
154
155       } else { // Receiver
156         on_receiver_start(Actor::self());
157         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
158                           user_data_, timeout, rate_);
159       }
160       state_ = State::FINISHED;
161       break;
162
163     case State::STARTED:
164       simcall_comm_wait(pimpl_, timeout);
165       on_completion(Actor::self());
166       state_ = State::FINISHED;
167       break;
168
169     case State::CANCELED:
170       throw CancelException(XBT_THROW_POINT, "Communication canceled");
171
172     default:
173       THROW_IMPOSSIBLE;
174   }
175   return this;
176 }
177 int Comm::test_any(std::vector<CommPtr>* comms)
178 {
179   std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
180   std::transform(begin(*comms), end(*comms), rcomms.get(),
181                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
182   return simcall_comm_testany(rcomms.get(), comms->size());
183 }
184
185 Comm* Comm::detach()
186 {
187   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
188              __FUNCTION__);
189   xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
190   detached_ = true;
191   return start();
192 }
193
194 Comm* Comm::cancel()
195 {
196   simix::simcall([this] {
197     if (pimpl_)
198       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
199   });
200   state_ = State::CANCELED;
201   return this;
202 }
203
204 bool Comm::test()
205 {
206   xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
207
208   if (state_ == State::FINISHED)
209     return true;
210
211   if (state_ == State::INITED)
212     this->start();
213
214   if (simcall_comm_test(pimpl_)) {
215     state_ = State::FINISHED;
216     return true;
217   }
218   return false;
219 }
220
221 Mailbox* Comm::get_mailbox()
222 {
223   return mailbox_;
224 }
225
226 ActorPtr Comm::get_sender()
227 {
228   return sender_ ? sender_->iface() : nullptr;
229 }
230
231 void intrusive_ptr_release(simgrid::s4u::Comm* c)
232 {
233   if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
234     std::atomic_thread_fence(std::memory_order_acquire);
235     delete c;
236   }
237 }
238 void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
239 {
240   c->refcount_.fetch_add(1, std::memory_order_relaxed);
241 }
242 } // namespace s4u
243 } // namespace simgrid