1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <boost/range/algorithm.hpp>
10 #include <simgrid/s4u/host.hpp>
12 #include "src/surf/surf_interface.hpp"
13 #include "src/simix/smx_private.h"
16 #include "src/mc/mc_replay.h"
18 #include "simgrid/s4u/mailbox.hpp"
20 #include "src/simix/SynchroComm.hpp"
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
24 static void SIMIX_mbox_free(void *data);
25 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
27 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
28 static void SIMIX_comm_copy_data(smx_synchro_t comm);
29 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
30 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
31 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
32 static void SIMIX_comm_start(smx_synchro_t synchro);
34 void SIMIX_mailbox_exit(void)
36 xbt_dict_free(&mailboxes);
39 /******************************************************************************/
40 /* Rendez-Vous Points */
41 /******************************************************************************/
43 smx_mailbox_t SIMIX_mbox_create(const char *name)
45 xbt_assert(name, "Mailboxes must have a name");
46 /* two processes may have pushed the same mbox_create simcall at the same time */
47 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
49 mbox = new simgrid::simix::Mailbox(name);
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
63 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
65 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
69 * \brief set the receiver of the rendez vous point to allow eager sends
70 * \param mbox The rendez-vous point
71 * \param process The receiving process
73 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
75 mbox->permanent_receiver = process;
79 * \brief Pushes a communication synchro into a rendez-vous point
80 * \param mbox The mailbox
81 * \param synchro The communication synchro
83 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
85 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
86 mbox->comm_queue.push_back(comm);
91 * \brief Removes a communication synchro from a rendez-vous point
92 * \param mbox The rendez-vous point
93 * \param synchro The communication synchro
95 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
97 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
100 for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
102 mbox->comm_queue. erase(it);
105 xbt_die("Cannot remove this comm that is not part of the mailbox");
109 * \brief Checks if there is a communication synchro queued in a deque matching our needs
110 * \param type The type of communication we are looking for (comm_send, comm_recv)
111 * \return The communication synchro if found, nullptr otherwise
113 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
114 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
116 void* other_user_data = nullptr;
118 for(auto it = deque->begin(); it != deque->end(); it++){
119 smx_synchro_t synchro = *it;
120 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
122 if (comm->type == SIMIX_COMM_SEND) {
123 other_user_data = comm->src_data;
124 } else if (comm->type == SIMIX_COMM_RECEIVE) {
125 other_user_data = comm->dst_data;
127 if (comm->type == type &&
128 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
129 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
130 XBT_DEBUG("Found a matching communication synchro %p", comm);
135 comm->mbox_cpy = comm->mbox;
137 comm->mbox = nullptr;
140 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
141 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
142 comm, (int)comm->type, (int)type);
144 XBT_DEBUG("No matching communication synchro found");
148 /******************************************************************************/
149 /* Communication synchros */
150 /******************************************************************************/
151 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
152 double task_size, double rate,
153 void *src_buff, size_t src_buff_size,
154 int (*match_fun)(void *, void *,smx_synchro_t),
155 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
156 void *data, double timeout){
157 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
158 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
160 SIMCALL_SET_MC_VALUE(simcall, 0);
161 simcall_HANDLER_comm_wait(simcall, comm, timeout);
163 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
164 double task_size, double rate,
165 void *src_buff, size_t src_buff_size,
166 int (*match_fun)(void *, void *,smx_synchro_t),
167 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
168 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
169 void *data, int detached)
171 XBT_DEBUG("send from %p", mbox);
173 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
174 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
176 /* Look for communication synchro matching our needs. We also provide a description of
177 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
179 * If it is not found then push our communication into the rendez-vous point */
180 smx_synchro_t other_synchro =
181 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
182 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
185 if (!other_synchro) {
186 other_synchro = this_synchro;
187 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
189 if (mbox->permanent_receiver!=nullptr){
190 //this mailbox is for small messages, which have to be sent right now
191 other_synchro->state = SIMIX_READY;
192 other_comm->dst_proc=mbox->permanent_receiver.get();
194 mbox->done_comm_queue.push_back(other_synchro);
195 other_comm->mbox=mbox;
196 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
199 SIMIX_mbox_push(mbox, this_synchro);
202 XBT_DEBUG("Receive already pushed");
203 this_synchro->unref();
205 other_comm->state = SIMIX_READY;
206 other_comm->type = SIMIX_COMM_READY;
209 xbt_fifo_push(src_proc->comms, other_synchro);
213 other_comm->detached = true;
214 other_comm->clean_fun = clean_fun;
216 other_comm->clean_fun = nullptr;
219 /* Setup the communication synchro */
220 other_comm->src_proc = src_proc;
221 other_comm->task_size = task_size;
222 other_comm->rate = rate;
223 other_comm->src_buff = src_buff;
224 other_comm->src_buff_size = src_buff_size;
225 other_comm->src_data = data;
227 other_comm->match_fun = match_fun;
228 other_comm->copy_data_fun = copy_data_fun;
231 if (MC_is_active() || MC_record_replay_is_active()) {
232 other_comm->state = SIMIX_RUNNING;
233 return (detached ? nullptr : other_comm);
236 SIMIX_comm_start(other_comm);
237 return (detached ? nullptr : other_comm);
240 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
241 void *dst_buff, size_t *dst_buff_size,
242 int (*match_fun)(void *, void *, smx_synchro_t),
243 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
244 void *data, double timeout, double rate)
246 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
247 SIMCALL_SET_MC_VALUE(simcall, 0);
248 simcall_HANDLER_comm_wait(simcall, comm, timeout);
251 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
252 void *dst_buff, size_t *dst_buff_size,
253 int (*match_fun)(void *, void *, smx_synchro_t),
254 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
255 void *data, double rate)
257 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
260 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
261 int (*match_fun)(void *, void *, smx_synchro_t),
262 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
263 void *data, double rate)
265 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
266 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
268 smx_synchro_t other_synchro;
269 //communication already done, get it inside the fifo of completed comms
270 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
272 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
273 //find a match in the already received fifo
274 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
275 //if not found, assume the receiver came first, register it to the mailbox in the classical way
276 if (!other_synchro) {
277 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
278 other_synchro = this_synchro;
279 SIMIX_mbox_push(mbox, this_synchro);
281 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
283 if(other_comm->surf_comm && other_comm->remains()==0.0) {
284 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
285 other_comm->state = SIMIX_DONE;
286 other_comm->type = SIMIX_COMM_DONE;
287 other_comm->mbox = nullptr;
290 static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
293 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
295 /* Look for communication synchro matching our needs. We also provide a description of
296 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
298 * If it is not found then push our communication into the rendez-vous point */
299 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
301 if (!other_synchro) {
302 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
303 other_synchro = this_synchro;
304 SIMIX_mbox_push(mbox, this_synchro);
306 this_synchro->unref();
307 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
309 other_comm->state = SIMIX_READY;
310 other_comm->type = SIMIX_COMM_READY;
312 xbt_fifo_push(dst_proc->comms, other_synchro);
315 /* Setup communication synchro */
316 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
317 other_comm->dst_proc = dst_proc;
318 other_comm->dst_buff = dst_buff;
319 other_comm->dst_buff_size = dst_buff_size;
320 other_comm->dst_data = data;
322 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
323 other_comm->rate = rate;
325 other_comm->match_fun = match_fun;
326 other_comm->copy_data_fun = copy_data_fun;
328 if (MC_is_active() || MC_record_replay_is_active()) {
329 other_synchro->state = SIMIX_RUNNING;
330 return other_synchro;
333 SIMIX_comm_start(other_synchro);
334 return other_synchro;
337 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
338 int type, int src, int tag,
339 int (*match_fun)(void *, void *, smx_synchro_t),
341 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
344 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
345 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
347 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
348 simgrid::simix::Comm* this_comm;
351 this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
352 smx_type = SIMIX_COMM_RECEIVE;
354 this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
355 smx_type = SIMIX_COMM_SEND;
357 smx_synchro_t other_synchro=nullptr;
358 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
359 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
360 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
361 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
364 XBT_DEBUG("check if we have more luck in the normal mailbox");
365 other_synchro = _find_matching_comm(&mbox->comm_queue,
366 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
370 other_synchro->unref();
373 return other_synchro;
376 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
378 /* Associate this simcall to the wait synchro */
379 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
381 synchro->simcalls.push_back(simcall);
382 simcall->issuer->waiting_synchro = synchro;
384 if (MC_is_active() || MC_record_replay_is_active()) {
385 int idx = SIMCALL_GET_MC_VALUE(simcall);
387 synchro->state = SIMIX_DONE;
389 /* If we reached this point, the wait simcall must have a timeout */
390 /* Otherwise it shouldn't be enabled and executed by the MC */
394 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
395 if (comm->src_proc == simcall->issuer)
396 comm->state = SIMIX_SRC_TIMEOUT;
398 comm->state = SIMIX_DST_TIMEOUT;
401 SIMIX_comm_finish(synchro);
405 /* If the synchro has already finish perform the error handling, */
406 /* otherwise set up a waiting timeout on the right side */
407 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
408 SIMIX_comm_finish(synchro);
409 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
410 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
411 sleep->setData(synchro);
413 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
414 if (simcall->issuer == comm->src_proc)
415 comm->src_timeout = sleep;
417 comm->dst_timeout = sleep;
421 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
423 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
425 if (MC_is_active() || MC_record_replay_is_active()){
426 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
427 if (simcall_comm_test__get__result(simcall)){
428 synchro->state = SIMIX_DONE;
429 synchro->simcalls.push_back(simcall);
430 SIMIX_comm_finish(synchro);
432 SIMIX_simcall_answer(simcall);
437 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
438 if (simcall_comm_test__get__result(simcall)) {
439 synchro->simcalls.push_back(simcall);
440 SIMIX_comm_finish(synchro);
442 SIMIX_simcall_answer(simcall);
446 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
449 smx_synchro_t synchro;
450 // The default result is -1 -- this means, "nothing is ready".
451 // It can be changed below, but only if something matches.
452 simcall_comm_testany__set__result(simcall, -1);
454 if (MC_is_active() || MC_record_replay_is_active()){
455 int idx = SIMCALL_GET_MC_VALUE(simcall);
457 SIMIX_simcall_answer(simcall);
459 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
460 simcall_comm_testany__set__result(simcall, idx);
461 synchro->simcalls.push_back(simcall);
462 synchro->state = SIMIX_DONE;
463 SIMIX_comm_finish(synchro);
468 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
469 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
470 simcall_comm_testany__set__result(simcall, cursor);
471 synchro->simcalls.push_back(simcall);
472 SIMIX_comm_finish(synchro);
476 SIMIX_simcall_answer(simcall);
479 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
481 smx_synchro_t synchro;
482 unsigned int cursor = 0;
484 if (MC_is_active() || MC_record_replay_is_active()){
486 xbt_die("Timeout not implemented for waitany in the model-checker");
487 int idx = SIMCALL_GET_MC_VALUE(simcall);
488 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
489 synchro->simcalls.push_back(simcall);
490 simcall_comm_waitany__set__result(simcall, idx);
491 synchro->state = SIMIX_DONE;
492 SIMIX_comm_finish(synchro);
497 simcall->timer = NULL;
499 simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
500 SIMIX_waitany_remove_simcall_from_actions(simcall);
501 simcall_comm_waitany__set__result(simcall, -1);
502 SIMIX_simcall_answer(simcall);
506 xbt_dynar_foreach(synchros, cursor, synchro){
507 /* associate this simcall to the the synchro */
508 synchro->simcalls.push_back(simcall);
510 /* see if the synchro is already finished */
511 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
512 SIMIX_comm_finish(synchro);
518 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
520 smx_synchro_t synchro;
521 unsigned int cursor = 0;
522 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
524 xbt_dynar_foreach(synchros, cursor, synchro) {
525 // Remove the first occurence of simcall:
526 auto i = boost::range::find(synchro->simcalls, simcall);
527 if (i != synchro->simcalls.end())
528 synchro->simcalls.erase(i);
533 * \brief Starts the simulation of a communication synchro.
534 * \param synchro the communication synchro
536 static inline void SIMIX_comm_start(smx_synchro_t synchro)
538 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
540 /* If both the sender and the receiver are already there, start the communication */
541 if (synchro->state == SIMIX_READY) {
543 sg_host_t sender = comm->src_proc->host;
544 sg_host_t receiver = comm->dst_proc->host;
546 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
548 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
549 comm->surf_comm->setData(synchro);
550 comm->state = SIMIX_RUNNING;
552 /* If a link is failed, detect it immediately */
553 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
554 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
555 sg_host_get_name(sender), sg_host_get_name(receiver));
556 comm->state = SIMIX_LINK_FAILURE;
560 /* If any of the process is suspend, create the synchro but stop its execution,
561 it will be restarted when the sender process resume */
562 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
563 if (SIMIX_process_is_suspended(comm->src_proc))
564 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
565 comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
567 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
568 comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
570 comm->surf_comm->suspend();
576 * \brief Answers the SIMIX simcalls associated to a communication synchro.
577 * \param synchro a finished communication synchro
579 void SIMIX_comm_finish(smx_synchro_t synchro)
581 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
582 unsigned int destroy_count = 0;
584 while (!synchro->simcalls.empty()) {
585 smx_simcall_t simcall = synchro->simcalls.front();
586 synchro->simcalls.pop_front();
588 /* If a waitany simcall is waiting for this synchro to finish, then remove
589 it from the other synchros in the waitany list. Afterwards, get the
590 position of the actual synchro in the waitany dynar and
591 return it as the result of the simcall */
593 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
594 continue; // if process handling comm is killed
595 if (simcall->call == SIMCALL_COMM_WAITANY) {
596 SIMIX_waitany_remove_simcall_from_actions(simcall);
597 if (simcall->timer) {
598 SIMIX_timer_remove(simcall->timer);
599 simcall->timer = nullptr;
601 if (!MC_is_active() && !MC_record_replay_is_active())
602 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
605 /* If the synchro is still in a rendez-vous point then remove from it */
607 SIMIX_mbox_remove(comm->mbox, synchro);
609 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
611 /* Check out for errors */
613 if (simcall->issuer->host->isOff()) {
614 simcall->issuer->context->iwannadie = 1;
615 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
617 switch (synchro->state) {
620 XBT_DEBUG("Communication %p complete!", synchro);
621 SIMIX_comm_copy_data(synchro);
624 case SIMIX_SRC_TIMEOUT:
625 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
628 case SIMIX_DST_TIMEOUT:
629 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
632 case SIMIX_SRC_HOST_FAILURE:
633 if (simcall->issuer == comm->src_proc)
634 simcall->issuer->context->iwannadie = 1;
635 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
637 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
640 case SIMIX_DST_HOST_FAILURE:
641 if (simcall->issuer == comm->dst_proc)
642 simcall->issuer->context->iwannadie = 1;
643 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
645 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
648 case SIMIX_LINK_FAILURE:
650 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
652 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
653 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
654 simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
655 if (comm->src_proc == simcall->issuer) {
656 XBT_DEBUG("I'm source");
657 } else if (comm->dst_proc == simcall->issuer) {
658 XBT_DEBUG("I'm dest");
660 XBT_DEBUG("I'm neither source nor dest");
662 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
666 if (simcall->issuer == comm->dst_proc)
667 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
669 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
673 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
677 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
678 if (simcall->issuer->exception) {
679 // In order to modify the exception we have to rethrow it:
681 std::rethrow_exception(simcall->issuer->exception);
684 if (simcall->call == SIMCALL_COMM_WAITANY) {
685 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
687 else if (simcall->call == SIMCALL_COMM_TESTANY) {
688 e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
690 simcall->issuer->exception = std::make_exception_ptr(e);
697 if (simcall->issuer->host->isOff()) {
698 simcall->issuer->context->iwannadie = 1;
701 simcall->issuer->waiting_synchro = nullptr;
702 xbt_fifo_remove(simcall->issuer->comms, synchro);
704 if(simcall->issuer == comm->src_proc){
706 xbt_fifo_remove(comm->dst_proc->comms, synchro);
708 if(simcall->issuer == comm->dst_proc){
710 xbt_fifo_remove(comm->src_proc->comms, synchro);
711 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
715 SIMIX_simcall_answer(simcall);
719 while (destroy_count-- > 0)
720 static_cast<simgrid::simix::Comm*>(synchro)->unref();
723 /******************************************************************************/
724 /* SIMIX_comm_copy_data callbacks */
725 /******************************************************************************/
726 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
728 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
730 SIMIX_comm_copy_data_callback = callback;
733 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
735 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
737 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
738 *(void **) (comm->dst_buff) = buff;
741 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
743 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
745 XBT_DEBUG("Copy the data over");
746 memcpy(comm->dst_buff, buff, buff_size);
747 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
749 comm->src_buff = nullptr;
755 * \brief Copy the communication data from the sender's buffer to the receiver's one
756 * \param comm The communication
758 void SIMIX_comm_copy_data(smx_synchro_t synchro)
760 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
762 size_t buff_size = comm->src_buff_size;
763 /* If there is no data to copy then return */
764 if (!comm->src_buff || !comm->dst_buff || comm->copied)
767 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
769 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
771 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
772 comm->dst_buff, buff_size);
774 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
775 if (comm->dst_buff_size)
776 buff_size = MIN(buff_size, *(comm->dst_buff_size));
778 /* Update the receiver's buffer size to the copied amount */
779 if (comm->dst_buff_size)
780 *comm->dst_buff_size = buff_size;
783 if(comm->copy_data_fun)
784 comm->copy_data_fun (comm, comm->src_buff, buff_size);
786 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
790 /* Set the copied flag so we copy data only once */
791 /* (this function might be called from both communication ends) */