1 /* Copyright (c) 2009-2016. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
10 #include "src/mc/mc_replay.h"
12 #include "simgrid/s4u/mailbox.hpp"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
16 static void SIMIX_mbox_free(void *data);
17 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
23 static smx_synchro_t _extract_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
24 int (*match_fun)(void *, void *,smx_synchro_t),
25 void *user_data, smx_synchro_t my_synchro);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
28 void SIMIX_mailbox_exit(void)
30 xbt_dict_free(&mailboxes);
33 /******************************************************************************/
34 /* Rendez-Vous Points */
35 /******************************************************************************/
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 xbt_assert(name, "Mailboxes must have a name");
40 /* two processes may have pushed the same mbox_create simcall at the same time */
41 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
44 mbox = xbt_new0(s_smx_mailbox_t, 1);
45 mbox->name = xbt_strdup(name);
46 mbox->comm_queue = new std::deque<smx_synchro_t>();
47 mbox->done_comm_queue = nullptr; // Allocated on need only
48 mbox->permanent_receiver=NULL;
50 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56 void SIMIX_mbox_free(void *data)
58 XBT_DEBUG("mbox free %p", data);
59 smx_mailbox_t mbox = (smx_mailbox_t) data;
61 delete mbox->comm_queue;
62 delete mbox->done_comm_queue;
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
72 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
74 return mbox->comm_queue->front();
78 * \brief get the receiver (process associated to the mailbox)
79 * \param mbox The rendez-vous point
80 * \return process The receiving process (NULL if not set)
82 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
84 return mbox->permanent_receiver;
88 * \brief set the receiver of the rendez vous point to allow eager sends
89 * \param mbox The rendez-vous point
90 * \param process The receiving process
92 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
94 mbox->permanent_receiver=process;
95 if (mbox->done_comm_queue == nullptr)
96 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
100 * \brief Pushes a communication synchro into a rendez-vous point
101 * \param mbox The mailbox
102 * \param comm The communication synchro
104 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
106 mbox->comm_queue->push_back(comm);
107 comm->comm.mbox = mbox;
111 * \brief Removes a communication synchro from a rendez-vous point
112 * \param mbox The rendez-vous point
113 * \param comm The communication synchro
115 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
117 comm->comm.mbox = NULL;
118 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
120 mbox->comm_queue->erase(it);
123 xbt_die("Cannot remove this comm that is not part of the mailbox");
127 * \brief Checks if there is a communication synchro queued in a deque matching our needs
128 * \param type The type of communication we are looking for (comm_send, comm_recv)
129 * \return The communication synchro if found, NULL otherwise
131 static smx_synchro_t _extract_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
132 int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro)
134 void* other_user_data = NULL;
136 for(auto it = deque->begin(); it != deque->end(); it++){
137 smx_synchro_t synchro = *it;
138 if (synchro->comm.type == SIMIX_COMM_SEND) {
139 other_user_data = synchro->comm.src_data;
140 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
141 other_user_data = synchro->comm.dst_data;
143 if (synchro->comm.type == type &&
144 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
145 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
146 XBT_DEBUG("Found a matching communication synchro %p", synchro);
148 synchro->comm.refcount++;
150 synchro->comm.mbox_cpy = synchro->comm.mbox;
152 synchro->comm.mbox = NULL;
155 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
156 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
157 synchro, (int)synchro->comm.type, (int)type);
159 XBT_DEBUG("No matching communication synchro found");
163 /******************************************************************************/
164 /* Communication synchros */
165 /******************************************************************************/
168 * \brief Creates a new communicate synchro
169 * \param type The direction of communication (comm_send, comm_recv)
170 * \return The new communicate synchro
172 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
174 smx_synchro_t synchro;
176 /* alloc structures */
177 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
179 synchro->type = SIMIX_SYNC_COMMUNICATE;
180 synchro->state = SIMIX_WAITING;
182 /* set communication */
183 synchro->comm.type = type;
184 synchro->comm.refcount = 1;
185 synchro->comm.src_data=NULL;
186 synchro->comm.dst_data=NULL;
188 synchro->category = NULL;
190 XBT_DEBUG("Create communicate synchro %p", synchro);
196 * \brief Destroy a communicate synchro
197 * \param synchro The communicate synchro to be destroyed
199 void SIMIX_comm_destroy(smx_synchro_t synchro)
201 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
202 synchro, synchro->comm.refcount, (int)synchro->state);
204 if (synchro->comm.refcount <= 0) {
205 xbt_backtrace_display_current();
206 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
207 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
209 synchro->comm.refcount--;
210 if (synchro->comm.refcount > 0)
212 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
213 synchro->comm.refcount);
215 xbt_free(synchro->name);
216 SIMIX_comm_destroy_internal_actions(synchro);
218 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
219 /* the communication has failed and was detached:
220 * we have to free the buffer */
221 if (synchro->comm.clean_fun) {
222 synchro->comm.clean_fun(synchro->comm.src_buff);
224 synchro->comm.src_buff = NULL;
227 if(synchro->comm.mbox)
228 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
230 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
233 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
235 if (synchro->comm.surf_comm){
236 synchro->comm.surf_comm->unref();
237 synchro->comm.surf_comm = NULL;
240 if (synchro->comm.src_timeout){
241 synchro->comm.src_timeout->unref();
242 synchro->comm.src_timeout = NULL;
245 if (synchro->comm.dst_timeout){
246 synchro->comm.dst_timeout->unref();
247 synchro->comm.dst_timeout = NULL;
251 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
252 double task_size, double rate,
253 void *src_buff, size_t src_buff_size,
254 int (*match_fun)(void *, void *,smx_synchro_t),
255 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256 void *data, double timeout){
257 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
258 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
260 SIMCALL_SET_MC_VALUE(simcall, 0);
261 simcall_HANDLER_comm_wait(simcall, comm, timeout);
263 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
264 double task_size, double rate,
265 void *src_buff, size_t src_buff_size,
266 int (*match_fun)(void *, void *,smx_synchro_t),
267 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
268 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
269 void *data, int detached)
271 XBT_DEBUG("send from %p", mbox);
273 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
274 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
276 /* Look for communication synchro matching our needs. We also provide a description of
277 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
279 * If it is not found then push our communication into the rendez-vous point */
280 smx_synchro_t other_synchro = _extract_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
282 if (!other_synchro) {
283 other_synchro = this_synchro;
285 if (mbox->permanent_receiver!=NULL){
286 //this mailbox is for small messages, which have to be sent right now
287 other_synchro->state = SIMIX_READY;
288 other_synchro->comm.dst_proc=mbox->permanent_receiver;
289 other_synchro->comm.refcount++;
290 mbox->done_comm_queue->push_back(other_synchro);
291 other_synchro->comm.mbox=mbox;
292 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
295 SIMIX_mbox_push(mbox, this_synchro);
298 XBT_DEBUG("Receive already pushed");
300 SIMIX_comm_destroy(this_synchro);
302 other_synchro->state = SIMIX_READY;
303 other_synchro->comm.type = SIMIX_COMM_READY;
306 xbt_fifo_push(src_proc->comms, other_synchro);
308 /* if the communication synchro is detached then decrease the refcount
309 * by one, so it will be eliminated by the receiver's destroy call */
311 other_synchro->comm.detached = 1;
312 other_synchro->comm.refcount--;
313 other_synchro->comm.clean_fun = clean_fun;
315 other_synchro->comm.clean_fun = NULL;
318 /* Setup the communication synchro */
319 other_synchro->comm.src_proc = src_proc;
320 other_synchro->comm.task_size = task_size;
321 other_synchro->comm.rate = rate;
322 other_synchro->comm.src_buff = src_buff;
323 other_synchro->comm.src_buff_size = src_buff_size;
324 other_synchro->comm.src_data = data;
326 other_synchro->comm.match_fun = match_fun;
327 other_synchro->comm.copy_data_fun = copy_data_fun;
330 if (MC_is_active() || MC_record_replay_is_active()) {
331 other_synchro->state = SIMIX_RUNNING;
332 return (detached ? NULL : other_synchro);
335 SIMIX_comm_start(other_synchro);
336 return (detached ? NULL : other_synchro);
339 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
340 void *dst_buff, size_t *dst_buff_size,
341 int (*match_fun)(void *, void *, smx_synchro_t),
342 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
343 void *data, double timeout, double rate)
345 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
346 dst_buff_size, match_fun, copy_data_fun, data, rate);
347 SIMCALL_SET_MC_VALUE(simcall, 0);
348 simcall_HANDLER_comm_wait(simcall, comm, timeout);
351 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
352 void *dst_buff, size_t *dst_buff_size,
353 int (*match_fun)(void *, void *, smx_synchro_t),
354 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
355 void *data, double rate)
357 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
360 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
361 int (*match_fun)(void *, void *, smx_synchro_t),
362 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
363 void *data, double rate)
365 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
366 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
368 smx_synchro_t other_synchro;
369 //communication already done, get it inside the fifo of completed comms
370 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
372 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
373 //find a match in the already received fifo
374 other_synchro = _extract_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
375 //if not found, assume the receiver came first, register it to the mailbox in the classical way
376 if (!other_synchro) {
377 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
378 other_synchro = this_synchro;
379 SIMIX_mbox_push(mbox, this_synchro);
381 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
382 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
383 other_synchro->state = SIMIX_DONE;
384 other_synchro->comm.type = SIMIX_COMM_DONE;
385 other_synchro->comm.mbox = NULL;
387 other_synchro->comm.refcount--;
388 SIMIX_comm_destroy(this_synchro);
391 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
393 /* Look for communication synchro matching our needs. We also provide a description of
394 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
396 * If it is not found then push our communication into the rendez-vous point */
397 other_synchro = _extract_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
399 if (!other_synchro) {
400 XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
401 other_synchro = this_synchro;
402 SIMIX_mbox_push(mbox, this_synchro);
404 SIMIX_comm_destroy(this_synchro);
405 other_synchro->state = SIMIX_READY;
406 other_synchro->comm.type = SIMIX_COMM_READY;
407 //other_synchro->comm.refcount--;
409 xbt_fifo_push(dst_proc->comms, other_synchro);
412 /* Setup communication synchro */
413 other_synchro->comm.dst_proc = dst_proc;
414 other_synchro->comm.dst_buff = dst_buff;
415 other_synchro->comm.dst_buff_size = dst_buff_size;
416 other_synchro->comm.dst_data = data;
418 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
419 other_synchro->comm.rate = rate;
421 other_synchro->comm.match_fun = match_fun;
422 other_synchro->comm.copy_data_fun = copy_data_fun;
424 if (MC_is_active() || MC_record_replay_is_active()) {
425 other_synchro->state = SIMIX_RUNNING;
426 return other_synchro;
429 SIMIX_comm_start(other_synchro);
430 return other_synchro;
433 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
434 int type, int src, int tag,
435 int (*match_fun)(void *, void *, smx_synchro_t),
437 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
440 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
441 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
443 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
444 smx_synchro_t this_synchro;
447 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
448 smx_type = SIMIX_COMM_RECEIVE;
450 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
451 smx_type = SIMIX_COMM_SEND;
453 smx_synchro_t other_synchro=NULL;
454 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
455 XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
456 other_synchro = _extract_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
459 XBT_DEBUG("check if we have more luck in the normal mailbox");
460 other_synchro = _extract_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
463 other_synchro->comm.refcount--;
465 SIMIX_comm_destroy(this_synchro);
466 return other_synchro;
469 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
471 /* the simcall may be a wait, a send or a recv */
474 /* Associate this simcall to the wait synchro */
475 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
477 xbt_fifo_push(synchro->simcalls, simcall);
478 simcall->issuer->waiting_synchro = synchro;
480 if (MC_is_active() || MC_record_replay_is_active()) {
481 int idx = SIMCALL_GET_MC_VALUE(simcall);
483 synchro->state = SIMIX_DONE;
485 /* If we reached this point, the wait simcall must have a timeout */
486 /* Otherwise it shouldn't be enabled and executed by the MC */
490 if (synchro->comm.src_proc == simcall->issuer)
491 synchro->state = SIMIX_SRC_TIMEOUT;
493 synchro->state = SIMIX_DST_TIMEOUT;
496 SIMIX_comm_finish(synchro);
500 /* If the synchro has already finish perform the error handling, */
501 /* otherwise set up a waiting timeout on the right side */
502 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
503 SIMIX_comm_finish(synchro);
504 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
505 sleep = surf_host_sleep(simcall->issuer->host, timeout);
506 sleep->setData(synchro);
508 if (simcall->issuer == synchro->comm.src_proc)
509 synchro->comm.src_timeout = sleep;
511 synchro->comm.dst_timeout = sleep;
515 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
517 if(MC_is_active() || MC_record_replay_is_active()){
518 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
519 if(simcall_comm_test__get__result(simcall)){
520 synchro->state = SIMIX_DONE;
521 xbt_fifo_push(synchro->simcalls, simcall);
522 SIMIX_comm_finish(synchro);
524 SIMIX_simcall_answer(simcall);
529 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
530 if (simcall_comm_test__get__result(simcall)) {
531 xbt_fifo_push(synchro->simcalls, simcall);
532 SIMIX_comm_finish(synchro);
534 SIMIX_simcall_answer(simcall);
538 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
541 smx_synchro_t synchro;
542 simcall_comm_testany__set__result(simcall, -1);
544 if (MC_is_active() || MC_record_replay_is_active()){
545 int idx = SIMCALL_GET_MC_VALUE(simcall);
547 SIMIX_simcall_answer(simcall);
549 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
550 simcall_comm_testany__set__result(simcall, idx);
551 xbt_fifo_push(synchro->simcalls, simcall);
552 synchro->state = SIMIX_DONE;
553 SIMIX_comm_finish(synchro);
558 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
559 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
560 simcall_comm_testany__set__result(simcall, cursor);
561 xbt_fifo_push(synchro->simcalls, simcall);
562 SIMIX_comm_finish(synchro);
566 SIMIX_simcall_answer(simcall);
569 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
571 smx_synchro_t synchro;
572 unsigned int cursor = 0;
574 if (MC_is_active() || MC_record_replay_is_active()){
575 int idx = SIMCALL_GET_MC_VALUE(simcall);
576 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
577 xbt_fifo_push(synchro->simcalls, simcall);
578 simcall_comm_waitany__set__result(simcall, idx);
579 synchro->state = SIMIX_DONE;
580 SIMIX_comm_finish(synchro);
584 xbt_dynar_foreach(synchros, cursor, synchro){
585 /* associate this simcall to the the synchro */
586 xbt_fifo_push(synchro->simcalls, simcall);
588 /* see if the synchro is already finished */
589 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
590 SIMIX_comm_finish(synchro);
596 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
598 smx_synchro_t synchro;
599 unsigned int cursor = 0;
600 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
602 xbt_dynar_foreach(synchros, cursor, synchro) {
603 xbt_fifo_remove(synchro->simcalls, simcall);
608 * \brief Starts the simulation of a communication synchro.
609 * \param synchro the communication synchro
611 static inline void SIMIX_comm_start(smx_synchro_t synchro)
613 /* If both the sender and the receiver are already there, start the communication */
614 if (synchro->state == SIMIX_READY) {
616 sg_host_t sender = synchro->comm.src_proc->host;
617 sg_host_t receiver = synchro->comm.dst_proc->host;
619 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
620 sg_host_get_name(sender), sg_host_get_name(receiver));
622 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
624 synchro->comm.task_size, synchro->comm.rate);
626 synchro->comm.surf_comm->setData(synchro);
628 synchro->state = SIMIX_RUNNING;
630 /* If a link is failed, detect it immediately */
631 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
632 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
633 sg_host_get_name(sender), sg_host_get_name(receiver));
634 synchro->state = SIMIX_LINK_FAILURE;
635 SIMIX_comm_destroy_internal_actions(synchro);
638 /* If any of the process is suspend, create the synchro but stop its execution,
639 it will be restarted when the sender process resume */
640 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
641 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
642 /* FIXME: check what should happen with the synchro state */
644 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
645 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
646 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
648 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
649 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
651 synchro->comm.surf_comm->suspend();
658 * \brief Answers the SIMIX simcalls associated to a communication synchro.
659 * \param synchro a finished communication synchro
661 void SIMIX_comm_finish(smx_synchro_t synchro)
663 unsigned int destroy_count = 0;
664 smx_simcall_t simcall;
666 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
668 /* If a waitany simcall is waiting for this synchro to finish, then remove
669 it from the other synchros in the waitany list. Afterwards, get the
670 position of the actual synchro in the waitany dynar and
671 return it as the result of the simcall */
673 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
674 continue; // if process handling comm is killed
675 if (simcall->call == SIMCALL_COMM_WAITANY) {
676 SIMIX_waitany_remove_simcall_from_actions(simcall);
677 if (!MC_is_active() && !MC_record_replay_is_active())
678 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
681 /* If the synchro is still in a rendez-vous point then remove from it */
682 if (synchro->comm.mbox)
683 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
685 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
687 /* Check out for errors */
689 if (simcall->issuer->host->isOff()) {
690 simcall->issuer->context->iwannadie = 1;
691 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
694 switch (synchro->state) {
697 XBT_DEBUG("Communication %p complete!", synchro);
698 SIMIX_comm_copy_data(synchro);
701 case SIMIX_SRC_TIMEOUT:
702 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
703 "Communication timeouted because of sender");
706 case SIMIX_DST_TIMEOUT:
707 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
708 "Communication timeouted because of receiver");
711 case SIMIX_SRC_HOST_FAILURE:
712 if (simcall->issuer == synchro->comm.src_proc)
713 simcall->issuer->context->iwannadie = 1;
714 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
716 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
719 case SIMIX_DST_HOST_FAILURE:
720 if (simcall->issuer == synchro->comm.dst_proc)
721 simcall->issuer->context->iwannadie = 1;
722 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
724 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
727 case SIMIX_LINK_FAILURE:
729 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
731 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
732 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
733 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
734 if (synchro->comm.src_proc == simcall->issuer) {
735 XBT_DEBUG("I'm source");
736 } else if (synchro->comm.dst_proc == simcall->issuer) {
737 XBT_DEBUG("I'm dest");
739 XBT_DEBUG("I'm neither source nor dest");
741 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
745 if (simcall->issuer == synchro->comm.dst_proc)
746 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
747 "Communication canceled by the sender");
749 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
750 "Communication canceled by the receiver");
754 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
757 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
758 if (simcall->issuer->doexception) {
759 if (simcall->call == SIMCALL_COMM_WAITANY) {
760 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
762 else if (simcall->call == SIMCALL_COMM_TESTANY) {
763 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
767 if (simcall->issuer->host->isOff()) {
768 simcall->issuer->context->iwannadie = 1;
771 simcall->issuer->waiting_synchro = NULL;
772 xbt_fifo_remove(simcall->issuer->comms, synchro);
773 if(synchro->comm.detached){
774 if(simcall->issuer == synchro->comm.src_proc){
775 if(synchro->comm.dst_proc)
776 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
778 if(simcall->issuer == synchro->comm.dst_proc){
779 if(synchro->comm.src_proc)
780 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
783 SIMIX_simcall_answer(simcall);
787 while (destroy_count-- > 0)
788 SIMIX_comm_destroy(synchro);
792 * \brief This function is called when a Surf communication synchro is finished.
793 * \param synchro the corresponding Simix communication
795 void SIMIX_post_comm(smx_synchro_t synchro)
797 /* Update synchro state */
798 if (synchro->comm.src_timeout &&
799 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
800 synchro->state = SIMIX_SRC_TIMEOUT;
801 else if (synchro->comm.dst_timeout &&
802 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
803 synchro->state = SIMIX_DST_TIMEOUT;
804 else if (synchro->comm.src_timeout &&
805 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
806 synchro->state = SIMIX_SRC_HOST_FAILURE;
807 else if (synchro->comm.dst_timeout &&
808 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
809 synchro->state = SIMIX_DST_HOST_FAILURE;
810 else if (synchro->comm.surf_comm &&
811 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
812 XBT_DEBUG("Puta madre. Surf says that the link broke");
813 synchro->state = SIMIX_LINK_FAILURE;
815 synchro->state = SIMIX_DONE;
817 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
818 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
820 /* destroy the surf actions associated with the Simix communication */
821 SIMIX_comm_destroy_internal_actions(synchro);
823 /* if there are simcalls associated with the synchro, then answer them */
824 if (xbt_fifo_size(synchro->simcalls)) {
825 SIMIX_comm_finish(synchro);
829 void SIMIX_comm_cancel(smx_synchro_t synchro)
831 /* if the synchro is a waiting state means that it is still in a mbox */
832 /* so remove from it and delete it */
833 if (synchro->state == SIMIX_WAITING) {
834 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
835 synchro->state = SIMIX_CANCELED;
837 else if (!MC_is_active() /* when running the MC there are no surf actions */
838 && !MC_record_replay_is_active()
839 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
841 synchro->comm.surf_comm->cancel();
845 void SIMIX_comm_suspend(smx_synchro_t synchro)
847 /*FIXME: shall we suspend also the timeout synchro? */
848 if (synchro->comm.surf_comm)
849 synchro->comm.surf_comm->suspend();
850 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
853 void SIMIX_comm_resume(smx_synchro_t synchro)
855 /*FIXME: check what happen with the timeouts */
856 if (synchro->comm.surf_comm)
857 synchro->comm.surf_comm->resume();
858 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
862 /************* synchro Getters **************/
865 * \brief get the amount remaining from the communication
866 * \param synchro The communication
868 double SIMIX_comm_get_remains(smx_synchro_t synchro)
874 switch (synchro->state) {
877 remains = synchro->comm.surf_comm->getRemains();
882 remains = 0; /*FIXME: check what should be returned */
886 remains = 0; /*FIXME: is this correct? */
892 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
894 return synchro->state;
898 * \brief Return the user data associated to the sender of the communication
899 * \param synchro The communication
900 * \return the user data
902 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
904 return synchro->comm.src_data;
908 * \brief Return the user data associated to the receiver of the communication
909 * \param synchro The communication
910 * \return the user data
912 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
914 return synchro->comm.dst_data;
917 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
919 return synchro->comm.src_proc;
922 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
924 return synchro->comm.dst_proc;
927 /******************************************************************************/
928 /* SIMIX_comm_copy_data callbacks */
929 /******************************************************************************/
930 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
931 &SIMIX_comm_copy_pointer_callback;
934 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
936 SIMIX_comm_copy_data_callback = callback;
939 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
941 xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
942 *(void **) (comm->comm.dst_buff) = buff;
945 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
947 XBT_DEBUG("Copy the data over");
948 memcpy(comm->comm.dst_buff, buff, buff_size);
949 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
951 comm->comm.src_buff = NULL;
957 * \brief Copy the communication data from the sender's buffer to the receiver's one
958 * \param comm The communication
960 void SIMIX_comm_copy_data(smx_synchro_t comm)
962 size_t buff_size = comm->comm.src_buff_size;
963 /* If there is no data to be copy then return */
964 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
967 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
969 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
971 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
972 comm->comm.dst_buff, buff_size);
974 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
975 if (comm->comm.dst_buff_size)
976 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
978 /* Update the receiver's buffer size to the copied amount */
979 if (comm->comm.dst_buff_size)
980 *comm->comm.dst_buff_size = buff_size;
983 if(comm->comm.copy_data_fun)
984 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
986 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
990 /* Set the copied flag so we copy data only once */
991 /* (this function might be called from both communication ends) */
992 comm->comm.copied = 1;