1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
14 #include "src/surf/surf_interface.hpp"
15 #include "src/simix/smx_private.h"
18 #include "src/mc/mc_replay.h"
20 #include "simgrid/s4u/Mailbox.hpp"
22 #include "src/kernel/activity/SynchroComm.hpp"
23 #include "src/surf/network_interface.hpp"
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
27 static void SIMIX_mbox_free(void *data);
28 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
30 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
31 static void SIMIX_comm_copy_data(smx_activity_t comm);
32 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t comm);
33 static smx_activity_t _find_matching_comm(std::deque<smx_activity_t> *deque, e_smx_comm_type_t type,
34 int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
35 static void SIMIX_comm_start(smx_activity_t synchro);
37 void SIMIX_mailbox_exit()
39 xbt_dict_free(&mailboxes);
42 /******************************************************************************/
43 /* Rendez-Vous Points */
44 /******************************************************************************/
46 smx_mailbox_t SIMIX_mbox_create(const char *name)
48 xbt_assert(name, "Mailboxes must have a name");
49 /* two processes may have pushed the same mbox_create simcall at the same time */
50 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
52 mbox = new simgrid::simix::Mailbox(name);
53 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
54 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
59 void SIMIX_mbox_free(void *data)
61 XBT_DEBUG("mbox free %p", data);
62 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(data);
66 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
68 return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
72 * \brief set the receiver of the rendez vous point to allow eager sends
73 * \param mbox The rendez-vous point
74 * \param process The receiving process
76 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t process)
78 mbox->permanent_receiver = process;
82 * \brief Pushes a communication synchro into a rendez-vous point
83 * \param mbox The mailbox
84 * \param synchro The communication synchro
86 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro)
88 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
89 mbox->comm_queue.push_back(comm);
94 * \brief Removes a communication synchro from a rendez-vous point
95 * \param mbox The rendez-vous point
96 * \param synchro The communication synchro
98 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t synchro)
100 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
102 comm->mbox = nullptr;
103 for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
105 mbox->comm_queue. erase(it);
108 xbt_die("Cannot remove the comm %p that is not part of the mailbox %s",comm, mbox->name);
112 * \brief Checks if there is a communication synchro queued in a deque matching our needs
113 * \param type The type of communication we are looking for (comm_send, comm_recv)
114 * \return The communication synchro if found, nullptr otherwise
116 static smx_activity_t _find_matching_comm(std::deque<smx_activity_t> *deque, e_smx_comm_type_t type,
117 int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
119 void* other_user_data = nullptr;
121 for(auto it = deque->begin(); it != deque->end(); it++){
122 smx_activity_t synchro = *it;
123 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
125 if (comm->type == SIMIX_COMM_SEND) {
126 other_user_data = comm->src_data;
127 } else if (comm->type == SIMIX_COMM_RECEIVE) {
128 other_user_data = comm->dst_data;
130 if (comm->type == type &&
131 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
132 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
133 XBT_DEBUG("Found a matching communication synchro %p", comm);
138 comm->mbox_cpy = comm->mbox;
140 comm->mbox = nullptr;
143 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
144 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
145 comm, (int)comm->type, (int)type);
147 XBT_DEBUG("No matching communication synchro found");
151 /******************************************************************************/
152 /* Communication synchros */
153 /******************************************************************************/
154 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
155 double task_size, double rate,
156 void *src_buff, size_t src_buff_size,
157 int (*match_fun)(void *, void *,smx_activity_t),
158 void (*copy_data_fun)(smx_activity_t, void*, size_t),
159 void *data, double timeout){
160 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
161 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
163 SIMCALL_SET_MC_VALUE(simcall, 0);
164 simcall_HANDLER_comm_wait(simcall, comm, timeout);
166 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
167 double task_size, double rate,
168 void *src_buff, size_t src_buff_size,
169 int (*match_fun)(void *, void *,smx_activity_t),
170 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
171 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
172 void *data, int detached)
174 XBT_DEBUG("send from %p", mbox);
176 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
177 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
179 /* Look for communication synchro matching our needs. We also provide a description of
180 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
182 * If it is not found then push our communication into the rendez-vous point */
183 smx_activity_t other_synchro =
184 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
185 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
188 if (!other_synchro) {
189 other_synchro = this_synchro;
190 other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
192 if (mbox->permanent_receiver!=nullptr){
193 //this mailbox is for small messages, which have to be sent right now
194 other_synchro->state = SIMIX_READY;
195 other_comm->dst_proc=mbox->permanent_receiver.get();
197 mbox->done_comm_queue.push_back(other_synchro);
198 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
201 SIMIX_mbox_push(mbox, this_synchro);
204 XBT_DEBUG("Receive already pushed");
205 this_synchro->unref();
207 other_comm->state = SIMIX_READY;
208 other_comm->type = SIMIX_COMM_READY;
211 xbt_fifo_push(src_proc->comms, other_synchro);
215 other_comm->detached = true;
216 other_comm->clean_fun = clean_fun;
218 other_comm->clean_fun = nullptr;
221 /* Setup the communication synchro */
222 other_comm->src_proc = src_proc;
223 other_comm->task_size = task_size;
224 other_comm->rate = rate;
225 other_comm->src_buff = src_buff;
226 other_comm->src_buff_size = src_buff_size;
227 other_comm->src_data = data;
229 other_comm->match_fun = match_fun;
230 other_comm->copy_data_fun = copy_data_fun;
233 if (MC_is_active() || MC_record_replay_is_active()) {
234 other_comm->state = SIMIX_RUNNING;
235 return (detached ? nullptr : other_comm);
238 SIMIX_comm_start(other_comm);
239 return (detached ? nullptr : other_comm);
242 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
243 void *dst_buff, size_t *dst_buff_size,
244 int (*match_fun)(void *, void *, smx_activity_t),
245 void (*copy_data_fun)(smx_activity_t, void*, size_t),
246 void *data, double timeout, double rate)
248 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
249 SIMCALL_SET_MC_VALUE(simcall, 0);
250 simcall_HANDLER_comm_wait(simcall, comm, timeout);
253 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
254 void *dst_buff, size_t *dst_buff_size,
255 int (*match_fun)(void *, void *, smx_activity_t),
256 void (*copy_data_fun)(smx_activity_t, void*, size_t),
257 void *data, double rate)
259 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
262 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
263 int (*match_fun)(void *, void *, smx_activity_t),
264 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
265 void *data, double rate)
267 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
268 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
270 smx_activity_t other_synchro;
271 //communication already done, get it inside the fifo of completed comms
272 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
274 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
275 //find a match in the already received fifo
276 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
277 //if not found, assume the receiver came first, register it to the mailbox in the classical way
278 if (!other_synchro) {
279 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
280 other_synchro = this_synchro;
281 SIMIX_mbox_push(mbox, this_synchro);
283 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
285 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
286 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
287 other_comm->state = SIMIX_DONE;
288 other_comm->type = SIMIX_COMM_DONE;
289 other_comm->mbox = nullptr;
292 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
295 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
297 /* Look for communication synchro matching our needs. We also provide a description of
298 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
300 * If it is not found then push our communication into the rendez-vous point */
301 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
303 if (!other_synchro) {
304 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
305 other_synchro = this_synchro;
306 SIMIX_mbox_push(mbox, this_synchro);
308 this_synchro->unref();
309 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
311 other_comm->state = SIMIX_READY;
312 other_comm->type = SIMIX_COMM_READY;
314 xbt_fifo_push(dst_proc->comms, other_synchro);
317 /* Setup communication synchro */
318 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
319 other_comm->dst_proc = dst_proc;
320 other_comm->dst_buff = dst_buff;
321 other_comm->dst_buff_size = dst_buff_size;
322 other_comm->dst_data = data;
324 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
325 other_comm->rate = rate;
327 other_comm->match_fun = match_fun;
328 other_comm->copy_data_fun = copy_data_fun;
330 if (MC_is_active() || MC_record_replay_is_active()) {
331 other_synchro->state = SIMIX_RUNNING;
332 return other_synchro;
335 SIMIX_comm_start(other_synchro);
336 return other_synchro;
339 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
340 int type, int src, int tag,
341 int (*match_fun)(void *, void *, smx_activity_t),
343 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
346 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
347 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
349 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
350 simgrid::kernel::activity::Comm* this_comm;
353 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
354 smx_type = SIMIX_COMM_RECEIVE;
356 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
357 smx_type = SIMIX_COMM_SEND;
359 smx_activity_t other_synchro=nullptr;
360 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
361 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
362 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
363 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
366 XBT_DEBUG("check if we have more luck in the normal mailbox");
367 other_synchro = _find_matching_comm(&mbox->comm_queue,
368 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
372 other_synchro->unref();
375 return other_synchro;
378 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
380 /* Associate this simcall to the wait synchro */
381 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
383 synchro->simcalls.push_back(simcall);
384 simcall->issuer->waiting_synchro = synchro;
386 if (MC_is_active() || MC_record_replay_is_active()) {
387 int idx = SIMCALL_GET_MC_VALUE(simcall);
389 synchro->state = SIMIX_DONE;
391 /* If we reached this point, the wait simcall must have a timeout */
392 /* Otherwise it shouldn't be enabled and executed by the MC */
396 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
397 if (comm->src_proc == simcall->issuer)
398 comm->state = SIMIX_SRC_TIMEOUT;
400 comm->state = SIMIX_DST_TIMEOUT;
403 SIMIX_comm_finish(synchro);
407 /* If the synchro has already finish perform the error handling, */
408 /* otherwise set up a waiting timeout on the right side */
409 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
410 SIMIX_comm_finish(synchro);
411 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
412 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
413 sleep->setData(synchro);
415 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
416 if (simcall->issuer == comm->src_proc)
417 comm->src_timeout = sleep;
419 comm->dst_timeout = sleep;
423 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
425 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
427 if (MC_is_active() || MC_record_replay_is_active()){
428 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
429 if (simcall_comm_test__get__result(simcall)){
430 synchro->state = SIMIX_DONE;
431 synchro->simcalls.push_back(simcall);
432 SIMIX_comm_finish(synchro);
434 SIMIX_simcall_answer(simcall);
439 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
440 if (simcall_comm_test__get__result(simcall)) {
441 synchro->simcalls.push_back(simcall);
442 SIMIX_comm_finish(synchro);
444 SIMIX_simcall_answer(simcall);
448 void simcall_HANDLER_comm_testany(
449 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
451 // The default result is -1 -- this means, "nothing is ready".
452 // It can be changed below, but only if something matches.
453 simcall_comm_testany__set__result(simcall, -1);
455 if (MC_is_active() || MC_record_replay_is_active()){
456 int idx = SIMCALL_GET_MC_VALUE(simcall);
458 SIMIX_simcall_answer(simcall);
460 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
461 simcall_comm_testany__set__result(simcall, idx);
462 synchro->simcalls.push_back(simcall);
463 synchro->state = SIMIX_DONE;
464 SIMIX_comm_finish(synchro);
469 for (std::size_t i = 0; i != count; ++i) {
470 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
471 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
472 simcall_comm_testany__set__result(simcall, i);
473 synchro->simcalls.push_back(simcall);
474 SIMIX_comm_finish(synchro);
478 SIMIX_simcall_answer(simcall);
481 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
483 smx_activity_t synchro;
484 unsigned int cursor = 0;
486 if (MC_is_active() || MC_record_replay_is_active()){
488 xbt_die("Timeout not implemented for waitany in the model-checker");
489 int idx = SIMCALL_GET_MC_VALUE(simcall);
490 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
491 synchro->simcalls.push_back(simcall);
492 simcall_comm_waitany__set__result(simcall, idx);
493 synchro->state = SIMIX_DONE;
494 SIMIX_comm_finish(synchro);
499 simcall->timer = NULL;
501 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
502 SIMIX_waitany_remove_simcall_from_actions(simcall);
503 simcall_comm_waitany__set__result(simcall, -1);
504 SIMIX_simcall_answer(simcall);
508 xbt_dynar_foreach(synchros, cursor, synchro){
509 /* associate this simcall to the the synchro */
510 synchro->simcalls.push_back(simcall);
512 /* see if the synchro is already finished */
513 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
514 SIMIX_comm_finish(synchro);
520 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
522 smx_activity_t synchro;
523 unsigned int cursor = 0;
524 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
526 xbt_dynar_foreach(synchros, cursor, synchro) {
527 // Remove the first occurence of simcall:
528 auto i = boost::range::find(synchro->simcalls, simcall);
529 if (i != synchro->simcalls.end())
530 synchro->simcalls.erase(i);
535 * \brief Starts the simulation of a communication synchro.
536 * \param synchro the communication synchro
538 static inline void SIMIX_comm_start(smx_activity_t synchro)
540 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
542 /* If both the sender and the receiver are already there, start the communication */
543 if (synchro->state == SIMIX_READY) {
545 simgrid::s4u::Host* sender = comm->src_proc->host;
546 simgrid::s4u::Host* receiver = comm->dst_proc->host;
548 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
550 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
551 comm->surf_comm->setData(synchro);
552 comm->state = SIMIX_RUNNING;
554 /* If a link is failed, detect it immediately */
555 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
556 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
557 sg_host_get_name(sender), sg_host_get_name(receiver));
558 comm->state = SIMIX_LINK_FAILURE;
562 /* If any of the process is suspend, create the synchro but stop its execution,
563 it will be restarted when the sender process resume */
564 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
565 if (SIMIX_process_is_suspended(comm->src_proc))
566 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
567 comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
569 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
570 comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
572 comm->surf_comm->suspend();
578 * \brief Answers the SIMIX simcalls associated to a communication synchro.
579 * \param synchro a finished communication synchro
581 void SIMIX_comm_finish(smx_activity_t synchro)
583 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
584 unsigned int destroy_count = 0;
586 while (!synchro->simcalls.empty()) {
587 smx_simcall_t simcall = synchro->simcalls.front();
588 synchro->simcalls.pop_front();
590 /* If a waitany simcall is waiting for this synchro to finish, then remove
591 it from the other synchros in the waitany list. Afterwards, get the
592 position of the actual synchro in the waitany dynar and
593 return it as the result of the simcall */
595 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
596 continue; // if process handling comm is killed
597 if (simcall->call == SIMCALL_COMM_WAITANY) {
598 SIMIX_waitany_remove_simcall_from_actions(simcall);
599 if (simcall->timer) {
600 SIMIX_timer_remove(simcall->timer);
601 simcall->timer = nullptr;
603 if (!MC_is_active() && !MC_record_replay_is_active())
604 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
607 /* If the synchro is still in a rendez-vous point then remove from it */
609 SIMIX_mbox_remove(comm->mbox, synchro);
611 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
613 /* Check out for errors */
615 if (simcall->issuer->host->isOff()) {
616 simcall->issuer->context->iwannadie = 1;
617 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
619 switch (synchro->state) {
622 XBT_DEBUG("Communication %p complete!", synchro);
623 SIMIX_comm_copy_data(synchro);
626 case SIMIX_SRC_TIMEOUT:
627 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
630 case SIMIX_DST_TIMEOUT:
631 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
634 case SIMIX_SRC_HOST_FAILURE:
635 if (simcall->issuer == comm->src_proc)
636 simcall->issuer->context->iwannadie = 1;
637 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
639 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
642 case SIMIX_DST_HOST_FAILURE:
643 if (simcall->issuer == comm->dst_proc)
644 simcall->issuer->context->iwannadie = 1;
645 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
647 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
650 case SIMIX_LINK_FAILURE:
652 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
654 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
655 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
656 simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
657 if (comm->src_proc == simcall->issuer) {
658 XBT_DEBUG("I'm source");
659 } else if (comm->dst_proc == simcall->issuer) {
660 XBT_DEBUG("I'm dest");
662 XBT_DEBUG("I'm neither source nor dest");
664 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
668 if (simcall->issuer == comm->dst_proc)
669 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
671 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
675 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
679 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
680 if (simcall->issuer->exception) {
681 // In order to modify the exception we have to rethrow it:
683 std::rethrow_exception(simcall->issuer->exception);
686 if (simcall->call == SIMCALL_COMM_WAITANY) {
687 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
689 else if (simcall->call == SIMCALL_COMM_TESTANY) {
691 auto comms = simcall_comm_testany__get__comms(simcall);
692 auto count = simcall_comm_testany__get__count(simcall);
693 auto element = std::find(comms, comms + count, synchro);
694 if (element == comms + count)
697 e.value = element - comms;
699 simcall->issuer->exception = std::make_exception_ptr(e);
706 if (simcall->issuer->host->isOff()) {
707 simcall->issuer->context->iwannadie = 1;
710 simcall->issuer->waiting_synchro = nullptr;
711 xbt_fifo_remove(simcall->issuer->comms, synchro);
713 if(simcall->issuer == comm->src_proc){
715 xbt_fifo_remove(comm->dst_proc->comms, synchro);
717 if(simcall->issuer == comm->dst_proc){
719 xbt_fifo_remove(comm->src_proc->comms, synchro);
720 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
724 SIMIX_simcall_answer(simcall);
728 while (destroy_count-- > 0)
729 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
732 /******************************************************************************/
733 /* SIMIX_comm_copy_data callbacks */
734 /******************************************************************************/
735 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
737 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
739 SIMIX_comm_copy_data_callback = callback;
742 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
744 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
746 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
747 *(void **) (comm->dst_buff) = buff;
750 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
752 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
754 XBT_DEBUG("Copy the data over");
755 memcpy(comm->dst_buff, buff, buff_size);
756 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
758 comm->src_buff = nullptr;
764 * \brief Copy the communication data from the sender's buffer to the receiver's one
765 * \param comm The communication
767 void SIMIX_comm_copy_data(smx_activity_t synchro)
769 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
771 size_t buff_size = comm->src_buff_size;
772 /* If there is no data to copy then return */
773 if (!comm->src_buff || !comm->dst_buff || comm->copied)
776 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
778 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
780 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
781 comm->dst_buff, buff_size);
783 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
784 if (comm->dst_buff_size)
785 buff_size = MIN(buff_size, *(comm->dst_buff_size));
787 /* Update the receiver's buffer size to the copied amount */
788 if (comm->dst_buff_size)
789 *comm->dst_buff_size = buff_size;
792 if(comm->copy_data_fun)
793 comm->copy_data_fun (comm, comm->src_buff, buff_size);
795 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
799 /* Set the copied flag so we copy data only once */
800 /* (this function might be called from both communication ends) */