1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
14 #include "src/surf/surf_interface.hpp"
15 #include "src/simix/smx_private.h"
18 #include "src/mc/mc_replay.h"
20 #include "simgrid/s4u/Mailbox.hpp"
22 #include "src/kernel/activity/SynchroComm.hpp"
23 #include "src/surf/network_interface.hpp"
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
27 static void SIMIX_mbox_free(void *data);
28 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
30 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
31 static void SIMIX_comm_copy_data(smx_activity_t comm);
32 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t comm);
33 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
34 int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
35 static void SIMIX_comm_start(smx_activity_t synchro);
37 void SIMIX_mailbox_exit()
39 xbt_dict_free(&mailboxes);
42 /******************************************************************************/
43 /* Rendez-Vous Points */
44 /******************************************************************************/
46 smx_mailbox_t SIMIX_mbox_create(const char *name)
48 xbt_assert(name, "Mailboxes must have a name");
49 /* two processes may have pushed the same mbox_create simcall at the same time */
50 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
52 mbox = new simgrid::simix::Mailbox(name);
53 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
54 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
59 void SIMIX_mbox_free(void *data)
61 XBT_DEBUG("mbox free %p", data);
62 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(data);
66 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
68 return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
72 * \brief set the receiver of the rendez vous point to allow eager sends
73 * \param mbox The rendez-vous point
74 * \param process The receiving process
76 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t process)
78 mbox->permanent_receiver = process;
82 * \brief Pushes a communication synchro into a rendez-vous point
83 * \param mbox The mailbox
84 * \param synchro The communication synchro
86 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro)
88 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
89 mbox->comm_queue.push_back(comm);
94 * \brief Removes a communication synchro from a rendez-vous point
95 * \param mbox The rendez-vous point
96 * \param synchro The communication synchro
98 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t synchro)
100 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
102 comm->mbox = nullptr;
103 for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
105 mbox->comm_queue. erase(it);
108 xbt_die("Cannot remove the comm %p that is not part of the mailbox %s",comm, mbox->name);
112 * \brief Checks if there is a communication synchro queued in a deque matching our needs
113 * \param type The type of communication we are looking for (comm_send, comm_recv)
114 * \return The communication synchro if found, nullptr otherwise
116 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
117 int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
119 void* other_user_data = nullptr;
121 for(auto it = deque->begin(); it != deque->end(); it++){
122 smx_activity_t synchro = *it;
123 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
125 if (comm->type == SIMIX_COMM_SEND) {
126 other_user_data = comm->src_data;
127 } else if (comm->type == SIMIX_COMM_RECEIVE) {
128 other_user_data = comm->dst_data;
130 if (comm->type == type &&
131 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
132 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
133 XBT_DEBUG("Found a matching communication synchro %p", comm);
138 comm->mbox_cpy = comm->mbox;
140 comm->mbox = nullptr;
143 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
144 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
145 comm, (int)comm->type, (int)type);
147 XBT_DEBUG("No matching communication synchro found");
151 /******************************************************************************/
152 /* Communication synchros */
153 /******************************************************************************/
154 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
155 double task_size, double rate,
156 void *src_buff, size_t src_buff_size,
157 int (*match_fun)(void *, void *,smx_activity_t),
158 void (*copy_data_fun)(smx_activity_t, void*, size_t),
159 void *data, double timeout){
160 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
161 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
163 SIMCALL_SET_MC_VALUE(simcall, 0);
164 simcall_HANDLER_comm_wait(simcall, comm, timeout);
166 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
167 double task_size, double rate,
168 void *src_buff, size_t src_buff_size,
169 int (*match_fun)(void *, void *,smx_activity_t),
170 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
171 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
172 void *data, int detached)
174 XBT_DEBUG("send from %p", mbox);
176 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
177 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
179 /* Look for communication synchro matching our needs. We also provide a description of
180 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
182 * If it is not found then push our communication into the rendez-vous point */
183 smx_activity_t other_synchro =
184 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
185 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
188 if (!other_synchro) {
189 other_synchro = this_synchro;
190 other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
192 if (mbox->permanent_receiver!=nullptr){
193 //this mailbox is for small messages, which have to be sent right now
194 other_synchro->state = SIMIX_READY;
195 other_comm->dst_proc=mbox->permanent_receiver.get();
197 mbox->done_comm_queue.push_back(other_synchro);
198 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
201 SIMIX_mbox_push(mbox, this_synchro);
204 XBT_DEBUG("Receive already pushed");
205 this_synchro->unref();
207 other_comm->state = SIMIX_READY;
208 other_comm->type = SIMIX_COMM_READY;
211 xbt_fifo_push(src_proc->comms, other_synchro);
215 other_comm->detached = true;
216 other_comm->clean_fun = clean_fun;
218 other_comm->clean_fun = nullptr;
221 /* Setup the communication synchro */
222 other_comm->src_proc = src_proc;
223 other_comm->task_size = task_size;
224 other_comm->rate = rate;
225 other_comm->src_buff = src_buff;
226 other_comm->src_buff_size = src_buff_size;
227 other_comm->src_data = data;
229 other_comm->match_fun = match_fun;
230 other_comm->copy_data_fun = copy_data_fun;
233 if (MC_is_active() || MC_record_replay_is_active()) {
234 other_comm->state = SIMIX_RUNNING;
235 return (detached ? nullptr : other_comm);
238 SIMIX_comm_start(other_comm);
239 return (detached ? nullptr : other_comm);
242 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
243 void *dst_buff, size_t *dst_buff_size,
244 int (*match_fun)(void *, void *, smx_activity_t),
245 void (*copy_data_fun)(smx_activity_t, void*, size_t),
246 void *data, double timeout, double rate)
248 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
249 SIMCALL_SET_MC_VALUE(simcall, 0);
250 simcall_HANDLER_comm_wait(simcall, comm, timeout);
253 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
254 void *dst_buff, size_t *dst_buff_size,
255 int (*match_fun)(void *, void *, smx_activity_t),
256 void (*copy_data_fun)(smx_activity_t, void*, size_t),
257 void *data, double rate)
259 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
262 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
263 int (*match_fun)(void *, void *, smx_activity_t),
264 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
265 void *data, double rate)
267 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
268 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
270 smx_activity_t other_synchro;
271 //communication already done, get it inside the fifo of completed comms
272 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
274 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
275 //find a match in the already received fifo
276 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
277 //if not found, assume the receiver came first, register it to the mailbox in the classical way
278 if (!other_synchro) {
279 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
280 other_synchro = this_synchro;
281 SIMIX_mbox_push(mbox, this_synchro);
283 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
285 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
286 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
287 other_comm->state = SIMIX_DONE;
288 other_comm->type = SIMIX_COMM_DONE;
289 other_comm->mbox = nullptr;
292 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
295 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
297 /* Look for communication synchro matching our needs. We also provide a description of
298 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
300 * If it is not found then push our communication into the rendez-vous point */
301 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
303 if (!other_synchro) {
304 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
305 other_synchro = this_synchro;
306 SIMIX_mbox_push(mbox, this_synchro);
308 this_synchro->unref();
309 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
311 other_comm->state = SIMIX_READY;
312 other_comm->type = SIMIX_COMM_READY;
314 xbt_fifo_push(dst_proc->comms, other_synchro);
317 /* Setup communication synchro */
318 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
319 other_comm->dst_proc = dst_proc;
320 other_comm->dst_buff = dst_buff;
321 other_comm->dst_buff_size = dst_buff_size;
322 other_comm->dst_data = data;
324 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
325 other_comm->rate = rate;
327 other_comm->match_fun = match_fun;
328 other_comm->copy_data_fun = copy_data_fun;
330 if (MC_is_active() || MC_record_replay_is_active()) {
331 other_synchro->state = SIMIX_RUNNING;
332 return other_synchro;
335 SIMIX_comm_start(other_synchro);
336 return other_synchro;
339 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
340 int type, int src, int tag,
341 int (*match_fun)(void *, void *, smx_activity_t),
343 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
346 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
347 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
349 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
350 simgrid::kernel::activity::Comm* this_comm;
353 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
354 smx_type = SIMIX_COMM_RECEIVE;
356 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
357 smx_type = SIMIX_COMM_SEND;
359 smx_activity_t other_synchro=nullptr;
360 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
361 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
362 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
363 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
366 XBT_DEBUG("check if we have more luck in the normal mailbox");
367 other_synchro = _find_matching_comm(&mbox->comm_queue,
368 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
372 other_synchro->unref();
375 return other_synchro;
378 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
380 /* Associate this simcall to the wait synchro */
381 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
383 synchro->simcalls.push_back(simcall);
384 simcall->issuer->waiting_synchro = synchro;
386 if (MC_is_active() || MC_record_replay_is_active()) {
387 int idx = SIMCALL_GET_MC_VALUE(simcall);
389 synchro->state = SIMIX_DONE;
391 /* If we reached this point, the wait simcall must have a timeout */
392 /* Otherwise it shouldn't be enabled and executed by the MC */
396 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
397 if (comm->src_proc == simcall->issuer)
398 comm->state = SIMIX_SRC_TIMEOUT;
400 comm->state = SIMIX_DST_TIMEOUT;
403 SIMIX_comm_finish(synchro);
407 /* If the synchro has already finish perform the error handling, */
408 /* otherwise set up a waiting timeout on the right side */
409 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
410 SIMIX_comm_finish(synchro);
411 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
412 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
413 sleep->setData(synchro);
415 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
416 if (simcall->issuer == comm->src_proc)
417 comm->src_timeout = sleep;
419 comm->dst_timeout = sleep;
423 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
425 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
427 if (MC_is_active() || MC_record_replay_is_active()){
428 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
429 if (simcall_comm_test__get__result(simcall)){
430 synchro->state = SIMIX_DONE;
431 synchro->simcalls.push_back(simcall);
432 SIMIX_comm_finish(synchro);
434 SIMIX_simcall_answer(simcall);
439 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
440 if (simcall_comm_test__get__result(simcall)) {
441 synchro->simcalls.push_back(simcall);
442 SIMIX_comm_finish(synchro);
444 SIMIX_simcall_answer(simcall);
448 void simcall_HANDLER_comm_testany(
449 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
451 // The default result is -1 -- this means, "nothing is ready".
452 // It can be changed below, but only if something matches.
453 simcall_comm_testany__set__result(simcall, -1);
455 if (MC_is_active() || MC_record_replay_is_active()){
456 int idx = SIMCALL_GET_MC_VALUE(simcall);
458 SIMIX_simcall_answer(simcall);
460 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
461 simcall_comm_testany__set__result(simcall, idx);
462 synchro->simcalls.push_back(simcall);
463 synchro->state = SIMIX_DONE;
464 SIMIX_comm_finish(synchro);
469 for (std::size_t i = 0; i != count; ++i) {
470 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
471 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
472 simcall_comm_testany__set__result(simcall, i);
473 synchro->simcalls.push_back(simcall);
474 SIMIX_comm_finish(synchro);
478 SIMIX_simcall_answer(simcall);
481 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
483 smx_activity_t synchro;
484 unsigned int cursor = 0;
486 if (MC_is_active() || MC_record_replay_is_active()){
488 xbt_die("Timeout not implemented for waitany in the model-checker");
489 int idx = SIMCALL_GET_MC_VALUE(simcall);
490 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
491 synchro->simcalls.push_back(simcall);
492 simcall_comm_waitany__set__result(simcall, idx);
493 synchro->state = SIMIX_DONE;
494 SIMIX_comm_finish(synchro);
499 simcall->timer = NULL;
501 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
502 SIMIX_waitany_remove_simcall_from_actions(simcall);
503 simcall_comm_waitany__set__result(simcall, -1);
504 SIMIX_simcall_answer(simcall);
508 xbt_dynar_foreach(synchros, cursor, synchro){
509 /* associate this simcall to the the synchro */
510 synchro->simcalls.push_back(simcall);
512 /* see if the synchro is already finished */
513 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
514 SIMIX_comm_finish(synchro);
520 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
522 smx_activity_t synchro;
523 unsigned int cursor = 0;
524 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
526 xbt_dynar_foreach(synchros, cursor, synchro) {
527 // Remove the first occurence of simcall:
528 auto i = boost::range::find(synchro->simcalls, simcall);
529 if (i != synchro->simcalls.end())
530 synchro->simcalls.erase(i);
535 * \brief Starts the simulation of a communication synchro.
536 * \param synchro the communication synchro
538 static inline void SIMIX_comm_start(smx_activity_t synchro)
540 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
542 /* If both the sender and the receiver are already there, start the communication */
543 if (synchro->state == SIMIX_READY) {
545 simgrid::s4u::Host* sender = comm->src_proc->host;
546 simgrid::s4u::Host* receiver = comm->dst_proc->host;
548 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
550 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
551 comm->surf_comm->setData(synchro);
552 comm->state = SIMIX_RUNNING;
554 /* If a link is failed, detect it immediately */
555 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
556 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
558 comm->state = SIMIX_LINK_FAILURE;
562 /* If any of the process is suspend, create the synchro but stop its execution,
563 it will be restarted when the sender process resume */
564 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
565 if (SIMIX_process_is_suspended(comm->src_proc))
566 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
568 comm->src_proc->cname(), comm->src_proc->host->cname());
570 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
572 comm->dst_proc->cname(), comm->dst_proc->host->cname());
574 comm->surf_comm->suspend();
580 * \brief Answers the SIMIX simcalls associated to a communication synchro.
581 * \param synchro a finished communication synchro
583 void SIMIX_comm_finish(smx_activity_t synchro)
585 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
586 unsigned int destroy_count = 0;
588 while (!synchro->simcalls.empty()) {
589 smx_simcall_t simcall = synchro->simcalls.front();
590 synchro->simcalls.pop_front();
592 /* If a waitany simcall is waiting for this synchro to finish, then remove
593 it from the other synchros in the waitany list. Afterwards, get the
594 position of the actual synchro in the waitany dynar and
595 return it as the result of the simcall */
597 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
598 continue; // if process handling comm is killed
599 if (simcall->call == SIMCALL_COMM_WAITANY) {
600 SIMIX_waitany_remove_simcall_from_actions(simcall);
601 if (simcall->timer) {
602 SIMIX_timer_remove(simcall->timer);
603 simcall->timer = nullptr;
605 if (!MC_is_active() && !MC_record_replay_is_active())
606 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
609 /* If the synchro is still in a rendez-vous point then remove from it */
611 SIMIX_mbox_remove(comm->mbox, synchro);
613 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
615 /* Check out for errors */
617 if (simcall->issuer->host->isOff()) {
618 simcall->issuer->context->iwannadie = 1;
619 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
621 switch (synchro->state) {
624 XBT_DEBUG("Communication %p complete!", synchro);
625 SIMIX_comm_copy_data(synchro);
628 case SIMIX_SRC_TIMEOUT:
629 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
632 case SIMIX_DST_TIMEOUT:
633 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
636 case SIMIX_SRC_HOST_FAILURE:
637 if (simcall->issuer == comm->src_proc)
638 simcall->issuer->context->iwannadie = 1;
639 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
641 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
644 case SIMIX_DST_HOST_FAILURE:
645 if (simcall->issuer == comm->dst_proc)
646 simcall->issuer->context->iwannadie = 1;
647 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
649 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
652 case SIMIX_LINK_FAILURE:
655 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
656 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
657 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
659 if (comm->src_proc == simcall->issuer) {
660 XBT_DEBUG("I'm source");
661 } else if (comm->dst_proc == simcall->issuer) {
662 XBT_DEBUG("I'm dest");
664 XBT_DEBUG("I'm neither source nor dest");
666 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
670 if (simcall->issuer == comm->dst_proc)
671 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
673 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
677 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
681 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
682 if (simcall->issuer->exception) {
683 // In order to modify the exception we have to rethrow it:
685 std::rethrow_exception(simcall->issuer->exception);
688 if (simcall->call == SIMCALL_COMM_WAITANY) {
689 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
691 else if (simcall->call == SIMCALL_COMM_TESTANY) {
693 auto comms = simcall_comm_testany__get__comms(simcall);
694 auto count = simcall_comm_testany__get__count(simcall);
695 auto element = std::find(comms, comms + count, synchro);
696 if (element == comms + count)
699 e.value = element - comms;
701 simcall->issuer->exception = std::make_exception_ptr(e);
708 if (simcall->issuer->host->isOff()) {
709 simcall->issuer->context->iwannadie = 1;
712 simcall->issuer->waiting_synchro = nullptr;
713 xbt_fifo_remove(simcall->issuer->comms, synchro);
715 if(simcall->issuer == comm->src_proc){
717 xbt_fifo_remove(comm->dst_proc->comms, synchro);
719 if(simcall->issuer == comm->dst_proc){
721 xbt_fifo_remove(comm->src_proc->comms, synchro);
722 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
726 SIMIX_simcall_answer(simcall);
730 while (destroy_count-- > 0)
731 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
734 /******************************************************************************/
735 /* SIMIX_comm_copy_data callbacks */
736 /******************************************************************************/
737 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
739 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
741 SIMIX_comm_copy_data_callback = callback;
744 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
746 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
748 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
749 *(void **) (comm->dst_buff) = buff;
752 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
754 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
756 XBT_DEBUG("Copy the data over");
757 memcpy(comm->dst_buff, buff, buff_size);
758 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
760 comm->src_buff = nullptr;
766 * \brief Copy the communication data from the sender's buffer to the receiver's one
767 * \param comm The communication
769 void SIMIX_comm_copy_data(smx_activity_t synchro)
771 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
773 size_t buff_size = comm->src_buff_size;
774 /* If there is no data to copy then return */
775 if (!comm->src_buff || !comm->dst_buff || comm->copied)
778 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
779 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
780 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
782 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
783 if (comm->dst_buff_size)
784 buff_size = MIN(buff_size, *(comm->dst_buff_size));
786 /* Update the receiver's buffer size to the copied amount */
787 if (comm->dst_buff_size)
788 *comm->dst_buff_size = buff_size;
791 if(comm->copy_data_fun)
792 comm->copy_data_fun (comm, comm->src_buff, buff_size);
794 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
798 /* Set the copied flag so we copy data only once */
799 /* (this function might be called from both communication ends) */