1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
73 * \brief set the receiver of the rendez vous point to allow eager sends
74 * \param mbox The rendez-vous point
75 * \param process The receiving process
77 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
79 mbox->permanent_receiver=process;
80 if (mbox->done_comm_queue == nullptr)
81 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
85 * \brief Pushes a communication synchro into a rendez-vous point
86 * \param mbox The mailbox
87 * \param synchro The communication synchro
89 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
91 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
93 mbox->comm_queue->push_back(comm);
98 * \brief Removes a communication synchro from a rendez-vous point
99 * \param mbox The rendez-vous point
100 * \param synchro The communication synchro
102 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
104 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
107 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
109 mbox->comm_queue->erase(it);
112 xbt_die("Cannot remove this comm that is not part of the mailbox");
116 * \brief Checks if there is a communication synchro queued in a deque matching our needs
117 * \param type The type of communication we are looking for (comm_send, comm_recv)
118 * \return The communication synchro if found, NULL otherwise
120 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
121 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
123 void* other_user_data = NULL;
125 for(auto it = deque->begin(); it != deque->end(); it++){
126 smx_synchro_t synchro = *it;
127 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
129 if (comm->type == SIMIX_COMM_SEND) {
130 other_user_data = comm->src_data;
131 } else if (comm->type == SIMIX_COMM_RECEIVE) {
132 other_user_data = comm->dst_data;
134 if (comm->type == type &&
135 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
136 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
137 XBT_DEBUG("Found a matching communication synchro %p", comm);
142 comm->mbox_cpy = comm->mbox;
147 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
148 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
149 comm, (int)comm->type, (int)type);
151 XBT_DEBUG("No matching communication synchro found");
155 /******************************************************************************/
156 /* Communication synchros */
157 /******************************************************************************/
158 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
159 double task_size, double rate,
160 void *src_buff, size_t src_buff_size,
161 int (*match_fun)(void *, void *,smx_synchro_t),
162 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
163 void *data, double timeout){
164 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
165 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
167 SIMCALL_SET_MC_VALUE(simcall, 0);
168 simcall_HANDLER_comm_wait(simcall, comm, timeout);
170 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
171 double task_size, double rate,
172 void *src_buff, size_t src_buff_size,
173 int (*match_fun)(void *, void *,smx_synchro_t),
174 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
175 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
176 void *data, int detached)
178 XBT_DEBUG("send from %p", mbox);
180 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
181 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
183 /* Look for communication synchro matching our needs. We also provide a description of
184 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
186 * If it is not found then push our communication into the rendez-vous point */
187 smx_synchro_t other_synchro =
188 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
189 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
192 if (!other_synchro) {
193 other_synchro = this_synchro;
194 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
196 if (mbox->permanent_receiver!=NULL){
197 //this mailbox is for small messages, which have to be sent right now
198 other_synchro->state = SIMIX_READY;
199 other_comm->dst_proc=mbox->permanent_receiver;
201 mbox->done_comm_queue->push_back(other_synchro);
202 other_comm->mbox=mbox;
203 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
206 SIMIX_mbox_push(mbox, this_synchro);
209 XBT_DEBUG("Receive already pushed");
210 this_synchro->unref();
212 other_comm->state = SIMIX_READY;
213 other_comm->type = SIMIX_COMM_READY;
216 xbt_fifo_push(src_proc->comms, other_synchro);
220 other_comm->detached = true;
221 other_comm->clean_fun = clean_fun;
223 other_comm->clean_fun = NULL;
226 /* Setup the communication synchro */
227 other_comm->src_proc = src_proc;
228 other_comm->task_size = task_size;
229 other_comm->rate = rate;
230 other_comm->src_buff = src_buff;
231 other_comm->src_buff_size = src_buff_size;
232 other_comm->src_data = data;
234 other_comm->match_fun = match_fun;
235 other_comm->copy_data_fun = copy_data_fun;
238 if (MC_is_active() || MC_record_replay_is_active()) {
239 other_comm->state = SIMIX_RUNNING;
240 return (detached ? NULL : other_comm);
243 SIMIX_comm_start(other_comm);
244 return (detached ? NULL : other_comm);
247 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
248 void *dst_buff, size_t *dst_buff_size,
249 int (*match_fun)(void *, void *, smx_synchro_t),
250 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
251 void *data, double timeout, double rate)
253 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
254 SIMCALL_SET_MC_VALUE(simcall, 0);
255 simcall_HANDLER_comm_wait(simcall, comm, timeout);
258 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
259 void *dst_buff, size_t *dst_buff_size,
260 int (*match_fun)(void *, void *, smx_synchro_t),
261 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
262 void *data, double rate)
264 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
267 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
268 int (*match_fun)(void *, void *, smx_synchro_t),
269 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
270 void *data, double rate)
272 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
273 simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
275 smx_synchro_t other_synchro;
276 //communication already done, get it inside the fifo of completed comms
277 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
279 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
280 //find a match in the already received fifo
281 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
282 //if not found, assume the receiver came first, register it to the mailbox in the classical way
283 if (!other_synchro) {
284 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
285 other_synchro = this_synchro;
286 SIMIX_mbox_push(mbox, this_synchro);
288 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
290 if(other_comm->surf_comm && other_comm->remains()==0.0) {
291 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
292 other_comm->state = SIMIX_DONE;
293 other_comm->type = SIMIX_COMM_DONE;
294 other_comm->mbox = NULL;
297 static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
300 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
302 /* Look for communication synchro matching our needs. We also provide a description of
303 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
305 * If it is not found then push our communication into the rendez-vous point */
306 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
308 if (!other_synchro) {
309 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
310 other_synchro = this_synchro;
311 SIMIX_mbox_push(mbox, this_synchro);
313 this_synchro->unref();
314 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
316 other_comm->state = SIMIX_READY;
317 other_comm->type = SIMIX_COMM_READY;
319 xbt_fifo_push(dst_proc->comms, other_synchro);
322 /* Setup communication synchro */
323 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
324 other_comm->dst_proc = dst_proc;
325 other_comm->dst_buff = dst_buff;
326 other_comm->dst_buff_size = dst_buff_size;
327 other_comm->dst_data = data;
329 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
330 other_comm->rate = rate;
332 other_comm->match_fun = match_fun;
333 other_comm->copy_data_fun = copy_data_fun;
335 if (MC_is_active() || MC_record_replay_is_active()) {
336 other_synchro->state = SIMIX_RUNNING;
337 return other_synchro;
340 SIMIX_comm_start(other_synchro);
341 return other_synchro;
344 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
345 int type, int src, int tag,
346 int (*match_fun)(void *, void *, smx_synchro_t),
348 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
351 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
352 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
354 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
355 simgrid::simix::Comm* this_comm;
358 this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
359 smx_type = SIMIX_COMM_RECEIVE;
361 this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
362 smx_type = SIMIX_COMM_SEND;
364 smx_synchro_t other_synchro=NULL;
365 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
366 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
368 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
371 XBT_DEBUG("check if we have more luck in the normal mailbox");
372 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
376 other_synchro->unref();
379 return other_synchro;
382 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
384 /* Associate this simcall to the wait synchro */
385 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
387 xbt_fifo_push(synchro->simcalls, simcall);
388 simcall->issuer->waiting_synchro = synchro;
390 if (MC_is_active() || MC_record_replay_is_active()) {
391 int idx = SIMCALL_GET_MC_VALUE(simcall);
393 synchro->state = SIMIX_DONE;
395 /* If we reached this point, the wait simcall must have a timeout */
396 /* Otherwise it shouldn't be enabled and executed by the MC */
400 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
401 if (comm->src_proc == simcall->issuer)
402 comm->state = SIMIX_SRC_TIMEOUT;
404 comm->state = SIMIX_DST_TIMEOUT;
407 SIMIX_comm_finish(synchro);
411 /* If the synchro has already finish perform the error handling, */
412 /* otherwise set up a waiting timeout on the right side */
413 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
414 SIMIX_comm_finish(synchro);
415 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
416 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
417 sleep->setData(synchro);
419 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
420 if (simcall->issuer == comm->src_proc)
421 comm->src_timeout = sleep;
423 comm->dst_timeout = sleep;
427 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
429 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
431 if (MC_is_active() || MC_record_replay_is_active()){
432 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
433 if (simcall_comm_test__get__result(simcall)){
434 synchro->state = SIMIX_DONE;
435 xbt_fifo_push(synchro->simcalls, simcall);
436 SIMIX_comm_finish(synchro);
438 SIMIX_simcall_answer(simcall);
443 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
444 if (simcall_comm_test__get__result(simcall)) {
445 xbt_fifo_push(synchro->simcalls, simcall);
446 SIMIX_comm_finish(synchro);
448 SIMIX_simcall_answer(simcall);
452 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
455 smx_synchro_t synchro;
456 simcall_comm_testany__set__result(simcall, -1);
458 if (MC_is_active() || MC_record_replay_is_active()){
459 int idx = SIMCALL_GET_MC_VALUE(simcall);
461 SIMIX_simcall_answer(simcall);
463 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
464 simcall_comm_testany__set__result(simcall, idx);
465 xbt_fifo_push(synchro->simcalls, simcall);
466 synchro->state = SIMIX_DONE;
467 SIMIX_comm_finish(synchro);
472 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
473 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
474 simcall_comm_testany__set__result(simcall, cursor);
475 xbt_fifo_push(synchro->simcalls, simcall);
476 SIMIX_comm_finish(synchro);
480 SIMIX_simcall_answer(simcall);
483 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
485 smx_synchro_t synchro;
486 unsigned int cursor = 0;
488 if (MC_is_active() || MC_record_replay_is_active()){
489 int idx = SIMCALL_GET_MC_VALUE(simcall);
490 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
491 xbt_fifo_push(synchro->simcalls, simcall);
492 simcall_comm_waitany__set__result(simcall, idx);
493 synchro->state = SIMIX_DONE;
494 SIMIX_comm_finish(synchro);
498 xbt_dynar_foreach(synchros, cursor, synchro){
499 /* associate this simcall to the the synchro */
500 xbt_fifo_push(synchro->simcalls, simcall);
502 /* see if the synchro is already finished */
503 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
504 SIMIX_comm_finish(synchro);
510 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
512 smx_synchro_t synchro;
513 unsigned int cursor = 0;
514 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
516 xbt_dynar_foreach(synchros, cursor, synchro)
517 xbt_fifo_remove(synchro->simcalls, simcall);
521 * \brief Starts the simulation of a communication synchro.
522 * \param synchro the communication synchro
524 static inline void SIMIX_comm_start(smx_synchro_t synchro)
526 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
528 /* If both the sender and the receiver are already there, start the communication */
529 if (synchro->state == SIMIX_READY) {
531 sg_host_t sender = comm->src_proc->host;
532 sg_host_t receiver = comm->dst_proc->host;
534 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
536 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
537 comm->surf_comm->setData(synchro);
538 comm->state = SIMIX_RUNNING;
540 /* If a link is failed, detect it immediately */
541 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
542 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
543 sg_host_get_name(sender), sg_host_get_name(receiver));
544 comm->state = SIMIX_LINK_FAILURE;
548 /* If any of the process is suspend, create the synchro but stop its execution,
549 it will be restarted when the sender process resume */
550 if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
551 if (SIMIX_process_is_suspended(comm->src_proc))
552 XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
553 comm->src_proc->name, sg_host_get_name(comm->src_proc->host));
555 XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
556 comm->dst_proc->name, sg_host_get_name(comm->dst_proc->host));
558 comm->surf_comm->suspend();
564 * \brief Answers the SIMIX simcalls associated to a communication synchro.
565 * \param synchro a finished communication synchro
567 void SIMIX_comm_finish(smx_synchro_t synchro)
569 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
570 unsigned int destroy_count = 0;
571 smx_simcall_t simcall;
573 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
575 /* If a waitany simcall is waiting for this synchro to finish, then remove
576 it from the other synchros in the waitany list. Afterwards, get the
577 position of the actual synchro in the waitany dynar and
578 return it as the result of the simcall */
580 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
581 continue; // if process handling comm is killed
582 if (simcall->call == SIMCALL_COMM_WAITANY) {
583 SIMIX_waitany_remove_simcall_from_actions(simcall);
584 if (!MC_is_active() && !MC_record_replay_is_active())
585 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
588 /* If the synchro is still in a rendez-vous point then remove from it */
590 SIMIX_mbox_remove(comm->mbox, synchro);
592 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
594 /* Check out for errors */
596 if (simcall->issuer->host->isOff()) {
597 simcall->issuer->context->iwannadie = 1;
598 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
601 switch (synchro->state) {
604 XBT_DEBUG("Communication %p complete!", synchro);
605 SIMIX_comm_copy_data(synchro);
608 case SIMIX_SRC_TIMEOUT:
609 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
612 case SIMIX_DST_TIMEOUT:
613 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
616 case SIMIX_SRC_HOST_FAILURE:
617 if (simcall->issuer == comm->src_proc)
618 simcall->issuer->context->iwannadie = 1;
619 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
621 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
624 case SIMIX_DST_HOST_FAILURE:
625 if (simcall->issuer == comm->dst_proc)
626 simcall->issuer->context->iwannadie = 1;
627 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
629 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
632 case SIMIX_LINK_FAILURE:
634 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
636 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
637 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
638 simcall->issuer->name, simcall->issuer, comm->detached);
639 if (comm->src_proc == simcall->issuer) {
640 XBT_DEBUG("I'm source");
641 } else if (comm->dst_proc == simcall->issuer) {
642 XBT_DEBUG("I'm dest");
644 XBT_DEBUG("I'm neither source nor dest");
646 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
650 if (simcall->issuer == comm->dst_proc)
651 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
653 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
657 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
660 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
661 if (simcall->issuer->doexception) {
662 if (simcall->call == SIMCALL_COMM_WAITANY) {
663 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
665 else if (simcall->call == SIMCALL_COMM_TESTANY) {
666 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
670 if (simcall->issuer->host->isOff()) {
671 simcall->issuer->context->iwannadie = 1;
674 simcall->issuer->waiting_synchro = NULL;
675 xbt_fifo_remove(simcall->issuer->comms, synchro);
677 if(simcall->issuer == comm->src_proc){
679 xbt_fifo_remove(comm->dst_proc->comms, synchro);
681 if(simcall->issuer == comm->dst_proc){
683 xbt_fifo_remove(comm->src_proc->comms, synchro);
684 //in case of a detached comm we have an extra ref to remove, as the sender won't do it
688 SIMIX_simcall_answer(simcall);
692 while (destroy_count-- > 0)
693 static_cast<simgrid::simix::Comm*>(synchro)->unref();
696 /******************************************************************************/
697 /* SIMIX_comm_copy_data callbacks */
698 /******************************************************************************/
699 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
701 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
703 SIMIX_comm_copy_data_callback = callback;
706 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
708 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
710 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
711 *(void **) (comm->dst_buff) = buff;
714 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
716 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
718 XBT_DEBUG("Copy the data over");
719 memcpy(comm->dst_buff, buff, buff_size);
720 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
722 comm->src_buff = NULL;
728 * \brief Copy the communication data from the sender's buffer to the receiver's one
729 * \param comm The communication
731 void SIMIX_comm_copy_data(smx_synchro_t synchro)
733 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
735 size_t buff_size = comm->src_buff_size;
736 /* If there is no data to copy then return */
737 if (!comm->src_buff || !comm->dst_buff || comm->copied)
740 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
742 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
744 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
745 comm->dst_buff, buff_size);
747 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
748 if (comm->dst_buff_size)
749 buff_size = MIN(buff_size, *(comm->dst_buff_size));
751 /* Update the receiver's buffer size to the copied amount */
752 if (comm->dst_buff_size)
753 *comm->dst_buff_size = buff_size;
756 if(comm->copy_data_fun)
757 comm->copy_data_fun (comm, comm->src_buff, buff_size);
759 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
763 /* Set the copied flag so we copy data only once */
764 /* (this function might be called from both communication ends) */