1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <boost/range/algorithm.hpp>
10 #include "src/surf/surf_interface.hpp"
11 #include "src/simix/smx_private.h"
14 #include "src/mc/mc_replay.h"
16 #include "simgrid/s4u/mailbox.hpp"
18 #include "src/simix/SynchroComm.hpp"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
22 static void SIMIX_mbox_free(void *data);
23 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
25 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
26 static void SIMIX_comm_copy_data(smx_synchro_t comm);
27 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
28 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
29 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
32 void SIMIX_mailbox_exit(void)
34 xbt_dict_free(&mailboxes);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
43 xbt_assert(name, "Mailboxes must have a name");
44 /* two processes may have pushed the same mbox_create simcall at the same time */
45 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
48 mbox = new s_smx_mailbox_t();
49 mbox->name = xbt_strdup(name);
50 mbox->comm_queue = new std::deque<smx_synchro_t>();
51 mbox->done_comm_queue = nullptr; // Allocated on need only
52 mbox->permanent_receiver=nullptr;
54 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
60 void SIMIX_mbox_free(void *data)
62 XBT_DEBUG("mbox free %p", data);
63 smx_mailbox_t mbox = (smx_mailbox_t) data;
65 delete mbox->comm_queue;
66 delete mbox->done_comm_queue;
70 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
72 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
76 * \brief set the receiver of the rendez vous point to allow eager sends
77 * \param mbox The rendez-vous point
78 * \param process The receiving process
80 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
82 mbox->permanent_receiver=process;
83 if (mbox->done_comm_queue == nullptr)
84 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
88 * \brief Pushes a communication synchro into a rendez-vous point
89 * \param mbox The mailbox
90 * \param synchro The communication synchro
92 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
94 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
96 mbox->comm_queue->push_back(comm);
101 * \brief Removes a communication synchro from a rendez-vous point
102 * \param mbox The rendez-vous point
103 * \param synchro The communication synchro
105 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
107 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
109 comm->mbox = nullptr;
110 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
112 mbox->comm_queue->erase(it);
115 xbt_die("Cannot remove this comm that is not part of the mailbox");
119 * \brief Checks if there is a communication synchro queued in a deque matching our needs
120 * \param type The type of communication we are looking for (comm_send, comm_recv)
121 * \return The communication synchro if found, nullptr otherwise
123 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
124 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
126 void* other_user_data = nullptr;
128 for(auto it = deque->begin(); it != deque->end(); it++){
129 smx_synchro_t synchro = *it;
130 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
132 if (comm->type == SIMIX_COMM_SEND) {
133 other_user_data = comm->src_data;
134 } else if (comm->type == SIMIX_COMM_RECEIVE) {
135 other_user_data = comm->dst_data;
137 if (comm->type == type &&
138 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
139 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
140 XBT_DEBUG("Found a matching communication synchro %p", comm);
145 comm->mbox_cpy = comm->mbox;
147 comm->mbox = nullptr;
150 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
151 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
152 comm, (int)comm->type, (int)type);
154 XBT_DEBUG("No matching communication synchro found");
158 /******************************************************************************/
159 /* Communication synchros */
160 /******************************************************************************/
161 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
162 double task_size, double rate,
163 void *src_buff, size_t src_buff_size,
164 int (*match_fun)(void *, void *,smx_synchro_t),
165 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
166 void *data, double timeout){
167 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
168 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
170 SIMCALL_SET_MC_VALUE(simcall, 0);
171 simcall_HANDLER_comm_wait(simcall, comm, timeout);
173 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
174 double task_size, double rate,
175 void *src_buff, size_t src_buff_size,
176 int (*match_fun)(void *, void *,smx_synchro_t),
177 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
178 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
179 void *data, int detached)
181 XBT_DEBUG("send from %p", mbox);
183 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
184 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
186 /* Look for communication synchro matching our needs. We also provide a description of
187 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
189 * If it is not found then push our communication into the rendez-vous point */
190 smx_synchro_t other_synchro =
191 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
192 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
195 if (!other_synchro) {
196 other_synchro = this_synchro;
197 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
199 if (mbox->permanent_receiver!=nullptr){
200 //this mailbox is for small messages, which have to be sent right now
201 other_synchro->state = SIMIX_READY;
202 other_comm->dst_proc=mbox->permanent_receiver.get();
204 mbox->done_comm_queue->push_back(other_synchro);
205 other_comm->mbox=mbox;
206 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
209 SIMIX_mbox_push(mbox, this_synchro);
212 XBT_DEBUG("Receive already pushed");
213 this_synchro->unref();
215 other_comm->state = SIMIX_READY;
216 other_comm->type = SIMIX_COMM_READY;
219 xbt_fifo_push(src_proc->comms, other_synchro);
223 other_comm->detached = true;
224 other_comm->clean_fun = clean_fun;
226 other_comm->clean_fun = nullptr;
229 /* Setup the communication synchro */
230 other_comm->src_proc = src_proc;
231 other_comm->task_size = task_size;
232 other_comm->rate = rate;
233 other_comm->src_buff = src_buff;
234 other_comm->src_buff_size = src_buff_size;
235 other_comm->src_data = data;
237 other_comm->match_fun = match_fun;
238 other_comm->copy_data_fun = copy_data_fun;
241 if (MC_is_active() || MC_record_replay_is_active()) {
242 other_comm->state = SIMIX_RUNNING;
243 return (detached ? nullptr : other_comm);
246 SIMIX_comm_start(other_comm);
247 return (detached ? nullptr : other_comm);
250 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
251 void *dst_buff, size_t *dst_buff_size,
252 int (*match_fun)(void *, void *, smx_synchro_t),
253 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
254 void *data, double timeout, double rate)
256 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
257 SIMCALL_SET_MC_VALUE(simcall, 0);
258 simcall_HANDLER_comm_wait(simcall, comm, timeout);
261 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
262 void *dst_buff, size_t *dst_buff_size,
263 int (*match_fun)(void *, void *, smx_synchro_t),
264 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
265 void *data, double rate)
267 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
270 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
271 int (*match_fun)(void *, void *, smx_synchro_t),
272 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
273 void *data, double rate)
275 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
276 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
278 smx_synchro_t other_synchro;
279 //communication already done, get it inside the fifo of completed comms
280 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
282 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
283 //find a match in the already received fifo
284 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
285 //if not found, assume the receiver came first, register it to the mailbox in the classical way
286 if (!other_synchro) {
287 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
288 other_synchro = this_synchro;
289 SIMIX_mbox_push(mbox, this_synchro);
291 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
293 if(other_comm->surf_comm && other_comm->remains()==0.0) {
294 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
295 other_comm->state = SIMIX_DONE;
296 other_comm->type = SIMIX_COMM_DONE;
297 other_comm->mbox = nullptr;
300 static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
303 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
305 /* Look for communication synchro matching our needs. We also provide a description of
306 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
308 * If it is not found then push our communication into the rendez-vous point */
309 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
311 if (!other_synchro) {
312 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
313 other_synchro = this_synchro;
314 SIMIX_mbox_push(mbox, this_synchro);
316 this_synchro->unref();
317 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
319 other_comm->state = SIMIX_READY;
320 other_comm->type = SIMIX_COMM_READY;
322 xbt_fifo_push(dst_proc->comms, other_synchro);
325 /* Setup communication synchro */
326 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
327 other_comm->dst_proc = dst_proc;
328 other_comm->dst_buff = dst_buff;
329 other_comm->dst_buff_size = dst_buff_size;
330 other_comm->dst_data = data;
332 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
333 other_comm->rate = rate;
335 other_comm->match_fun = match_fun;
336 other_comm->copy_data_fun = copy_data_fun;
338 if (MC_is_active() || MC_record_replay_is_active()) {
339 other_synchro->state = SIMIX_RUNNING;
340 return other_synchro;
343 SIMIX_comm_start(other_synchro);
344 return other_synchro;
347 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
348 int type, int src, int tag,
349 int (*match_fun)(void *, void *, smx_synchro_t),
351 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
354 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
355 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
357 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
358 simgrid::simix::Comm* this_comm;
361 this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
362 smx_type = SIMIX_COMM_RECEIVE;
364 this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
365 smx_type = SIMIX_COMM_SEND;
367 smx_synchro_t other_synchro=nullptr;
368 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
369 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
371 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
374 XBT_DEBUG("check if we have more luck in the normal mailbox");
375 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
379 other_synchro->unref();
382 return other_synchro;
385 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
387 /* Associate this simcall to the wait synchro */
388 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
390 synchro->simcalls.push_back(simcall);
391 simcall->issuer->waiting_synchro = synchro;
393 if (MC_is_active() || MC_record_replay_is_active()) {
394 int idx = SIMCALL_GET_MC_VALUE(simcall);
396 synchro->state = SIMIX_DONE;
398 /* If we reached this point, the wait simcall must have a timeout */
399 /* Otherwise it shouldn't be enabled and executed by the MC */
403 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
404 if (comm->src_proc == simcall->issuer)
405 comm->state = SIMIX_SRC_TIMEOUT;
407 comm->state = SIMIX_DST_TIMEOUT;
410 SIMIX_comm_finish(synchro);
414 /* If the synchro has already finish perform the error handling, */
415 /* otherwise set up a waiting timeout on the right side */
416 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
417 SIMIX_comm_finish(synchro);
418 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
419 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
420 sleep->setData(synchro);
422 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
423 if (simcall->issuer == comm->src_proc)
424 comm->src_timeout = sleep;
426 comm->dst_timeout = sleep;
430 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
432 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
434 if (MC_is_active() || MC_record_replay_is_active()){
435 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
436 if (simcall_comm_test__get__result(simcall)){
437 synchro->state = SIMIX_DONE;
438 synchro->simcalls.push_back(simcall);
439 SIMIX_comm_finish(synchro);
441 SIMIX_simcall_answer(simcall);
446 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
447 if (simcall_comm_test__get__result(simcall)) {
448 synchro->simcalls.push_back(simcall);
449 SIMIX_comm_finish(synchro);
451 SIMIX_simcall_answer(simcall);
455 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
458 smx_synchro_t synchro;
459 // The default result is -1 -- this means, "nothing is ready".
460 // It can be changed below, but only if something matches.
461 simcall_comm_testany__set__result(simcall, -1);
463 if (MC_is_active() || MC_record_replay_is_active()){
464 int idx = SIMCALL_GET_MC_VALUE(simcall);
466 SIMIX_simcall_answer(simcall);
468 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
469 simcall_comm_testany__set__result(simcall, idx);
470 synchro->simcalls.push_back(simcall);
471 synchro->state = SIMIX_DONE;
472 SIMIX_comm_finish(synchro);
477 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
478 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
479 simcall_comm_testany__set__result(simcall, cursor);
480 synchro->simcalls.push_back(simcall);
481 SIMIX_comm_finish(synchro);
485 SIMIX_simcall_answer(simcall);
488 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
490 smx_synchro_t synchro;
491 unsigned int cursor = 0;
493 if (MC_is_active() || MC_record_replay_is_active()){
494 int idx = SIMCALL_GET_MC_VALUE(simcall);
495 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
496 synchro->simcalls.push_back(simcall);
497 simcall_comm_waitany__set__result(simcall, idx);
498 synchro->state = SIMIX_DONE;
499 SIMIX_comm_finish(synchro);
503 xbt_dynar_foreach(synchros, cursor, synchro){
504 /* associate this simcall to the the synchro */
505 synchro->simcalls.push_back(simcall);
507 /* see if the synchro is already finished */
508 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
509 SIMIX_comm_finish(synchro);
515 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
517 smx_synchro_t synchro;
518 unsigned int cursor = 0;
519 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
521 xbt_dynar_foreach(synchros, cursor, synchro) {
522 // Remove the first occurence of simcall:
523 auto i = boost::range::find(synchro->simcalls, simcall);
524 if (i != synchro->simcalls.end())
525 synchro->simcalls.erase(i);
530 * \brief Starts the simulation of a communication synchro.
531 * \param synchro the communication synchro
533 static inline void SIMIX_comm_start(smx_synchro_t synchro)
535 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
537 /* If both the sender and the receiver are already there, start the communication */
538 if (synchro->state == SIMIX_READY) {
540 sg_host_t sender = comm->src_proc->host;
541 sg_host_t receiver = comm->dst_proc->host;
543 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
545 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
546 comm->surf_comm->setData(synchro);
547 comm->state = SIMIX_RUNNING;
549 /* If a link is failed, detect it immediately */
550 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
551 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
552 sg_host_get_name(sender), sg_host_get_name(receiver));
553 comm->state = SIMIX_LINK_FAILURE;
557 /* If any of the process is suspend, create the synchro but stop its execution,
558 it will be restarted when the sender process resume */
559 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
560 if (SIMIX_process_is_suspended(comm->src_proc))
561 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
562 comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
564 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
565 comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
567 comm->surf_comm->suspend();
573 * \brief Answers the SIMIX simcalls associated to a communication synchro.
574 * \param synchro a finished communication synchro
576 void SIMIX_comm_finish(smx_synchro_t synchro)
578 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
579 unsigned int destroy_count = 0;
581 while (!synchro->simcalls.empty()) {
582 smx_simcall_t simcall = synchro->simcalls.front();
583 synchro->simcalls.pop_front();
585 /* If a waitany simcall is waiting for this synchro to finish, then remove
586 it from the other synchros in the waitany list. Afterwards, get the
587 position of the actual synchro in the waitany dynar and
588 return it as the result of the simcall */
590 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
591 continue; // if process handling comm is killed
592 if (simcall->call == SIMCALL_COMM_WAITANY) {
593 SIMIX_waitany_remove_simcall_from_actions(simcall);
594 if (!MC_is_active() && !MC_record_replay_is_active())
595 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
598 /* If the synchro is still in a rendez-vous point then remove from it */
600 SIMIX_mbox_remove(comm->mbox, synchro);
602 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
604 /* Check out for errors */
606 if (simcall->issuer->host->isOff()) {
607 simcall->issuer->context->iwannadie = 1;
608 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
610 switch (synchro->state) {
613 XBT_DEBUG("Communication %p complete!", synchro);
614 SIMIX_comm_copy_data(synchro);
617 case SIMIX_SRC_TIMEOUT:
618 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
621 case SIMIX_DST_TIMEOUT:
622 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
625 case SIMIX_SRC_HOST_FAILURE:
626 if (simcall->issuer == comm->src_proc)
627 simcall->issuer->context->iwannadie = 1;
628 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
630 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
633 case SIMIX_DST_HOST_FAILURE:
634 if (simcall->issuer == comm->dst_proc)
635 simcall->issuer->context->iwannadie = 1;
636 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
638 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
641 case SIMIX_LINK_FAILURE:
643 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
645 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
646 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
647 simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
648 if (comm->src_proc == simcall->issuer) {
649 XBT_DEBUG("I'm source");
650 } else if (comm->dst_proc == simcall->issuer) {
651 XBT_DEBUG("I'm dest");
653 XBT_DEBUG("I'm neither source nor dest");
655 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
659 if (simcall->issuer == comm->dst_proc)
660 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
662 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
666 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
670 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
671 if (simcall->issuer->exception) {
672 // In order to modify the exception we have to rethrow it:
674 std::rethrow_exception(simcall->issuer->exception);
677 if (simcall->call == SIMCALL_COMM_WAITANY) {
678 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
680 else if (simcall->call == SIMCALL_COMM_TESTANY) {
681 e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
683 simcall->issuer->exception = std::make_exception_ptr(e);
690 if (simcall->issuer->host->isOff()) {
691 simcall->issuer->context->iwannadie = 1;
694 simcall->issuer->waiting_synchro = nullptr;
695 xbt_fifo_remove(simcall->issuer->comms, synchro);
697 if(simcall->issuer == comm->src_proc){
699 xbt_fifo_remove(comm->dst_proc->comms, synchro);
701 if(simcall->issuer == comm->dst_proc){
703 xbt_fifo_remove(comm->src_proc->comms, synchro);
704 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
708 SIMIX_simcall_answer(simcall);
712 while (destroy_count-- > 0)
713 static_cast<simgrid::simix::Comm*>(synchro)->unref();
716 /******************************************************************************/
717 /* SIMIX_comm_copy_data callbacks */
718 /******************************************************************************/
719 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
721 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
723 SIMIX_comm_copy_data_callback = callback;
726 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
728 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
730 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
731 *(void **) (comm->dst_buff) = buff;
734 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
736 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
738 XBT_DEBUG("Copy the data over");
739 memcpy(comm->dst_buff, buff, buff_size);
740 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
742 comm->src_buff = nullptr;
748 * \brief Copy the communication data from the sender's buffer to the receiver's one
749 * \param comm The communication
751 void SIMIX_comm_copy_data(smx_synchro_t synchro)
753 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
755 size_t buff_size = comm->src_buff_size;
756 /* If there is no data to copy then return */
757 if (!comm->src_buff || !comm->dst_buff || comm->copied)
760 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
762 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
764 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
765 comm->dst_buff, buff_size);
767 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
768 if (comm->dst_buff_size)
769 buff_size = MIN(buff_size, *(comm->dst_buff_size));
771 /* Update the receiver's buffer size to the copied amount */
772 if (comm->dst_buff_size)
773 *comm->dst_buff_size = buff_size;
776 if(comm->copy_data_fun)
777 comm->copy_data_fun (comm, comm->src_buff, buff_size);
779 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
783 /* Set the copied flag so we copy data only once */
784 /* (this function might be called from both communication ends) */