Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'issue105' into 'master'
[simgrid.git] / src / s4u / s4u_Comm.cpp
1 /* Copyright (c) 2006-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <cmath>
7 #include <simgrid/Exception.hpp>
8 #include <simgrid/comm.h>
9 #include <simgrid/s4u/Comm.hpp>
10 #include <simgrid/s4u/Engine.hpp>
11 #include <simgrid/s4u/Mailbox.hpp>
12
13 #include "mc/mc.h"
14 #include "src/kernel/activity/CommImpl.hpp"
15 #include "src/kernel/actor/ActorImpl.hpp"
16 #include "src/kernel/actor/SimcallObserver.hpp"
17 #include "src/mc/mc_replay.hpp"
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
20
21 namespace simgrid {
22 namespace s4u {
23 xbt::signal<void(Comm const&)> Comm::on_send;
24 xbt::signal<void(Comm const&)> Comm::on_recv;
25
26 CommPtr Comm::set_copy_data_callback(const std::function<void(kernel::activity::CommImpl*, void*, size_t)>& callback)
27 {
28   copy_data_function_ = callback;
29   return this;
30 }
31
32 void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
33 {
34   XBT_DEBUG("Copy the data over");
35   memcpy(comm->dst_buff_, buff, buff_size);
36   if (comm->is_detached()) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the
37                              // original buffer available to the application ASAP
38     xbt_free(buff);
39     comm->src_buff_ = nullptr;
40   }
41 }
42
43 void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
44 {
45   xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
46   *(void**)(comm->dst_buff_) = buff;
47 }
48
49 Comm::~Comm()
50 {
51   if (state_ == State::STARTED && not detached_ &&
52       (pimpl_ == nullptr || pimpl_->get_state() == kernel::activity::State::RUNNING)) {
53     XBT_INFO("Comm %p freed before its completion. Did you forget to detach it? (state: %s)", this, get_state_str());
54     if (pimpl_ != nullptr)
55       XBT_INFO("pimpl_->state: %s", pimpl_->get_state_str());
56     else
57       XBT_INFO("pimpl_ is null");
58     xbt_backtrace_display_current();
59   }
60 }
61
62 void Comm::send(kernel::actor::ActorImpl* sender, const Mailbox* mbox, double task_size, double rate, void* src_buff,
63                 size_t src_buff_size,
64                 const std::function<bool(void*, void*, simgrid::kernel::activity::CommImpl*)>& match_fun,
65                 const std::function<void(simgrid::kernel::activity::CommImpl*, void*, size_t)>& copy_data_fun,
66                 void* data, double timeout)
67 {
68   /* checking for infinite values */
69   xbt_assert(std::isfinite(task_size), "task_size is not finite!");
70   xbt_assert(std::isfinite(rate), "rate is not finite!");
71   xbt_assert(std::isfinite(timeout), "timeout is not finite!");
72
73   xbt_assert(mbox, "No rendez-vous point defined for send");
74
75   if (MC_is_active() || MC_record_replay_is_active()) {
76     /* the model-checker wants two separate simcalls, and wants comm to be nullptr during the simcall */
77     simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
78
79     simgrid::kernel::actor::CommIsendSimcall send_observer{
80         sender,  mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
81         nullptr, copy_data_fun,    data,      false};
82     comm = simgrid::kernel::actor::simcall_answered(
83         [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
84
85     simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
86     if (simgrid::kernel::actor::simcall_blocking(
87             [&wait_observer] {
88               wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
89             },
90             &wait_observer)) {
91       throw simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted");
92     }
93     comm = nullptr;
94   } else {
95     simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate,
96                                                       static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
97                                                       nullptr, copy_data_fun, data, false);
98     simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
99       simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
100       comm->wait_for(observer.get_issuer(), timeout);
101     });
102   }
103 }
104
105 void Comm::recv(kernel::actor::ActorImpl* receiver, const Mailbox* mbox, void* dst_buff, size_t* dst_buff_size,
106                 const std::function<bool(void*, void*, simgrid::kernel::activity::CommImpl*)>& match_fun,
107                 const std::function<void(simgrid::kernel::activity::CommImpl*, void*, size_t)>& copy_data_fun,
108                 void* data, double timeout, double rate)
109 {
110   xbt_assert(std::isfinite(timeout), "timeout is not finite!");
111   xbt_assert(mbox, "No rendez-vous point defined for recv");
112
113   if (MC_is_active() || MC_record_replay_is_active()) {
114     /* the model-checker wants two separate simcalls, and wants comm to be nullptr during the simcall */
115     simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
116
117     simgrid::kernel::actor::CommIrecvSimcall observer{receiver,
118                                                       mbox->get_impl(),
119                                                       static_cast<unsigned char*>(dst_buff),
120                                                       dst_buff_size,
121                                                       match_fun,
122                                                       copy_data_fun,
123                                                       data,
124                                                       rate};
125     comm = simgrid::kernel::actor::simcall_answered(
126         [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
127
128     simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
129     if (simgrid::kernel::actor::simcall_blocking(
130             [&wait_observer] {
131               wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
132             },
133             &wait_observer)) {
134       throw simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted");
135     }
136     comm = nullptr;
137   } else {
138     simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast<unsigned char*>(dst_buff),
139                                                       dst_buff_size, match_fun, copy_data_fun, data, rate);
140     simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
141       simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
142       comm->wait_for(observer.get_issuer(), timeout);
143     });
144   }
145 }
146
147 CommPtr Comm::sendto_init()
148 {
149   CommPtr res(new Comm());
150   res->pimpl_ = kernel::activity::CommImplPtr(new kernel::activity::CommImpl());
151   boost::static_pointer_cast<kernel::activity::CommImpl>(res->pimpl_)->detach();
152   res->sender_ = kernel::actor::ActorImpl::self();
153   return res;
154 }
155
156 CommPtr Comm::sendto_init(Host* from, Host* to)
157 {
158   auto res = Comm::sendto_init()->set_source(from)->set_destination(to);
159   res->set_state(State::STARTING);
160   return res;
161 }
162
163 CommPtr Comm::sendto_async(Host* from, Host* to, uint64_t simulated_size_in_bytes)
164 {
165   return Comm::sendto_init()->set_payload_size(simulated_size_in_bytes)->set_source(from)->set_destination(to);
166 }
167
168 void Comm::sendto(Host* from, Host* to, uint64_t simulated_size_in_bytes)
169 {
170   sendto_async(from, to, simulated_size_in_bytes)->wait();
171 }
172
173 CommPtr Comm::set_source(Host* from)
174 {
175   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
176              "Cannot change the source of a Comm once it's started (state: %s)", to_c_str(state_));
177   boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_source(from);
178   // Setting 'source' may allow to start the activity, let's try
179   if (state_ == State::STARTING && remains_ <= 0)
180     XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
181   else
182     vetoable_start();
183
184   return this;
185 }
186 Host* Comm::get_source() const
187 {
188   return pimpl_ ? boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->get_source() : nullptr;
189 }
190
191 CommPtr Comm::set_destination(Host* to)
192 {
193   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
194              "Cannot change the destination of a Comm once it's started (state: %s)", to_c_str(state_));
195   boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_destination(to);
196   // Setting 'destination' may allow to start the activity, let's try
197   if (state_ == State::STARTING && remains_ <= 0)
198     XBT_DEBUG("This communication has a payload size of 0 byte. It cannot start yet");
199   else
200     vetoable_start();
201
202   return this;
203 }
204
205 Host* Comm::get_destination() const
206 {
207   return pimpl_ ? boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->get_destination() : nullptr;
208 }
209
210 CommPtr Comm::set_rate(double rate)
211 {
212   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
213              __FUNCTION__);
214   rate_ = rate;
215   return this;
216 }
217
218 CommPtr Comm::set_mailbox(Mailbox* mailbox)
219 {
220   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
221              __FUNCTION__);
222   mailbox_ = mailbox;
223   return this;
224 }
225
226 CommPtr Comm::set_src_data(void* buff)
227 {
228   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
229              __FUNCTION__);
230   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
231   src_buff_ = buff;
232   return this;
233 }
234
235 CommPtr Comm::set_src_data_size(size_t size)
236 {
237   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
238              __FUNCTION__);
239   src_buff_size_ = size;
240   return this;
241 }
242
243 CommPtr Comm::set_src_data(void* buff, size_t size)
244 {
245   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
246              __FUNCTION__);
247
248   xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
249   src_buff_      = buff;
250   src_buff_size_ = size;
251   return this;
252 }
253
254 CommPtr Comm::set_dst_data(void** buff)
255 {
256   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
257              __FUNCTION__);
258   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
259   dst_buff_ = buff;
260   return this;
261 }
262
263 CommPtr Comm::set_dst_data(void** buff, size_t size)
264 {
265   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
266              __FUNCTION__);
267
268   xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
269   dst_buff_      = buff;
270   dst_buff_size_ = size;
271   return this;
272 }
273
274 CommPtr Comm::set_payload_size(uint64_t bytes)
275 {
276   Activity::set_remaining(bytes);
277   if (pimpl_) {
278     boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->set_size(bytes);
279   }
280   return this;
281 }
282
283 Actor* Comm::get_sender() const
284 {
285   kernel::actor::ActorImplPtr sender = nullptr;
286   if (pimpl_)
287     sender = boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->src_actor_;
288   return sender ? sender->get_ciface() : nullptr;
289 }
290
291 bool Comm::is_assigned() const
292 {
293   return (pimpl_ && boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->is_assigned()) ||
294          mailbox_ != nullptr;
295 }
296
297 Comm* Comm::start()
298 {
299   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
300              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
301   if (get_source() != nullptr || get_destination() != nullptr) {
302     xbt_assert(is_assigned(), "When either from_ or to_ is specified, both must be.");
303     xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
304                "Direct host-to-host communications cannot carry any data.");
305     XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it.");
306     kernel::actor::simcall_answered([this] {
307       pimpl_->set_state(kernel::activity::State::READY);
308       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->start();
309     });
310   } else if (src_buff_ != nullptr) { // Sender side
311     on_send(*this);
312     kernel::actor::CommIsendSimcall observer{sender_,
313                                              mailbox_->get_impl(),
314                                              remains_,
315                                              rate_,
316                                              static_cast<unsigned char*>(src_buff_),
317                                              src_buff_size_,
318                                              match_fun_,
319                                              clean_fun_,
320                                              copy_data_function_,
321                                              get_data<void>(),
322                                              detached_};
323     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
324                                              &observer);
325   } else if (dst_buff_ != nullptr) { // Receiver side
326     xbt_assert(not detached_, "Receive cannot be detached");
327     on_recv(*this);
328     kernel::actor::CommIrecvSimcall observer{receiver_,
329                                              mailbox_->get_impl(),
330                                              static_cast<unsigned char*>(dst_buff_),
331                                              &dst_buff_size_,
332                                              match_fun_,
333                                              copy_data_function_,
334                                              get_data<void>(),
335                                              rate_};
336     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
337                                              &observer);
338   } else {
339     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
340   }
341
342   if (suspended_)
343     pimpl_->suspend();
344
345   if (not detached_) {
346     pimpl_->set_iface(this);
347     pimpl_->set_actor(sender_);
348   }
349
350   state_ = State::STARTED;
351   return this;
352 }
353
354 Comm* Comm::detach()
355 {
356   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
357              "You cannot use %s() once your communication is %s (not implemented)", __FUNCTION__, get_state_str());
358   xbt_assert(dst_buff_ == nullptr && dst_buff_size_ == 0, "You can only detach sends, not recvs");
359   detached_ = true;
360   vetoable_start();
361   return this;
362 }
363
364 ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
365 {
366   std::vector<ActivityPtr> activities;
367   for (const auto& comm : comms)
368     activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
369   return Activity::test_any(activities);
370 }
371
372 /** @brief Block the calling actor until the communication is finished, or until timeout
373  *
374  * On timeout, an exception is thrown and the communication is invalidated.
375  *
376  * @param timeout the amount of seconds to wait for the comm termination.
377  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
378 Comm* Comm::wait_for(double timeout)
379 {
380   XBT_DEBUG("Calling Comm::wait_for with state %s", get_state_str());
381   kernel::actor::ActorImpl* issuer = nullptr;
382   switch (state_) {
383     case State::FINISHED:
384       break;
385     case State::FAILED:
386       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
387     case State::INITED:
388     case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
389       if (get_source() != nullptr || get_destination() != nullptr) {
390         return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
391       } else if (src_buff_ != nullptr) {
392         on_send(*this);
393         send(sender_, mailbox_, remains_, rate_, src_buff_, src_buff_size_, match_fun_, copy_data_function_,
394              get_data<void>(), timeout);
395
396       } else { // Receiver
397         on_recv(*this);
398         recv(receiver_, mailbox_, dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_, get_data<void>(),
399              timeout, rate_);
400       }
401       break;
402     case State::STARTED:
403       try {
404         issuer = kernel::actor::ActorImpl::self();
405         kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
406         if (kernel::actor::simcall_blocking(
407                 [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
408                 &observer)) {
409           throw TimeoutException(XBT_THROW_POINT, "Timeouted");
410         }
411       } catch (const NetworkFailureException& e) {
412         issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
413         complete(State::FAILED);
414         e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
415       }
416       break;
417
418     case State::CANCELED:
419       throw CancelException(XBT_THROW_POINT, "Communication canceled");
420
421     default:
422       THROW_IMPOSSIBLE;
423   }
424   complete(State::FINISHED);
425   return this;
426 }
427
428 ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
429 {
430   std::vector<ActivityPtr> activities;
431   for (const auto& comm : comms)
432     activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
433   ssize_t changed_pos;
434   try {
435     changed_pos = Activity::wait_any_for(activities, timeout);
436   } catch (const NetworkFailureException& e) {
437     changed_pos = -1;
438     for (auto c : comms) {
439       if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
440         c->complete(State::FAILED);
441       }
442     }
443     e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
444   }
445   return changed_pos;
446 }
447
448 void Comm::wait_all(const std::vector<CommPtr>& comms)
449 {
450   // TODO: this should be a simcall or something
451   for (auto& comm : comms)
452     comm->wait();
453 }
454
455 size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
456 {
457   if (timeout < 0.0) {
458     wait_all(comms);
459     return comms.size();
460   }
461
462   double deadline = Engine::get_clock() + timeout;
463   std::vector<CommPtr> waited_comm(1, nullptr);
464   for (size_t i = 0; i < comms.size(); i++) {
465     double wait_timeout = std::max(0.0, deadline - Engine::get_clock());
466     waited_comm[0]      = comms[i];
467     // Using wait_any_for() here (and not wait_for) because we don't want comms to be invalidated on timeout
468     if (wait_any_for(waited_comm, wait_timeout) == -1) {
469       XBT_DEBUG("Timeout (%g): i = %zu", wait_timeout, i);
470       return i;
471     }
472   }
473   return comms.size();
474 }
475 } // namespace s4u
476 } // namespace simgrid
477 /* **************************** Public C interface *************************** */
478 void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*))
479 {
480   comm->detach(clean_function);
481   comm->unref();
482 }
483 void sg_comm_unref(sg_comm_t comm)
484 {
485   comm->unref();
486 }
487 int sg_comm_test(sg_comm_t comm)
488 {
489   bool finished = comm->test();
490   if (finished)
491     comm->unref();
492   return finished;
493 }
494
495 sg_error_t sg_comm_wait(sg_comm_t comm)
496 {
497   return sg_comm_wait_for(comm, -1);
498 }
499
500 sg_error_t sg_comm_wait_for(sg_comm_t comm, double timeout)
501 {
502   sg_error_t status = SG_OK;
503
504   simgrid::s4u::CommPtr s4u_comm(comm, false);
505   try {
506     s4u_comm->wait_for(timeout);
507   } catch (const simgrid::TimeoutException&) {
508     status = SG_ERROR_TIMEOUT;
509   } catch (const simgrid::CancelException&) {
510     status = SG_ERROR_CANCELED;
511   } catch (const simgrid::NetworkFailureException&) {
512     status = SG_ERROR_NETWORK;
513   }
514   return status;
515 }
516
517 void sg_comm_wait_all(sg_comm_t* comms, size_t count)
518 {
519   sg_comm_wait_all_for(comms, count, -1);
520 }
521
522 size_t sg_comm_wait_all_for(sg_comm_t* comms, size_t count, double timeout)
523 {
524   std::vector<simgrid::s4u::CommPtr> s4u_comms;
525   for (size_t i = 0; i < count; i++)
526     s4u_comms.emplace_back(comms[i], false);
527
528   size_t pos = simgrid::s4u::Comm::wait_all_for(s4u_comms, timeout);
529   for (size_t i = pos; i < count; i++)
530     s4u_comms[i]->add_ref();
531   return pos;
532 }
533
534 ssize_t sg_comm_wait_any(sg_comm_t* comms, size_t count)
535 {
536   return sg_comm_wait_any_for(comms, count, -1);
537 }
538
539 ssize_t sg_comm_wait_any_for(sg_comm_t* comms, size_t count, double timeout)
540 {
541   std::vector<simgrid::s4u::CommPtr> s4u_comms;
542   for (size_t i = 0; i < count; i++)
543     s4u_comms.emplace_back(comms[i], false);
544
545   ssize_t pos = simgrid::s4u::Comm::wait_any_for(s4u_comms, timeout);
546   for (size_t i = 0; i < count; i++) {
547     if (pos != -1 && static_cast<size_t>(pos) != i)
548       s4u_comms[i]->add_ref();
549   }
550   return pos;
551 }