1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 #include "src/simix/SynchroComm.hpp"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
73 * \brief set the receiver of the rendez vous point to allow eager sends
74 * \param mbox The rendez-vous point
75 * \param process The receiving process
77 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
79 mbox->permanent_receiver=process;
80 if (mbox->done_comm_queue == nullptr)
81 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
85 * \brief Pushes a communication synchro into a rendez-vous point
86 * \param mbox The mailbox
87 * \param comm The communication synchro
89 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
91 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
93 mbox->comm_queue->push_back(comm);
98 * \brief Removes a communication synchro from a rendez-vous point
99 * \param mbox The rendez-vous point
100 * \param comm The communication synchro
102 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
104 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
107 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
109 mbox->comm_queue->erase(it);
112 xbt_die("Cannot remove this comm that is not part of the mailbox");
116 * \brief Checks if there is a communication synchro queued in a deque matching our needs
117 * \param type The type of communication we are looking for (comm_send, comm_recv)
118 * \return The communication synchro if found, NULL otherwise
120 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
121 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
123 void* other_user_data = NULL;
125 for(auto it = deque->begin(); it != deque->end(); it++){
126 smx_synchro_t synchro = *it;
127 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
129 if (comm->type == SIMIX_COMM_SEND) {
130 other_user_data = comm->src_data;
131 } else if (comm->type == SIMIX_COMM_RECEIVE) {
132 other_user_data = comm->dst_data;
134 if (comm->type == type &&
135 (! match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
136 (!comm->match_fun || comm->match_fun(other_user_data, this_user_data, my_synchro))) {
137 XBT_DEBUG("Found a matching communication synchro %p", comm);
142 comm->mbox_cpy = comm->mbox;
147 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
148 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
149 comm, (int)comm->type, (int)type);
151 XBT_DEBUG("No matching communication synchro found");
155 /******************************************************************************/
156 /* Communication synchros */
157 /******************************************************************************/
160 * \brief Destroy a communicate synchro
161 * \param synchro The communicate synchro to be destroyed
163 void SIMIX_comm_destroy(smx_synchro_t synchro)
165 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
167 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
169 if (comm->refcount <= 0) {
170 xbt_backtrace_display_current();
171 xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
174 if (comm->refcount > 0)
176 XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
180 if (comm->detached && comm->state != SIMIX_DONE) {
181 /* the communication has failed and was detached:
182 * we have to free the buffer */
183 if (comm->clean_fun) {
184 comm->clean_fun(comm->src_buff);
186 comm->src_buff = NULL;
190 SIMIX_mbox_remove(comm->mbox, comm);
195 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
196 double task_size, double rate,
197 void *src_buff, size_t src_buff_size,
198 int (*match_fun)(void *, void *,smx_synchro_t),
199 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
200 void *data, double timeout){
201 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
202 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
204 SIMCALL_SET_MC_VALUE(simcall, 0);
205 simcall_HANDLER_comm_wait(simcall, comm, timeout);
207 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
208 double task_size, double rate,
209 void *src_buff, size_t src_buff_size,
210 int (*match_fun)(void *, void *,smx_synchro_t),
211 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
212 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
213 void *data, int detached)
215 XBT_DEBUG("send from %p", mbox);
217 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
218 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
220 /* Look for communication synchro matching our needs. We also provide a description of
221 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
223 * If it is not found then push our communication into the rendez-vous point */
224 smx_synchro_t other_synchro =
225 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
226 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
229 if (!other_synchro) {
230 other_synchro = this_synchro;
231 other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
233 if (mbox->permanent_receiver!=NULL){
234 //this mailbox is for small messages, which have to be sent right now
235 other_synchro->state = SIMIX_READY;
236 other_comm->dst_proc=mbox->permanent_receiver;
237 other_comm->refcount++;
238 mbox->done_comm_queue->push_back(other_synchro);
239 other_comm->mbox=mbox;
240 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
243 SIMIX_mbox_push(mbox, this_synchro);
246 XBT_DEBUG("Receive already pushed");
248 SIMIX_comm_destroy(this_synchro);
250 other_comm->state = SIMIX_READY;
251 other_comm->type = SIMIX_COMM_READY;
254 xbt_fifo_push(src_proc->comms, other_synchro);
256 /* if the communication synchro is detached then decrease the refcount
257 * by one, so it will be eliminated by the receiver's destroy call */
259 other_comm->detached = 1;
260 other_comm->refcount--;
261 other_comm->clean_fun = clean_fun;
263 other_comm->clean_fun = NULL;
266 /* Setup the communication synchro */
267 other_comm->src_proc = src_proc;
268 other_comm->task_size = task_size;
269 other_comm->rate = rate;
270 other_comm->src_buff = src_buff;
271 other_comm->src_buff_size = src_buff_size;
272 other_comm->src_data = data;
274 other_comm->match_fun = match_fun;
275 other_comm->copy_data_fun = copy_data_fun;
278 if (MC_is_active() || MC_record_replay_is_active()) {
279 other_comm->state = SIMIX_RUNNING;
280 return (detached ? NULL : other_comm);
283 SIMIX_comm_start(other_comm);
284 return (detached ? NULL : other_comm);
287 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
288 void *dst_buff, size_t *dst_buff_size,
289 int (*match_fun)(void *, void *, smx_synchro_t),
290 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
291 void *data, double timeout, double rate)
293 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
294 SIMCALL_SET_MC_VALUE(simcall, 0);
295 simcall_HANDLER_comm_wait(simcall, comm, timeout);
298 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
299 void *dst_buff, size_t *dst_buff_size,
300 int (*match_fun)(void *, void *, smx_synchro_t),
301 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
302 void *data, double rate)
304 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
307 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
308 int (*match_fun)(void *, void *, smx_synchro_t),
309 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
310 void *data, double rate)
312 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
313 smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
315 smx_synchro_t other_synchro;
316 //communication already done, get it inside the fifo of completed comms
317 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
319 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
320 //find a match in the already received fifo
321 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
322 //if not found, assume the receiver came first, register it to the mailbox in the classical way
323 if (!other_synchro) {
324 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
325 other_synchro = this_synchro;
326 SIMIX_mbox_push(mbox, this_synchro);
328 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
330 if(other_comm->surf_comm && other_comm->remains()==0.0) {
331 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
332 other_comm->state = SIMIX_DONE;
333 other_comm->type = SIMIX_COMM_DONE;
334 other_comm->mbox = NULL;
336 other_comm->refcount--;
337 SIMIX_comm_destroy(this_synchro);
340 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
342 /* Look for communication synchro matching our needs. We also provide a description of
343 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
345 * If it is not found then push our communication into the rendez-vous point */
346 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
348 if (!other_synchro) {
349 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
350 other_synchro = this_synchro;
351 SIMIX_mbox_push(mbox, this_synchro);
353 SIMIX_comm_destroy(this_synchro);
354 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
356 other_comm->state = SIMIX_READY;
357 other_comm->type = SIMIX_COMM_READY;
359 xbt_fifo_push(dst_proc->comms, other_synchro);
362 /* Setup communication synchro */
363 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
364 other_comm->dst_proc = dst_proc;
365 other_comm->dst_buff = dst_buff;
366 other_comm->dst_buff_size = dst_buff_size;
367 other_comm->dst_data = data;
369 if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
370 other_comm->rate = rate;
372 other_comm->match_fun = match_fun;
373 other_comm->copy_data_fun = copy_data_fun;
375 if (MC_is_active() || MC_record_replay_is_active()) {
376 other_synchro->state = SIMIX_RUNNING;
377 return other_synchro;
380 SIMIX_comm_start(other_synchro);
381 return other_synchro;
384 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
385 int type, int src, int tag,
386 int (*match_fun)(void *, void *, smx_synchro_t),
388 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
391 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
392 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
394 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
395 smx_synchro_t this_synchro;
398 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
399 smx_type = SIMIX_COMM_RECEIVE;
401 this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
402 smx_type = SIMIX_COMM_SEND;
404 smx_synchro_t other_synchro=NULL;
405 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
406 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
408 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
411 XBT_DEBUG("check if we have more luck in the normal mailbox");
412 other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
416 simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
417 other_comm->refcount--;
420 SIMIX_comm_destroy(this_synchro);
421 return other_synchro;
424 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
426 /* Associate this simcall to the wait synchro */
427 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
429 xbt_fifo_push(synchro->simcalls, simcall);
430 simcall->issuer->waiting_synchro = synchro;
432 if (MC_is_active() || MC_record_replay_is_active()) {
433 int idx = SIMCALL_GET_MC_VALUE(simcall);
435 synchro->state = SIMIX_DONE;
437 /* If we reached this point, the wait simcall must have a timeout */
438 /* Otherwise it shouldn't be enabled and executed by the MC */
442 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
443 if (comm->src_proc == simcall->issuer)
444 comm->state = SIMIX_SRC_TIMEOUT;
446 comm->state = SIMIX_DST_TIMEOUT;
449 SIMIX_comm_finish(synchro);
453 /* If the synchro has already finish perform the error handling, */
454 /* otherwise set up a waiting timeout on the right side */
455 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
456 SIMIX_comm_finish(synchro);
457 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
458 surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
459 sleep->setData(synchro);
461 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
462 if (simcall->issuer == comm->src_proc)
463 comm->src_timeout = sleep;
465 comm->dst_timeout = sleep;
469 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
471 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
473 if (MC_is_active() || MC_record_replay_is_active()){
474 simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
475 if (simcall_comm_test__get__result(simcall)){
476 synchro->state = SIMIX_DONE;
477 xbt_fifo_push(synchro->simcalls, simcall);
478 SIMIX_comm_finish(synchro);
480 SIMIX_simcall_answer(simcall);
485 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
486 if (simcall_comm_test__get__result(simcall)) {
487 xbt_fifo_push(synchro->simcalls, simcall);
488 SIMIX_comm_finish(synchro);
490 SIMIX_simcall_answer(simcall);
494 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
497 smx_synchro_t synchro;
498 simcall_comm_testany__set__result(simcall, -1);
500 if (MC_is_active() || MC_record_replay_is_active()){
501 int idx = SIMCALL_GET_MC_VALUE(simcall);
503 SIMIX_simcall_answer(simcall);
505 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
506 simcall_comm_testany__set__result(simcall, idx);
507 xbt_fifo_push(synchro->simcalls, simcall);
508 synchro->state = SIMIX_DONE;
509 SIMIX_comm_finish(synchro);
514 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
515 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
516 simcall_comm_testany__set__result(simcall, cursor);
517 xbt_fifo_push(synchro->simcalls, simcall);
518 SIMIX_comm_finish(synchro);
522 SIMIX_simcall_answer(simcall);
525 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
527 smx_synchro_t synchro;
528 unsigned int cursor = 0;
530 if (MC_is_active() || MC_record_replay_is_active()){
531 int idx = SIMCALL_GET_MC_VALUE(simcall);
532 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
533 xbt_fifo_push(synchro->simcalls, simcall);
534 simcall_comm_waitany__set__result(simcall, idx);
535 synchro->state = SIMIX_DONE;
536 SIMIX_comm_finish(synchro);
540 xbt_dynar_foreach(synchros, cursor, synchro){
541 /* associate this simcall to the the synchro */
542 xbt_fifo_push(synchro->simcalls, simcall);
544 /* see if the synchro is already finished */
545 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
546 SIMIX_comm_finish(synchro);
552 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
554 smx_synchro_t synchro;
555 unsigned int cursor = 0;
556 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
558 xbt_dynar_foreach(synchros, cursor, synchro)
559 xbt_fifo_remove(synchro->simcalls, simcall);
563 * \brief Starts the simulation of a communication synchro.
564 * \param synchro the communication synchro
566 static inline void SIMIX_comm_start(smx_synchro_t synchro)
568 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
570 /* If both the sender and the receiver are already there, start the communication */
571 if (synchro->state == SIMIX_READY) {
573 sg_host_t sender = comm->src_proc->host;
574 sg_host_t receiver = comm->dst_proc->host;
576 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
578 comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
579 comm->surf_comm->setData(synchro);
580 comm->state = SIMIX_RUNNING;
582 /* If a link is failed, detect it immediately */
583 if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
584 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
585 sg_host_get_name(sender), sg_host_get_name(receiver));
586 comm->state = SIMIX_LINK_FAILURE;
590 /* If any of the process is suspend, create the synchro but stop its execution,
591 it will be restarted when the sender process resume */
592 if (SIMIX_process_is_suspended(comm->src_proc) ||
593 SIMIX_process_is_suspended(comm->dst_proc)) {
594 /* FIXME: check what should happen with the synchro state */
596 if (SIMIX_process_is_suspended(comm->src_proc))
597 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
598 sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
600 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
601 sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
603 comm->surf_comm->suspend();
609 * \brief Answers the SIMIX simcalls associated to a communication synchro.
610 * \param synchro a finished communication synchro
612 void SIMIX_comm_finish(smx_synchro_t synchro)
614 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
615 unsigned int destroy_count = 0;
616 smx_simcall_t simcall;
618 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
620 /* If a waitany simcall is waiting for this synchro to finish, then remove
621 it from the other synchros in the waitany list. Afterwards, get the
622 position of the actual synchro in the waitany dynar and
623 return it as the result of the simcall */
625 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
626 continue; // if process handling comm is killed
627 if (simcall->call == SIMCALL_COMM_WAITANY) {
628 SIMIX_waitany_remove_simcall_from_actions(simcall);
629 if (!MC_is_active() && !MC_record_replay_is_active())
630 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
633 /* If the synchro is still in a rendez-vous point then remove from it */
635 SIMIX_mbox_remove(comm->mbox, synchro);
637 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
639 /* Check out for errors */
641 if (simcall->issuer->host->isOff()) {
642 simcall->issuer->context->iwannadie = 1;
643 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
646 switch (synchro->state) {
649 XBT_DEBUG("Communication %p complete!", synchro);
650 SIMIX_comm_copy_data(synchro);
653 case SIMIX_SRC_TIMEOUT:
654 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
657 case SIMIX_DST_TIMEOUT:
658 SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
661 case SIMIX_SRC_HOST_FAILURE:
662 if (simcall->issuer == comm->src_proc)
663 simcall->issuer->context->iwannadie = 1;
664 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
666 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
669 case SIMIX_DST_HOST_FAILURE:
670 if (simcall->issuer == comm->dst_proc)
671 simcall->issuer->context->iwannadie = 1;
672 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
674 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
677 case SIMIX_LINK_FAILURE:
679 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
681 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
682 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
683 simcall->issuer->name, simcall->issuer, comm->detached);
684 if (comm->src_proc == simcall->issuer) {
685 XBT_DEBUG("I'm source");
686 } else if (comm->dst_proc == simcall->issuer) {
687 XBT_DEBUG("I'm dest");
689 XBT_DEBUG("I'm neither source nor dest");
691 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
695 if (simcall->issuer == comm->dst_proc)
696 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
698 SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
702 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
705 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
706 if (simcall->issuer->doexception) {
707 if (simcall->call == SIMCALL_COMM_WAITANY) {
708 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
710 else if (simcall->call == SIMCALL_COMM_TESTANY) {
711 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
715 if (simcall->issuer->host->isOff()) {
716 simcall->issuer->context->iwannadie = 1;
719 simcall->issuer->waiting_synchro = NULL;
720 xbt_fifo_remove(simcall->issuer->comms, synchro);
722 if(simcall->issuer == comm->src_proc){
724 xbt_fifo_remove(comm->dst_proc->comms, synchro);
726 if(simcall->issuer == comm->dst_proc){
728 xbt_fifo_remove(comm->src_proc->comms, synchro);
731 SIMIX_simcall_answer(simcall);
735 while (destroy_count-- > 0)
736 SIMIX_comm_destroy(synchro);
740 * \brief This function is called when a Surf communication synchro is finished.
741 * \param synchro the corresponding Simix communication
743 void SIMIX_post_comm(smx_synchro_t synchro)
745 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
747 /* Update synchro state */
748 if (comm->src_timeout &&
749 comm->src_timeout->getState() == simgrid::surf::Action::State::done)
750 synchro->state = SIMIX_SRC_TIMEOUT;
751 else if (comm->dst_timeout &&
752 comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
753 synchro->state = SIMIX_DST_TIMEOUT;
754 else if (comm->src_timeout &&
755 comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
756 synchro->state = SIMIX_SRC_HOST_FAILURE;
757 else if (comm->dst_timeout &&
758 comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
759 synchro->state = SIMIX_DST_HOST_FAILURE;
760 else if (comm->surf_comm &&
761 comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
762 synchro->state = SIMIX_LINK_FAILURE;
764 synchro->state = SIMIX_DONE;
766 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
767 comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
769 /* destroy the surf actions associated with the Simix communication */
772 /* if there are simcalls associated with the synchro, then answer them */
773 if (xbt_fifo_size(synchro->simcalls)) {
774 SIMIX_comm_finish(comm);
778 /************* synchro Getters **************/
781 * \brief Return the user data associated to the sender of the communication
782 * \param synchro The communication
783 * \return the user data
785 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
787 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
789 return comm->src_data;
793 * \brief Return the user data associated to the receiver of the communication
794 * \param synchro The communication
795 * \return the user data
797 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
799 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
801 return comm->dst_data;
804 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
806 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
808 return comm->src_proc;
811 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
813 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
815 return comm->dst_proc;
818 /******************************************************************************/
819 /* SIMIX_comm_copy_data callbacks */
820 /******************************************************************************/
821 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
823 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
825 SIMIX_comm_copy_data_callback = callback;
828 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
830 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
832 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
833 *(void **) (comm->dst_buff) = buff;
836 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
838 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
840 XBT_DEBUG("Copy the data over");
841 memcpy(comm->dst_buff, buff, buff_size);
842 if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
844 comm->src_buff = NULL;
850 * \brief Copy the communication data from the sender's buffer to the receiver's one
851 * \param comm The communication
853 void SIMIX_comm_copy_data(smx_synchro_t synchro)
855 simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
857 size_t buff_size = comm->src_buff_size;
858 /* If there is no data to copy then return */
859 if (!comm->src_buff || !comm->dst_buff || comm->copied)
862 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
864 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
866 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
867 comm->dst_buff, buff_size);
869 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
870 if (comm->dst_buff_size)
871 buff_size = MIN(buff_size, *(comm->dst_buff_size));
873 /* Update the receiver's buffer size to the copied amount */
874 if (comm->dst_buff_size)
875 *comm->dst_buff_size = buff_size;
878 if(comm->copy_data_fun)
879 comm->copy_data_fun (comm, comm->src_buff, buff_size);
881 SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
885 /* Set the copied flag so we copy data only once */
886 /* (this function might be called from both communication ends) */