1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include <boost/range/algorithm.hpp>
10 #include <simgrid/s4u/host.hpp>
12 #include "src/surf/surf_interface.hpp"
13 #include "src/simix/smx_private.h"
16 #include "src/mc/mc_replay.h"
18 #include "simgrid/s4u/mailbox.hpp"
20 #include "src/simix/SynchroComm.hpp"
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
24 static void SIMIX_mbox_free(void *data);
25 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
27 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
28 static void SIMIX_comm_copy_data(smx_synchro_t comm);
29 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
30 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
31 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
32 static void SIMIX_comm_start(smx_synchro_t synchro);
34 void SIMIX_mailbox_exit(void)
36 xbt_dict_free(&mailboxes);
39 /******************************************************************************/
40 /* Rendez-Vous Points */
41 /******************************************************************************/
43 smx_mailbox_t SIMIX_mbox_create(const char *name)
45 xbt_assert(name, "Mailboxes must have a name");
46 /* two processes may have pushed the same mbox_create simcall at the same time */
47 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
50 mbox = new s_smx_mailbox_t();
51 mbox->name = xbt_strdup(name);
52 mbox->comm_queue = new std::deque<smx_synchro_t>();
53 mbox->done_comm_queue = nullptr; // Allocated on need only
54 mbox->permanent_receiver=nullptr;
56 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
57 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
62 void SIMIX_mbox_free(void *data)
64 XBT_DEBUG("mbox free %p", data);
65 smx_mailbox_t mbox = (smx_mailbox_t) data;
67 delete mbox->comm_queue;
68 delete mbox->done_comm_queue;
72 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
74 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
78 * \brief set the receiver of the rendez vous point to allow eager sends
79 * \param mbox The rendez-vous point
80 * \param process The receiving process
82 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
84 mbox->permanent_receiver=process;
85 if (mbox->done_comm_queue == nullptr)
86 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
90 * \brief Pushes a communication synchro into a rendez-vous point
91 * \param mbox The mailbox
92 * \param synchro The communication synchro
94 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
96 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
98 mbox->comm_queue->push_back(comm);
103 * \brief Removes a communication synchro from a rendez-vous point
104 * \param mbox The rendez-vous point
105 * \param synchro The communication synchro
107 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
109 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
111 comm->mbox = nullptr;
112 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
114 mbox->comm_queue->erase(it);
117 xbt_die("Cannot remove this comm that is not part of the mailbox");
121 * \brief Checks if there is a communication synchro queued in a deque matching our needs
122 * \param type The type of communication we are looking for (comm_send, comm_recv)
123 * \return The communication synchro if found, nullptr otherwise
125 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
126 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
128 void* other_user_data = nullptr;
130 for(auto it = deque->begin(); it != deque->end(); it++){
131 smx_synchro_t synchro = *it;
132 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
134 if (comm->type == SIMIX_COMM_SEND) {
135 other_user_data = comm->src_data;
136 } else if (comm->type == SIMIX_COMM_RECEIVE) {
137 other_user_data = comm->dst_data;
139 if (comm->type == type &&
140 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
141 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
142 XBT_DEBUG("Found a matching communication synchro %p", comm);
147 comm->mbox_cpy = comm->mbox;
149 comm->mbox = nullptr;
152 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
153 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
154 comm, (int)comm->type, (int)type);
156 XBT_DEBUG("No matching communication synchro found");
160 /******************************************************************************/
161 /* Communication synchros */
162 /******************************************************************************/
163 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
164 double task_size, double rate,
165 void *src_buff, size_t src_buff_size,
166 int (*match_fun)(void *, void *,smx_synchro_t),
167 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
168 void *data, double timeout){
169 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
170 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
172 SIMCALL_SET_MC_VALUE(simcall, 0);
173 simcall_HANDLER_comm_wait(simcall, comm, timeout);
175 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
176 double task_size, double rate,
177 void *src_buff, size_t src_buff_size,
178 int (*match_fun)(void *, void *,smx_synchro_t),
179 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
180 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
181 void *data, int detached)
183 XBT_DEBUG("send from %p", mbox);
185 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
186 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
188 /* Look for communication synchro matching our needs. We also provide a description of
189 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
191 * If it is not found then push our communication into the rendez-vous point */
192 smx_synchro_t other_synchro =
193 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
194 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
197 if (!other_synchro) {
198 other_synchro = this_synchro;
199 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
201 if (mbox->permanent_receiver!=nullptr){
202 //this mailbox is for small messages, which have to be sent right now
203 other_synchro->state = SIMIX_READY;
204 other_comm->dst_proc=mbox->permanent_receiver.get();
206 mbox->done_comm_queue->push_back(other_synchro);
207 other_comm->mbox=mbox;
208 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
211 SIMIX_mbox_push(mbox, this_synchro);
214 XBT_DEBUG("Receive already pushed");
215 this_synchro->unref();
217 other_comm->state = SIMIX_READY;
218 other_comm->type = SIMIX_COMM_READY;
221 xbt_fifo_push(src_proc->comms, other_synchro);
225 other_comm->detached = true;
226 other_comm->clean_fun = clean_fun;
228 other_comm->clean_fun = nullptr;
231 /* Setup the communication synchro */
232 other_comm->src_proc = src_proc;
233 other_comm->task_size = task_size;
234 other_comm->rate = rate;
235 other_comm->src_buff = src_buff;
236 other_comm->src_buff_size = src_buff_size;
237 other_comm->src_data = data;
239 other_comm->match_fun = match_fun;
240 other_comm->copy_data_fun = copy_data_fun;
243 if (MC_is_active() || MC_record_replay_is_active()) {
244 other_comm->state = SIMIX_RUNNING;
245 return (detached ? nullptr : other_comm);
248 SIMIX_comm_start(other_comm);
249 return (detached ? nullptr : other_comm);
252 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
253 void *dst_buff, size_t *dst_buff_size,
254 int (*match_fun)(void *, void *, smx_synchro_t),
255 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256 void *data, double timeout, double rate)
258 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
259 SIMCALL_SET_MC_VALUE(simcall, 0);
260 simcall_HANDLER_comm_wait(simcall, comm, timeout);
263 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
264 void *dst_buff, size_t *dst_buff_size,
265 int (*match_fun)(void *, void *, smx_synchro_t),
266 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
267 void *data, double rate)
269 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
272 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
273 int (*match_fun)(void *, void *, smx_synchro_t),
274 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
275 void *data, double rate)
277 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
278 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
280 smx_synchro_t other_synchro;
281 //communication already done, get it inside the fifo of completed comms
282 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
284 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
285 //find a match in the already received fifo
286 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
287 //if not found, assume the receiver came first, register it to the mailbox in the classical way
288 if (!other_synchro) {
289 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
290 other_synchro = this_synchro;
291 SIMIX_mbox_push(mbox, this_synchro);
293 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
295 if(other_comm->surf_comm && other_comm->remains()==0.0) {
296 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
297 other_comm->state = SIMIX_DONE;
298 other_comm->type = SIMIX_COMM_DONE;
299 other_comm->mbox = nullptr;
302 static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
305 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
307 /* Look for communication synchro matching our needs. We also provide a description of
308 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
310 * If it is not found then push our communication into the rendez-vous point */
311 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
313 if (!other_synchro) {
314 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
315 other_synchro = this_synchro;
316 SIMIX_mbox_push(mbox, this_synchro);
318 this_synchro->unref();
319 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
321 other_comm->state = SIMIX_READY;
322 other_comm->type = SIMIX_COMM_READY;
324 xbt_fifo_push(dst_proc->comms, other_synchro);
327 /* Setup communication synchro */
328 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
329 other_comm->dst_proc = dst_proc;
330 other_comm->dst_buff = dst_buff;
331 other_comm->dst_buff_size = dst_buff_size;
332 other_comm->dst_data = data;
334 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
335 other_comm->rate = rate;
337 other_comm->match_fun = match_fun;
338 other_comm->copy_data_fun = copy_data_fun;
340 if (MC_is_active() || MC_record_replay_is_active()) {
341 other_synchro->state = SIMIX_RUNNING;
342 return other_synchro;
345 SIMIX_comm_start(other_synchro);
346 return other_synchro;
349 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
350 int type, int src, int tag,
351 int (*match_fun)(void *, void *, smx_synchro_t),
353 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
356 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
357 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
359 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
360 simgrid::simix::Comm* this_comm;
363 this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
364 smx_type = SIMIX_COMM_RECEIVE;
366 this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
367 smx_type = SIMIX_COMM_SEND;
369 smx_synchro_t other_synchro=nullptr;
370 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
371 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
373 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
376 XBT_DEBUG("check if we have more luck in the normal mailbox");
377 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
381 other_synchro->unref();
384 return other_synchro;
387 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
389 /* Associate this simcall to the wait synchro */
390 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
392 synchro->simcalls.push_back(simcall);
393 simcall->issuer->waiting_synchro = synchro;
395 if (MC_is_active() || MC_record_replay_is_active()) {
396 int idx = SIMCALL_GET_MC_VALUE(simcall);
398 synchro->state = SIMIX_DONE;
400 /* If we reached this point, the wait simcall must have a timeout */
401 /* Otherwise it shouldn't be enabled and executed by the MC */
405 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
406 if (comm->src_proc == simcall->issuer)
407 comm->state = SIMIX_SRC_TIMEOUT;
409 comm->state = SIMIX_DST_TIMEOUT;
412 SIMIX_comm_finish(synchro);
416 /* If the synchro has already finish perform the error handling, */
417 /* otherwise set up a waiting timeout on the right side */
418 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
419 SIMIX_comm_finish(synchro);
420 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
421 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
422 sleep->setData(synchro);
424 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
425 if (simcall->issuer == comm->src_proc)
426 comm->src_timeout = sleep;
428 comm->dst_timeout = sleep;
432 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
434 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
436 if (MC_is_active() || MC_record_replay_is_active()){
437 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
438 if (simcall_comm_test__get__result(simcall)){
439 synchro->state = SIMIX_DONE;
440 synchro->simcalls.push_back(simcall);
441 SIMIX_comm_finish(synchro);
443 SIMIX_simcall_answer(simcall);
448 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
449 if (simcall_comm_test__get__result(simcall)) {
450 synchro->simcalls.push_back(simcall);
451 SIMIX_comm_finish(synchro);
453 SIMIX_simcall_answer(simcall);
457 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
460 smx_synchro_t synchro;
461 // The default result is -1 -- this means, "nothing is ready".
462 // It can be changed below, but only if something matches.
463 simcall_comm_testany__set__result(simcall, -1);
465 if (MC_is_active() || MC_record_replay_is_active()){
466 int idx = SIMCALL_GET_MC_VALUE(simcall);
468 SIMIX_simcall_answer(simcall);
470 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
471 simcall_comm_testany__set__result(simcall, idx);
472 synchro->simcalls.push_back(simcall);
473 synchro->state = SIMIX_DONE;
474 SIMIX_comm_finish(synchro);
479 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
480 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
481 simcall_comm_testany__set__result(simcall, cursor);
482 synchro->simcalls.push_back(simcall);
483 SIMIX_comm_finish(synchro);
487 SIMIX_simcall_answer(simcall);
490 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
492 smx_synchro_t synchro;
493 unsigned int cursor = 0;
495 if (MC_is_active() || MC_record_replay_is_active()){
496 int idx = SIMCALL_GET_MC_VALUE(simcall);
497 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
498 synchro->simcalls.push_back(simcall);
499 simcall_comm_waitany__set__result(simcall, idx);
500 synchro->state = SIMIX_DONE;
501 SIMIX_comm_finish(synchro);
505 xbt_dynar_foreach(synchros, cursor, synchro){
506 /* associate this simcall to the the synchro */
507 synchro->simcalls.push_back(simcall);
509 /* see if the synchro is already finished */
510 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
511 SIMIX_comm_finish(synchro);
517 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
519 smx_synchro_t synchro;
520 unsigned int cursor = 0;
521 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
523 xbt_dynar_foreach(synchros, cursor, synchro) {
524 // Remove the first occurence of simcall:
525 auto i = boost::range::find(synchro->simcalls, simcall);
526 if (i != synchro->simcalls.end())
527 synchro->simcalls.erase(i);
532 * \brief Starts the simulation of a communication synchro.
533 * \param synchro the communication synchro
535 static inline void SIMIX_comm_start(smx_synchro_t synchro)
537 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
539 /* If both the sender and the receiver are already there, start the communication */
540 if (synchro->state == SIMIX_READY) {
542 sg_host_t sender = comm->src_proc->host;
543 sg_host_t receiver = comm->dst_proc->host;
545 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
547 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
548 comm->surf_comm->setData(synchro);
549 comm->state = SIMIX_RUNNING;
551 /* If a link is failed, detect it immediately */
552 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
553 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
554 sg_host_get_name(sender), sg_host_get_name(receiver));
555 comm->state = SIMIX_LINK_FAILURE;
559 /* If any of the process is suspend, create the synchro but stop its execution,
560 it will be restarted when the sender process resume */
561 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
562 if (SIMIX_process_is_suspended(comm->src_proc))
563 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
564 comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
566 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
567 comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
569 comm->surf_comm->suspend();
575 * \brief Answers the SIMIX simcalls associated to a communication synchro.
576 * \param synchro a finished communication synchro
578 void SIMIX_comm_finish(smx_synchro_t synchro)
580 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
581 unsigned int destroy_count = 0;
583 while (!synchro->simcalls.empty()) {
584 smx_simcall_t simcall = synchro->simcalls.front();
585 synchro->simcalls.pop_front();
587 /* If a waitany simcall is waiting for this synchro to finish, then remove
588 it from the other synchros in the waitany list. Afterwards, get the
589 position of the actual synchro in the waitany dynar and
590 return it as the result of the simcall */
592 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
593 continue; // if process handling comm is killed
594 if (simcall->call == SIMCALL_COMM_WAITANY) {
595 SIMIX_waitany_remove_simcall_from_actions(simcall);
596 if (!MC_is_active() && !MC_record_replay_is_active())
597 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
600 /* If the synchro is still in a rendez-vous point then remove from it */
602 SIMIX_mbox_remove(comm->mbox, synchro);
604 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
606 /* Check out for errors */
608 if (simcall->issuer->host->isOff()) {
609 simcall->issuer->context->iwannadie = 1;
610 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
612 switch (synchro->state) {
615 XBT_DEBUG("Communication %p complete!", synchro);
616 SIMIX_comm_copy_data(synchro);
619 case SIMIX_SRC_TIMEOUT:
620 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
623 case SIMIX_DST_TIMEOUT:
624 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
627 case SIMIX_SRC_HOST_FAILURE:
628 if (simcall->issuer == comm->src_proc)
629 simcall->issuer->context->iwannadie = 1;
630 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
632 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
635 case SIMIX_DST_HOST_FAILURE:
636 if (simcall->issuer == comm->dst_proc)
637 simcall->issuer->context->iwannadie = 1;
638 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
640 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
643 case SIMIX_LINK_FAILURE:
645 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
647 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
648 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
649 simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
650 if (comm->src_proc == simcall->issuer) {
651 XBT_DEBUG("I'm source");
652 } else if (comm->dst_proc == simcall->issuer) {
653 XBT_DEBUG("I'm dest");
655 XBT_DEBUG("I'm neither source nor dest");
657 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
661 if (simcall->issuer == comm->dst_proc)
662 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
664 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
668 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
672 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
673 if (simcall->issuer->exception) {
674 // In order to modify the exception we have to rethrow it:
676 std::rethrow_exception(simcall->issuer->exception);
679 if (simcall->call == SIMCALL_COMM_WAITANY) {
680 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
682 else if (simcall->call == SIMCALL_COMM_TESTANY) {
683 e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
685 simcall->issuer->exception = std::make_exception_ptr(e);
692 if (simcall->issuer->host->isOff()) {
693 simcall->issuer->context->iwannadie = 1;
696 simcall->issuer->waiting_synchro = nullptr;
697 xbt_fifo_remove(simcall->issuer->comms, synchro);
699 if(simcall->issuer == comm->src_proc){
701 xbt_fifo_remove(comm->dst_proc->comms, synchro);
703 if(simcall->issuer == comm->dst_proc){
705 xbt_fifo_remove(comm->src_proc->comms, synchro);
706 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
710 SIMIX_simcall_answer(simcall);
714 while (destroy_count-- > 0)
715 static_cast<simgrid::simix::Comm*>(synchro)->unref();
718 /******************************************************************************/
719 /* SIMIX_comm_copy_data callbacks */
720 /******************************************************************************/
721 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
723 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
725 SIMIX_comm_copy_data_callback = callback;
728 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
730 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
732 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
733 *(void **) (comm->dst_buff) = buff;
736 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
738 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
740 XBT_DEBUG("Copy the data over");
741 memcpy(comm->dst_buff, buff, buff_size);
742 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
744 comm->src_buff = nullptr;
750 * \brief Copy the communication data from the sender's buffer to the receiver's one
751 * \param comm The communication
753 void SIMIX_comm_copy_data(smx_synchro_t synchro)
755 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
757 size_t buff_size = comm->src_buff_size;
758 /* If there is no data to copy then return */
759 if (!comm->src_buff || !comm->dst_buff || comm->copied)
762 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
764 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
766 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
767 comm->dst_buff, buff_size);
769 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
770 if (comm->dst_buff_size)
771 buff_size = MIN(buff_size, *(comm->dst_buff_size));
773 /* Update the receiver's buffer size to the copied amount */
774 if (comm->dst_buff_size)
775 *comm->dst_buff_size = buff_size;
778 if(comm->copy_data_fun)
779 comm->copy_data_fun (comm, comm->src_buff, buff_size);
781 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
785 /* Set the copied flag so we copy data only once */
786 /* (this function might be called from both communication ends) */