1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include "simgrid/s4u/Host.hpp"
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
21 #include "src/kernel/activity/SynchroComm.hpp"
22 #include "src/surf/network_interface.hpp"
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
26 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
27 static void SIMIX_comm_copy_data(smx_activity_t comm);
28 static void SIMIX_comm_start(smx_activity_t synchro);
29 static simgrid::kernel::activity::Comm*
30 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
31 int (*match_fun)(void*, void*, smx_activity_t), void* user_data, smx_activity_t my_synchro,
32 bool remove_matching);
35 * \brief Checks if there is a communication activity queued in a deque matching our needs
36 * \param type The type of communication we are looking for (comm_send, comm_recv)
37 * \return The communication activity if found, nullptr otherwise
39 static simgrid::kernel::activity::Comm*
40 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
41 int (*match_fun)(void*, void*, smx_activity_t), void* this_user_data, smx_activity_t my_synchro,
44 void* other_user_data = nullptr;
46 for(auto it = deque->begin(); it != deque->end(); it++){
47 smx_activity_t synchro = *it;
48 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
50 if (comm->type == SIMIX_COMM_SEND) {
51 other_user_data = comm->src_data;
52 } else if (comm->type == SIMIX_COMM_RECEIVE) {
53 other_user_data = comm->dst_data;
55 if (comm->type == type &&
56 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
57 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
58 XBT_DEBUG("Found a matching communication synchro %p", comm);
63 comm->mbox_cpy = comm->mbox;
68 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
69 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
70 comm, (int)comm->type, (int)type);
72 XBT_DEBUG("No matching communication synchro found");
76 /******************************************************************************/
77 /* Communication synchros */
78 /******************************************************************************/
79 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
80 double task_size, double rate,
81 void *src_buff, size_t src_buff_size,
82 int (*match_fun)(void *, void *,smx_activity_t),
83 void (*copy_data_fun)(smx_activity_t, void*, size_t),
84 void *data, double timeout){
85 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
86 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
88 SIMCALL_SET_MC_VALUE(simcall, 0);
89 simcall_HANDLER_comm_wait(simcall, comm, timeout);
91 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
92 double task_size, double rate,
93 void *src_buff, size_t src_buff_size,
94 int (*match_fun)(void *, void *,smx_activity_t),
95 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
96 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
97 void *data, int detached)
99 XBT_DEBUG("send from %p", mbox);
101 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
102 simgrid::kernel::activity::Comm* this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
104 /* Look for communication synchro matching our needs. We also provide a description of
105 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
107 * If it is not found then push our communication into the rendez-vous point */
108 simgrid::kernel::activity::Comm* other_comm =
109 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_comm, /*remove_matching*/ true);
112 other_comm = this_comm;
114 if (mbox->permanent_receiver!=nullptr){
115 //this mailbox is for small messages, which have to be sent right now
116 other_comm->state = SIMIX_READY;
117 other_comm->dst_proc=mbox->permanent_receiver.get();
119 mbox->done_comm_queue.push_back(other_comm);
120 XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
123 mbox->push(this_comm);
126 XBT_DEBUG("Receive already pushed");
129 other_comm->state = SIMIX_READY;
130 other_comm->type = SIMIX_COMM_READY;
133 src_proc->comms.push_back(other_comm);
136 other_comm->detached = true;
137 other_comm->clean_fun = clean_fun;
139 other_comm->clean_fun = nullptr;
142 /* Setup the communication synchro */
143 other_comm->src_proc = src_proc;
144 other_comm->task_size = task_size;
145 other_comm->rate = rate;
146 other_comm->src_buff = src_buff;
147 other_comm->src_buff_size = src_buff_size;
148 other_comm->src_data = data;
150 other_comm->match_fun = match_fun;
151 other_comm->copy_data_fun = copy_data_fun;
154 if (MC_is_active() || MC_record_replay_is_active()) {
155 other_comm->state = SIMIX_RUNNING;
156 return (detached ? nullptr : other_comm);
159 SIMIX_comm_start(other_comm);
160 return (detached ? nullptr : other_comm);
163 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
164 void *dst_buff, size_t *dst_buff_size,
165 int (*match_fun)(void *, void *, smx_activity_t),
166 void (*copy_data_fun)(smx_activity_t, void*, size_t),
167 void *data, double timeout, double rate)
169 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
170 SIMCALL_SET_MC_VALUE(simcall, 0);
171 simcall_HANDLER_comm_wait(simcall, comm, timeout);
174 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
175 void *dst_buff, size_t *dst_buff_size,
176 int (*match_fun)(void *, void *, smx_activity_t),
177 void (*copy_data_fun)(smx_activity_t, void*, size_t),
178 void *data, double rate)
180 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
183 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
184 int (*match_fun)(void *, void *, smx_activity_t),
185 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
186 void *data, double rate)
188 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
189 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
191 simgrid::kernel::activity::Comm* other_comm;
192 //communication already done, get it inside the list of completed comms
193 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
195 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
196 //find a match in the list of already received comms
197 other_comm = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
198 /*remove_matching*/ true);
199 //if not found, assume the receiver came first, register it to the mailbox in the classical way
201 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list");
202 other_comm = this_synchro;
203 mbox->push(this_synchro);
205 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
206 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
207 other_comm->state = SIMIX_DONE;
208 other_comm->type = SIMIX_COMM_DONE;
209 other_comm->mbox = nullptr;
212 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
215 /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
217 /* Look for communication activity matching our needs. We also provide a description of
218 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
220 * If it is not found then push our communication into the rendez-vous point */
221 other_comm = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
222 /*remove_matching*/ true);
225 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
226 other_comm = this_synchro;
227 mbox->push(this_synchro);
229 this_synchro->unref();
231 other_comm->state = SIMIX_READY;
232 other_comm->type = SIMIX_COMM_READY;
234 dst_proc->comms.push_back(other_comm);
237 /* Setup communication synchro */
238 other_comm->dst_proc = dst_proc;
239 other_comm->dst_buff = dst_buff;
240 other_comm->dst_buff_size = dst_buff_size;
241 other_comm->dst_data = data;
243 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
244 other_comm->rate = rate;
246 other_comm->match_fun = match_fun;
247 other_comm->copy_data_fun = copy_data_fun;
249 if (MC_is_active() || MC_record_replay_is_active()) {
250 other_comm->state = SIMIX_RUNNING;
254 SIMIX_comm_start(other_comm);
258 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
259 int type, int src, int tag,
260 int (*match_fun)(void *, void *, smx_activity_t),
262 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
265 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
266 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
268 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
269 simgrid::kernel::activity::Comm* this_comm;
272 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
273 smx_type = SIMIX_COMM_RECEIVE;
275 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
276 smx_type = SIMIX_COMM_SEND;
278 smx_activity_t other_synchro=nullptr;
279 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
280 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
281 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
282 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
285 XBT_DEBUG("check if we have more luck in the normal mailbox");
286 other_synchro = _find_matching_comm(&mbox->comm_queue,
287 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
291 other_synchro->unref();
294 return other_synchro;
297 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
299 /* Associate this simcall to the wait synchro */
300 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
302 synchro->simcalls.push_back(simcall);
303 simcall->issuer->waiting_synchro = synchro;
305 if (MC_is_active() || MC_record_replay_is_active()) {
306 int idx = SIMCALL_GET_MC_VALUE(simcall);
308 synchro->state = SIMIX_DONE;
310 /* If we reached this point, the wait simcall must have a timeout */
311 /* Otherwise it shouldn't be enabled and executed by the MC */
315 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
316 if (comm->src_proc == simcall->issuer)
317 comm->state = SIMIX_SRC_TIMEOUT;
319 comm->state = SIMIX_DST_TIMEOUT;
322 SIMIX_comm_finish(synchro);
326 /* If the synchro has already finish perform the error handling, */
327 /* otherwise set up a waiting timeout on the right side */
328 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
329 SIMIX_comm_finish(synchro);
330 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
331 surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
332 sleep->setData(synchro);
334 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
335 if (simcall->issuer == comm->src_proc)
336 comm->src_timeout = sleep;
338 comm->dst_timeout = sleep;
342 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
344 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
346 if (MC_is_active() || MC_record_replay_is_active()){
347 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
348 if (simcall_comm_test__get__result(simcall)){
349 synchro->state = SIMIX_DONE;
350 synchro->simcalls.push_back(simcall);
351 SIMIX_comm_finish(synchro);
353 SIMIX_simcall_answer(simcall);
358 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
359 if (simcall_comm_test__get__result(simcall)) {
360 synchro->simcalls.push_back(simcall);
361 SIMIX_comm_finish(synchro);
363 SIMIX_simcall_answer(simcall);
367 void simcall_HANDLER_comm_testany(
368 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
370 // The default result is -1 -- this means, "nothing is ready".
371 // It can be changed below, but only if something matches.
372 simcall_comm_testany__set__result(simcall, -1);
374 if (MC_is_active() || MC_record_replay_is_active()){
375 int idx = SIMCALL_GET_MC_VALUE(simcall);
377 SIMIX_simcall_answer(simcall);
379 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
380 simcall_comm_testany__set__result(simcall, idx);
381 synchro->simcalls.push_back(simcall);
382 synchro->state = SIMIX_DONE;
383 SIMIX_comm_finish(synchro);
388 for (std::size_t i = 0; i != count; ++i) {
389 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
390 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
391 simcall_comm_testany__set__result(simcall, i);
392 synchro->simcalls.push_back(simcall);
393 SIMIX_comm_finish(synchro);
397 SIMIX_simcall_answer(simcall);
400 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
402 smx_activity_t synchro;
403 unsigned int cursor = 0;
405 if (MC_is_active() || MC_record_replay_is_active()){
407 xbt_die("Timeout not implemented for waitany in the model-checker");
408 int idx = SIMCALL_GET_MC_VALUE(simcall);
409 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
410 synchro->simcalls.push_back(simcall);
411 simcall_comm_waitany__set__result(simcall, idx);
412 synchro->state = SIMIX_DONE;
413 SIMIX_comm_finish(synchro);
418 simcall->timer = NULL;
420 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
421 SIMIX_waitany_remove_simcall_from_actions(simcall);
422 simcall_comm_waitany__set__result(simcall, -1);
423 SIMIX_simcall_answer(simcall);
427 xbt_dynar_foreach(synchros, cursor, synchro){
428 /* associate this simcall to the the synchro */
429 synchro->simcalls.push_back(simcall);
431 /* see if the synchro is already finished */
432 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
433 SIMIX_comm_finish(synchro);
439 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
441 smx_activity_t synchro;
442 unsigned int cursor = 0;
443 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
445 xbt_dynar_foreach(synchros, cursor, synchro) {
446 // Remove the first occurence of simcall:
447 auto i = boost::range::find(synchro->simcalls, simcall);
448 if (i != synchro->simcalls.end())
449 synchro->simcalls.erase(i);
454 * \brief Starts the simulation of a communication synchro.
455 * \param synchro the communication synchro
457 static inline void SIMIX_comm_start(smx_activity_t synchro)
459 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
461 /* If both the sender and the receiver are already there, start the communication */
462 if (synchro->state == SIMIX_READY) {
464 simgrid::s4u::Host* sender = comm->src_proc->host;
465 simgrid::s4u::Host* receiver = comm->dst_proc->host;
467 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
469 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
470 comm->surf_comm->setData(synchro);
471 comm->state = SIMIX_RUNNING;
473 /* If a link is failed, detect it immediately */
474 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
475 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
477 comm->state = SIMIX_LINK_FAILURE;
481 /* If any of the process is suspend, create the synchro but stop its execution,
482 it will be restarted when the sender process resume */
483 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
484 if (SIMIX_process_is_suspended(comm->src_proc))
485 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
487 comm->src_proc->cname(), comm->src_proc->host->cname());
489 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
491 comm->dst_proc->cname(), comm->dst_proc->host->cname());
493 comm->surf_comm->suspend();
499 * \brief Answers the SIMIX simcalls associated to a communication synchro.
500 * \param synchro a finished communication synchro
502 void SIMIX_comm_finish(smx_activity_t synchro)
504 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
505 unsigned int destroy_count = 0;
507 while (!synchro->simcalls.empty()) {
508 smx_simcall_t simcall = synchro->simcalls.front();
509 synchro->simcalls.pop_front();
511 /* If a waitany simcall is waiting for this synchro to finish, then remove
512 it from the other synchros in the waitany list. Afterwards, get the
513 position of the actual synchro in the waitany dynar and
514 return it as the result of the simcall */
516 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
517 continue; // if process handling comm is killed
518 if (simcall->call == SIMCALL_COMM_WAITANY) {
519 SIMIX_waitany_remove_simcall_from_actions(simcall);
520 if (simcall->timer) {
521 SIMIX_timer_remove(simcall->timer);
522 simcall->timer = nullptr;
524 if (!MC_is_active() && !MC_record_replay_is_active())
525 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
528 /* If the synchro is still in a rendez-vous point then remove from it */
530 comm->mbox->remove(synchro);
532 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
534 /* Check out for errors */
536 if (simcall->issuer->host->isOff()) {
537 simcall->issuer->context->iwannadie = 1;
538 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
540 switch (synchro->state) {
543 XBT_DEBUG("Communication %p complete!", synchro);
544 SIMIX_comm_copy_data(synchro);
547 case SIMIX_SRC_TIMEOUT:
548 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
551 case SIMIX_DST_TIMEOUT:
552 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
555 case SIMIX_SRC_HOST_FAILURE:
556 if (simcall->issuer == comm->src_proc)
557 simcall->issuer->context->iwannadie = 1;
558 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
560 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
563 case SIMIX_DST_HOST_FAILURE:
564 if (simcall->issuer == comm->dst_proc)
565 simcall->issuer->context->iwannadie = 1;
566 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
568 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
571 case SIMIX_LINK_FAILURE:
574 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
575 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
576 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
578 if (comm->src_proc == simcall->issuer) {
579 XBT_DEBUG("I'm source");
580 } else if (comm->dst_proc == simcall->issuer) {
581 XBT_DEBUG("I'm dest");
583 XBT_DEBUG("I'm neither source nor dest");
585 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
589 if (simcall->issuer == comm->dst_proc)
590 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
592 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
596 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
600 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
601 if (simcall->issuer->exception) {
602 // In order to modify the exception we have to rethrow it:
604 std::rethrow_exception(simcall->issuer->exception);
607 if (simcall->call == SIMCALL_COMM_WAITANY) {
608 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
610 else if (simcall->call == SIMCALL_COMM_TESTANY) {
612 auto comms = simcall_comm_testany__get__comms(simcall);
613 auto count = simcall_comm_testany__get__count(simcall);
614 auto element = std::find(comms, comms + count, synchro);
615 if (element == comms + count)
618 e.value = element - comms;
620 simcall->issuer->exception = std::make_exception_ptr(e);
627 if (simcall->issuer->host->isOff()) {
628 simcall->issuer->context->iwannadie = 1;
631 simcall->issuer->waiting_synchro = nullptr;
632 simcall->issuer->comms.remove(synchro);
634 if(simcall->issuer == comm->src_proc){
636 comm->dst_proc->comms.remove(synchro);
638 else if(simcall->issuer == comm->dst_proc){
640 comm->src_proc->comms.remove(synchro);
643 comm->dst_proc->comms.remove(synchro);
644 comm->src_proc->comms.remove(synchro);
646 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
650 SIMIX_simcall_answer(simcall);
654 while (destroy_count-- > 0)
655 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
658 /******************************************************************************/
659 /* SIMIX_comm_copy_data callbacks */
660 /******************************************************************************/
661 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
663 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
665 SIMIX_comm_copy_data_callback = callback;
668 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
670 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
672 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
673 *(void **) (comm->dst_buff) = buff;
676 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
678 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
680 XBT_DEBUG("Copy the data over");
681 memcpy(comm->dst_buff, buff, buff_size);
682 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
684 comm->src_buff = nullptr;
690 * @brief Copy the communication data from the sender's buffer to the receiver's one
691 * @param synchro The communication
693 void SIMIX_comm_copy_data(smx_activity_t synchro)
695 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
697 size_t buff_size = comm->src_buff_size;
698 /* If there is no data to copy then return */
699 if (!comm->src_buff || !comm->dst_buff || comm->copied)
702 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
703 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
704 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
706 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
707 if (comm->dst_buff_size)
708 buff_size = MIN(buff_size, *(comm->dst_buff_size));
710 /* Update the receiver's buffer size to the copied amount */
711 if (comm->dst_buff_size)
712 *comm->dst_buff_size = buff_size;
715 if(comm->copy_data_fun)
716 comm->copy_data_fun (comm, comm->src_buff, buff_size);
718 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
722 /* Set the copied flag so we copy data only once */
723 /* (this function might be called from both communication ends) */