1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include "simgrid/s4u/Host.hpp"
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
21 #include "src/kernel/activity/SynchroComm.hpp"
22 #include "src/surf/network_interface.hpp"
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
26 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
27 static void SIMIX_comm_copy_data(smx_activity_t comm);
28 static void SIMIX_comm_start(smx_activity_t synchro);
29 static simgrid::kernel::activity::Comm*
30 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
31 int (*match_fun)(void*, void*, smx_activity_t), void* user_data, smx_activity_t my_synchro,
32 bool remove_matching);
35 * \brief Checks if there is a communication activity queued in a deque matching our needs
36 * \param type The type of communication we are looking for (comm_send, comm_recv)
37 * \return The communication activity if found, nullptr otherwise
39 static simgrid::kernel::activity::Comm*
40 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
41 int (*match_fun)(void*, void*, smx_activity_t), void* this_user_data, smx_activity_t my_synchro,
44 void* other_user_data = nullptr;
46 for(auto it = deque->begin(); it != deque->end(); it++){
47 smx_activity_t synchro = *it;
48 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
50 if (comm->type == SIMIX_COMM_SEND) {
51 other_user_data = comm->src_data;
52 } else if (comm->type == SIMIX_COMM_RECEIVE) {
53 other_user_data = comm->dst_data;
55 if (comm->type == type && (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
56 (not comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
57 XBT_DEBUG("Found a matching communication synchro %p", comm);
62 comm->mbox_cpy = comm->mbox;
67 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
68 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
69 comm, (int)comm->type, (int)type);
71 XBT_DEBUG("No matching communication synchro found");
75 /******************************************************************************/
76 /* Communication synchros */
77 /******************************************************************************/
78 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
79 double task_size, double rate,
80 void *src_buff, size_t src_buff_size,
81 int (*match_fun)(void *, void *,smx_activity_t),
82 void (*copy_data_fun)(smx_activity_t, void*, size_t),
83 void *data, double timeout){
84 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
85 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
87 SIMCALL_SET_MC_VALUE(simcall, 0);
88 simcall_HANDLER_comm_wait(simcall, comm, timeout);
90 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
91 double task_size, double rate,
92 void *src_buff, size_t src_buff_size,
93 int (*match_fun)(void *, void *,smx_activity_t),
94 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
95 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
96 void *data, int detached)
98 XBT_DEBUG("send from %p", mbox);
100 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
101 simgrid::kernel::activity::Comm* this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
103 /* Look for communication synchro matching our needs. We also provide a description of
104 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
106 * If it is not found then push our communication into the rendez-vous point */
107 simgrid::kernel::activity::Comm* other_comm =
108 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_comm, /*remove_matching*/ true);
110 if (not other_comm) {
111 other_comm = this_comm;
113 if (mbox->permanent_receiver!=nullptr){
114 //this mailbox is for small messages, which have to be sent right now
115 other_comm->state = SIMIX_READY;
116 other_comm->dst_proc=mbox->permanent_receiver.get();
118 mbox->done_comm_queue.push_back(other_comm);
119 XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
122 mbox->push(this_comm);
125 XBT_DEBUG("Receive already pushed");
128 other_comm->state = SIMIX_READY;
129 other_comm->type = SIMIX_COMM_READY;
132 src_proc->comms.push_back(other_comm);
135 other_comm->detached = true;
136 other_comm->clean_fun = clean_fun;
138 other_comm->clean_fun = nullptr;
141 /* Setup the communication synchro */
142 other_comm->src_proc = src_proc;
143 other_comm->task_size = task_size;
144 other_comm->rate = rate;
145 other_comm->src_buff = src_buff;
146 other_comm->src_buff_size = src_buff_size;
147 other_comm->src_data = data;
149 other_comm->match_fun = match_fun;
150 other_comm->copy_data_fun = copy_data_fun;
153 if (MC_is_active() || MC_record_replay_is_active()) {
154 other_comm->state = SIMIX_RUNNING;
155 return (detached ? nullptr : other_comm);
158 SIMIX_comm_start(other_comm);
159 return (detached ? nullptr : other_comm);
162 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
163 void *dst_buff, size_t *dst_buff_size,
164 int (*match_fun)(void *, void *, smx_activity_t),
165 void (*copy_data_fun)(smx_activity_t, void*, size_t),
166 void *data, double timeout, double rate)
168 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
169 SIMCALL_SET_MC_VALUE(simcall, 0);
170 simcall_HANDLER_comm_wait(simcall, comm, timeout);
173 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
174 void *dst_buff, size_t *dst_buff_size,
175 int (*match_fun)(void *, void *, smx_activity_t),
176 void (*copy_data_fun)(smx_activity_t, void*, size_t),
177 void *data, double rate)
179 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
182 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
183 int (*match_fun)(void *, void *, smx_activity_t),
184 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
185 void *data, double rate)
187 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
188 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
190 simgrid::kernel::activity::Comm* other_comm;
191 //communication already done, get it inside the list of completed comms
192 if (mbox->permanent_receiver != nullptr && not mbox->done_comm_queue.empty()) {
194 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
195 //find a match in the list of already received comms
196 other_comm = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
197 /*remove_matching*/ true);
198 //if not found, assume the receiver came first, register it to the mailbox in the classical way
199 if (not other_comm) {
200 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list");
201 other_comm = this_synchro;
202 mbox->push(this_synchro);
204 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
205 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
206 other_comm->state = SIMIX_DONE;
207 other_comm->type = SIMIX_COMM_DONE;
208 other_comm->mbox = nullptr;
211 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
214 /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
216 /* Look for communication activity matching our needs. We also provide a description of
217 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
219 * If it is not found then push our communication into the rendez-vous point */
220 other_comm = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
221 /*remove_matching*/ true);
223 if (not other_comm) {
224 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
225 other_comm = this_synchro;
226 mbox->push(this_synchro);
228 this_synchro->unref();
230 other_comm->state = SIMIX_READY;
231 other_comm->type = SIMIX_COMM_READY;
233 dst_proc->comms.push_back(other_comm);
236 /* Setup communication synchro */
237 other_comm->dst_proc = dst_proc;
238 other_comm->dst_buff = dst_buff;
239 other_comm->dst_buff_size = dst_buff_size;
240 other_comm->dst_data = data;
242 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
243 other_comm->rate = rate;
245 other_comm->match_fun = match_fun;
246 other_comm->copy_data_fun = copy_data_fun;
248 if (MC_is_active() || MC_record_replay_is_active()) {
249 other_comm->state = SIMIX_RUNNING;
253 SIMIX_comm_start(other_comm);
257 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
258 int type, int src, int tag,
259 int (*match_fun)(void *, void *, smx_activity_t),
261 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
264 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
265 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
267 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
268 simgrid::kernel::activity::Comm* this_comm;
271 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
272 smx_type = SIMIX_COMM_RECEIVE;
274 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
275 smx_type = SIMIX_COMM_SEND;
277 smx_activity_t other_synchro=nullptr;
278 if (mbox->permanent_receiver != nullptr && not mbox->done_comm_queue.empty()) {
279 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
280 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
281 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
283 if (not other_synchro) {
284 XBT_DEBUG("check if we have more luck in the normal mailbox");
285 other_synchro = _find_matching_comm(&mbox->comm_queue,
286 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
290 other_synchro->unref();
293 return other_synchro;
296 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
298 /* Associate this simcall to the wait synchro */
299 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
301 synchro->simcalls.push_back(simcall);
302 simcall->issuer->waiting_synchro = synchro;
304 if (MC_is_active() || MC_record_replay_is_active()) {
305 int idx = SIMCALL_GET_MC_VALUE(simcall);
307 synchro->state = SIMIX_DONE;
309 /* If we reached this point, the wait simcall must have a timeout */
310 /* Otherwise it shouldn't be enabled and executed by the MC */
314 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
315 if (comm->src_proc == simcall->issuer)
316 comm->state = SIMIX_SRC_TIMEOUT;
318 comm->state = SIMIX_DST_TIMEOUT;
321 SIMIX_comm_finish(synchro);
325 /* If the synchro has already finish perform the error handling, */
326 /* otherwise set up a waiting timeout on the right side */
327 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
328 SIMIX_comm_finish(synchro);
329 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
330 surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
331 sleep->setData(synchro);
333 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
334 if (simcall->issuer == comm->src_proc)
335 comm->src_timeout = sleep;
337 comm->dst_timeout = sleep;
341 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
343 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
345 if (MC_is_active() || MC_record_replay_is_active()){
346 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
347 if (simcall_comm_test__get__result(simcall)){
348 synchro->state = SIMIX_DONE;
349 synchro->simcalls.push_back(simcall);
350 SIMIX_comm_finish(synchro);
352 SIMIX_simcall_answer(simcall);
357 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
358 if (simcall_comm_test__get__result(simcall)) {
359 synchro->simcalls.push_back(simcall);
360 SIMIX_comm_finish(synchro);
362 SIMIX_simcall_answer(simcall);
366 void simcall_HANDLER_comm_testany(
367 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
369 // The default result is -1 -- this means, "nothing is ready".
370 // It can be changed below, but only if something matches.
371 simcall_comm_testany__set__result(simcall, -1);
373 if (MC_is_active() || MC_record_replay_is_active()){
374 int idx = SIMCALL_GET_MC_VALUE(simcall);
376 SIMIX_simcall_answer(simcall);
378 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
379 simcall_comm_testany__set__result(simcall, idx);
380 synchro->simcalls.push_back(simcall);
381 synchro->state = SIMIX_DONE;
382 SIMIX_comm_finish(synchro);
387 for (std::size_t i = 0; i != count; ++i) {
388 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
389 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
390 simcall_comm_testany__set__result(simcall, i);
391 synchro->simcalls.push_back(simcall);
392 SIMIX_comm_finish(synchro);
396 SIMIX_simcall_answer(simcall);
399 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
401 smx_activity_t synchro;
402 unsigned int cursor = 0;
404 if (MC_is_active() || MC_record_replay_is_active()){
406 xbt_die("Timeout not implemented for waitany in the model-checker");
407 int idx = SIMCALL_GET_MC_VALUE(simcall);
408 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
409 synchro->simcalls.push_back(simcall);
410 simcall_comm_waitany__set__result(simcall, idx);
411 synchro->state = SIMIX_DONE;
412 SIMIX_comm_finish(synchro);
417 simcall->timer = NULL;
419 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
420 SIMIX_waitany_remove_simcall_from_actions(simcall);
421 simcall_comm_waitany__set__result(simcall, -1);
422 SIMIX_simcall_answer(simcall);
426 xbt_dynar_foreach(synchros, cursor, synchro){
427 /* associate this simcall to the the synchro */
428 synchro->simcalls.push_back(simcall);
430 /* see if the synchro is already finished */
431 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
432 SIMIX_comm_finish(synchro);
438 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
440 smx_activity_t synchro;
441 unsigned int cursor = 0;
442 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
444 xbt_dynar_foreach(synchros, cursor, synchro) {
445 // Remove the first occurence of simcall:
446 auto i = boost::range::find(synchro->simcalls, simcall);
447 if (i != synchro->simcalls.end())
448 synchro->simcalls.erase(i);
453 * \brief Starts the simulation of a communication synchro.
454 * \param synchro the communication synchro
456 static inline void SIMIX_comm_start(smx_activity_t synchro)
458 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
460 /* If both the sender and the receiver are already there, start the communication */
461 if (synchro->state == SIMIX_READY) {
463 simgrid::s4u::Host* sender = comm->src_proc->host;
464 simgrid::s4u::Host* receiver = comm->dst_proc->host;
466 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
468 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
469 comm->surf_comm->setData(synchro);
470 comm->state = SIMIX_RUNNING;
472 /* If a link is failed, detect it immediately */
473 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
474 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
476 comm->state = SIMIX_LINK_FAILURE;
480 /* If any of the process is suspend, create the synchro but stop its execution,
481 it will be restarted when the sender process resume */
482 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
483 if (SIMIX_process_is_suspended(comm->src_proc))
484 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
486 comm->src_proc->cname(), comm->src_proc->host->cname());
488 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
490 comm->dst_proc->cname(), comm->dst_proc->host->cname());
492 comm->surf_comm->suspend();
498 * \brief Answers the SIMIX simcalls associated to a communication synchro.
499 * \param synchro a finished communication synchro
501 void SIMIX_comm_finish(smx_activity_t synchro)
503 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
504 unsigned int destroy_count = 0;
506 while (not synchro->simcalls.empty()) {
507 smx_simcall_t simcall = synchro->simcalls.front();
508 synchro->simcalls.pop_front();
510 /* If a waitany simcall is waiting for this synchro to finish, then remove
511 it from the other synchros in the waitany list. Afterwards, get the
512 position of the actual synchro in the waitany dynar and
513 return it as the result of the simcall */
515 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
516 continue; // if process handling comm is killed
517 if (simcall->call == SIMCALL_COMM_WAITANY) {
518 SIMIX_waitany_remove_simcall_from_actions(simcall);
519 if (simcall->timer) {
520 SIMIX_timer_remove(simcall->timer);
521 simcall->timer = nullptr;
523 if (not MC_is_active() && not MC_record_replay_is_active())
524 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
527 /* If the synchro is still in a rendez-vous point then remove from it */
529 comm->mbox->remove(synchro);
531 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
533 /* Check out for errors */
535 if (simcall->issuer->host->isOff()) {
536 simcall->issuer->context->iwannadie = 1;
537 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
539 switch (synchro->state) {
542 XBT_DEBUG("Communication %p complete!", synchro);
543 SIMIX_comm_copy_data(synchro);
546 case SIMIX_SRC_TIMEOUT:
547 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
550 case SIMIX_DST_TIMEOUT:
551 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
554 case SIMIX_SRC_HOST_FAILURE:
555 if (simcall->issuer == comm->src_proc)
556 simcall->issuer->context->iwannadie = 1;
557 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
559 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
562 case SIMIX_DST_HOST_FAILURE:
563 if (simcall->issuer == comm->dst_proc)
564 simcall->issuer->context->iwannadie = 1;
565 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
567 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
570 case SIMIX_LINK_FAILURE:
573 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
574 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
575 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
577 if (comm->src_proc == simcall->issuer) {
578 XBT_DEBUG("I'm source");
579 } else if (comm->dst_proc == simcall->issuer) {
580 XBT_DEBUG("I'm dest");
582 XBT_DEBUG("I'm neither source nor dest");
584 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
588 if (simcall->issuer == comm->dst_proc)
589 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
591 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
595 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
599 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
600 if (simcall->issuer->exception) {
601 // In order to modify the exception we have to rethrow it:
603 std::rethrow_exception(simcall->issuer->exception);
606 if (simcall->call == SIMCALL_COMM_WAITANY) {
607 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
609 else if (simcall->call == SIMCALL_COMM_TESTANY) {
611 auto comms = simcall_comm_testany__get__comms(simcall);
612 auto count = simcall_comm_testany__get__count(simcall);
613 auto element = std::find(comms, comms + count, synchro);
614 if (element == comms + count)
617 e.value = element - comms;
619 simcall->issuer->exception = std::make_exception_ptr(e);
626 if (simcall->issuer->host->isOff()) {
627 simcall->issuer->context->iwannadie = 1;
630 simcall->issuer->waiting_synchro = nullptr;
631 simcall->issuer->comms.remove(synchro);
633 if(simcall->issuer == comm->src_proc){
635 comm->dst_proc->comms.remove(synchro);
637 else if(simcall->issuer == comm->dst_proc){
639 comm->src_proc->comms.remove(synchro);
642 comm->dst_proc->comms.remove(synchro);
643 comm->src_proc->comms.remove(synchro);
645 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
649 SIMIX_simcall_answer(simcall);
653 while (destroy_count-- > 0)
654 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
657 /******************************************************************************/
658 /* SIMIX_comm_copy_data callbacks */
659 /******************************************************************************/
660 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
662 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
664 SIMIX_comm_copy_data_callback = callback;
667 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
669 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
671 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
672 *(void **) (comm->dst_buff) = buff;
675 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
677 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
679 XBT_DEBUG("Copy the data over");
680 memcpy(comm->dst_buff, buff, buff_size);
681 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
683 comm->src_buff = nullptr;
689 * @brief Copy the communication data from the sender's buffer to the receiver's one
690 * @param synchro The communication
692 void SIMIX_comm_copy_data(smx_activity_t synchro)
694 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
696 size_t buff_size = comm->src_buff_size;
697 /* If there is no data to copy then return */
698 if (not comm->src_buff || not comm->dst_buff || comm->copied)
701 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
702 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
703 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
705 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
706 if (comm->dst_buff_size)
707 buff_size = MIN(buff_size, *(comm->dst_buff_size));
709 /* Update the receiver's buffer size to the copied amount */
710 if (comm->dst_buff_size)
711 *comm->dst_buff_size = buff_size;
714 if(comm->copy_data_fun)
715 comm->copy_data_fun (comm, comm->src_buff, buff_size);
717 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
721 /* Set the copied flag so we copy data only once */
722 /* (this function might be called from both communication ends) */