1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
23 #include "src/kernel/activity/SynchroComm.hpp"
24 #include "src/surf/network_interface.hpp"
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
28 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
29 static void SIMIX_comm_copy_data(smx_activity_t comm);
30 static void SIMIX_comm_start(smx_activity_t synchro);
31 static simgrid::kernel::activity::Comm*
32 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
33 int (*match_fun)(void*, void*, smx_activity_t), void* user_data, smx_activity_t my_synchro,
34 bool remove_matching);
37 * \brief Checks if there is a communication activity queued in a deque matching our needs
38 * \param type The type of communication we are looking for (comm_send, comm_recv)
39 * \return The communication activity if found, nullptr otherwise
41 static simgrid::kernel::activity::Comm*
42 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
43 int (*match_fun)(void*, void*, smx_activity_t), void* this_user_data, smx_activity_t my_synchro,
46 void* other_user_data = nullptr;
48 for(auto it = deque->begin(); it != deque->end(); it++){
49 smx_activity_t synchro = *it;
50 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
52 if (comm->type == SIMIX_COMM_SEND) {
53 other_user_data = comm->src_data;
54 } else if (comm->type == SIMIX_COMM_RECEIVE) {
55 other_user_data = comm->dst_data;
57 if (comm->type == type &&
58 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
59 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
60 XBT_DEBUG("Found a matching communication synchro %p", comm);
65 comm->mbox_cpy = comm->mbox;
70 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
71 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
72 comm, (int)comm->type, (int)type);
74 XBT_DEBUG("No matching communication synchro found");
78 /******************************************************************************/
79 /* Communication synchros */
80 /******************************************************************************/
81 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
82 double task_size, double rate,
83 void *src_buff, size_t src_buff_size,
84 int (*match_fun)(void *, void *,smx_activity_t),
85 void (*copy_data_fun)(smx_activity_t, void*, size_t),
86 void *data, double timeout){
87 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
88 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
90 SIMCALL_SET_MC_VALUE(simcall, 0);
91 simcall_HANDLER_comm_wait(simcall, comm, timeout);
93 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
94 double task_size, double rate,
95 void *src_buff, size_t src_buff_size,
96 int (*match_fun)(void *, void *,smx_activity_t),
97 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
98 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
99 void *data, int detached)
101 XBT_DEBUG("send from %p", mbox);
103 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
104 simgrid::kernel::activity::Comm* this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
106 /* Look for communication synchro matching our needs. We also provide a description of
107 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
109 * If it is not found then push our communication into the rendez-vous point */
110 simgrid::kernel::activity::Comm* other_comm =
111 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_comm, /*remove_matching*/ true);
114 other_comm = this_comm;
116 if (mbox->permanent_receiver!=nullptr){
117 //this mailbox is for small messages, which have to be sent right now
118 other_comm->state = SIMIX_READY;
119 other_comm->dst_proc=mbox->permanent_receiver.get();
121 mbox->done_comm_queue.push_back(other_comm);
122 XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
125 mbox->push(this_comm);
128 XBT_DEBUG("Receive already pushed");
131 other_comm->state = SIMIX_READY;
132 other_comm->type = SIMIX_COMM_READY;
135 src_proc->comms.push_back(other_comm);
138 other_comm->detached = true;
139 other_comm->clean_fun = clean_fun;
141 other_comm->clean_fun = nullptr;
144 /* Setup the communication synchro */
145 other_comm->src_proc = src_proc;
146 other_comm->task_size = task_size;
147 other_comm->rate = rate;
148 other_comm->src_buff = src_buff;
149 other_comm->src_buff_size = src_buff_size;
150 other_comm->src_data = data;
152 other_comm->match_fun = match_fun;
153 other_comm->copy_data_fun = copy_data_fun;
156 if (MC_is_active() || MC_record_replay_is_active()) {
157 other_comm->state = SIMIX_RUNNING;
158 return (detached ? nullptr : other_comm);
161 SIMIX_comm_start(other_comm);
162 return (detached ? nullptr : other_comm);
165 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
166 void *dst_buff, size_t *dst_buff_size,
167 int (*match_fun)(void *, void *, smx_activity_t),
168 void (*copy_data_fun)(smx_activity_t, void*, size_t),
169 void *data, double timeout, double rate)
171 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
172 SIMCALL_SET_MC_VALUE(simcall, 0);
173 simcall_HANDLER_comm_wait(simcall, comm, timeout);
176 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
177 void *dst_buff, size_t *dst_buff_size,
178 int (*match_fun)(void *, void *, smx_activity_t),
179 void (*copy_data_fun)(smx_activity_t, void*, size_t),
180 void *data, double rate)
182 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
185 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
186 int (*match_fun)(void *, void *, smx_activity_t),
187 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
188 void *data, double rate)
190 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
191 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
193 simgrid::kernel::activity::Comm* other_comm;
194 //communication already done, get it inside the list of completed comms
195 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
197 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
198 //find a match in the list of already received comms
199 other_comm = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
200 /*remove_matching*/ true);
201 //if not found, assume the receiver came first, register it to the mailbox in the classical way
203 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list");
204 other_comm = this_synchro;
205 mbox->push(this_synchro);
207 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
208 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
209 other_comm->state = SIMIX_DONE;
210 other_comm->type = SIMIX_COMM_DONE;
211 other_comm->mbox = nullptr;
214 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
217 /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
219 /* Look for communication activity matching our needs. We also provide a description of
220 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
222 * If it is not found then push our communication into the rendez-vous point */
223 other_comm = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
224 /*remove_matching*/ true);
227 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
228 other_comm = this_synchro;
229 mbox->push(this_synchro);
231 this_synchro->unref();
233 other_comm->state = SIMIX_READY;
234 other_comm->type = SIMIX_COMM_READY;
236 dst_proc->comms.push_back(other_comm);
239 /* Setup communication synchro */
240 other_comm->dst_proc = dst_proc;
241 other_comm->dst_buff = dst_buff;
242 other_comm->dst_buff_size = dst_buff_size;
243 other_comm->dst_data = data;
245 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
246 other_comm->rate = rate;
248 other_comm->match_fun = match_fun;
249 other_comm->copy_data_fun = copy_data_fun;
251 if (MC_is_active() || MC_record_replay_is_active()) {
252 other_comm->state = SIMIX_RUNNING;
256 SIMIX_comm_start(other_comm);
260 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
261 int type, int src, int tag,
262 int (*match_fun)(void *, void *, smx_activity_t),
264 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
267 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
268 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
270 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
271 simgrid::kernel::activity::Comm* this_comm;
274 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
275 smx_type = SIMIX_COMM_RECEIVE;
277 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
278 smx_type = SIMIX_COMM_SEND;
280 smx_activity_t other_synchro=nullptr;
281 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
282 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
283 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
284 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
287 XBT_DEBUG("check if we have more luck in the normal mailbox");
288 other_synchro = _find_matching_comm(&mbox->comm_queue,
289 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
293 other_synchro->unref();
296 return other_synchro;
299 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
301 /* Associate this simcall to the wait synchro */
302 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
304 synchro->simcalls.push_back(simcall);
305 simcall->issuer->waiting_synchro = synchro;
307 if (MC_is_active() || MC_record_replay_is_active()) {
308 int idx = SIMCALL_GET_MC_VALUE(simcall);
310 synchro->state = SIMIX_DONE;
312 /* If we reached this point, the wait simcall must have a timeout */
313 /* Otherwise it shouldn't be enabled and executed by the MC */
317 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
318 if (comm->src_proc == simcall->issuer)
319 comm->state = SIMIX_SRC_TIMEOUT;
321 comm->state = SIMIX_DST_TIMEOUT;
324 SIMIX_comm_finish(synchro);
328 /* If the synchro has already finish perform the error handling, */
329 /* otherwise set up a waiting timeout on the right side */
330 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
331 SIMIX_comm_finish(synchro);
332 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
333 surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
334 sleep->setData(synchro);
336 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
337 if (simcall->issuer == comm->src_proc)
338 comm->src_timeout = sleep;
340 comm->dst_timeout = sleep;
344 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
346 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
348 if (MC_is_active() || MC_record_replay_is_active()){
349 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
350 if (simcall_comm_test__get__result(simcall)){
351 synchro->state = SIMIX_DONE;
352 synchro->simcalls.push_back(simcall);
353 SIMIX_comm_finish(synchro);
355 SIMIX_simcall_answer(simcall);
360 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
361 if (simcall_comm_test__get__result(simcall)) {
362 synchro->simcalls.push_back(simcall);
363 SIMIX_comm_finish(synchro);
365 SIMIX_simcall_answer(simcall);
369 void simcall_HANDLER_comm_testany(
370 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
372 // The default result is -1 -- this means, "nothing is ready".
373 // It can be changed below, but only if something matches.
374 simcall_comm_testany__set__result(simcall, -1);
376 if (MC_is_active() || MC_record_replay_is_active()){
377 int idx = SIMCALL_GET_MC_VALUE(simcall);
379 SIMIX_simcall_answer(simcall);
381 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
382 simcall_comm_testany__set__result(simcall, idx);
383 synchro->simcalls.push_back(simcall);
384 synchro->state = SIMIX_DONE;
385 SIMIX_comm_finish(synchro);
390 for (std::size_t i = 0; i != count; ++i) {
391 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
392 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
393 simcall_comm_testany__set__result(simcall, i);
394 synchro->simcalls.push_back(simcall);
395 SIMIX_comm_finish(synchro);
399 SIMIX_simcall_answer(simcall);
402 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
404 smx_activity_t synchro;
405 unsigned int cursor = 0;
407 if (MC_is_active() || MC_record_replay_is_active()){
409 xbt_die("Timeout not implemented for waitany in the model-checker");
410 int idx = SIMCALL_GET_MC_VALUE(simcall);
411 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
412 synchro->simcalls.push_back(simcall);
413 simcall_comm_waitany__set__result(simcall, idx);
414 synchro->state = SIMIX_DONE;
415 SIMIX_comm_finish(synchro);
420 simcall->timer = NULL;
422 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
423 SIMIX_waitany_remove_simcall_from_actions(simcall);
424 simcall_comm_waitany__set__result(simcall, -1);
425 SIMIX_simcall_answer(simcall);
429 xbt_dynar_foreach(synchros, cursor, synchro){
430 /* associate this simcall to the the synchro */
431 synchro->simcalls.push_back(simcall);
433 /* see if the synchro is already finished */
434 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
435 SIMIX_comm_finish(synchro);
441 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
443 smx_activity_t synchro;
444 unsigned int cursor = 0;
445 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
447 xbt_dynar_foreach(synchros, cursor, synchro) {
448 // Remove the first occurence of simcall:
449 auto i = boost::range::find(synchro->simcalls, simcall);
450 if (i != synchro->simcalls.end())
451 synchro->simcalls.erase(i);
456 * \brief Starts the simulation of a communication synchro.
457 * \param synchro the communication synchro
459 static inline void SIMIX_comm_start(smx_activity_t synchro)
461 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
463 /* If both the sender and the receiver are already there, start the communication */
464 if (synchro->state == SIMIX_READY) {
466 simgrid::s4u::Host* sender = comm->src_proc->host;
467 simgrid::s4u::Host* receiver = comm->dst_proc->host;
469 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
471 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
472 comm->surf_comm->setData(synchro);
473 comm->state = SIMIX_RUNNING;
475 /* If a link is failed, detect it immediately */
476 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
477 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
479 comm->state = SIMIX_LINK_FAILURE;
483 /* If any of the process is suspend, create the synchro but stop its execution,
484 it will be restarted when the sender process resume */
485 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
486 if (SIMIX_process_is_suspended(comm->src_proc))
487 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
489 comm->src_proc->cname(), comm->src_proc->host->cname());
491 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
493 comm->dst_proc->cname(), comm->dst_proc->host->cname());
495 comm->surf_comm->suspend();
501 * \brief Answers the SIMIX simcalls associated to a communication synchro.
502 * \param synchro a finished communication synchro
504 void SIMIX_comm_finish(smx_activity_t synchro)
506 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
507 unsigned int destroy_count = 0;
509 while (!synchro->simcalls.empty()) {
510 smx_simcall_t simcall = synchro->simcalls.front();
511 synchro->simcalls.pop_front();
513 /* If a waitany simcall is waiting for this synchro to finish, then remove
514 it from the other synchros in the waitany list. Afterwards, get the
515 position of the actual synchro in the waitany dynar and
516 return it as the result of the simcall */
518 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
519 continue; // if process handling comm is killed
520 if (simcall->call == SIMCALL_COMM_WAITANY) {
521 SIMIX_waitany_remove_simcall_from_actions(simcall);
522 if (simcall->timer) {
523 SIMIX_timer_remove(simcall->timer);
524 simcall->timer = nullptr;
526 if (!MC_is_active() && !MC_record_replay_is_active())
527 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
530 /* If the synchro is still in a rendez-vous point then remove from it */
532 comm->mbox->remove(synchro);
534 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
536 /* Check out for errors */
538 if (simcall->issuer->host->isOff()) {
539 simcall->issuer->context->iwannadie = 1;
540 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
542 switch (synchro->state) {
545 XBT_DEBUG("Communication %p complete!", synchro);
546 SIMIX_comm_copy_data(synchro);
549 case SIMIX_SRC_TIMEOUT:
550 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
553 case SIMIX_DST_TIMEOUT:
554 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
557 case SIMIX_SRC_HOST_FAILURE:
558 if (simcall->issuer == comm->src_proc)
559 simcall->issuer->context->iwannadie = 1;
560 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
562 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
565 case SIMIX_DST_HOST_FAILURE:
566 if (simcall->issuer == comm->dst_proc)
567 simcall->issuer->context->iwannadie = 1;
568 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
570 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
573 case SIMIX_LINK_FAILURE:
576 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
577 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
578 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
580 if (comm->src_proc == simcall->issuer) {
581 XBT_DEBUG("I'm source");
582 } else if (comm->dst_proc == simcall->issuer) {
583 XBT_DEBUG("I'm dest");
585 XBT_DEBUG("I'm neither source nor dest");
587 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
591 if (simcall->issuer == comm->dst_proc)
592 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
594 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
598 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
602 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
603 if (simcall->issuer->exception) {
604 // In order to modify the exception we have to rethrow it:
606 std::rethrow_exception(simcall->issuer->exception);
609 if (simcall->call == SIMCALL_COMM_WAITANY) {
610 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
612 else if (simcall->call == SIMCALL_COMM_TESTANY) {
614 auto comms = simcall_comm_testany__get__comms(simcall);
615 auto count = simcall_comm_testany__get__count(simcall);
616 auto element = std::find(comms, comms + count, synchro);
617 if (element == comms + count)
620 e.value = element - comms;
622 simcall->issuer->exception = std::make_exception_ptr(e);
629 if (simcall->issuer->host->isOff()) {
630 simcall->issuer->context->iwannadie = 1;
633 simcall->issuer->waiting_synchro = nullptr;
634 simcall->issuer->comms.remove(synchro);
636 if(simcall->issuer == comm->src_proc){
638 comm->dst_proc->comms.remove(synchro);
640 else if(simcall->issuer == comm->dst_proc){
642 comm->src_proc->comms.remove(synchro);
645 comm->dst_proc->comms.remove(synchro);
646 comm->src_proc->comms.remove(synchro);
648 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
652 SIMIX_simcall_answer(simcall);
656 while (destroy_count-- > 0)
657 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
660 /******************************************************************************/
661 /* SIMIX_comm_copy_data callbacks */
662 /******************************************************************************/
663 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
665 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
667 SIMIX_comm_copy_data_callback = callback;
670 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
672 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
674 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
675 *(void **) (comm->dst_buff) = buff;
678 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
680 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
682 XBT_DEBUG("Copy the data over");
683 memcpy(comm->dst_buff, buff, buff_size);
684 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
686 comm->src_buff = nullptr;
692 * @brief Copy the communication data from the sender's buffer to the receiver's one
693 * @param synchro The communication
695 void SIMIX_comm_copy_data(smx_activity_t synchro)
697 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
699 size_t buff_size = comm->src_buff_size;
700 /* If there is no data to copy then return */
701 if (!comm->src_buff || !comm->dst_buff || comm->copied)
704 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
705 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
706 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
708 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
709 if (comm->dst_buff_size)
710 buff_size = MIN(buff_size, *(comm->dst_buff_size));
712 /* Update the receiver's buffer size to the copied amount */
713 if (comm->dst_buff_size)
714 *comm->dst_buff_size = buff_size;
717 if(comm->copy_data_fun)
718 comm->copy_data_fun (comm, comm->src_buff, buff_size);
720 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
724 /* Set the copied flag so we copy data only once */
725 /* (this function might be called from both communication ends) */