1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
16 static void SIMIX_mbox_free(void *data);
17 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
23 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
24 int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
25 static void SIMIX_comm_start(smx_synchro_t synchro);
27 void SIMIX_mailbox_exit(void)
29 xbt_dict_free(&mailboxes);
32 /******************************************************************************/
33 /* Rendez-Vous Points */
34 /******************************************************************************/
36 smx_mailbox_t SIMIX_mbox_create(const char *name)
38 xbt_assert(name, "Mailboxes must have a name");
39 /* two processes may have pushed the same mbox_create simcall at the same time */
40 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
43 mbox = xbt_new0(s_smx_mailbox_t, 1);
44 mbox->name = xbt_strdup(name);
45 mbox->comm_queue = new std::deque<smx_synchro_t>();
46 mbox->done_comm_queue = nullptr; // Allocated on need only
47 mbox->permanent_receiver=NULL;
49 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
50 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
55 void SIMIX_mbox_free(void *data)
57 XBT_DEBUG("mbox free %p", data);
58 smx_mailbox_t mbox = (smx_mailbox_t) data;
60 delete mbox->comm_queue;
61 delete mbox->done_comm_queue;
66 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
68 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
71 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
73 return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
77 * \brief get the receiver (process associated to the mailbox)
78 * \param mbox The rendez-vous point
79 * \return process The receiving process (NULL if not set)
81 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
83 return mbox->permanent_receiver;
87 * \brief set the receiver of the rendez vous point to allow eager sends
88 * \param mbox The rendez-vous point
89 * \param process The receiving process
91 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
93 mbox->permanent_receiver=process;
94 if (mbox->done_comm_queue == nullptr)
95 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
99 * \brief Pushes a communication synchro into a rendez-vous point
100 * \param mbox The mailbox
101 * \param comm The communication synchro
103 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
105 mbox->comm_queue->push_back(comm);
106 comm->comm.mbox = mbox;
110 * \brief Removes a communication synchro from a rendez-vous point
111 * \param mbox The rendez-vous point
112 * \param comm The communication synchro
114 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
116 comm->comm.mbox = NULL;
117 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
119 mbox->comm_queue->erase(it);
122 xbt_die("Cannot remove this comm that is not part of the mailbox");
126 * \brief Checks if there is a communication synchro queued in a deque matching our needs
127 * \param type The type of communication we are looking for (comm_send, comm_recv)
128 * \return The communication synchro if found, NULL otherwise
130 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
131 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
133 void* other_user_data = NULL;
135 for(auto it = deque->begin(); it != deque->end(); it++){
136 smx_synchro_t synchro = *it;
137 if (synchro->comm.type == SIMIX_COMM_SEND) {
138 other_user_data = synchro->comm.src_data;
139 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
140 other_user_data = synchro->comm.dst_data;
142 if (synchro->comm.type == type &&
143 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
144 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
145 XBT_DEBUG("Found a matching communication synchro %p", synchro);
148 synchro->comm.refcount++;
150 synchro->comm.mbox_cpy = synchro->comm.mbox;
152 synchro->comm.mbox = NULL;
155 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
156 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
157 synchro, (int)synchro->comm.type, (int)type);
159 XBT_DEBUG("No matching communication synchro found");
163 /******************************************************************************/
164 /* Communication synchros */
165 /******************************************************************************/
168 * \brief Creates a new communicate synchro
169 * \param type The direction of communication (comm_send, comm_recv)
170 * \return The new communicate synchro
172 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
174 smx_synchro_t synchro;
176 /* alloc structures */
177 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
179 synchro->type = SIMIX_SYNC_COMMUNICATE;
180 synchro->state = SIMIX_WAITING;
182 /* set communication */
183 synchro->comm.type = type;
184 synchro->comm.refcount = 1;
185 synchro->comm.src_data=NULL;
186 synchro->comm.dst_data=NULL;
188 synchro->category = NULL;
190 XBT_DEBUG("Create communicate synchro %p", synchro);
196 * \brief Destroy a communicate synchro
197 * \param synchro The communicate synchro to be destroyed
199 void SIMIX_comm_destroy(smx_synchro_t synchro)
201 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
202 synchro, synchro->comm.refcount, (int)synchro->state);
204 if (synchro->comm.refcount <= 0) {
205 xbt_backtrace_display_current();
206 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
207 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
209 synchro->comm.refcount--;
210 if (synchro->comm.refcount > 0)
212 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
213 synchro->comm.refcount);
215 xbt_free(synchro->name);
216 SIMIX_comm_destroy_internal_actions(synchro);
218 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
219 /* the communication has failed and was detached:
220 * we have to free the buffer */
221 if (synchro->comm.clean_fun) {
222 synchro->comm.clean_fun(synchro->comm.src_buff);
224 synchro->comm.src_buff = NULL;
227 if(synchro->comm.mbox)
228 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
230 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
233 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
235 if (synchro->comm.surf_comm){
236 synchro->comm.surf_comm->unref();
237 synchro->comm.surf_comm = NULL;
240 if (synchro->comm.src_timeout){
241 synchro->comm.src_timeout->unref();
242 synchro->comm.src_timeout = NULL;
245 if (synchro->comm.dst_timeout){
246 synchro->comm.dst_timeout->unref();
247 synchro->comm.dst_timeout = NULL;
251 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
252 double task_size, double rate,
253 void *src_buff, size_t src_buff_size,
254 int (*match_fun)(void *, void *,smx_synchro_t),
255 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256 void *data, double timeout){
257 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
258 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
260 SIMCALL_SET_MC_VALUE(simcall, 0);
261 simcall_HANDLER_comm_wait(simcall, comm, timeout);
263 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
264 double task_size, double rate,
265 void *src_buff, size_t src_buff_size,
266 int (*match_fun)(void *, void *,smx_synchro_t),
267 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
268 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
269 void *data, int detached)
271 XBT_DEBUG("send from %p", mbox);
273 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
274 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
276 /* Look for communication synchro matching our needs. We also provide a description of
277 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
279 * If it is not found then push our communication into the rendez-vous point */
280 smx_synchro_t other_synchro =
281 _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
283 if (!other_synchro) {
284 other_synchro = this_synchro;
286 if (mbox->permanent_receiver!=NULL){
287 //this mailbox is for small messages, which have to be sent right now
288 other_synchro->state = SIMIX_READY;
289 other_synchro->comm.dst_proc=mbox->permanent_receiver;
290 other_synchro->comm.refcount++;
291 mbox->done_comm_queue->push_back(other_synchro);
292 other_synchro->comm.mbox=mbox;
293 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
296 SIMIX_mbox_push(mbox, this_synchro);
299 XBT_DEBUG("Receive already pushed");
301 SIMIX_comm_destroy(this_synchro);
303 other_synchro->state = SIMIX_READY;
304 other_synchro->comm.type = SIMIX_COMM_READY;
307 xbt_fifo_push(src_proc->comms, other_synchro);
309 /* if the communication synchro is detached then decrease the refcount
310 * by one, so it will be eliminated by the receiver's destroy call */
312 other_synchro->comm.detached = 1;
313 other_synchro->comm.refcount--;
314 other_synchro->comm.clean_fun = clean_fun;
316 other_synchro->comm.clean_fun = NULL;
319 /* Setup the communication synchro */
320 other_synchro->comm.src_proc = src_proc;
321 other_synchro->comm.task_size = task_size;
322 other_synchro->comm.rate = rate;
323 other_synchro->comm.src_buff = src_buff;
324 other_synchro->comm.src_buff_size = src_buff_size;
325 other_synchro->comm.src_data = data;
327 other_synchro->comm.match_fun = match_fun;
328 other_synchro->comm.copy_data_fun = copy_data_fun;
331 if (MC_is_active() || MC_record_replay_is_active()) {
332 other_synchro->state = SIMIX_RUNNING;
333 return (detached ? NULL : other_synchro);
336 SIMIX_comm_start(other_synchro);
337 return (detached ? NULL : other_synchro);
340 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
341 void *dst_buff, size_t *dst_buff_size,
342 int (*match_fun)(void *, void *, smx_synchro_t),
343 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
344 void *data, double timeout, double rate)
346 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
347 dst_buff_size, match_fun, copy_data_fun, data, rate);
348 SIMCALL_SET_MC_VALUE(simcall, 0);
349 simcall_HANDLER_comm_wait(simcall, comm, timeout);
352 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
353 void *dst_buff, size_t *dst_buff_size,
354 int (*match_fun)(void *, void *, smx_synchro_t),
355 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
356 void *data, double rate)
358 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
361 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
362 int (*match_fun)(void *, void *, smx_synchro_t),
363 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
364 void *data, double rate)
366 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
367 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
369 smx_synchro_t other_synchro;
370 //communication already done, get it inside the fifo of completed comms
371 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
373 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
374 //find a match in the already received fifo
375 other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
376 //if not found, assume the receiver came first, register it to the mailbox in the classical way
377 if (!other_synchro) {
378 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
379 other_synchro = this_synchro;
380 SIMIX_mbox_push(mbox, this_synchro);
382 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
383 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
384 other_synchro->state = SIMIX_DONE;
385 other_synchro->comm.type = SIMIX_COMM_DONE;
386 other_synchro->comm.mbox = NULL;
388 other_synchro->comm.refcount--;
389 SIMIX_comm_destroy(this_synchro);
392 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
394 /* Look for communication synchro matching our needs. We also provide a description of
395 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
397 * If it is not found then push our communication into the rendez-vous point */
398 other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
400 if (!other_synchro) {
401 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
402 other_synchro = this_synchro;
403 SIMIX_mbox_push(mbox, this_synchro);
405 SIMIX_comm_destroy(this_synchro);
406 other_synchro->state = SIMIX_READY;
407 other_synchro->comm.type = SIMIX_COMM_READY;
408 //other_synchro->comm.refcount--;
410 xbt_fifo_push(dst_proc->comms, other_synchro);
413 /* Setup communication synchro */
414 other_synchro->comm.dst_proc = dst_proc;
415 other_synchro->comm.dst_buff = dst_buff;
416 other_synchro->comm.dst_buff_size = dst_buff_size;
417 other_synchro->comm.dst_data = data;
419 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
420 other_synchro->comm.rate = rate;
422 other_synchro->comm.match_fun = match_fun;
423 other_synchro->comm.copy_data_fun = copy_data_fun;
425 if (MC_is_active() || MC_record_replay_is_active()) {
426 other_synchro->state = SIMIX_RUNNING;
427 return other_synchro;
430 SIMIX_comm_start(other_synchro);
431 return other_synchro;
434 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
435 int type, int src, int tag,
436 int (*match_fun)(void *, void *, smx_synchro_t),
438 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
441 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
442 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
444 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
445 smx_synchro_t this_synchro;
448 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
449 smx_type = SIMIX_COMM_RECEIVE;
451 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
452 smx_type = SIMIX_COMM_SEND;
454 smx_synchro_t other_synchro=NULL;
455 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
456 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
458 _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
461 XBT_DEBUG("check if we have more luck in the normal mailbox");
463 _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
466 other_synchro->comm.refcount--;
468 SIMIX_comm_destroy(this_synchro);
469 return other_synchro;
472 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
474 /* the simcall may be a wait, a send or a recv */
477 /* Associate this simcall to the wait synchro */
478 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
480 xbt_fifo_push(synchro->simcalls, simcall);
481 simcall->issuer->waiting_synchro = synchro;
483 if (MC_is_active() || MC_record_replay_is_active()) {
484 int idx = SIMCALL_GET_MC_VALUE(simcall);
486 synchro->state = SIMIX_DONE;
488 /* If we reached this point, the wait simcall must have a timeout */
489 /* Otherwise it shouldn't be enabled and executed by the MC */
493 if (synchro->comm.src_proc == simcall->issuer)
494 synchro->state = SIMIX_SRC_TIMEOUT;
496 synchro->state = SIMIX_DST_TIMEOUT;
499 SIMIX_comm_finish(synchro);
503 /* If the synchro has already finish perform the error handling, */
504 /* otherwise set up a waiting timeout on the right side */
505 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
506 SIMIX_comm_finish(synchro);
507 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
508 sleep = surf_host_sleep(simcall->issuer->host, timeout);
509 sleep->setData(synchro);
511 if (simcall->issuer == synchro->comm.src_proc)
512 synchro->comm.src_timeout = sleep;
514 synchro->comm.dst_timeout = sleep;
518 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
520 if(MC_is_active() || MC_record_replay_is_active()){
521 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
522 if(simcall_comm_test__get__result(simcall)){
523 synchro->state = SIMIX_DONE;
524 xbt_fifo_push(synchro->simcalls, simcall);
525 SIMIX_comm_finish(synchro);
527 SIMIX_simcall_answer(simcall);
532 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
533 if (simcall_comm_test__get__result(simcall)) {
534 xbt_fifo_push(synchro->simcalls, simcall);
535 SIMIX_comm_finish(synchro);
537 SIMIX_simcall_answer(simcall);
541 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
544 smx_synchro_t synchro;
545 simcall_comm_testany__set__result(simcall, -1);
547 if (MC_is_active() || MC_record_replay_is_active()){
548 int idx = SIMCALL_GET_MC_VALUE(simcall);
550 SIMIX_simcall_answer(simcall);
552 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
553 simcall_comm_testany__set__result(simcall, idx);
554 xbt_fifo_push(synchro->simcalls, simcall);
555 synchro->state = SIMIX_DONE;
556 SIMIX_comm_finish(synchro);
561 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
562 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
563 simcall_comm_testany__set__result(simcall, cursor);
564 xbt_fifo_push(synchro->simcalls, simcall);
565 SIMIX_comm_finish(synchro);
569 SIMIX_simcall_answer(simcall);
572 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
574 smx_synchro_t synchro;
575 unsigned int cursor = 0;
577 if (MC_is_active() || MC_record_replay_is_active()){
578 int idx = SIMCALL_GET_MC_VALUE(simcall);
579 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
580 xbt_fifo_push(synchro->simcalls, simcall);
581 simcall_comm_waitany__set__result(simcall, idx);
582 synchro->state = SIMIX_DONE;
583 SIMIX_comm_finish(synchro);
587 xbt_dynar_foreach(synchros, cursor, synchro){
588 /* associate this simcall to the the synchro */
589 xbt_fifo_push(synchro->simcalls, simcall);
591 /* see if the synchro is already finished */
592 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
593 SIMIX_comm_finish(synchro);
599 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
601 smx_synchro_t synchro;
602 unsigned int cursor = 0;
603 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
605 xbt_dynar_foreach(synchros, cursor, synchro) {
606 xbt_fifo_remove(synchro->simcalls, simcall);
611 * \brief Starts the simulation of a communication synchro.
612 * \param synchro the communication synchro
614 static inline void SIMIX_comm_start(smx_synchro_t synchro)
616 /* If both the sender and the receiver are already there, start the communication */
617 if (synchro->state == SIMIX_READY) {
619 sg_host_t sender = synchro->comm.src_proc->host;
620 sg_host_t receiver = synchro->comm.dst_proc->host;
622 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
623 sg_host_get_name(sender), sg_host_get_name(receiver));
625 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
627 synchro->comm.task_size, synchro->comm.rate);
629 synchro->comm.surf_comm->setData(synchro);
631 synchro->state = SIMIX_RUNNING;
633 /* If a link is failed, detect it immediately */
634 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
635 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
636 sg_host_get_name(sender), sg_host_get_name(receiver));
637 synchro->state = SIMIX_LINK_FAILURE;
638 SIMIX_comm_destroy_internal_actions(synchro);
641 /* If any of the process is suspend, create the synchro but stop its execution,
642 it will be restarted when the sender process resume */
643 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
644 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
645 /* FIXME: check what should happen with the synchro state */
647 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
648 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
649 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
651 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
652 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
654 synchro->comm.surf_comm->suspend();
661 * \brief Answers the SIMIX simcalls associated to a communication synchro.
662 * \param synchro a finished communication synchro
664 void SIMIX_comm_finish(smx_synchro_t synchro)
666 unsigned int destroy_count = 0;
667 smx_simcall_t simcall;
669 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
671 /* If a waitany simcall is waiting for this synchro to finish, then remove
672 it from the other synchros in the waitany list. Afterwards, get the
673 position of the actual synchro in the waitany dynar and
674 return it as the result of the simcall */
676 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
677 continue; // if process handling comm is killed
678 if (simcall->call == SIMCALL_COMM_WAITANY) {
679 SIMIX_waitany_remove_simcall_from_actions(simcall);
680 if (!MC_is_active() && !MC_record_replay_is_active())
681 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
684 /* If the synchro is still in a rendez-vous point then remove from it */
685 if (synchro->comm.mbox)
686 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
688 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
690 /* Check out for errors */
692 if (simcall->issuer->host->isOff()) {
693 simcall->issuer->context->iwannadie = 1;
694 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
697 switch (synchro->state) {
700 XBT_DEBUG("Communication %p complete!", synchro);
701 SIMIX_comm_copy_data(synchro);
704 case SIMIX_SRC_TIMEOUT:
705 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
706 "Communication timeouted because of sender");
709 case SIMIX_DST_TIMEOUT:
710 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
711 "Communication timeouted because of receiver");
714 case SIMIX_SRC_HOST_FAILURE:
715 if (simcall->issuer == synchro->comm.src_proc)
716 simcall->issuer->context->iwannadie = 1;
717 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
719 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
722 case SIMIX_DST_HOST_FAILURE:
723 if (simcall->issuer == synchro->comm.dst_proc)
724 simcall->issuer->context->iwannadie = 1;
725 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
727 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
730 case SIMIX_LINK_FAILURE:
732 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
734 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
735 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
736 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
737 if (synchro->comm.src_proc == simcall->issuer) {
738 XBT_DEBUG("I'm source");
739 } else if (synchro->comm.dst_proc == simcall->issuer) {
740 XBT_DEBUG("I'm dest");
742 XBT_DEBUG("I'm neither source nor dest");
744 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
748 if (simcall->issuer == synchro->comm.dst_proc)
749 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
750 "Communication canceled by the sender");
752 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
753 "Communication canceled by the receiver");
757 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
760 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
761 if (simcall->issuer->doexception) {
762 if (simcall->call == SIMCALL_COMM_WAITANY) {
763 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
765 else if (simcall->call == SIMCALL_COMM_TESTANY) {
766 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
770 if (simcall->issuer->host->isOff()) {
771 simcall->issuer->context->iwannadie = 1;
774 simcall->issuer->waiting_synchro = NULL;
775 xbt_fifo_remove(simcall->issuer->comms, synchro);
776 if(synchro->comm.detached){
777 if(simcall->issuer == synchro->comm.src_proc){
778 if(synchro->comm.dst_proc)
779 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
781 if(simcall->issuer == synchro->comm.dst_proc){
782 if(synchro->comm.src_proc)
783 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
786 SIMIX_simcall_answer(simcall);
790 while (destroy_count-- > 0)
791 SIMIX_comm_destroy(synchro);
795 * \brief This function is called when a Surf communication synchro is finished.
796 * \param synchro the corresponding Simix communication
798 void SIMIX_post_comm(smx_synchro_t synchro)
800 /* Update synchro state */
801 if (synchro->comm.src_timeout &&
802 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
803 synchro->state = SIMIX_SRC_TIMEOUT;
804 else if (synchro->comm.dst_timeout &&
805 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
806 synchro->state = SIMIX_DST_TIMEOUT;
807 else if (synchro->comm.src_timeout &&
808 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
809 synchro->state = SIMIX_SRC_HOST_FAILURE;
810 else if (synchro->comm.dst_timeout &&
811 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
812 synchro->state = SIMIX_DST_HOST_FAILURE;
813 else if (synchro->comm.surf_comm &&
814 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
815 XBT_DEBUG("Puta madre. Surf says that the link broke");
816 synchro->state = SIMIX_LINK_FAILURE;
818 synchro->state = SIMIX_DONE;
820 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
821 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
823 /* destroy the surf actions associated with the Simix communication */
824 SIMIX_comm_destroy_internal_actions(synchro);
826 /* if there are simcalls associated with the synchro, then answer them */
827 if (xbt_fifo_size(synchro->simcalls)) {
828 SIMIX_comm_finish(synchro);
832 void SIMIX_comm_cancel(smx_synchro_t synchro)
834 /* if the synchro is a waiting state means that it is still in a mbox */
835 /* so remove from it and delete it */
836 if (synchro->state == SIMIX_WAITING) {
837 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
838 synchro->state = SIMIX_CANCELED;
840 else if (!MC_is_active() /* when running the MC there are no surf actions */
841 && !MC_record_replay_is_active()
842 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
844 synchro->comm.surf_comm->cancel();
848 void SIMIX_comm_suspend(smx_synchro_t synchro)
850 /*FIXME: shall we suspend also the timeout synchro? */
851 if (synchro->comm.surf_comm)
852 synchro->comm.surf_comm->suspend();
853 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
856 void SIMIX_comm_resume(smx_synchro_t synchro)
858 /*FIXME: check what happen with the timeouts */
859 if (synchro->comm.surf_comm)
860 synchro->comm.surf_comm->resume();
861 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
865 /************* synchro Getters **************/
868 * \brief get the amount remaining from the communication
869 * \param synchro The communication
871 double SIMIX_comm_get_remains(smx_synchro_t synchro)
877 switch (synchro->state) {
880 remains = synchro->comm.surf_comm->getRemains();
885 remains = 0; /*FIXME: check what should be returned */
889 remains = 0; /*FIXME: is this correct? */
895 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
897 return synchro->state;
901 * \brief Return the user data associated to the sender of the communication
902 * \param synchro The communication
903 * \return the user data
905 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
907 return synchro->comm.src_data;
911 * \brief Return the user data associated to the receiver of the communication
912 * \param synchro The communication
913 * \return the user data
915 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
917 return synchro->comm.dst_data;
920 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
922 return synchro->comm.src_proc;
925 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
927 return synchro->comm.dst_proc;
930 /******************************************************************************/
931 /* SIMIX_comm_copy_data callbacks */
932 /******************************************************************************/
933 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
934 &SIMIX_comm_copy_pointer_callback;
937 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
939 SIMIX_comm_copy_data_callback = callback;
942 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
944 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
945 *(void **) (comm->comm.dst_buff) = buff;
948 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
950 XBT_DEBUG("Copy the data over");
951 memcpy(comm->comm.dst_buff, buff, buff_size);
952 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
954 comm->comm.src_buff = NULL;
960 * \brief Copy the communication data from the sender's buffer to the receiver's one
961 * \param comm The communication
963 void SIMIX_comm_copy_data(smx_synchro_t comm)
965 size_t buff_size = comm->comm.src_buff_size;
966 /* If there is no data to be copy then return */
967 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
970 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
972 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
974 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
975 comm->comm.dst_buff, buff_size);
977 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
978 if (comm->comm.dst_buff_size)
979 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
981 /* Update the receiver's buffer size to the copied amount */
982 if (comm->comm.dst_buff_size)
983 *comm->comm.dst_buff_size = buff_size;
986 if(comm->comm.copy_data_fun)
987 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
989 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
993 /* Set the copied flag so we copy data only once */
994 /* (this function might be called from both communication ends) */
995 comm->comm.copied = 1;