1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
73 * \brief set the receiver of the rendez vous point to allow eager sends
74 * \param mbox The rendez-vous point
75 * \param process The receiving process
77 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
79 mbox->permanent_receiver=process;
80 if (mbox->done_comm_queue == nullptr)
81 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
85 * \brief Pushes a communication synchro into a rendez-vous point
86 * \param mbox The mailbox
87 * \param comm The communication synchro
89 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
91 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
93 mbox->comm_queue->push_back(comm);
98 * \brief Removes a communication synchro from a rendez-vous point
99 * \param mbox The rendez-vous point
100 * \param comm The communication synchro
102 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
104 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
107 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
109 mbox->comm_queue->erase(it);
112 xbt_die("Cannot remove this comm that is not part of the mailbox");
116 * \brief Checks if there is a communication synchro queued in a deque matching our needs
117 * \param type The type of communication we are looking for (comm_send, comm_recv)
118 * \return The communication synchro if found, NULL otherwise
120 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
121 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
123 void* other_user_data = NULL;
125 for(auto it = deque->begin(); it != deque->end(); it++){
126 smx_synchro_t synchro = *it;
127 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
129 if (comm->type == SIMIX_COMM_SEND) {
130 other_user_data = comm->src_data;
131 } else if (comm->type == SIMIX_COMM_RECEIVE) {
132 other_user_data = comm->dst_data;
134 if (comm->type == type &&
135 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
136 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
137 XBT_DEBUG("Found a matching communication synchro %p", comm);
142 comm->mbox_cpy = comm->mbox;
147 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
148 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
149 comm, (int)comm->type, (int)type);
151 XBT_DEBUG("No matching communication synchro found");
155 /******************************************************************************/
156 /* Communication synchros */
157 /******************************************************************************/
158 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
159 double task_size, double rate,
160 void *src_buff, size_t src_buff_size,
161 int (*match_fun)(void *, void *,smx_synchro_t),
162 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
163 void *data, double timeout){
164 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
165 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
167 SIMCALL_SET_MC_VALUE(simcall, 0);
168 simcall_HANDLER_comm_wait(simcall, comm, timeout);
170 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
171 double task_size, double rate,
172 void *src_buff, size_t src_buff_size,
173 int (*match_fun)(void *, void *,smx_synchro_t),
174 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
175 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
176 void *data, int detached)
178 XBT_DEBUG("send from %p", mbox);
180 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
181 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
183 /* Look for communication synchro matching our needs. We also provide a description of
184 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
186 * If it is not found then push our communication into the rendez-vous point */
187 smx_synchro_t other_synchro =
188 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
189 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
192 if (!other_synchro) {
193 other_synchro = this_synchro;
194 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
196 if (mbox->permanent_receiver!=NULL){
197 //this mailbox is for small messages, which have to be sent right now
198 other_synchro->state = SIMIX_READY;
199 other_comm->dst_proc=mbox->permanent_receiver;
201 mbox->done_comm_queue->push_back(other_synchro);
202 other_comm->mbox=mbox;
203 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
206 SIMIX_mbox_push(mbox, this_synchro);
209 XBT_DEBUG("Receive already pushed");
210 this_synchro->unref();
212 other_comm->state = SIMIX_READY;
213 other_comm->type = SIMIX_COMM_READY;
216 xbt_fifo_push(src_proc->comms, other_synchro);
218 /* if the communication synchro is detached then decrease the refcount
219 * by one, so it will be eliminated by the receiver's destroy call */
221 other_comm->detached = true;
223 other_comm->clean_fun = clean_fun;
225 other_comm->clean_fun = NULL;
228 /* Setup the communication synchro */
229 other_comm->src_proc = src_proc;
230 other_comm->task_size = task_size;
231 other_comm->rate = rate;
232 other_comm->src_buff = src_buff;
233 other_comm->src_buff_size = src_buff_size;
234 other_comm->src_data = data;
236 other_comm->match_fun = match_fun;
237 other_comm->copy_data_fun = copy_data_fun;
240 if (MC_is_active() || MC_record_replay_is_active()) {
241 other_comm->state = SIMIX_RUNNING;
242 return (detached ? NULL : other_comm);
245 SIMIX_comm_start(other_comm);
246 return (detached ? NULL : other_comm);
249 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
250 void *dst_buff, size_t *dst_buff_size,
251 int (*match_fun)(void *, void *, smx_synchro_t),
252 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
253 void *data, double timeout, double rate)
255 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
256 SIMCALL_SET_MC_VALUE(simcall, 0);
257 simcall_HANDLER_comm_wait(simcall, comm, timeout);
260 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
261 void *dst_buff, size_t *dst_buff_size,
262 int (*match_fun)(void *, void *, smx_synchro_t),
263 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
264 void *data, double rate)
266 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
269 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
270 int (*match_fun)(void *, void *, smx_synchro_t),
271 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
272 void *data, double rate)
274 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
275 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
277 smx_synchro_t other_synchro;
278 //communication already done, get it inside the fifo of completed comms
279 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
281 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
282 //find a match in the already received fifo
283 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
284 //if not found, assume the receiver came first, register it to the mailbox in the classical way
285 if (!other_synchro) {
286 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
287 other_synchro = this_synchro;
288 SIMIX_mbox_push(mbox, this_synchro);
290 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
292 if(other_comm->surf_comm && other_comm->remains()==0.0) {
293 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
294 other_comm->state = SIMIX_DONE;
295 other_comm->type = SIMIX_COMM_DONE;
296 other_comm->mbox = NULL;
299 static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
302 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
304 /* Look for communication synchro matching our needs. We also provide a description of
305 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
307 * If it is not found then push our communication into the rendez-vous point */
308 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
310 if (!other_synchro) {
311 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
312 other_synchro = this_synchro;
313 SIMIX_mbox_push(mbox, this_synchro);
315 this_synchro->unref();
316 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
318 other_comm->state = SIMIX_READY;
319 other_comm->type = SIMIX_COMM_READY;
321 xbt_fifo_push(dst_proc->comms, other_synchro);
324 /* Setup communication synchro */
325 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
326 other_comm->dst_proc = dst_proc;
327 other_comm->dst_buff = dst_buff;
328 other_comm->dst_buff_size = dst_buff_size;
329 other_comm->dst_data = data;
331 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
332 other_comm->rate = rate;
334 other_comm->match_fun = match_fun;
335 other_comm->copy_data_fun = copy_data_fun;
337 if (MC_is_active() || MC_record_replay_is_active()) {
338 other_synchro->state = SIMIX_RUNNING;
339 return other_synchro;
342 SIMIX_comm_start(other_synchro);
343 return other_synchro;
346 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
347 int type, int src, int tag,
348 int (*match_fun)(void *, void *, smx_synchro_t),
350 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
353 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
354 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
356 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
357 simgrid::simix::Comm* this_comm;
360 this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
361 smx_type = SIMIX_COMM_RECEIVE;
363 this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
364 smx_type = SIMIX_COMM_SEND;
366 smx_synchro_t other_synchro=NULL;
367 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
368 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
370 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
373 XBT_DEBUG("check if we have more luck in the normal mailbox");
374 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
378 other_synchro->unref();
381 return other_synchro;
384 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
386 /* Associate this simcall to the wait synchro */
387 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
389 xbt_fifo_push(synchro->simcalls, simcall);
390 simcall->issuer->waiting_synchro = synchro;
392 if (MC_is_active() || MC_record_replay_is_active()) {
393 int idx = SIMCALL_GET_MC_VALUE(simcall);
395 synchro->state = SIMIX_DONE;
397 /* If we reached this point, the wait simcall must have a timeout */
398 /* Otherwise it shouldn't be enabled and executed by the MC */
402 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
403 if (comm->src_proc == simcall->issuer)
404 comm->state = SIMIX_SRC_TIMEOUT;
406 comm->state = SIMIX_DST_TIMEOUT;
409 SIMIX_comm_finish(synchro);
413 /* If the synchro has already finish perform the error handling, */
414 /* otherwise set up a waiting timeout on the right side */
415 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
416 SIMIX_comm_finish(synchro);
417 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
418 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
419 sleep->setData(synchro);
421 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
422 if (simcall->issuer == comm->src_proc)
423 comm->src_timeout = sleep;
425 comm->dst_timeout = sleep;
429 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
431 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
433 if (MC_is_active() || MC_record_replay_is_active()){
434 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
435 if (simcall_comm_test__get__result(simcall)){
436 synchro->state = SIMIX_DONE;
437 xbt_fifo_push(synchro->simcalls, simcall);
438 SIMIX_comm_finish(synchro);
440 SIMIX_simcall_answer(simcall);
445 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
446 if (simcall_comm_test__get__result(simcall)) {
447 xbt_fifo_push(synchro->simcalls, simcall);
448 SIMIX_comm_finish(synchro);
450 SIMIX_simcall_answer(simcall);
454 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
457 smx_synchro_t synchro;
458 simcall_comm_testany__set__result(simcall, -1);
460 if (MC_is_active() || MC_record_replay_is_active()){
461 int idx = SIMCALL_GET_MC_VALUE(simcall);
463 SIMIX_simcall_answer(simcall);
465 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
466 simcall_comm_testany__set__result(simcall, idx);
467 xbt_fifo_push(synchro->simcalls, simcall);
468 synchro->state = SIMIX_DONE;
469 SIMIX_comm_finish(synchro);
474 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
475 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
476 simcall_comm_testany__set__result(simcall, cursor);
477 xbt_fifo_push(synchro->simcalls, simcall);
478 SIMIX_comm_finish(synchro);
482 SIMIX_simcall_answer(simcall);
485 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
487 smx_synchro_t synchro;
488 unsigned int cursor = 0;
490 if (MC_is_active() || MC_record_replay_is_active()){
491 int idx = SIMCALL_GET_MC_VALUE(simcall);
492 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
493 xbt_fifo_push(synchro->simcalls, simcall);
494 simcall_comm_waitany__set__result(simcall, idx);
495 synchro->state = SIMIX_DONE;
496 SIMIX_comm_finish(synchro);
500 xbt_dynar_foreach(synchros, cursor, synchro){
501 /* associate this simcall to the the synchro */
502 xbt_fifo_push(synchro->simcalls, simcall);
504 /* see if the synchro is already finished */
505 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
506 SIMIX_comm_finish(synchro);
512 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
514 smx_synchro_t synchro;
515 unsigned int cursor = 0;
516 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
518 xbt_dynar_foreach(synchros, cursor, synchro)
519 xbt_fifo_remove(synchro->simcalls, simcall);
523 * \brief Starts the simulation of a communication synchro.
524 * \param synchro the communication synchro
526 static inline void SIMIX_comm_start(smx_synchro_t synchro)
528 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
530 /* If both the sender and the receiver are already there, start the communication */
531 if (synchro->state == SIMIX_READY) {
533 sg_host_t sender = comm->src_proc->host;
534 sg_host_t receiver = comm->dst_proc->host;
536 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
538 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
539 comm->surf_comm->setData(synchro);
540 comm->state = SIMIX_RUNNING;
542 /* If a link is failed, detect it immediately */
543 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
544 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
545 sg_host_get_name(sender), sg_host_get_name(receiver));
546 comm->state = SIMIX_LINK_FAILURE;
550 /* If any of the process is suspend, create the synchro but stop its execution,
551 it will be restarted when the sender process resume */
552 if (SIMIX_process_is_suspended(comm->src_proc) ||
553 SIMIX_process_is_suspended(comm->dst_proc)) {
554 /* FIXME: check what should happen with the synchro state */
556 if (SIMIX_process_is_suspended(comm->src_proc))
557 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
558 sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
560 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
561 sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
563 comm->surf_comm->suspend();
569 * \brief Answers the SIMIX simcalls associated to a communication synchro.
570 * \param synchro a finished communication synchro
572 void SIMIX_comm_finish(smx_synchro_t synchro)
574 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
575 unsigned int destroy_count = 0;
576 smx_simcall_t simcall;
578 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
580 /* If a waitany simcall is waiting for this synchro to finish, then remove
581 it from the other synchros in the waitany list. Afterwards, get the
582 position of the actual synchro in the waitany dynar and
583 return it as the result of the simcall */
585 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
586 continue; // if process handling comm is killed
587 if (simcall->call == SIMCALL_COMM_WAITANY) {
588 SIMIX_waitany_remove_simcall_from_actions(simcall);
589 if (!MC_is_active() && !MC_record_replay_is_active())
590 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
593 /* If the synchro is still in a rendez-vous point then remove from it */
595 SIMIX_mbox_remove(comm->mbox, synchro);
597 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
599 /* Check out for errors */
601 if (simcall->issuer->host->isOff()) {
602 simcall->issuer->context->iwannadie = 1;
603 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
606 switch (synchro->state) {
609 XBT_DEBUG("Communication %p complete!", synchro);
610 SIMIX_comm_copy_data(synchro);
613 case SIMIX_SRC_TIMEOUT:
614 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
617 case SIMIX_DST_TIMEOUT:
618 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
621 case SIMIX_SRC_HOST_FAILURE:
622 if (simcall->issuer == comm->src_proc)
623 simcall->issuer->context->iwannadie = 1;
624 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
626 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
629 case SIMIX_DST_HOST_FAILURE:
630 if (simcall->issuer == comm->dst_proc)
631 simcall->issuer->context->iwannadie = 1;
632 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
634 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
637 case SIMIX_LINK_FAILURE:
639 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
641 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
642 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
643 simcall->issuer->name, simcall->issuer, comm->detached);
644 if (comm->src_proc == simcall->issuer) {
645 XBT_DEBUG("I'm source");
646 } else if (comm->dst_proc == simcall->issuer) {
647 XBT_DEBUG("I'm dest");
649 XBT_DEBUG("I'm neither source nor dest");
651 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
655 if (simcall->issuer == comm->dst_proc)
656 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
658 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
662 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
665 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
666 if (simcall->issuer->doexception) {
667 if (simcall->call == SIMCALL_COMM_WAITANY) {
668 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
670 else if (simcall->call == SIMCALL_COMM_TESTANY) {
671 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
675 if (simcall->issuer->host->isOff()) {
676 simcall->issuer->context->iwannadie = 1;
679 simcall->issuer->waiting_synchro = NULL;
680 xbt_fifo_remove(simcall->issuer->comms, synchro);
682 if(simcall->issuer == comm->src_proc){
684 xbt_fifo_remove(comm->dst_proc->comms, synchro);
686 if(simcall->issuer == comm->dst_proc){
688 xbt_fifo_remove(comm->src_proc->comms, synchro);
691 SIMIX_simcall_answer(simcall);
695 while (destroy_count-- > 0)
696 static_cast<simgrid::simix::Comm*>(synchro)->unref();
700 * \brief This function is called when a Surf communication synchro is finished.
701 * \param synchro the corresponding Simix communication
703 void SIMIX_post_comm(smx_synchro_t synchro)
705 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
707 /* Update synchro state */
708 if (comm->src_timeout &&
709 comm->src_timeout->getState() == simgrid::surf::Action::State::done)
710 synchro->state = SIMIX_SRC_TIMEOUT;
711 else if (comm->dst_timeout &&
712 comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
713 synchro->state = SIMIX_DST_TIMEOUT;
714 else if (comm->src_timeout &&
715 comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
716 synchro->state = SIMIX_SRC_HOST_FAILURE;
717 else if (comm->dst_timeout &&
718 comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
719 synchro->state = SIMIX_DST_HOST_FAILURE;
720 else if (comm->surf_comm &&
721 comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
722 synchro->state = SIMIX_LINK_FAILURE;
724 synchro->state = SIMIX_DONE;
726 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
727 comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
729 /* destroy the surf actions associated with the Simix communication */
732 /* if there are simcalls associated with the synchro, then answer them */
733 if (xbt_fifo_size(synchro->simcalls)) {
734 SIMIX_comm_finish(comm);
738 /******************************************************************************/
739 /* SIMIX_comm_copy_data callbacks */
740 /******************************************************************************/
741 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
743 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
745 SIMIX_comm_copy_data_callback = callback;
748 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
750 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
752 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
753 *(void **) (comm->dst_buff) = buff;
756 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
758 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
760 XBT_DEBUG("Copy the data over");
761 memcpy(comm->dst_buff, buff, buff_size);
762 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
764 comm->src_buff = NULL;
770 * \brief Copy the communication data from the sender's buffer to the receiver's one
771 * \param comm The communication
773 void SIMIX_comm_copy_data(smx_synchro_t synchro)
775 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
777 size_t buff_size = comm->src_buff_size;
778 /* If there is no data to copy then return */
779 if (!comm->src_buff || !comm->dst_buff || comm->copied)
782 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
784 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
786 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
787 comm->dst_buff, buff_size);
789 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
790 if (comm->dst_buff_size)
791 buff_size = MIN(buff_size, *(comm->dst_buff_size));
793 /* Update the receiver's buffer size to the copied amount */
794 if (comm->dst_buff_size)
795 *comm->dst_buff_size = buff_size;
798 if(comm->copy_data_fun)
799 comm->copy_data_fun (comm, comm->src_buff, buff_size);
801 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
805 /* Set the copied flag so we copy data only once */
806 /* (this function might be called from both communication ends) */