1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
72 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
74 return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
78 * \brief get the receiver (process associated to the mailbox)
79 * \param mbox The rendez-vous point
80 * \return process The receiving process (NULL if not set)
82 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
84 return mbox->permanent_receiver;
88 * \brief set the receiver of the rendez vous point to allow eager sends
89 * \param mbox The rendez-vous point
90 * \param process The receiving process
92 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
94 mbox->permanent_receiver=process;
95 if (mbox->done_comm_queue == nullptr)
96 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
100 * \brief Pushes a communication synchro into a rendez-vous point
101 * \param mbox The mailbox
102 * \param comm The communication synchro
104 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
106 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
108 mbox->comm_queue->push_back(comm);
113 * \brief Removes a communication synchro from a rendez-vous point
114 * \param mbox The rendez-vous point
115 * \param comm The communication synchro
117 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
119 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
122 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
124 mbox->comm_queue->erase(it);
127 xbt_die("Cannot remove this comm that is not part of the mailbox");
131 * \brief Checks if there is a communication synchro queued in a deque matching our needs
132 * \param type The type of communication we are looking for (comm_send, comm_recv)
133 * \return The communication synchro if found, NULL otherwise
135 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
136 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
138 void* other_user_data = NULL;
140 for(auto it = deque->begin(); it != deque->end(); it++){
141 smx_synchro_t synchro = *it;
142 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
144 if (comm->type == SIMIX_COMM_SEND) {
145 other_user_data = comm->src_data;
146 } else if (comm->type == SIMIX_COMM_RECEIVE) {
147 other_user_data = comm->dst_data;
149 if (comm->type == type &&
150 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
151 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
152 XBT_DEBUG("Found a matching communication synchro %p", comm);
157 comm->mbox_cpy = comm->mbox;
162 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
163 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
164 comm, (int)comm->type, (int)type);
166 XBT_DEBUG("No matching communication synchro found");
170 /******************************************************************************/
171 /* Communication synchros */
172 /******************************************************************************/
175 * \brief Destroy a communicate synchro
176 * \param synchro The communicate synchro to be destroyed
178 void SIMIX_comm_destroy(smx_synchro_t synchro)
180 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
182 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
184 if (comm->refcount <= 0) {
185 xbt_backtrace_display_current();
186 xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
189 if (comm->refcount > 0)
191 XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
193 SIMIX_comm_destroy_internal_actions(synchro);
195 if (comm->detached && comm->state != SIMIX_DONE) {
196 /* the communication has failed and was detached:
197 * we have to free the buffer */
198 if (comm->clean_fun) {
199 comm->clean_fun(comm->src_buff);
201 comm->src_buff = NULL;
205 SIMIX_mbox_remove(comm->mbox, comm);
210 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
212 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
213 if (comm->surf_comm){
214 comm->surf_comm->unref();
215 comm->surf_comm = NULL;
218 if (comm->src_timeout){
219 comm->src_timeout->unref();
220 comm->src_timeout = NULL;
223 if (comm->dst_timeout){
224 comm->dst_timeout->unref();
225 comm->dst_timeout = NULL;
229 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
230 double task_size, double rate,
231 void *src_buff, size_t src_buff_size,
232 int (*match_fun)(void *, void *,smx_synchro_t),
233 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
234 void *data, double timeout){
235 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
236 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
238 SIMCALL_SET_MC_VALUE(simcall, 0);
239 simcall_HANDLER_comm_wait(simcall, comm, timeout);
241 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
242 double task_size, double rate,
243 void *src_buff, size_t src_buff_size,
244 int (*match_fun)(void *, void *,smx_synchro_t),
245 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
246 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
247 void *data, int detached)
249 XBT_DEBUG("send from %p", mbox);
251 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
252 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
254 /* Look for communication synchro matching our needs. We also provide a description of
255 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
257 * If it is not found then push our communication into the rendez-vous point */
258 smx_synchro_t other_synchro =
259 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
260 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
263 if (!other_synchro) {
264 other_synchro = this_synchro;
265 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
267 if (mbox->permanent_receiver!=NULL){
268 //this mailbox is for small messages, which have to be sent right now
269 other_synchro->state = SIMIX_READY;
270 other_comm->dst_proc=mbox->permanent_receiver;
271 other_comm->refcount++;
272 mbox->done_comm_queue->push_back(other_synchro);
273 other_comm->mbox=mbox;
274 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
277 SIMIX_mbox_push(mbox, this_synchro);
280 XBT_DEBUG("Receive already pushed");
282 SIMIX_comm_destroy(this_synchro);
284 other_comm->state = SIMIX_READY;
285 other_comm->type = SIMIX_COMM_READY;
288 xbt_fifo_push(src_proc->comms, other_synchro);
290 /* if the communication synchro is detached then decrease the refcount
291 * by one, so it will be eliminated by the receiver's destroy call */
293 other_comm->detached = 1;
294 other_comm->refcount--;
295 other_comm->clean_fun = clean_fun;
297 other_comm->clean_fun = NULL;
300 /* Setup the communication synchro */
301 other_comm->src_proc = src_proc;
302 other_comm->task_size = task_size;
303 other_comm->rate = rate;
304 other_comm->src_buff = src_buff;
305 other_comm->src_buff_size = src_buff_size;
306 other_comm->src_data = data;
308 other_comm->match_fun = match_fun;
309 other_comm->copy_data_fun = copy_data_fun;
312 if (MC_is_active() || MC_record_replay_is_active()) {
313 other_comm->state = SIMIX_RUNNING;
314 return (detached ? NULL : other_comm);
317 SIMIX_comm_start(other_comm);
318 return (detached ? NULL : other_comm);
321 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
322 void *dst_buff, size_t *dst_buff_size,
323 int (*match_fun)(void *, void *, smx_synchro_t),
324 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
325 void *data, double timeout, double rate)
327 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
328 SIMCALL_SET_MC_VALUE(simcall, 0);
329 simcall_HANDLER_comm_wait(simcall, comm, timeout);
332 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
333 void *dst_buff, size_t *dst_buff_size,
334 int (*match_fun)(void *, void *, smx_synchro_t),
335 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
336 void *data, double rate)
338 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
341 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
342 int (*match_fun)(void *, void *, smx_synchro_t),
343 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
344 void *data, double rate)
346 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
347 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
349 smx_synchro_t other_synchro;
350 //communication already done, get it inside the fifo of completed comms
351 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
353 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
354 //find a match in the already received fifo
355 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
356 //if not found, assume the receiver came first, register it to the mailbox in the classical way
357 if (!other_synchro) {
358 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
359 other_synchro = this_synchro;
360 SIMIX_mbox_push(mbox, this_synchro);
362 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
364 if(other_comm->surf_comm && SIMIX_comm_get_remains(other_comm)==0.0) {
365 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
366 other_comm->state = SIMIX_DONE;
367 other_comm->type = SIMIX_COMM_DONE;
368 other_comm->mbox = NULL;
370 other_comm->refcount--;
371 SIMIX_comm_destroy(this_synchro);
374 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
376 /* Look for communication synchro matching our needs. We also provide a description of
377 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
379 * If it is not found then push our communication into the rendez-vous point */
380 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
382 if (!other_synchro) {
383 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
384 other_synchro = this_synchro;
385 SIMIX_mbox_push(mbox, this_synchro);
387 SIMIX_comm_destroy(this_synchro);
388 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
390 other_comm->state = SIMIX_READY;
391 other_comm->type = SIMIX_COMM_READY;
393 xbt_fifo_push(dst_proc->comms, other_synchro);
396 /* Setup communication synchro */
397 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
398 other_comm->dst_proc = dst_proc;
399 other_comm->dst_buff = dst_buff;
400 other_comm->dst_buff_size = dst_buff_size;
401 other_comm->dst_data = data;
403 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
404 other_comm->rate = rate;
406 other_comm->match_fun = match_fun;
407 other_comm->copy_data_fun = copy_data_fun;
409 if (MC_is_active() || MC_record_replay_is_active()) {
410 other_synchro->state = SIMIX_RUNNING;
411 return other_synchro;
414 SIMIX_comm_start(other_synchro);
415 return other_synchro;
418 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
419 int type, int src, int tag,
420 int (*match_fun)(void *, void *, smx_synchro_t),
422 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
425 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
426 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
428 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
429 smx_synchro_t this_synchro;
432 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
433 smx_type = SIMIX_COMM_RECEIVE;
435 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
436 smx_type = SIMIX_COMM_SEND;
438 smx_synchro_t other_synchro=NULL;
439 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
440 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
442 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
445 XBT_DEBUG("check if we have more luck in the normal mailbox");
446 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
450 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
451 other_comm->refcount--;
454 SIMIX_comm_destroy(this_synchro);
455 return other_synchro;
458 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
460 /* the simcall may be a wait, a send or a recv */
463 /* Associate this simcall to the wait synchro */
464 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
466 xbt_fifo_push(synchro->simcalls, simcall);
467 simcall->issuer->waiting_synchro = synchro;
469 if (MC_is_active() || MC_record_replay_is_active()) {
470 int idx = SIMCALL_GET_MC_VALUE(simcall);
472 synchro->state = SIMIX_DONE;
474 /* If we reached this point, the wait simcall must have a timeout */
475 /* Otherwise it shouldn't be enabled and executed by the MC */
479 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
480 if (comm->src_proc == simcall->issuer)
481 comm->state = SIMIX_SRC_TIMEOUT;
483 comm->state = SIMIX_DST_TIMEOUT;
486 SIMIX_comm_finish(synchro);
490 /* If the synchro has already finish perform the error handling, */
491 /* otherwise set up a waiting timeout on the right side */
492 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
493 SIMIX_comm_finish(synchro);
494 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
495 sleep = surf_host_sleep(simcall->issuer->host, timeout);
496 sleep->setData(synchro);
498 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
499 if (simcall->issuer == comm->src_proc)
500 comm->src_timeout = sleep;
502 comm->dst_timeout = sleep;
506 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
508 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
510 if (MC_is_active() || MC_record_replay_is_active()){
511 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
512 if (simcall_comm_test__get__result(simcall)){
513 synchro->state = SIMIX_DONE;
514 xbt_fifo_push(synchro->simcalls, simcall);
515 SIMIX_comm_finish(synchro);
517 SIMIX_simcall_answer(simcall);
522 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
523 if (simcall_comm_test__get__result(simcall)) {
524 xbt_fifo_push(synchro->simcalls, simcall);
525 SIMIX_comm_finish(synchro);
527 SIMIX_simcall_answer(simcall);
531 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
534 smx_synchro_t synchro;
535 simcall_comm_testany__set__result(simcall, -1);
537 if (MC_is_active() || MC_record_replay_is_active()){
538 int idx = SIMCALL_GET_MC_VALUE(simcall);
540 SIMIX_simcall_answer(simcall);
542 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
543 simcall_comm_testany__set__result(simcall, idx);
544 xbt_fifo_push(synchro->simcalls, simcall);
545 synchro->state = SIMIX_DONE;
546 SIMIX_comm_finish(synchro);
551 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
552 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
553 simcall_comm_testany__set__result(simcall, cursor);
554 xbt_fifo_push(synchro->simcalls, simcall);
555 SIMIX_comm_finish(synchro);
559 SIMIX_simcall_answer(simcall);
562 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
564 smx_synchro_t synchro;
565 unsigned int cursor = 0;
567 if (MC_is_active() || MC_record_replay_is_active()){
568 int idx = SIMCALL_GET_MC_VALUE(simcall);
569 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
570 xbt_fifo_push(synchro->simcalls, simcall);
571 simcall_comm_waitany__set__result(simcall, idx);
572 synchro->state = SIMIX_DONE;
573 SIMIX_comm_finish(synchro);
577 xbt_dynar_foreach(synchros, cursor, synchro){
578 /* associate this simcall to the the synchro */
579 xbt_fifo_push(synchro->simcalls, simcall);
581 /* see if the synchro is already finished */
582 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
583 SIMIX_comm_finish(synchro);
589 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
591 smx_synchro_t synchro;
592 unsigned int cursor = 0;
593 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
595 xbt_dynar_foreach(synchros, cursor, synchro)
596 xbt_fifo_remove(synchro->simcalls, simcall);
600 * \brief Starts the simulation of a communication synchro.
601 * \param synchro the communication synchro
603 static inline void SIMIX_comm_start(smx_synchro_t synchro)
605 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
607 /* If both the sender and the receiver are already there, start the communication */
608 if (synchro->state == SIMIX_READY) {
610 sg_host_t sender = comm->src_proc->host;
611 sg_host_t receiver = comm->dst_proc->host;
613 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
615 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
616 comm->surf_comm->setData(synchro);
617 comm->state = SIMIX_RUNNING;
619 /* If a link is failed, detect it immediately */
620 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
621 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
622 sg_host_get_name(sender), sg_host_get_name(receiver));
623 comm->state = SIMIX_LINK_FAILURE;
624 SIMIX_comm_destroy_internal_actions(synchro);
627 /* If any of the process is suspend, create the synchro but stop its execution,
628 it will be restarted when the sender process resume */
629 if (SIMIX_process_is_suspended(comm->src_proc) ||
630 SIMIX_process_is_suspended(comm->dst_proc)) {
631 /* FIXME: check what should happen with the synchro state */
633 if (SIMIX_process_is_suspended(comm->src_proc))
634 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
635 sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
637 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
638 sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
640 comm->surf_comm->suspend();
646 * \brief Answers the SIMIX simcalls associated to a communication synchro.
647 * \param synchro a finished communication synchro
649 void SIMIX_comm_finish(smx_synchro_t synchro)
651 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
652 unsigned int destroy_count = 0;
653 smx_simcall_t simcall;
655 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
657 /* If a waitany simcall is waiting for this synchro to finish, then remove
658 it from the other synchros in the waitany list. Afterwards, get the
659 position of the actual synchro in the waitany dynar and
660 return it as the result of the simcall */
662 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
663 continue; // if process handling comm is killed
664 if (simcall->call == SIMCALL_COMM_WAITANY) {
665 SIMIX_waitany_remove_simcall_from_actions(simcall);
666 if (!MC_is_active() && !MC_record_replay_is_active())
667 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
670 /* If the synchro is still in a rendez-vous point then remove from it */
672 SIMIX_mbox_remove(comm->mbox, synchro);
674 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
676 /* Check out for errors */
678 if (simcall->issuer->host->isOff()) {
679 simcall->issuer->context->iwannadie = 1;
680 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
683 switch (synchro->state) {
686 XBT_DEBUG("Communication %p complete!", synchro);
687 SIMIX_comm_copy_data(synchro);
690 case SIMIX_SRC_TIMEOUT:
691 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
694 case SIMIX_DST_TIMEOUT:
695 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
698 case SIMIX_SRC_HOST_FAILURE:
699 if (simcall->issuer == comm->src_proc)
700 simcall->issuer->context->iwannadie = 1;
701 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
703 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
706 case SIMIX_DST_HOST_FAILURE:
707 if (simcall->issuer == comm->dst_proc)
708 simcall->issuer->context->iwannadie = 1;
709 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
711 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
714 case SIMIX_LINK_FAILURE:
716 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
718 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
719 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
720 simcall->issuer->name, simcall->issuer, comm->detached);
721 if (comm->src_proc == simcall->issuer) {
722 XBT_DEBUG("I'm source");
723 } else if (comm->dst_proc == simcall->issuer) {
724 XBT_DEBUG("I'm dest");
726 XBT_DEBUG("I'm neither source nor dest");
728 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
732 if (simcall->issuer == comm->dst_proc)
733 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
735 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
739 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
742 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
743 if (simcall->issuer->doexception) {
744 if (simcall->call == SIMCALL_COMM_WAITANY) {
745 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
747 else if (simcall->call == SIMCALL_COMM_TESTANY) {
748 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
752 if (simcall->issuer->host->isOff()) {
753 simcall->issuer->context->iwannadie = 1;
756 simcall->issuer->waiting_synchro = NULL;
757 xbt_fifo_remove(simcall->issuer->comms, synchro);
759 if(simcall->issuer == comm->src_proc){
761 xbt_fifo_remove(comm->dst_proc->comms, synchro);
763 if(simcall->issuer == comm->dst_proc){
765 xbt_fifo_remove(comm->src_proc->comms, synchro);
768 SIMIX_simcall_answer(simcall);
772 while (destroy_count-- > 0)
773 SIMIX_comm_destroy(synchro);
777 * \brief This function is called when a Surf communication synchro is finished.
778 * \param synchro the corresponding Simix communication
780 void SIMIX_post_comm(smx_synchro_t synchro)
782 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
784 /* Update synchro state */
785 if (comm->src_timeout &&
786 comm->src_timeout->getState() == simgrid::surf::Action::State::done)
787 synchro->state = SIMIX_SRC_TIMEOUT;
788 else if (comm->dst_timeout &&
789 comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
790 synchro->state = SIMIX_DST_TIMEOUT;
791 else if (comm->src_timeout &&
792 comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
793 synchro->state = SIMIX_SRC_HOST_FAILURE;
794 else if (comm->dst_timeout &&
795 comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
796 synchro->state = SIMIX_DST_HOST_FAILURE;
797 else if (comm->surf_comm &&
798 comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
799 synchro->state = SIMIX_LINK_FAILURE;
801 synchro->state = SIMIX_DONE;
803 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
804 comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
806 /* destroy the surf actions associated with the Simix communication */
807 SIMIX_comm_destroy_internal_actions(comm);
809 /* if there are simcalls associated with the synchro, then answer them */
810 if (xbt_fifo_size(synchro->simcalls)) {
811 SIMIX_comm_finish(comm);
815 /************* synchro Getters **************/
818 * \brief get the amount remaining from the communication
819 * \param synchro The communication
821 double SIMIX_comm_get_remains(smx_synchro_t synchro)
825 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
828 switch (synchro->state) {
831 remains = comm->surf_comm->getRemains();
836 remains = 0; /*FIXME: check what should be returned */
840 remains = 0; /*FIXME: is this correct? */
847 * \brief Return the user data associated to the sender of the communication
848 * \param synchro The communication
849 * \return the user data
851 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
853 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
855 return comm->src_data;
859 * \brief Return the user data associated to the receiver of the communication
860 * \param synchro The communication
861 * \return the user data
863 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
865 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
867 return comm->dst_data;
870 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
872 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
874 return comm->src_proc;
877 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
879 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
881 return comm->dst_proc;
884 /******************************************************************************/
885 /* SIMIX_comm_copy_data callbacks */
886 /******************************************************************************/
887 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
889 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
891 SIMIX_comm_copy_data_callback = callback;
894 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
896 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
898 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
899 *(void **) (comm->dst_buff) = buff;
902 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
904 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
906 XBT_DEBUG("Copy the data over");
907 memcpy(comm->dst_buff, buff, buff_size);
908 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
910 comm->src_buff = NULL;
916 * \brief Copy the communication data from the sender's buffer to the receiver's one
917 * \param comm The communication
919 void SIMIX_comm_copy_data(smx_synchro_t synchro)
921 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
923 size_t buff_size = comm->src_buff_size;
924 /* If there is no data to copy then return */
925 if (!comm->src_buff || !comm->dst_buff || comm->copied)
928 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
930 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
932 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
933 comm->dst_buff, buff_size);
935 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
936 if (comm->dst_buff_size)
937 buff_size = MIN(buff_size, *(comm->dst_buff_size));
939 /* Update the receiver's buffer size to the copied amount */
940 if (comm->dst_buff_size)
941 *comm->dst_buff_size = buff_size;
944 if(comm->copy_data_fun)
945 comm->copy_data_fun (comm, comm->src_buff, buff_size);
947 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
951 /* Set the copied flag so we copy data only once */
952 /* (this function might be called from both communication ends) */