1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include <boost/range/algorithm.hpp>
12 #include <simgrid/s4u/host.hpp>
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
23 #include "src/kernel/activity/SynchroComm.hpp"
24 #include "src/surf/network_interface.hpp"
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
28 static void SIMIX_mbox_free(void *data);
29 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
31 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
32 static void SIMIX_comm_copy_data(smx_activity_t comm);
33 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t comm);
34 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
35 int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
36 static void SIMIX_comm_start(smx_activity_t synchro);
38 void SIMIX_mailbox_exit()
40 xbt_dict_free(&mailboxes);
43 /******************************************************************************/
44 /* Rendez-Vous Points */
45 /******************************************************************************/
47 smx_mailbox_t SIMIX_mbox_create(const char *name)
49 xbt_assert(name, "Mailboxes must have a name");
50 /* two processes may have pushed the same mbox_create simcall at the same time */
51 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
53 mbox = new simgrid::simix::Mailbox(name);
54 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55 xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
60 void SIMIX_mbox_free(void *data)
62 XBT_DEBUG("mbox free %p", data);
63 smx_mailbox_t mbox = static_cast<smx_mailbox_t>(data);
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
73 * \brief set the receiver of the rendez vous point to allow eager sends
74 * \param mbox The rendez-vous point
75 * \param process The receiving process
77 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_actor_t process)
79 mbox->permanent_receiver = process;
83 * \brief Pushes a communication synchro into a rendez-vous point
84 * \param mbox The mailbox
85 * \param synchro The communication synchro
87 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro)
89 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
90 mbox->comm_queue.push_back(comm);
95 * \brief Removes a communication synchro from a rendez-vous point
96 * \param mbox The rendez-vous point
97 * \param synchro The communication synchro
99 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t synchro)
101 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
103 comm->mbox = nullptr;
104 for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
106 mbox->comm_queue. erase(it);
109 xbt_die("Cannot remove the comm %p that is not part of the mailbox %s",comm, mbox->name);
113 * \brief Checks if there is a communication synchro queued in a deque matching our needs
114 * \param type The type of communication we are looking for (comm_send, comm_recv)
115 * \return The communication synchro if found, nullptr otherwise
117 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
118 int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
120 void* other_user_data = nullptr;
122 for(auto it = deque->begin(); it != deque->end(); it++){
123 smx_activity_t synchro = *it;
124 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
126 if (comm->type == SIMIX_COMM_SEND) {
127 other_user_data = comm->src_data;
128 } else if (comm->type == SIMIX_COMM_RECEIVE) {
129 other_user_data = comm->dst_data;
131 if (comm->type == type &&
132 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
133 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
134 XBT_DEBUG("Found a matching communication synchro %p", comm);
139 comm->mbox_cpy = comm->mbox;
141 comm->mbox = nullptr;
144 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
145 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
146 comm, (int)comm->type, (int)type);
148 XBT_DEBUG("No matching communication synchro found");
152 /******************************************************************************/
153 /* Communication synchros */
154 /******************************************************************************/
155 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
156 double task_size, double rate,
157 void *src_buff, size_t src_buff_size,
158 int (*match_fun)(void *, void *,smx_activity_t),
159 void (*copy_data_fun)(smx_activity_t, void*, size_t),
160 void *data, double timeout){
161 smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
162 src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
164 SIMCALL_SET_MC_VALUE(simcall, 0);
165 simcall_HANDLER_comm_wait(simcall, comm, timeout);
167 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
168 double task_size, double rate,
169 void *src_buff, size_t src_buff_size,
170 int (*match_fun)(void *, void *,smx_activity_t),
171 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
172 void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
173 void *data, int detached)
175 XBT_DEBUG("send from %p", mbox);
177 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
178 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
180 /* Look for communication synchro matching our needs. We also provide a description of
181 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
183 * If it is not found then push our communication into the rendez-vous point */
184 smx_activity_t other_synchro =
185 _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
186 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
189 if (!other_synchro) {
190 other_synchro = this_synchro;
191 other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
193 if (mbox->permanent_receiver!=nullptr){
194 //this mailbox is for small messages, which have to be sent right now
195 other_synchro->state = SIMIX_READY;
196 other_comm->dst_proc=mbox->permanent_receiver.get();
198 mbox->done_comm_queue.push_back(other_synchro);
199 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
202 SIMIX_mbox_push(mbox, this_synchro);
205 XBT_DEBUG("Receive already pushed");
206 this_synchro->unref();
208 other_comm->state = SIMIX_READY;
209 other_comm->type = SIMIX_COMM_READY;
212 xbt_fifo_push(src_proc->comms, other_synchro);
216 other_comm->detached = true;
217 other_comm->clean_fun = clean_fun;
219 other_comm->clean_fun = nullptr;
222 /* Setup the communication synchro */
223 other_comm->src_proc = src_proc;
224 other_comm->task_size = task_size;
225 other_comm->rate = rate;
226 other_comm->src_buff = src_buff;
227 other_comm->src_buff_size = src_buff_size;
228 other_comm->src_data = data;
230 other_comm->match_fun = match_fun;
231 other_comm->copy_data_fun = copy_data_fun;
234 if (MC_is_active() || MC_record_replay_is_active()) {
235 other_comm->state = SIMIX_RUNNING;
236 return (detached ? nullptr : other_comm);
239 SIMIX_comm_start(other_comm);
240 return (detached ? nullptr : other_comm);
243 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
244 void *dst_buff, size_t *dst_buff_size,
245 int (*match_fun)(void *, void *, smx_activity_t),
246 void (*copy_data_fun)(smx_activity_t, void*, size_t),
247 void *data, double timeout, double rate)
249 smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
250 SIMCALL_SET_MC_VALUE(simcall, 0);
251 simcall_HANDLER_comm_wait(simcall, comm, timeout);
254 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
255 void *dst_buff, size_t *dst_buff_size,
256 int (*match_fun)(void *, void *, smx_activity_t),
257 void (*copy_data_fun)(smx_activity_t, void*, size_t),
258 void *data, double rate)
260 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
263 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
264 int (*match_fun)(void *, void *, smx_activity_t),
265 void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
266 void *data, double rate)
268 XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
269 simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
271 smx_activity_t other_synchro;
272 //communication already done, get it inside the fifo of completed comms
273 if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
275 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
276 //find a match in the already received fifo
277 other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
278 //if not found, assume the receiver came first, register it to the mailbox in the classical way
279 if (!other_synchro) {
280 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
281 other_synchro = this_synchro;
282 SIMIX_mbox_push(mbox, this_synchro);
284 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
286 if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
287 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
288 other_comm->state = SIMIX_DONE;
289 other_comm->type = SIMIX_COMM_DONE;
290 other_comm->mbox = nullptr;
293 static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
296 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
298 /* Look for communication synchro matching our needs. We also provide a description of
299 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
301 * If it is not found then push our communication into the rendez-vous point */
302 other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
304 if (!other_synchro) {
305 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
306 other_synchro = this_synchro;
307 SIMIX_mbox_push(mbox, this_synchro);
309 this_synchro->unref();
310 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
312 other_comm->state = SIMIX_READY;
313 other_comm->type = SIMIX_COMM_READY;
315 xbt_fifo_push(dst_proc->comms, other_synchro);
318 /* Setup communication synchro */
319 simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
320 other_comm->dst_proc = dst_proc;
321 other_comm->dst_buff = dst_buff;
322 other_comm->dst_buff_size = dst_buff_size;
323 other_comm->dst_data = data;
325 if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
326 other_comm->rate = rate;
328 other_comm->match_fun = match_fun;
329 other_comm->copy_data_fun = copy_data_fun;
331 if (MC_is_active() || MC_record_replay_is_active()) {
332 other_synchro->state = SIMIX_RUNNING;
333 return other_synchro;
336 SIMIX_comm_start(other_synchro);
337 return other_synchro;
340 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
341 int type, int src, int tag,
342 int (*match_fun)(void *, void *, smx_activity_t),
344 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
347 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
348 int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
350 XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
351 simgrid::kernel::activity::Comm* this_comm;
354 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
355 smx_type = SIMIX_COMM_RECEIVE;
357 this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
358 smx_type = SIMIX_COMM_SEND;
360 smx_activity_t other_synchro=nullptr;
361 if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
362 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
363 other_synchro = _find_matching_comm(&mbox->done_comm_queue,
364 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
367 XBT_DEBUG("check if we have more luck in the normal mailbox");
368 other_synchro = _find_matching_comm(&mbox->comm_queue,
369 (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
373 other_synchro->unref();
376 return other_synchro;
379 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
381 /* Associate this simcall to the wait synchro */
382 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
384 synchro->simcalls.push_back(simcall);
385 simcall->issuer->waiting_synchro = synchro;
387 if (MC_is_active() || MC_record_replay_is_active()) {
388 int idx = SIMCALL_GET_MC_VALUE(simcall);
390 synchro->state = SIMIX_DONE;
392 /* If we reached this point, the wait simcall must have a timeout */
393 /* Otherwise it shouldn't be enabled and executed by the MC */
397 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
398 if (comm->src_proc == simcall->issuer)
399 comm->state = SIMIX_SRC_TIMEOUT;
401 comm->state = SIMIX_DST_TIMEOUT;
404 SIMIX_comm_finish(synchro);
408 /* If the synchro has already finish perform the error handling, */
409 /* otherwise set up a waiting timeout on the right side */
410 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
411 SIMIX_comm_finish(synchro);
412 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
413 surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
414 sleep->setData(synchro);
416 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
417 if (simcall->issuer == comm->src_proc)
418 comm->src_timeout = sleep;
420 comm->dst_timeout = sleep;
424 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
426 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
428 if (MC_is_active() || MC_record_replay_is_active()){
429 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
430 if (simcall_comm_test__get__result(simcall)){
431 synchro->state = SIMIX_DONE;
432 synchro->simcalls.push_back(simcall);
433 SIMIX_comm_finish(synchro);
435 SIMIX_simcall_answer(simcall);
440 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
441 if (simcall_comm_test__get__result(simcall)) {
442 synchro->simcalls.push_back(simcall);
443 SIMIX_comm_finish(synchro);
445 SIMIX_simcall_answer(simcall);
449 void simcall_HANDLER_comm_testany(
450 smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
452 // The default result is -1 -- this means, "nothing is ready".
453 // It can be changed below, but only if something matches.
454 simcall_comm_testany__set__result(simcall, -1);
456 if (MC_is_active() || MC_record_replay_is_active()){
457 int idx = SIMCALL_GET_MC_VALUE(simcall);
459 SIMIX_simcall_answer(simcall);
461 simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
462 simcall_comm_testany__set__result(simcall, idx);
463 synchro->simcalls.push_back(simcall);
464 synchro->state = SIMIX_DONE;
465 SIMIX_comm_finish(synchro);
470 for (std::size_t i = 0; i != count; ++i) {
471 simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
472 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
473 simcall_comm_testany__set__result(simcall, i);
474 synchro->simcalls.push_back(simcall);
475 SIMIX_comm_finish(synchro);
479 SIMIX_simcall_answer(simcall);
482 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
484 smx_activity_t synchro;
485 unsigned int cursor = 0;
487 if (MC_is_active() || MC_record_replay_is_active()){
489 xbt_die("Timeout not implemented for waitany in the model-checker");
490 int idx = SIMCALL_GET_MC_VALUE(simcall);
491 synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
492 synchro->simcalls.push_back(simcall);
493 simcall_comm_waitany__set__result(simcall, idx);
494 synchro->state = SIMIX_DONE;
495 SIMIX_comm_finish(synchro);
500 simcall->timer = NULL;
502 simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
503 SIMIX_waitany_remove_simcall_from_actions(simcall);
504 simcall_comm_waitany__set__result(simcall, -1);
505 SIMIX_simcall_answer(simcall);
509 xbt_dynar_foreach(synchros, cursor, synchro){
510 /* associate this simcall to the the synchro */
511 synchro->simcalls.push_back(simcall);
513 /* see if the synchro is already finished */
514 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
515 SIMIX_comm_finish(synchro);
521 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
523 smx_activity_t synchro;
524 unsigned int cursor = 0;
525 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
527 xbt_dynar_foreach(synchros, cursor, synchro) {
528 // Remove the first occurence of simcall:
529 auto i = boost::range::find(synchro->simcalls, simcall);
530 if (i != synchro->simcalls.end())
531 synchro->simcalls.erase(i);
536 * \brief Starts the simulation of a communication synchro.
537 * \param synchro the communication synchro
539 static inline void SIMIX_comm_start(smx_activity_t synchro)
541 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
543 /* If both the sender and the receiver are already there, start the communication */
544 if (synchro->state == SIMIX_READY) {
546 simgrid::s4u::Host* sender = comm->src_proc->host;
547 simgrid::s4u::Host* receiver = comm->dst_proc->host;
549 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
551 comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
552 comm->surf_comm->setData(synchro);
553 comm->state = SIMIX_RUNNING;
555 /* If a link is failed, detect it immediately */
556 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
557 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
559 comm->state = SIMIX_LINK_FAILURE;
563 /* If any of the process is suspend, create the synchro but stop its execution,
564 it will be restarted when the sender process resume */
565 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
566 if (SIMIX_process_is_suspended(comm->src_proc))
567 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
569 comm->src_proc->cname(), comm->src_proc->host->cname());
571 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
573 comm->dst_proc->cname(), comm->dst_proc->host->cname());
575 comm->surf_comm->suspend();
581 * \brief Answers the SIMIX simcalls associated to a communication synchro.
582 * \param synchro a finished communication synchro
584 void SIMIX_comm_finish(smx_activity_t synchro)
586 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
587 unsigned int destroy_count = 0;
589 while (!synchro->simcalls.empty()) {
590 smx_simcall_t simcall = synchro->simcalls.front();
591 synchro->simcalls.pop_front();
593 /* If a waitany simcall is waiting for this synchro to finish, then remove
594 it from the other synchros in the waitany list. Afterwards, get the
595 position of the actual synchro in the waitany dynar and
596 return it as the result of the simcall */
598 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
599 continue; // if process handling comm is killed
600 if (simcall->call == SIMCALL_COMM_WAITANY) {
601 SIMIX_waitany_remove_simcall_from_actions(simcall);
602 if (simcall->timer) {
603 SIMIX_timer_remove(simcall->timer);
604 simcall->timer = nullptr;
606 if (!MC_is_active() && !MC_record_replay_is_active())
607 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
610 /* If the synchro is still in a rendez-vous point then remove from it */
612 SIMIX_mbox_remove(comm->mbox, synchro);
614 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
616 /* Check out for errors */
618 if (simcall->issuer->host->isOff()) {
619 simcall->issuer->context->iwannadie = 1;
620 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
622 switch (synchro->state) {
625 XBT_DEBUG("Communication %p complete!", synchro);
626 SIMIX_comm_copy_data(synchro);
629 case SIMIX_SRC_TIMEOUT:
630 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
633 case SIMIX_DST_TIMEOUT:
634 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
637 case SIMIX_SRC_HOST_FAILURE:
638 if (simcall->issuer == comm->src_proc)
639 simcall->issuer->context->iwannadie = 1;
640 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
642 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
645 case SIMIX_DST_HOST_FAILURE:
646 if (simcall->issuer == comm->dst_proc)
647 simcall->issuer->context->iwannadie = 1;
648 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
650 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
653 case SIMIX_LINK_FAILURE:
656 "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
657 synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
658 comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
660 if (comm->src_proc == simcall->issuer) {
661 XBT_DEBUG("I'm source");
662 } else if (comm->dst_proc == simcall->issuer) {
663 XBT_DEBUG("I'm dest");
665 XBT_DEBUG("I'm neither source nor dest");
667 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
671 if (simcall->issuer == comm->dst_proc)
672 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
674 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
678 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
682 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
683 if (simcall->issuer->exception) {
684 // In order to modify the exception we have to rethrow it:
686 std::rethrow_exception(simcall->issuer->exception);
689 if (simcall->call == SIMCALL_COMM_WAITANY) {
690 e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
692 else if (simcall->call == SIMCALL_COMM_TESTANY) {
694 auto comms = simcall_comm_testany__get__comms(simcall);
695 auto count = simcall_comm_testany__get__count(simcall);
696 auto element = std::find(comms, comms + count, synchro);
697 if (element == comms + count)
700 e.value = element - comms;
702 simcall->issuer->exception = std::make_exception_ptr(e);
709 if (simcall->issuer->host->isOff()) {
710 simcall->issuer->context->iwannadie = 1;
713 simcall->issuer->waiting_synchro = nullptr;
714 xbt_fifo_remove(simcall->issuer->comms, synchro);
716 if(simcall->issuer == comm->src_proc){
718 xbt_fifo_remove(comm->dst_proc->comms, synchro);
720 if(simcall->issuer == comm->dst_proc){
722 xbt_fifo_remove(comm->src_proc->comms, synchro);
723 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
727 SIMIX_simcall_answer(simcall);
731 while (destroy_count-- > 0)
732 static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
735 /******************************************************************************/
736 /* SIMIX_comm_copy_data callbacks */
737 /******************************************************************************/
738 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
740 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
742 SIMIX_comm_copy_data_callback = callback;
745 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
747 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
749 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
750 *(void **) (comm->dst_buff) = buff;
753 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
755 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
757 XBT_DEBUG("Copy the data over");
758 memcpy(comm->dst_buff, buff, buff_size);
759 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
761 comm->src_buff = nullptr;
767 * @brief Copy the communication data from the sender's buffer to the receiver's one
768 * @param synchro The communication
770 void SIMIX_comm_copy_data(smx_activity_t synchro)
772 simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
774 size_t buff_size = comm->src_buff_size;
775 /* If there is no data to copy then return */
776 if (!comm->src_buff || !comm->dst_buff || comm->copied)
779 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
780 comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
781 comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
783 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
784 if (comm->dst_buff_size)
785 buff_size = MIN(buff_size, *(comm->dst_buff_size));
787 /* Update the receiver's buffer size to the copied amount */
788 if (comm->dst_buff_size)
789 *comm->dst_buff_size = buff_size;
792 if(comm->copy_data_fun)
793 comm->copy_data_fun (comm, comm->src_buff, buff_size);
795 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
799 /* Set the copied flag so we copy data only once */
800 /* (this function might be called from both communication ends) */