Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
don't use old fashioned simcall when you don't have to
[simgrid.git] / src / s4u / s4u_Comm.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/msg/msg_private.hpp"
7 #include "src/simix/ActorImpl.hpp"
8 #include "xbt/log.h"
9
10 #include "simgrid/Exception.hpp"
11 #include "simgrid/s4u/Comm.hpp"
12 #include "simgrid/s4u/Mailbox.hpp"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
15
16 namespace simgrid {
17 namespace s4u {
18 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Comm::on_sender_start;
19 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Comm::on_receiver_start;
20 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Comm::on_completion;
21
22 Comm::~Comm()
23 {
24   if (state_ == State::STARTED && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == SIMIX_RUNNING)) {
25     XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, (int)state_);
26     if (pimpl_ != nullptr)
27       XBT_INFO("pimpl_->state: %d", pimpl_->state_);
28     else
29       XBT_INFO("pimpl_ is null");
30     xbt_backtrace_display_current();
31   }
32 }
33
34 int Comm::wait_any_for(std::vector<CommPtr>* comms, double timeout)
35 {
36   std::unique_ptr<simgrid::kernel::activity::CommImpl* []> rcomms(
37       new simgrid::kernel::activity::CommImpl*[comms->size()]);
38   std::transform(begin(*comms), end(*comms), rcomms.get(), [](const CommPtr& comm) {
39     return static_cast<simgrid::kernel::activity::CommImpl*>(comm->pimpl_.get());
40   });
41   return simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
42 }
43
44 void Comm::wait_all(std::vector<CommPtr>* comms)
45 {
46   // TODO: this should be a simcall or something
47   // TODO: we are missing a version with timeout
48   for (CommPtr comm : *comms)
49     comm->wait();
50 }
51
52 CommPtr Comm::set_rate(double rate)
53 {
54   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
55              __FUNCTION__);
56   rate_ = rate;
57   return this;
58 }
59
60 CommPtr Comm::set_src_data(void* buff)
61 {
62   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
63              __FUNCTION__);
64   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
65   src_buff_ = buff;
66   return this;
67 }
68
69 CommPtr Comm::set_src_data_size(size_t size)
70 {
71   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
72              __FUNCTION__);
73   src_buff_size_ = size;
74   return this;
75 }
76
77 CommPtr Comm::set_src_data(void* buff, size_t size)
78 {
79   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
80              __FUNCTION__);
81
82   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
83   src_buff_      = buff;
84   src_buff_size_ = size;
85   return this;
86 }
87 CommPtr Comm::set_dst_data(void** buff)
88 {
89   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
90              __FUNCTION__);
91   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
92   dst_buff_ = buff;
93   return this;
94 }
95
96 size_t Comm::get_dst_data_size()
97 {
98   xbt_assert(state_ == State::FINISHED, "You cannot use %s before your communication terminated", __FUNCTION__);
99   return dst_buff_size_;
100 }
101 CommPtr Comm::set_dst_data(void** buff, size_t size)
102 {
103   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
104              __FUNCTION__);
105
106   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
107   dst_buff_      = buff;
108   dst_buff_size_ = size;
109   return this;
110 }
111
112 Comm* Comm::start()
113 {
114   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
115              __FUNCTION__);
116
117   if (src_buff_ != nullptr) { // Sender side
118     on_sender_start(Actor::self());
119     pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
120                                 clean_fun_, copy_data_function_, user_data_, detached_);
121   } else if (dst_buff_ != nullptr) { // Receiver side
122     xbt_assert(not detached_, "Receive cannot be detached");
123     on_receiver_start(Actor::self());
124     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
125                                 copy_data_function_, user_data_, rate_);
126
127   } else {
128     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
129   }
130   state_ = State::STARTED;
131   return this;
132 }
133
134 /** @brief Block the calling actor until the communication is finished */
135 Comm* Comm::wait()
136 {
137   return this->wait_for(-1);
138 }
139
140 /** @brief Block the calling actor until the communication is finished, or until timeout
141  *
142  * On timeout, an exception is thrown.
143  *
144  * @param timeout the amount of seconds to wait for the comm termination.
145  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
146 Comm* Comm::wait_for(double timeout)
147 {
148   switch (state_) {
149     case State::FINISHED:
150       break;
151
152     case State::INITED: // It's not started yet. Do it in one simcall
153       if (src_buff_ != nullptr) {
154         on_sender_start(Actor::self());
155         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
156                           copy_data_function_, user_data_, timeout);
157
158       } else { // Receiver
159         on_receiver_start(Actor::self());
160         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
161                           user_data_, timeout, rate_);
162       }
163       state_ = State::FINISHED;
164       break;
165
166     case State::STARTED:
167       simcall_comm_wait(pimpl_, timeout);
168       on_completion(Actor::self());
169       state_ = State::FINISHED;
170       break;
171
172     case State::CANCELED:
173       throw CancelException(XBT_THROW_POINT, "Communication canceled");
174
175     default:
176       THROW_IMPOSSIBLE;
177   }
178   return this;
179 }
180 int Comm::test_any(std::vector<CommPtr>* comms)
181 {
182   std::unique_ptr<simgrid::kernel::activity::CommImpl* []> rcomms(
183       new simgrid::kernel::activity::CommImpl*[comms->size()]);
184   std::transform(begin(*comms), end(*comms), rcomms.get(), [](const CommPtr& comm) {
185     return static_cast<simgrid::kernel::activity::CommImpl*>(comm->pimpl_.get());
186   });
187   return simcall_comm_testany(rcomms.get(), comms->size());
188 }
189
190 Comm* Comm::detach()
191 {
192   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
193              __FUNCTION__);
194   xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
195   detached_ = true;
196   return start();
197 }
198
199 Comm* Comm::cancel()
200 {
201   simgrid::simix::simcall([this] {
202     if (pimpl_)
203       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
204   });
205   state_ = State::CANCELED;
206   return this;
207 }
208
209 bool Comm::test()
210 {
211   xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
212
213   if (state_ == State::FINISHED)
214     return true;
215
216   if (state_ == State::INITED)
217     this->start();
218
219   if (simcall_comm_test(pimpl_)) {
220     state_ = State::FINISHED;
221     return true;
222   }
223   return false;
224 }
225
226 MailboxPtr Comm::get_mailbox()
227 {
228   return mailbox_;
229 }
230
231 ActorPtr Comm::get_sender()
232 {
233   return sender_ ? sender_->iface() : nullptr;
234 }
235
236 void intrusive_ptr_release(simgrid::s4u::Comm* c)
237 {
238   if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
239     std::atomic_thread_fence(std::memory_order_acquire);
240     delete c;
241   }
242 }
243 void intrusive_ptr_add_ref(simgrid::s4u::Comm* c)
244 {
245   c->refcount_.fetch_add(1, std::memory_order_relaxed);
246 }
247 } // namespace s4u
248 } // namespace simgrid