Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change some calls to get_cname to calls to get_name.
[simgrid.git] / src / kernel / activity / CommImpl.cpp
1 /* Copyright (c) 2007-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/kernel/activity/CommImpl.hpp"
7 #include "simgrid/kernel/resource/Action.hpp"
8 #include "simgrid/modelchecker.h"
9 #include "simgrid/s4u/Host.hpp"
10 #include "src/kernel/activity/MailboxImpl.hpp"
11 #include "src/mc/mc_replay.hpp"
12 #include "src/simix/smx_network_private.hpp"
13 #include "src/surf/network_interface.hpp"
14 #include "src/surf/surf_interface.hpp"
15
16 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_network);
17
18 /******************************************************************************/
19 /*                    SIMIX_comm_copy_data callbacks                       */
20 /******************************************************************************/
21 static void (*SIMIX_comm_copy_data_callback)(smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
22
23 void SIMIX_comm_set_copy_data_callback(void (*callback)(smx_activity_t, void*, size_t))
24 {
25   SIMIX_comm_copy_data_callback = callback;
26 }
27
28 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
29 {
30   simgrid::kernel::activity::CommImplPtr comm =
31       boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(synchro);
32
33   xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
34   *(void**)(comm->dst_buff_) = buff;
35 }
36
37 namespace simgrid {
38 namespace kernel {
39 namespace activity {
40
41 CommImpl::CommImpl(e_smx_comm_type_t _type) : type(_type)
42 {
43   state_   = SIMIX_WAITING;
44   src_data_ = nullptr;
45   dst_data_ = nullptr;
46   XBT_DEBUG("Create comm activity %p", this);
47 }
48
49 CommImpl::~CommImpl()
50 {
51   XBT_DEBUG("Really free communication %p", this);
52
53   cleanupSurf();
54
55   if (detached && state_ != SIMIX_DONE) {
56     /* the communication has failed and was detached:
57      * we have to free the buffer */
58     if (clean_fun)
59       clean_fun(src_buff_);
60     src_buff_ = nullptr;
61   }
62
63   if (mbox)
64     mbox->remove(this);
65 }
66
67 /**  @brief Starts the simulation of a communication synchro. */
68 void CommImpl::start()
69 {
70   /* If both the sender and the receiver are already there, start the communication */
71   if (state_ == SIMIX_READY) {
72
73     s4u::Host* sender   = src_actor_->host_;
74     s4u::Host* receiver = dst_actor_->host_;
75
76     surf_action_ = surf_network_model->communicate(sender, receiver, task_size_, rate_);
77     surf_action_->set_data(this);
78     state_ = SIMIX_RUNNING;
79
80     XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p)", this, sender->get_cname(),
81               receiver->get_cname(), surf_action_);
82
83     /* If a link is failed, detect it immediately */
84     if (surf_action_->get_state() == resource::Action::State::FAILED) {
85       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(),
86                 receiver->get_cname());
87       state_ = SIMIX_LINK_FAILURE;
88       cleanupSurf();
89     }
90
91     /* If any of the process is suspended, create the synchro but stop its execution,
92        it will be restarted when the sender process resume */
93     if (src_actor_->is_suspended() || dst_actor_->is_suspended()) {
94       if (src_actor_->is_suspended())
95         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
96                   "communication",
97                   src_actor_->get_cname(), src_actor_->host_->get_cname());
98       else
99         XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
100                   "communication",
101                   dst_actor_->get_cname(), dst_actor_->host_->get_cname());
102
103       surf_action_->suspend();
104     }
105   }
106 }
107
108 /** @brief Copy the communication data from the sender's buffer to the receiver's one  */
109 void CommImpl::copy_data()
110 {
111   size_t buff_size = src_buff_size_;
112   /* If there is no data to copy then return */
113   if (not src_buff_ || not dst_buff_ || copied)
114     return;
115
116   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", this,
117             src_actor_ ? src_actor_->host_->get_cname() : "a finished process", src_buff_,
118             dst_actor_ ? dst_actor_->host_->get_cname() : "a finished process", dst_buff_, buff_size);
119
120   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
121   if (dst_buff_size_)
122     buff_size = std::min(buff_size, *(dst_buff_size_));
123
124   /* Update the receiver's buffer size to the copied amount */
125   if (dst_buff_size_)
126     *dst_buff_size_ = buff_size;
127
128   if (buff_size > 0) {
129     if (copy_data_fun)
130       copy_data_fun(this, src_buff_, buff_size);
131     else
132       SIMIX_comm_copy_data_callback(this, src_buff_, buff_size);
133   }
134
135   /* Set the copied flag so we copy data only once */
136   /* (this function might be called from both communication ends) */
137   copied = true;
138 }
139
140 void CommImpl::suspend()
141 {
142   /* FIXME: shall we suspend also the timeout synchro? */
143   if (surf_action_)
144     surf_action_->suspend();
145   /* if not created yet, the action will be suspended on creation, in CommImpl::start() */
146 }
147
148 void CommImpl::resume()
149 {
150   /*FIXME: check what happen with the timeouts */
151   if (surf_action_)
152     surf_action_->resume();
153   /* in the other case, the synchro were not really suspended yet, see CommImpl::suspend() and CommImpl::start() */
154 }
155
156 void CommImpl::cancel()
157 {
158   /* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */
159   if (state_ == SIMIX_WAITING) {
160     mbox->remove(this);
161     state_ = SIMIX_CANCELED;
162   } else if (not MC_is_active() /* when running the MC there are no surf actions */
163              && not MC_record_replay_is_active() && (state_ == SIMIX_READY || state_ == SIMIX_RUNNING)) {
164     surf_action_->cancel();
165   }
166 }
167
168 /**  @brief get the amount remaining from the communication */
169 double CommImpl::remains()
170 {
171   return surf_action_->get_remains();
172 }
173
174 /** @brief This is part of the cleanup process, probably an internal command */
175 void CommImpl::cleanupSurf()
176 {
177   if (surf_action_) {
178     surf_action_->unref();
179     surf_action_ = nullptr;
180   }
181
182   if (src_timeout_) {
183     src_timeout_->unref();
184     src_timeout_ = nullptr;
185   }
186
187   if (dst_timeout_) {
188     dst_timeout_->unref();
189     dst_timeout_ = nullptr;
190   }
191 }
192
193 void CommImpl::post()
194 {
195   /* Update synchro state */
196   if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
197     state_ = SIMIX_SRC_TIMEOUT;
198   else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
199     state_ = SIMIX_DST_TIMEOUT;
200   else if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED)
201     state_ = SIMIX_SRC_HOST_FAILURE;
202   else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED)
203     state_ = SIMIX_DST_HOST_FAILURE;
204   else if (surf_action_ && surf_action_->get_state() == resource::Action::State::FAILED) {
205     state_ = SIMIX_LINK_FAILURE;
206   } else
207     state_ = SIMIX_DONE;
208
209   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", this, (int)state_,
210             src_actor_.get(), dst_actor_.get(), detached);
211
212   /* destroy the surf actions associated with the Simix communication */
213   cleanupSurf();
214
215   /* if there are simcalls associated with the synchro, then answer them */
216   if (not simcalls_.empty()) {
217     SIMIX_comm_finish(this);
218   }
219 }
220
221 } // namespace activity
222 } // namespace kernel
223 } // namespace simgrid