Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / s4u / s4u_Mess.cpp
1 /* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <cmath>
7 #include <simgrid/Exception.hpp>
8 #include <simgrid/s4u/ActivitySet.hpp>
9 #include <simgrid/s4u/Mess.hpp>
10 #include <simgrid/s4u/Engine.hpp>
11 #include <simgrid/s4u/MessageQueue.hpp>
12
13 #include "src/kernel/activity/MessImpl.hpp"
14 #include "src/kernel/actor/ActorImpl.hpp"
15 #include "src/kernel/actor/SimcallObserver.hpp"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mess, s4u_activity, "S4U asynchronous messaging");
18
19 namespace simgrid::s4u {
20 xbt::signal<void(Mess const&)> Mess::on_send;
21 xbt::signal<void(Mess const&)> Mess::on_recv;
22
23 MessPtr Mess::set_queue(MessageQueue* queue)
24 {
25   queue_ = queue;
26   return this;
27 }
28
29 MessPtr Mess::set_payload(void* payload)
30 {
31   payload_ = payload;
32   return this;
33 }
34
35 MessPtr Mess::set_dst_data(void** buff, size_t size)
36 {
37   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
38              __func__);
39
40   dst_buff_      = buff;
41   dst_buff_size_ = size;
42   return this;
43 }
44
45 Actor* Mess::get_sender() const
46 {
47   kernel::actor::ActorImplPtr sender = nullptr;
48   if (pimpl_)
49     sender = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->src_actor_;
50   return sender ? sender->get_ciface() : nullptr;
51 }
52
53 Actor* Mess::get_receiver() const
54 {
55   kernel::actor::ActorImplPtr receiver = nullptr;
56   if (pimpl_)
57     receiver = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->dst_actor_;
58   return receiver ? receiver->get_ciface() : nullptr;
59 }
60
61 Mess* Mess::do_start()
62 {
63   xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
64              "You cannot use %s() once your message exchange has started (not implemented)", __func__);
65
66   auto myself = kernel::actor::ActorImpl::self();
67   if (myself == sender_) {
68     on_send(*this);
69     on_this_send(*this);
70     kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
71     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
72                                              &observer);
73   } else if (myself == receiver_) {
74     on_recv(*this);
75     on_this_recv(*this);
76     kernel::actor::MessIgetSimcall observer{receiver_,
77                                             queue_->get_impl(),
78                                             static_cast<unsigned char*>(dst_buff_),
79                                             &dst_buff_size_,
80                                             get_payload()};
81     pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
82                                              &observer);
83   } else {
84     xbt_die("Cannot start a message exchange before specifying whether we are the sender or the receiver");
85   }
86
87   pimpl_->set_iface(this);
88   pimpl_->set_actor(sender_);
89   // Only throw the signal when both sides are here and the status is READY
90   if (pimpl_->get_state() != kernel::activity::State::WAITING) {
91     fire_on_start();
92     fire_on_this_start();
93   }
94   state_ = State::STARTED;
95   return this;
96 }
97
98 Mess* Mess::wait_for(double timeout)
99 {
100   XBT_DEBUG("Calling Mess::wait_for with state %s", get_state_str());
101   kernel::actor::ActorImpl* issuer = nullptr;
102   switch (state_) {
103     case State::FINISHED:
104       break;
105     case State::FAILED:
106       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
107     case State::INITED:
108     case State::STARTING:
109       if (get_payload() != nullptr) {
110         on_send(*this);
111         on_this_send(*this);
112         kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
113         pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
114                                                  &observer);
115       } else { // Receiver
116         on_recv(*this);
117         on_this_recv(*this);
118         kernel::actor::MessIgetSimcall observer{receiver_,
119                                                 queue_->get_impl(),
120                                                 static_cast<unsigned char*>(dst_buff_),
121                                                 &dst_buff_size_,
122                                                 get_payload()};
123         pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
124                                                  &observer);
125       }
126       break;
127     case State::STARTED:
128       try {
129         issuer = kernel::actor::ActorImpl::self();
130         kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
131         if (kernel::actor::simcall_blocking(
132                 [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
133                 &observer)) {
134           throw TimeoutException(XBT_THROW_POINT, "Timeouted");
135         }
136       } catch (const NetworkFailureException& e) {
137         issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
138         complete(State::FAILED);
139         e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
140       }
141       break;
142
143     case State::CANCELED:
144       throw CancelException(XBT_THROW_POINT, "Message canceled");
145
146     default:
147       THROW_IMPOSSIBLE;
148   }
149   complete(State::FINISHED);
150   return this;
151 }
152
153 } // namespace simgrid::s4u