1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
73 * \brief set the receiver of the rendez vous point to allow eager sends
74 * \param mbox The rendez-vous point
75 * \param process The receiving process
77 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
79 mbox->permanent_receiver=process;
80 if (mbox->done_comm_queue == nullptr)
81 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
85 * \brief Pushes a communication synchro into a rendez-vous point
86 * \param mbox The mailbox
87 * \param comm The communication synchro
89 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
91 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
93 mbox->comm_queue->push_back(comm);
98 * \brief Removes a communication synchro from a rendez-vous point
99 * \param mbox The rendez-vous point
100 * \param comm The communication synchro
102 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
104 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
107 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
109 mbox->comm_queue->erase(it);
112 xbt_die("Cannot remove this comm that is not part of the mailbox");
116 * \brief Checks if there is a communication synchro queued in a deque matching our needs
117 * \param type The type of communication we are looking for (comm_send, comm_recv)
118 * \return The communication synchro if found, NULL otherwise
120 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
121 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
123 void* other_user_data = NULL;
125 for(auto it = deque->begin(); it != deque->end(); it++){
126 smx_synchro_t synchro = *it;
127 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
129 if (comm->type == SIMIX_COMM_SEND) {
130 other_user_data = comm->src_data;
131 } else if (comm->type == SIMIX_COMM_RECEIVE) {
132 other_user_data = comm->dst_data;
134 if (comm->type == type &&
135 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
136 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
137 XBT_DEBUG("Found a matching communication synchro %p", comm);
142 comm->mbox_cpy = comm->mbox;
147 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
148 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
149 comm, (int)comm->type, (int)type);
151 XBT_DEBUG("No matching communication synchro found");
155 /******************************************************************************/
156 /* Communication synchros */
157 /******************************************************************************/
160 * \brief Destroy a communicate synchro
161 * \param synchro The communicate synchro to be destroyed
163 void SIMIX_comm_destroy(smx_synchro_t synchro)
165 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
167 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
169 if (comm->refcount <= 0) {
170 xbt_backtrace_display_current();
171 xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
174 if (comm->refcount > 0)
176 XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
178 SIMIX_comm_destroy_internal_actions(synchro);
180 if (comm->detached && comm->state != SIMIX_DONE) {
181 /* the communication has failed and was detached:
182 * we have to free the buffer */
183 if (comm->clean_fun) {
184 comm->clean_fun(comm->src_buff);
186 comm->src_buff = NULL;
190 SIMIX_mbox_remove(comm->mbox, comm);
195 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
197 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
198 if (comm->surf_comm){
199 comm->surf_comm->unref();
200 comm->surf_comm = NULL;
203 if (comm->src_timeout){
204 comm->src_timeout->unref();
205 comm->src_timeout = NULL;
208 if (comm->dst_timeout){
209 comm->dst_timeout->unref();
210 comm->dst_timeout = NULL;
214 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
215 double task_size, double rate,
216 void *src_buff, size_t src_buff_size,
217 int (*match_fun)(void *, void *,smx_synchro_t),
218 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
219 void *data, double timeout){
220 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
221 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
223 SIMCALL_SET_MC_VALUE(simcall, 0);
224 simcall_HANDLER_comm_wait(simcall, comm, timeout);
226 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
227 double task_size, double rate,
228 void *src_buff, size_t src_buff_size,
229 int (*match_fun)(void *, void *,smx_synchro_t),
230 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
231 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
232 void *data, int detached)
234 XBT_DEBUG("send from %p", mbox);
236 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
237 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
239 /* Look for communication synchro matching our needs. We also provide a description of
240 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
242 * If it is not found then push our communication into the rendez-vous point */
243 smx_synchro_t other_synchro =
244 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
245 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
248 if (!other_synchro) {
249 other_synchro = this_synchro;
250 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
252 if (mbox->permanent_receiver!=NULL){
253 //this mailbox is for small messages, which have to be sent right now
254 other_synchro->state = SIMIX_READY;
255 other_comm->dst_proc=mbox->permanent_receiver;
256 other_comm->refcount++;
257 mbox->done_comm_queue->push_back(other_synchro);
258 other_comm->mbox=mbox;
259 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
262 SIMIX_mbox_push(mbox, this_synchro);
265 XBT_DEBUG("Receive already pushed");
267 SIMIX_comm_destroy(this_synchro);
269 other_comm->state = SIMIX_READY;
270 other_comm->type = SIMIX_COMM_READY;
273 xbt_fifo_push(src_proc->comms, other_synchro);
275 /* if the communication synchro is detached then decrease the refcount
276 * by one, so it will be eliminated by the receiver's destroy call */
278 other_comm->detached = 1;
279 other_comm->refcount--;
280 other_comm->clean_fun = clean_fun;
282 other_comm->clean_fun = NULL;
285 /* Setup the communication synchro */
286 other_comm->src_proc = src_proc;
287 other_comm->task_size = task_size;
288 other_comm->rate = rate;
289 other_comm->src_buff = src_buff;
290 other_comm->src_buff_size = src_buff_size;
291 other_comm->src_data = data;
293 other_comm->match_fun = match_fun;
294 other_comm->copy_data_fun = copy_data_fun;
297 if (MC_is_active() || MC_record_replay_is_active()) {
298 other_comm->state = SIMIX_RUNNING;
299 return (detached ? NULL : other_comm);
302 SIMIX_comm_start(other_comm);
303 return (detached ? NULL : other_comm);
306 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
307 void *dst_buff, size_t *dst_buff_size,
308 int (*match_fun)(void *, void *, smx_synchro_t),
309 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
310 void *data, double timeout, double rate)
312 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
313 SIMCALL_SET_MC_VALUE(simcall, 0);
314 simcall_HANDLER_comm_wait(simcall, comm, timeout);
317 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
318 void *dst_buff, size_t *dst_buff_size,
319 int (*match_fun)(void *, void *, smx_synchro_t),
320 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
321 void *data, double rate)
323 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
326 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
327 int (*match_fun)(void *, void *, smx_synchro_t),
328 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
329 void *data, double rate)
331 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
332 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
334 smx_synchro_t other_synchro;
335 //communication already done, get it inside the fifo of completed comms
336 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
338 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
339 //find a match in the already received fifo
340 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
341 //if not found, assume the receiver came first, register it to the mailbox in the classical way
342 if (!other_synchro) {
343 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
344 other_synchro = this_synchro;
345 SIMIX_mbox_push(mbox, this_synchro);
347 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
349 if(other_comm->surf_comm && other_comm->remains()==0.0) {
350 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
351 other_comm->state = SIMIX_DONE;
352 other_comm->type = SIMIX_COMM_DONE;
353 other_comm->mbox = NULL;
355 other_comm->refcount--;
356 SIMIX_comm_destroy(this_synchro);
359 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
361 /* Look for communication synchro matching our needs. We also provide a description of
362 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
364 * If it is not found then push our communication into the rendez-vous point */
365 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
367 if (!other_synchro) {
368 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
369 other_synchro = this_synchro;
370 SIMIX_mbox_push(mbox, this_synchro);
372 SIMIX_comm_destroy(this_synchro);
373 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
375 other_comm->state = SIMIX_READY;
376 other_comm->type = SIMIX_COMM_READY;
378 xbt_fifo_push(dst_proc->comms, other_synchro);
381 /* Setup communication synchro */
382 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
383 other_comm->dst_proc = dst_proc;
384 other_comm->dst_buff = dst_buff;
385 other_comm->dst_buff_size = dst_buff_size;
386 other_comm->dst_data = data;
388 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
389 other_comm->rate = rate;
391 other_comm->match_fun = match_fun;
392 other_comm->copy_data_fun = copy_data_fun;
394 if (MC_is_active() || MC_record_replay_is_active()) {
395 other_synchro->state = SIMIX_RUNNING;
396 return other_synchro;
399 SIMIX_comm_start(other_synchro);
400 return other_synchro;
403 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
404 int type, int src, int tag,
405 int (*match_fun)(void *, void *, smx_synchro_t),
407 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
410 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
411 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
413 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
414 smx_synchro_t this_synchro;
417 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
418 smx_type = SIMIX_COMM_RECEIVE;
420 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
421 smx_type = SIMIX_COMM_SEND;
423 smx_synchro_t other_synchro=NULL;
424 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
425 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
427 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
430 XBT_DEBUG("check if we have more luck in the normal mailbox");
431 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
435 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
436 other_comm->refcount--;
439 SIMIX_comm_destroy(this_synchro);
440 return other_synchro;
443 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
445 /* the simcall may be a wait, a send or a recv */
448 /* Associate this simcall to the wait synchro */
449 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
451 xbt_fifo_push(synchro->simcalls, simcall);
452 simcall->issuer->waiting_synchro = synchro;
454 if (MC_is_active() || MC_record_replay_is_active()) {
455 int idx = SIMCALL_GET_MC_VALUE(simcall);
457 synchro->state = SIMIX_DONE;
459 /* If we reached this point, the wait simcall must have a timeout */
460 /* Otherwise it shouldn't be enabled and executed by the MC */
464 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
465 if (comm->src_proc == simcall->issuer)
466 comm->state = SIMIX_SRC_TIMEOUT;
468 comm->state = SIMIX_DST_TIMEOUT;
471 SIMIX_comm_finish(synchro);
475 /* If the synchro has already finish perform the error handling, */
476 /* otherwise set up a waiting timeout on the right side */
477 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
478 SIMIX_comm_finish(synchro);
479 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
480 sleep = surf_host_sleep(simcall->issuer->host, timeout);
481 sleep->setData(synchro);
483 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
484 if (simcall->issuer == comm->src_proc)
485 comm->src_timeout = sleep;
487 comm->dst_timeout = sleep;
491 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
493 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
495 if (MC_is_active() || MC_record_replay_is_active()){
496 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
497 if (simcall_comm_test__get__result(simcall)){
498 synchro->state = SIMIX_DONE;
499 xbt_fifo_push(synchro->simcalls, simcall);
500 SIMIX_comm_finish(synchro);
502 SIMIX_simcall_answer(simcall);
507 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
508 if (simcall_comm_test__get__result(simcall)) {
509 xbt_fifo_push(synchro->simcalls, simcall);
510 SIMIX_comm_finish(synchro);
512 SIMIX_simcall_answer(simcall);
516 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
519 smx_synchro_t synchro;
520 simcall_comm_testany__set__result(simcall, -1);
522 if (MC_is_active() || MC_record_replay_is_active()){
523 int idx = SIMCALL_GET_MC_VALUE(simcall);
525 SIMIX_simcall_answer(simcall);
527 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
528 simcall_comm_testany__set__result(simcall, idx);
529 xbt_fifo_push(synchro->simcalls, simcall);
530 synchro->state = SIMIX_DONE;
531 SIMIX_comm_finish(synchro);
536 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
537 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
538 simcall_comm_testany__set__result(simcall, cursor);
539 xbt_fifo_push(synchro->simcalls, simcall);
540 SIMIX_comm_finish(synchro);
544 SIMIX_simcall_answer(simcall);
547 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
549 smx_synchro_t synchro;
550 unsigned int cursor = 0;
552 if (MC_is_active() || MC_record_replay_is_active()){
553 int idx = SIMCALL_GET_MC_VALUE(simcall);
554 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
555 xbt_fifo_push(synchro->simcalls, simcall);
556 simcall_comm_waitany__set__result(simcall, idx);
557 synchro->state = SIMIX_DONE;
558 SIMIX_comm_finish(synchro);
562 xbt_dynar_foreach(synchros, cursor, synchro){
563 /* associate this simcall to the the synchro */
564 xbt_fifo_push(synchro->simcalls, simcall);
566 /* see if the synchro is already finished */
567 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
568 SIMIX_comm_finish(synchro);
574 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
576 smx_synchro_t synchro;
577 unsigned int cursor = 0;
578 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
580 xbt_dynar_foreach(synchros, cursor, synchro)
581 xbt_fifo_remove(synchro->simcalls, simcall);
585 * \brief Starts the simulation of a communication synchro.
586 * \param synchro the communication synchro
588 static inline void SIMIX_comm_start(smx_synchro_t synchro)
590 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
592 /* If both the sender and the receiver are already there, start the communication */
593 if (synchro->state == SIMIX_READY) {
595 sg_host_t sender = comm->src_proc->host;
596 sg_host_t receiver = comm->dst_proc->host;
598 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
600 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
601 comm->surf_comm->setData(synchro);
602 comm->state = SIMIX_RUNNING;
604 /* If a link is failed, detect it immediately */
605 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
606 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
607 sg_host_get_name(sender), sg_host_get_name(receiver));
608 comm->state = SIMIX_LINK_FAILURE;
609 SIMIX_comm_destroy_internal_actions(synchro);
612 /* If any of the process is suspend, create the synchro but stop its execution,
613 it will be restarted when the sender process resume */
614 if (SIMIX_process_is_suspended(comm->src_proc) ||
615 SIMIX_process_is_suspended(comm->dst_proc)) {
616 /* FIXME: check what should happen with the synchro state */
618 if (SIMIX_process_is_suspended(comm->src_proc))
619 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
620 sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
622 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
623 sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
625 comm->surf_comm->suspend();
631 * \brief Answers the SIMIX simcalls associated to a communication synchro.
632 * \param synchro a finished communication synchro
634 void SIMIX_comm_finish(smx_synchro_t synchro)
636 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
637 unsigned int destroy_count = 0;
638 smx_simcall_t simcall;
640 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
642 /* If a waitany simcall is waiting for this synchro to finish, then remove
643 it from the other synchros in the waitany list. Afterwards, get the
644 position of the actual synchro in the waitany dynar and
645 return it as the result of the simcall */
647 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
648 continue; // if process handling comm is killed
649 if (simcall->call == SIMCALL_COMM_WAITANY) {
650 SIMIX_waitany_remove_simcall_from_actions(simcall);
651 if (!MC_is_active() && !MC_record_replay_is_active())
652 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
655 /* If the synchro is still in a rendez-vous point then remove from it */
657 SIMIX_mbox_remove(comm->mbox, synchro);
659 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
661 /* Check out for errors */
663 if (simcall->issuer->host->isOff()) {
664 simcall->issuer->context->iwannadie = 1;
665 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
668 switch (synchro->state) {
671 XBT_DEBUG("Communication %p complete!", synchro);
672 SIMIX_comm_copy_data(synchro);
675 case SIMIX_SRC_TIMEOUT:
676 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
679 case SIMIX_DST_TIMEOUT:
680 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
683 case SIMIX_SRC_HOST_FAILURE:
684 if (simcall->issuer == comm->src_proc)
685 simcall->issuer->context->iwannadie = 1;
686 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
688 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
691 case SIMIX_DST_HOST_FAILURE:
692 if (simcall->issuer == comm->dst_proc)
693 simcall->issuer->context->iwannadie = 1;
694 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
696 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
699 case SIMIX_LINK_FAILURE:
701 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
703 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
704 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
705 simcall->issuer->name, simcall->issuer, comm->detached);
706 if (comm->src_proc == simcall->issuer) {
707 XBT_DEBUG("I'm source");
708 } else if (comm->dst_proc == simcall->issuer) {
709 XBT_DEBUG("I'm dest");
711 XBT_DEBUG("I'm neither source nor dest");
713 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
717 if (simcall->issuer == comm->dst_proc)
718 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
720 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
724 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
727 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
728 if (simcall->issuer->doexception) {
729 if (simcall->call == SIMCALL_COMM_WAITANY) {
730 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
732 else if (simcall->call == SIMCALL_COMM_TESTANY) {
733 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
737 if (simcall->issuer->host->isOff()) {
738 simcall->issuer->context->iwannadie = 1;
741 simcall->issuer->waiting_synchro = NULL;
742 xbt_fifo_remove(simcall->issuer->comms, synchro);
744 if(simcall->issuer == comm->src_proc){
746 xbt_fifo_remove(comm->dst_proc->comms, synchro);
748 if(simcall->issuer == comm->dst_proc){
750 xbt_fifo_remove(comm->src_proc->comms, synchro);
753 SIMIX_simcall_answer(simcall);
757 while (destroy_count-- > 0)
758 SIMIX_comm_destroy(synchro);
762 * \brief This function is called when a Surf communication synchro is finished.
763 * \param synchro the corresponding Simix communication
765 void SIMIX_post_comm(smx_synchro_t synchro)
767 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
769 /* Update synchro state */
770 if (comm->src_timeout &&
771 comm->src_timeout->getState() == simgrid::surf::Action::State::done)
772 synchro->state = SIMIX_SRC_TIMEOUT;
773 else if (comm->dst_timeout &&
774 comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
775 synchro->state = SIMIX_DST_TIMEOUT;
776 else if (comm->src_timeout &&
777 comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
778 synchro->state = SIMIX_SRC_HOST_FAILURE;
779 else if (comm->dst_timeout &&
780 comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
781 synchro->state = SIMIX_DST_HOST_FAILURE;
782 else if (comm->surf_comm &&
783 comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
784 synchro->state = SIMIX_LINK_FAILURE;
786 synchro->state = SIMIX_DONE;
788 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
789 comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
791 /* destroy the surf actions associated with the Simix communication */
792 SIMIX_comm_destroy_internal_actions(comm);
794 /* if there are simcalls associated with the synchro, then answer them */
795 if (xbt_fifo_size(synchro->simcalls)) {
796 SIMIX_comm_finish(comm);
800 /************* synchro Getters **************/
803 * \brief Return the user data associated to the sender of the communication
804 * \param synchro The communication
805 * \return the user data
807 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
809 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
811 return comm->src_data;
815 * \brief Return the user data associated to the receiver of the communication
816 * \param synchro The communication
817 * \return the user data
819 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
821 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
823 return comm->dst_data;
826 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
828 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
830 return comm->src_proc;
833 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
835 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
837 return comm->dst_proc;
840 /******************************************************************************/
841 /* SIMIX_comm_copy_data callbacks */
842 /******************************************************************************/
843 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
845 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
847 SIMIX_comm_copy_data_callback = callback;
850 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
852 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
854 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
855 *(void **) (comm->dst_buff) = buff;
858 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
860 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
862 XBT_DEBUG("Copy the data over");
863 memcpy(comm->dst_buff, buff, buff_size);
864 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
866 comm->src_buff = NULL;
872 * \brief Copy the communication data from the sender's buffer to the receiver's one
873 * \param comm The communication
875 void SIMIX_comm_copy_data(smx_synchro_t synchro)
877 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
879 size_t buff_size = comm->src_buff_size;
880 /* If there is no data to copy then return */
881 if (!comm->src_buff || !comm->dst_buff || comm->copied)
884 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
886 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
888 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
889 comm->dst_buff, buff_size);
891 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
892 if (comm->dst_buff_size)
893 buff_size = MIN(buff_size, *(comm->dst_buff_size));
895 /* Update the receiver's buffer size to the copied amount */
896 if (comm->dst_buff_size)
897 *comm->dst_buff_size = buff_size;
900 if(comm->copy_data_fun)
901 comm->copy_data_fun (comm, comm->src_buff, buff_size);
903 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
907 /* Set the copied flag so we copy data only once */
908 /* (this function might be called from both communication ends) */