1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
72 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
74 return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
78 * \brief set the receiver of the rendez vous point to allow eager sends
79 * \param mbox The rendez-vous point
80 * \param process The receiving process
82 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
84 mbox->permanent_receiver=process;
85 if (mbox->done_comm_queue == nullptr)
86 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
90 * \brief Pushes a communication synchro into a rendez-vous point
91 * \param mbox The mailbox
92 * \param comm The communication synchro
94 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
96 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
98 mbox->comm_queue->push_back(comm);
103 * \brief Removes a communication synchro from a rendez-vous point
104 * \param mbox The rendez-vous point
105 * \param comm The communication synchro
107 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
109 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
112 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
114 mbox->comm_queue->erase(it);
117 xbt_die("Cannot remove this comm that is not part of the mailbox");
121 * \brief Checks if there is a communication synchro queued in a deque matching our needs
122 * \param type The type of communication we are looking for (comm_send, comm_recv)
123 * \return The communication synchro if found, NULL otherwise
125 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
126 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
128 void* other_user_data = NULL;
130 for(auto it = deque->begin(); it != deque->end(); it++){
131 smx_synchro_t synchro = *it;
132 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
134 if (comm->type == SIMIX_COMM_SEND) {
135 other_user_data = comm->src_data;
136 } else if (comm->type == SIMIX_COMM_RECEIVE) {
137 other_user_data = comm->dst_data;
139 if (comm->type == type &&
140 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
141 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
142 XBT_DEBUG("Found a matching communication synchro %p", comm);
147 comm->mbox_cpy = comm->mbox;
152 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
153 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
154 comm, (int)comm->type, (int)type);
156 XBT_DEBUG("No matching communication synchro found");
160 /******************************************************************************/
161 /* Communication synchros */
162 /******************************************************************************/
165 * \brief Destroy a communicate synchro
166 * \param synchro The communicate synchro to be destroyed
168 void SIMIX_comm_destroy(smx_synchro_t synchro)
170 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
172 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
174 if (comm->refcount <= 0) {
175 xbt_backtrace_display_current();
176 xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
179 if (comm->refcount > 0)
181 XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
183 SIMIX_comm_destroy_internal_actions(synchro);
185 if (comm->detached && comm->state != SIMIX_DONE) {
186 /* the communication has failed and was detached:
187 * we have to free the buffer */
188 if (comm->clean_fun) {
189 comm->clean_fun(comm->src_buff);
191 comm->src_buff = NULL;
195 SIMIX_mbox_remove(comm->mbox, comm);
200 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
202 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
203 if (comm->surf_comm){
204 comm->surf_comm->unref();
205 comm->surf_comm = NULL;
208 if (comm->src_timeout){
209 comm->src_timeout->unref();
210 comm->src_timeout = NULL;
213 if (comm->dst_timeout){
214 comm->dst_timeout->unref();
215 comm->dst_timeout = NULL;
219 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
220 double task_size, double rate,
221 void *src_buff, size_t src_buff_size,
222 int (*match_fun)(void *, void *,smx_synchro_t),
223 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
224 void *data, double timeout){
225 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
226 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
228 SIMCALL_SET_MC_VALUE(simcall, 0);
229 simcall_HANDLER_comm_wait(simcall, comm, timeout);
231 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
232 double task_size, double rate,
233 void *src_buff, size_t src_buff_size,
234 int (*match_fun)(void *, void *,smx_synchro_t),
235 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
236 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
237 void *data, int detached)
239 XBT_DEBUG("send from %p", mbox);
241 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
242 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
244 /* Look for communication synchro matching our needs. We also provide a description of
245 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
247 * If it is not found then push our communication into the rendez-vous point */
248 smx_synchro_t other_synchro =
249 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
250 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
253 if (!other_synchro) {
254 other_synchro = this_synchro;
255 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
257 if (mbox->permanent_receiver!=NULL){
258 //this mailbox is for small messages, which have to be sent right now
259 other_synchro->state = SIMIX_READY;
260 other_comm->dst_proc=mbox->permanent_receiver;
261 other_comm->refcount++;
262 mbox->done_comm_queue->push_back(other_synchro);
263 other_comm->mbox=mbox;
264 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
267 SIMIX_mbox_push(mbox, this_synchro);
270 XBT_DEBUG("Receive already pushed");
272 SIMIX_comm_destroy(this_synchro);
274 other_comm->state = SIMIX_READY;
275 other_comm->type = SIMIX_COMM_READY;
278 xbt_fifo_push(src_proc->comms, other_synchro);
280 /* if the communication synchro is detached then decrease the refcount
281 * by one, so it will be eliminated by the receiver's destroy call */
283 other_comm->detached = 1;
284 other_comm->refcount--;
285 other_comm->clean_fun = clean_fun;
287 other_comm->clean_fun = NULL;
290 /* Setup the communication synchro */
291 other_comm->src_proc = src_proc;
292 other_comm->task_size = task_size;
293 other_comm->rate = rate;
294 other_comm->src_buff = src_buff;
295 other_comm->src_buff_size = src_buff_size;
296 other_comm->src_data = data;
298 other_comm->match_fun = match_fun;
299 other_comm->copy_data_fun = copy_data_fun;
302 if (MC_is_active() || MC_record_replay_is_active()) {
303 other_comm->state = SIMIX_RUNNING;
304 return (detached ? NULL : other_comm);
307 SIMIX_comm_start(other_comm);
308 return (detached ? NULL : other_comm);
311 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
312 void *dst_buff, size_t *dst_buff_size,
313 int (*match_fun)(void *, void *, smx_synchro_t),
314 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
315 void *data, double timeout, double rate)
317 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
318 SIMCALL_SET_MC_VALUE(simcall, 0);
319 simcall_HANDLER_comm_wait(simcall, comm, timeout);
322 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
323 void *dst_buff, size_t *dst_buff_size,
324 int (*match_fun)(void *, void *, smx_synchro_t),
325 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
326 void *data, double rate)
328 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
331 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
332 int (*match_fun)(void *, void *, smx_synchro_t),
333 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
334 void *data, double rate)
336 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
337 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
339 smx_synchro_t other_synchro;
340 //communication already done, get it inside the fifo of completed comms
341 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
343 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
344 //find a match in the already received fifo
345 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
346 //if not found, assume the receiver came first, register it to the mailbox in the classical way
347 if (!other_synchro) {
348 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
349 other_synchro = this_synchro;
350 SIMIX_mbox_push(mbox, this_synchro);
352 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
354 if(other_comm->surf_comm && other_comm->remains()==0.0) {
355 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
356 other_comm->state = SIMIX_DONE;
357 other_comm->type = SIMIX_COMM_DONE;
358 other_comm->mbox = NULL;
360 other_comm->refcount--;
361 SIMIX_comm_destroy(this_synchro);
364 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
366 /* Look for communication synchro matching our needs. We also provide a description of
367 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
369 * If it is not found then push our communication into the rendez-vous point */
370 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
372 if (!other_synchro) {
373 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
374 other_synchro = this_synchro;
375 SIMIX_mbox_push(mbox, this_synchro);
377 SIMIX_comm_destroy(this_synchro);
378 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
380 other_comm->state = SIMIX_READY;
381 other_comm->type = SIMIX_COMM_READY;
383 xbt_fifo_push(dst_proc->comms, other_synchro);
386 /* Setup communication synchro */
387 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
388 other_comm->dst_proc = dst_proc;
389 other_comm->dst_buff = dst_buff;
390 other_comm->dst_buff_size = dst_buff_size;
391 other_comm->dst_data = data;
393 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
394 other_comm->rate = rate;
396 other_comm->match_fun = match_fun;
397 other_comm->copy_data_fun = copy_data_fun;
399 if (MC_is_active() || MC_record_replay_is_active()) {
400 other_synchro->state = SIMIX_RUNNING;
401 return other_synchro;
404 SIMIX_comm_start(other_synchro);
405 return other_synchro;
408 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
409 int type, int src, int tag,
410 int (*match_fun)(void *, void *, smx_synchro_t),
412 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
415 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
416 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
418 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
419 smx_synchro_t this_synchro;
422 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
423 smx_type = SIMIX_COMM_RECEIVE;
425 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
426 smx_type = SIMIX_COMM_SEND;
428 smx_synchro_t other_synchro=NULL;
429 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
430 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
432 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
435 XBT_DEBUG("check if we have more luck in the normal mailbox");
436 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
440 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
441 other_comm->refcount--;
444 SIMIX_comm_destroy(this_synchro);
445 return other_synchro;
448 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
450 /* the simcall may be a wait, a send or a recv */
453 /* Associate this simcall to the wait synchro */
454 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
456 xbt_fifo_push(synchro->simcalls, simcall);
457 simcall->issuer->waiting_synchro = synchro;
459 if (MC_is_active() || MC_record_replay_is_active()) {
460 int idx = SIMCALL_GET_MC_VALUE(simcall);
462 synchro->state = SIMIX_DONE;
464 /* If we reached this point, the wait simcall must have a timeout */
465 /* Otherwise it shouldn't be enabled and executed by the MC */
469 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
470 if (comm->src_proc == simcall->issuer)
471 comm->state = SIMIX_SRC_TIMEOUT;
473 comm->state = SIMIX_DST_TIMEOUT;
476 SIMIX_comm_finish(synchro);
480 /* If the synchro has already finish perform the error handling, */
481 /* otherwise set up a waiting timeout on the right side */
482 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
483 SIMIX_comm_finish(synchro);
484 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
485 sleep = surf_host_sleep(simcall->issuer->host, timeout);
486 sleep->setData(synchro);
488 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
489 if (simcall->issuer == comm->src_proc)
490 comm->src_timeout = sleep;
492 comm->dst_timeout = sleep;
496 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
498 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
500 if (MC_is_active() || MC_record_replay_is_active()){
501 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
502 if (simcall_comm_test__get__result(simcall)){
503 synchro->state = SIMIX_DONE;
504 xbt_fifo_push(synchro->simcalls, simcall);
505 SIMIX_comm_finish(synchro);
507 SIMIX_simcall_answer(simcall);
512 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
513 if (simcall_comm_test__get__result(simcall)) {
514 xbt_fifo_push(synchro->simcalls, simcall);
515 SIMIX_comm_finish(synchro);
517 SIMIX_simcall_answer(simcall);
521 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
524 smx_synchro_t synchro;
525 simcall_comm_testany__set__result(simcall, -1);
527 if (MC_is_active() || MC_record_replay_is_active()){
528 int idx = SIMCALL_GET_MC_VALUE(simcall);
530 SIMIX_simcall_answer(simcall);
532 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
533 simcall_comm_testany__set__result(simcall, idx);
534 xbt_fifo_push(synchro->simcalls, simcall);
535 synchro->state = SIMIX_DONE;
536 SIMIX_comm_finish(synchro);
541 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
542 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
543 simcall_comm_testany__set__result(simcall, cursor);
544 xbt_fifo_push(synchro->simcalls, simcall);
545 SIMIX_comm_finish(synchro);
549 SIMIX_simcall_answer(simcall);
552 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
554 smx_synchro_t synchro;
555 unsigned int cursor = 0;
557 if (MC_is_active() || MC_record_replay_is_active()){
558 int idx = SIMCALL_GET_MC_VALUE(simcall);
559 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
560 xbt_fifo_push(synchro->simcalls, simcall);
561 simcall_comm_waitany__set__result(simcall, idx);
562 synchro->state = SIMIX_DONE;
563 SIMIX_comm_finish(synchro);
567 xbt_dynar_foreach(synchros, cursor, synchro){
568 /* associate this simcall to the the synchro */
569 xbt_fifo_push(synchro->simcalls, simcall);
571 /* see if the synchro is already finished */
572 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
573 SIMIX_comm_finish(synchro);
579 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
581 smx_synchro_t synchro;
582 unsigned int cursor = 0;
583 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
585 xbt_dynar_foreach(synchros, cursor, synchro)
586 xbt_fifo_remove(synchro->simcalls, simcall);
590 * \brief Starts the simulation of a communication synchro.
591 * \param synchro the communication synchro
593 static inline void SIMIX_comm_start(smx_synchro_t synchro)
595 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
597 /* If both the sender and the receiver are already there, start the communication */
598 if (synchro->state == SIMIX_READY) {
600 sg_host_t sender = comm->src_proc->host;
601 sg_host_t receiver = comm->dst_proc->host;
603 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
605 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
606 comm->surf_comm->setData(synchro);
607 comm->state = SIMIX_RUNNING;
609 /* If a link is failed, detect it immediately */
610 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
611 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
612 sg_host_get_name(sender), sg_host_get_name(receiver));
613 comm->state = SIMIX_LINK_FAILURE;
614 SIMIX_comm_destroy_internal_actions(synchro);
617 /* If any of the process is suspend, create the synchro but stop its execution,
618 it will be restarted when the sender process resume */
619 if (SIMIX_process_is_suspended(comm->src_proc) ||
620 SIMIX_process_is_suspended(comm->dst_proc)) {
621 /* FIXME: check what should happen with the synchro state */
623 if (SIMIX_process_is_suspended(comm->src_proc))
624 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
625 sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
627 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
628 sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
630 comm->surf_comm->suspend();
636 * \brief Answers the SIMIX simcalls associated to a communication synchro.
637 * \param synchro a finished communication synchro
639 void SIMIX_comm_finish(smx_synchro_t synchro)
641 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
642 unsigned int destroy_count = 0;
643 smx_simcall_t simcall;
645 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
647 /* If a waitany simcall is waiting for this synchro to finish, then remove
648 it from the other synchros in the waitany list. Afterwards, get the
649 position of the actual synchro in the waitany dynar and
650 return it as the result of the simcall */
652 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
653 continue; // if process handling comm is killed
654 if (simcall->call == SIMCALL_COMM_WAITANY) {
655 SIMIX_waitany_remove_simcall_from_actions(simcall);
656 if (!MC_is_active() && !MC_record_replay_is_active())
657 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
660 /* If the synchro is still in a rendez-vous point then remove from it */
662 SIMIX_mbox_remove(comm->mbox, synchro);
664 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
666 /* Check out for errors */
668 if (simcall->issuer->host->isOff()) {
669 simcall->issuer->context->iwannadie = 1;
670 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
673 switch (synchro->state) {
676 XBT_DEBUG("Communication %p complete!", synchro);
677 SIMIX_comm_copy_data(synchro);
680 case SIMIX_SRC_TIMEOUT:
681 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
684 case SIMIX_DST_TIMEOUT:
685 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
688 case SIMIX_SRC_HOST_FAILURE:
689 if (simcall->issuer == comm->src_proc)
690 simcall->issuer->context->iwannadie = 1;
691 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
693 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
696 case SIMIX_DST_HOST_FAILURE:
697 if (simcall->issuer == comm->dst_proc)
698 simcall->issuer->context->iwannadie = 1;
699 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
701 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
704 case SIMIX_LINK_FAILURE:
706 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
708 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
709 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
710 simcall->issuer->name, simcall->issuer, comm->detached);
711 if (comm->src_proc == simcall->issuer) {
712 XBT_DEBUG("I'm source");
713 } else if (comm->dst_proc == simcall->issuer) {
714 XBT_DEBUG("I'm dest");
716 XBT_DEBUG("I'm neither source nor dest");
718 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
722 if (simcall->issuer == comm->dst_proc)
723 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
725 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
729 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
732 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
733 if (simcall->issuer->doexception) {
734 if (simcall->call == SIMCALL_COMM_WAITANY) {
735 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
737 else if (simcall->call == SIMCALL_COMM_TESTANY) {
738 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
742 if (simcall->issuer->host->isOff()) {
743 simcall->issuer->context->iwannadie = 1;
746 simcall->issuer->waiting_synchro = NULL;
747 xbt_fifo_remove(simcall->issuer->comms, synchro);
749 if(simcall->issuer == comm->src_proc){
751 xbt_fifo_remove(comm->dst_proc->comms, synchro);
753 if(simcall->issuer == comm->dst_proc){
755 xbt_fifo_remove(comm->src_proc->comms, synchro);
758 SIMIX_simcall_answer(simcall);
762 while (destroy_count-- > 0)
763 SIMIX_comm_destroy(synchro);
767 * \brief This function is called when a Surf communication synchro is finished.
768 * \param synchro the corresponding Simix communication
770 void SIMIX_post_comm(smx_synchro_t synchro)
772 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
774 /* Update synchro state */
775 if (comm->src_timeout &&
776 comm->src_timeout->getState() == simgrid::surf::Action::State::done)
777 synchro->state = SIMIX_SRC_TIMEOUT;
778 else if (comm->dst_timeout &&
779 comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
780 synchro->state = SIMIX_DST_TIMEOUT;
781 else if (comm->src_timeout &&
782 comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
783 synchro->state = SIMIX_SRC_HOST_FAILURE;
784 else if (comm->dst_timeout &&
785 comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
786 synchro->state = SIMIX_DST_HOST_FAILURE;
787 else if (comm->surf_comm &&
788 comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
789 synchro->state = SIMIX_LINK_FAILURE;
791 synchro->state = SIMIX_DONE;
793 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
794 comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
796 /* destroy the surf actions associated with the Simix communication */
797 SIMIX_comm_destroy_internal_actions(comm);
799 /* if there are simcalls associated with the synchro, then answer them */
800 if (xbt_fifo_size(synchro->simcalls)) {
801 SIMIX_comm_finish(comm);
805 /************* synchro Getters **************/
808 * \brief Return the user data associated to the sender of the communication
809 * \param synchro The communication
810 * \return the user data
812 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
814 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
816 return comm->src_data;
820 * \brief Return the user data associated to the receiver of the communication
821 * \param synchro The communication
822 * \return the user data
824 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
826 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
828 return comm->dst_data;
831 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
833 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
835 return comm->src_proc;
838 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
840 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
842 return comm->dst_proc;
845 /******************************************************************************/
846 /* SIMIX_comm_copy_data callbacks */
847 /******************************************************************************/
848 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
850 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
852 SIMIX_comm_copy_data_callback = callback;
855 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
857 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
859 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
860 *(void **) (comm->dst_buff) = buff;
863 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
865 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
867 XBT_DEBUG("Copy the data over");
868 memcpy(comm->dst_buff, buff, buff_size);
869 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
871 comm->src_buff = NULL;
877 * \brief Copy the communication data from the sender's buffer to the receiver's one
878 * \param comm The communication
880 void SIMIX_comm_copy_data(smx_synchro_t synchro)
882 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
884 size_t buff_size = comm->src_buff_size;
885 /* If there is no data to copy then return */
886 if (!comm->src_buff || !comm->dst_buff || comm->copied)
889 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
891 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
893 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
894 comm->dst_buff, buff_size);
896 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
897 if (comm->dst_buff_size)
898 buff_size = MIN(buff_size, *(comm->dst_buff_size));
900 /* Update the receiver's buffer size to the copied amount */
901 if (comm->dst_buff_size)
902 *comm->dst_buff_size = buff_size;
905 if(comm->copy_data_fun)
906 comm->copy_data_fun (comm, comm->src_buff, buff_size);
908 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
912 /* Set the copied flag so we copy data only once */
913 /* (this function might be called from both communication ends) */