1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
11 #include "src/mc/mc_replay.h"
13 #include "simgrid/s4u/mailbox.hpp"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17 static void SIMIX_mbox_free(void *data);
18 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t),
26 void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_deque_get_filtered(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
28 int (*match_fun)(void *, void *,smx_synchro_t),
29 void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
32 void SIMIX_mailbox_exit(void)
34 xbt_dict_free(&mailboxes);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
43 xbt_assert(name, "Mailboxes must have a name");
44 /* two processes may have pushed the same mbox_create simcall at the same time */
45 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
48 mbox = xbt_new0(s_smx_mailbox_t, 1);
49 mbox->name = xbt_strdup(name);
50 mbox->comm_queue = new std::deque<smx_synchro_t>();
51 mbox->done_comm_queue = nullptr; // Allocated on need only
52 mbox->permanent_receiver=NULL;
54 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
60 void SIMIX_mbox_free(void *data)
62 XBT_DEBUG("mbox free %p", data);
63 smx_mailbox_t mbox = (smx_mailbox_t) data;
65 delete mbox->comm_queue;
66 delete mbox->done_comm_queue;
71 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
73 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
76 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
78 return mbox->comm_queue->front();
82 * \brief get the receiver (process associated to the mailbox)
83 * \param mbox The rendez-vous point
84 * \return process The receiving process (NULL if not set)
86 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
88 return mbox->permanent_receiver;
92 * \brief set the receiver of the rendez vous point to allow eager sends
93 * \param mbox The rendez-vous point
94 * \param process The receiving process
96 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
98 mbox->permanent_receiver=process;
99 if (mbox->done_comm_queue == nullptr)
100 mbox->done_comm_queue = new std::deque<smx_synchro_t>();
104 * \brief Pushes a communication synchro into a rendez-vous point
105 * \param mbox The mailbox
106 * \param comm The communication synchro
108 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
110 mbox->comm_queue->push_back(comm);
111 comm->comm.mbox = mbox;
115 * \brief Removes a communication synchro from a rendez-vous point
116 * \param mbox The rendez-vous point
117 * \param comm The communication synchro
119 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
121 for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
123 mbox->comm_queue->erase(it);
127 comm->comm.mbox = NULL;
131 * \brief Checks if there is a communication synchro queued in a deque matching our needs
132 * \param type The type of communication we are looking for (comm_send, comm_recv)
133 * \return The communication synchro if found, NULL otherwise
136 SIMIX_deque_get_filtered(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
137 int (*match_fun)(void *, void *,smx_synchro_t),
138 void *this_user_data, smx_synchro_t my_synchro)
140 void* other_user_data = NULL;
142 for(auto it = deque->begin(); it != deque->end(); it++){
143 smx_synchro_t synchro = *it;
144 if (synchro->comm.type == SIMIX_COMM_SEND) {
145 other_user_data = synchro->comm.src_data;
146 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
147 other_user_data = synchro->comm.dst_data;
149 if (synchro->comm.type == type &&
150 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
151 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
152 XBT_DEBUG("Found a matching communication synchro %p", synchro);
154 synchro->comm.refcount++;
156 synchro->comm.mbox_cpy = synchro->comm.mbox;
158 synchro->comm.mbox = NULL;
161 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
162 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
163 synchro, (int)synchro->comm.type, (int)type);
165 XBT_DEBUG("No matching communication synchro found");
171 * \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
172 * \param type The type of communication we are looking for (comm_send, comm_recv)
173 * \return The communication synchro if found, NULL otherwise
175 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
176 int (*match_fun)(void *, void *,smx_synchro_t),
177 void *this_user_data, smx_synchro_t my_synchro)
179 smx_synchro_t synchro;
180 xbt_fifo_item_t item;
181 void* other_user_data = NULL;
183 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
184 if (synchro->comm.type == SIMIX_COMM_SEND) {
185 other_user_data = synchro->comm.src_data;
186 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
187 other_user_data = synchro->comm.dst_data;
189 if (synchro->comm.type == type &&
190 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
191 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
192 XBT_DEBUG("Found a matching communication synchro %p", synchro);
193 synchro->comm.refcount++;
197 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
198 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
199 synchro, (int)synchro->comm.type, (int)type);
201 XBT_DEBUG("No matching communication synchro found");
204 /******************************************************************************/
205 /* Communication synchros */
206 /******************************************************************************/
209 * \brief Creates a new communicate synchro
210 * \param type The direction of communication (comm_send, comm_recv)
211 * \return The new communicate synchro
213 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
215 smx_synchro_t synchro;
217 /* alloc structures */
218 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
220 synchro->type = SIMIX_SYNC_COMMUNICATE;
221 synchro->state = SIMIX_WAITING;
223 /* set communication */
224 synchro->comm.type = type;
225 synchro->comm.refcount = 1;
226 synchro->comm.src_data=NULL;
227 synchro->comm.dst_data=NULL;
229 synchro->category = NULL;
231 XBT_DEBUG("Create communicate synchro %p", synchro);
237 * \brief Destroy a communicate synchro
238 * \param synchro The communicate synchro to be destroyed
240 void SIMIX_comm_destroy(smx_synchro_t synchro)
242 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
243 synchro, synchro->comm.refcount, (int)synchro->state);
245 if (synchro->comm.refcount <= 0) {
246 xbt_backtrace_display_current();
247 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
248 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
250 synchro->comm.refcount--;
251 if (synchro->comm.refcount > 0)
253 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
254 synchro->comm.refcount);
256 xbt_free(synchro->name);
257 SIMIX_comm_destroy_internal_actions(synchro);
259 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
260 /* the communication has failed and was detached:
261 * we have to free the buffer */
262 if (synchro->comm.clean_fun) {
263 synchro->comm.clean_fun(synchro->comm.src_buff);
265 synchro->comm.src_buff = NULL;
268 if(synchro->comm.mbox)
269 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
271 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
274 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
276 if (synchro->comm.surf_comm){
277 synchro->comm.surf_comm->unref();
278 synchro->comm.surf_comm = NULL;
281 if (synchro->comm.src_timeout){
282 synchro->comm.src_timeout->unref();
283 synchro->comm.src_timeout = NULL;
286 if (synchro->comm.dst_timeout){
287 synchro->comm.dst_timeout->unref();
288 synchro->comm.dst_timeout = NULL;
292 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
293 double task_size, double rate,
294 void *src_buff, size_t src_buff_size,
295 int (*match_fun)(void *, void *,smx_synchro_t),
296 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
297 void *data, double timeout){
298 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
299 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
301 SIMCALL_SET_MC_VALUE(simcall, 0);
302 simcall_HANDLER_comm_wait(simcall, comm, timeout);
304 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
305 double task_size, double rate,
306 void *src_buff, size_t src_buff_size,
307 int (*match_fun)(void *, void *,smx_synchro_t),
308 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
309 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
310 void *data, int detached)
312 XBT_DEBUG("send from %p", mbox);
314 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
315 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
317 /* Look for communication synchro matching our needs. We also provide a description of
318 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
320 * If it is not found then push our communication into the rendez-vous point */
321 smx_synchro_t other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
323 if (!other_synchro) {
324 other_synchro = this_synchro;
326 if (mbox->permanent_receiver!=NULL){
327 //this mailbox is for small messages, which have to be sent right now
328 other_synchro->state = SIMIX_READY;
329 other_synchro->comm.dst_proc=mbox->permanent_receiver;
330 other_synchro->comm.refcount++;
331 mbox->done_comm_queue->push_back(other_synchro);
332 other_synchro->comm.mbox=mbox;
333 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
336 SIMIX_mbox_push(mbox, this_synchro);
339 XBT_DEBUG("Receive already pushed");
341 SIMIX_comm_destroy(this_synchro);
343 other_synchro->state = SIMIX_READY;
344 other_synchro->comm.type = SIMIX_COMM_READY;
347 xbt_fifo_push(src_proc->comms, other_synchro);
349 /* if the communication synchro is detached then decrease the refcount
350 * by one, so it will be eliminated by the receiver's destroy call */
352 other_synchro->comm.detached = 1;
353 other_synchro->comm.refcount--;
354 other_synchro->comm.clean_fun = clean_fun;
356 other_synchro->comm.clean_fun = NULL;
359 /* Setup the communication synchro */
360 other_synchro->comm.src_proc = src_proc;
361 other_synchro->comm.task_size = task_size;
362 other_synchro->comm.rate = rate;
363 other_synchro->comm.src_buff = src_buff;
364 other_synchro->comm.src_buff_size = src_buff_size;
365 other_synchro->comm.src_data = data;
367 other_synchro->comm.match_fun = match_fun;
368 other_synchro->comm.copy_data_fun = copy_data_fun;
371 if (MC_is_active() || MC_record_replay_is_active()) {
372 other_synchro->state = SIMIX_RUNNING;
373 return (detached ? NULL : other_synchro);
376 SIMIX_comm_start(other_synchro);
377 return (detached ? NULL : other_synchro);
380 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
381 void *dst_buff, size_t *dst_buff_size,
382 int (*match_fun)(void *, void *, smx_synchro_t),
383 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
384 void *data, double timeout, double rate)
386 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
387 dst_buff_size, match_fun, copy_data_fun, data, rate);
388 SIMCALL_SET_MC_VALUE(simcall, 0);
389 simcall_HANDLER_comm_wait(simcall, comm, timeout);
392 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
393 void *dst_buff, size_t *dst_buff_size,
394 int (*match_fun)(void *, void *, smx_synchro_t),
395 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
396 void *data, double rate)
398 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
401 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
402 int (*match_fun)(void *, void *, smx_synchro_t),
403 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
404 void *data, double rate)
406 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
407 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
409 smx_synchro_t other_synchro;
410 //communication already done, get it inside the fifo of completed comms
411 if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
413 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
414 //find a match in the already received fifo
415 other_synchro = SIMIX_deque_get_filtered(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
416 //if not found, assume the receiver came first, register it to the mailbox in the classical way
417 if (!other_synchro) {
418 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
419 other_synchro = this_synchro;
420 SIMIX_mbox_push(mbox, this_synchro);
422 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
423 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
424 other_synchro->state = SIMIX_DONE;
425 other_synchro->comm.type = SIMIX_COMM_DONE;
426 other_synchro->comm.mbox = NULL;
428 other_synchro->comm.refcount--;
429 SIMIX_comm_destroy(this_synchro);
432 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
434 /* Look for communication synchro matching our needs. We also provide a description of
435 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
437 * If it is not found then push our communication into the rendez-vous point */
438 other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
440 if (!other_synchro) {
441 XBT_DEBUG("Receive pushed first %lu", mbox->comm_queue->size());
442 other_synchro = this_synchro;
443 SIMIX_mbox_push(mbox, this_synchro);
445 SIMIX_comm_destroy(this_synchro);
446 other_synchro->state = SIMIX_READY;
447 other_synchro->comm.type = SIMIX_COMM_READY;
448 //other_synchro->comm.refcount--;
450 xbt_fifo_push(dst_proc->comms, other_synchro);
453 /* Setup communication synchro */
454 other_synchro->comm.dst_proc = dst_proc;
455 other_synchro->comm.dst_buff = dst_buff;
456 other_synchro->comm.dst_buff_size = dst_buff_size;
457 other_synchro->comm.dst_data = data;
459 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
460 other_synchro->comm.rate = rate;
462 other_synchro->comm.match_fun = match_fun;
463 other_synchro->comm.copy_data_fun = copy_data_fun;
465 if (MC_is_active() || MC_record_replay_is_active()) {
466 other_synchro->state = SIMIX_RUNNING;
467 return other_synchro;
470 SIMIX_comm_start(other_synchro);
471 return other_synchro;
474 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
475 int type, int src, int tag,
476 int (*match_fun)(void *, void *, smx_synchro_t),
478 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
481 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
482 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
484 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
485 smx_synchro_t this_synchro;
488 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
489 smx_type = SIMIX_COMM_RECEIVE;
491 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
492 smx_type = SIMIX_COMM_SEND;
494 smx_synchro_t other_synchro=NULL;
495 if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
496 //find a match in the already received fifo
497 XBT_DEBUG("first check in the perm recv mailbox");
499 other_synchro = SIMIX_deque_get_filtered(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
503 XBT_DEBUG("try in the normal mailbox");
504 other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
507 if(other_synchro)other_synchro->comm.refcount--;
509 SIMIX_comm_destroy(this_synchro);
510 return other_synchro;
513 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
515 /* the simcall may be a wait, a send or a recv */
518 /* Associate this simcall to the wait synchro */
519 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
521 xbt_fifo_push(synchro->simcalls, simcall);
522 simcall->issuer->waiting_synchro = synchro;
524 if (MC_is_active() || MC_record_replay_is_active()) {
525 int idx = SIMCALL_GET_MC_VALUE(simcall);
527 synchro->state = SIMIX_DONE;
529 /* If we reached this point, the wait simcall must have a timeout */
530 /* Otherwise it shouldn't be enabled and executed by the MC */
534 if (synchro->comm.src_proc == simcall->issuer)
535 synchro->state = SIMIX_SRC_TIMEOUT;
537 synchro->state = SIMIX_DST_TIMEOUT;
540 SIMIX_comm_finish(synchro);
544 /* If the synchro has already finish perform the error handling, */
545 /* otherwise set up a waiting timeout on the right side */
546 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
547 SIMIX_comm_finish(synchro);
548 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
549 sleep = surf_host_sleep(simcall->issuer->host, timeout);
550 sleep->setData(synchro);
552 if (simcall->issuer == synchro->comm.src_proc)
553 synchro->comm.src_timeout = sleep;
555 synchro->comm.dst_timeout = sleep;
559 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
561 if(MC_is_active() || MC_record_replay_is_active()){
562 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
563 if(simcall_comm_test__get__result(simcall)){
564 synchro->state = SIMIX_DONE;
565 xbt_fifo_push(synchro->simcalls, simcall);
566 SIMIX_comm_finish(synchro);
568 SIMIX_simcall_answer(simcall);
573 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
574 if (simcall_comm_test__get__result(simcall)) {
575 xbt_fifo_push(synchro->simcalls, simcall);
576 SIMIX_comm_finish(synchro);
578 SIMIX_simcall_answer(simcall);
582 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
585 smx_synchro_t synchro;
586 simcall_comm_testany__set__result(simcall, -1);
588 if (MC_is_active() || MC_record_replay_is_active()){
589 int idx = SIMCALL_GET_MC_VALUE(simcall);
591 SIMIX_simcall_answer(simcall);
593 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
594 simcall_comm_testany__set__result(simcall, idx);
595 xbt_fifo_push(synchro->simcalls, simcall);
596 synchro->state = SIMIX_DONE;
597 SIMIX_comm_finish(synchro);
602 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
603 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
604 simcall_comm_testany__set__result(simcall, cursor);
605 xbt_fifo_push(synchro->simcalls, simcall);
606 SIMIX_comm_finish(synchro);
610 SIMIX_simcall_answer(simcall);
613 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
615 smx_synchro_t synchro;
616 unsigned int cursor = 0;
618 if (MC_is_active() || MC_record_replay_is_active()){
619 int idx = SIMCALL_GET_MC_VALUE(simcall);
620 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
621 xbt_fifo_push(synchro->simcalls, simcall);
622 simcall_comm_waitany__set__result(simcall, idx);
623 synchro->state = SIMIX_DONE;
624 SIMIX_comm_finish(synchro);
628 xbt_dynar_foreach(synchros, cursor, synchro){
629 /* associate this simcall to the the synchro */
630 xbt_fifo_push(synchro->simcalls, simcall);
632 /* see if the synchro is already finished */
633 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
634 SIMIX_comm_finish(synchro);
640 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
642 smx_synchro_t synchro;
643 unsigned int cursor = 0;
644 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
646 xbt_dynar_foreach(synchros, cursor, synchro) {
647 xbt_fifo_remove(synchro->simcalls, simcall);
652 * \brief Starts the simulation of a communication synchro.
653 * \param synchro the communication synchro
655 static inline void SIMIX_comm_start(smx_synchro_t synchro)
657 /* If both the sender and the receiver are already there, start the communication */
658 if (synchro->state == SIMIX_READY) {
660 sg_host_t sender = synchro->comm.src_proc->host;
661 sg_host_t receiver = synchro->comm.dst_proc->host;
663 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
664 sg_host_get_name(sender), sg_host_get_name(receiver));
666 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
668 synchro->comm.task_size, synchro->comm.rate);
670 synchro->comm.surf_comm->setData(synchro);
672 synchro->state = SIMIX_RUNNING;
674 /* If a link is failed, detect it immediately */
675 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
676 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
677 sg_host_get_name(sender), sg_host_get_name(receiver));
678 synchro->state = SIMIX_LINK_FAILURE;
679 SIMIX_comm_destroy_internal_actions(synchro);
682 /* If any of the process is suspend, create the synchro but stop its execution,
683 it will be restarted when the sender process resume */
684 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
685 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
686 /* FIXME: check what should happen with the synchro state */
688 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
689 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
690 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
692 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
693 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
695 synchro->comm.surf_comm->suspend();
702 * \brief Answers the SIMIX simcalls associated to a communication synchro.
703 * \param synchro a finished communication synchro
705 void SIMIX_comm_finish(smx_synchro_t synchro)
707 unsigned int destroy_count = 0;
708 smx_simcall_t simcall;
710 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
712 /* If a waitany simcall is waiting for this synchro to finish, then remove
713 it from the other synchros in the waitany list. Afterwards, get the
714 position of the actual synchro in the waitany dynar and
715 return it as the result of the simcall */
717 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
718 continue; // if process handling comm is killed
719 if (simcall->call == SIMCALL_COMM_WAITANY) {
720 SIMIX_waitany_remove_simcall_from_actions(simcall);
721 if (!MC_is_active() && !MC_record_replay_is_active())
722 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
725 /* If the synchro is still in a rendez-vous point then remove from it */
726 if (synchro->comm.mbox)
727 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
729 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
731 /* Check out for errors */
733 if (simcall->issuer->host->isOff()) {
734 simcall->issuer->context->iwannadie = 1;
735 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
738 switch (synchro->state) {
741 XBT_DEBUG("Communication %p complete!", synchro);
742 SIMIX_comm_copy_data(synchro);
745 case SIMIX_SRC_TIMEOUT:
746 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
747 "Communication timeouted because of sender");
750 case SIMIX_DST_TIMEOUT:
751 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
752 "Communication timeouted because of receiver");
755 case SIMIX_SRC_HOST_FAILURE:
756 if (simcall->issuer == synchro->comm.src_proc)
757 simcall->issuer->context->iwannadie = 1;
758 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
760 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
763 case SIMIX_DST_HOST_FAILURE:
764 if (simcall->issuer == synchro->comm.dst_proc)
765 simcall->issuer->context->iwannadie = 1;
766 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
768 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
771 case SIMIX_LINK_FAILURE:
773 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
775 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
776 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
777 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
778 if (synchro->comm.src_proc == simcall->issuer) {
779 XBT_DEBUG("I'm source");
780 } else if (synchro->comm.dst_proc == simcall->issuer) {
781 XBT_DEBUG("I'm dest");
783 XBT_DEBUG("I'm neither source nor dest");
785 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
789 if (simcall->issuer == synchro->comm.dst_proc)
790 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
791 "Communication canceled by the sender");
793 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
794 "Communication canceled by the receiver");
798 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
801 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
802 if (simcall->issuer->doexception) {
803 if (simcall->call == SIMCALL_COMM_WAITANY) {
804 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
806 else if (simcall->call == SIMCALL_COMM_TESTANY) {
807 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
811 if (simcall->issuer->host->isOff()) {
812 simcall->issuer->context->iwannadie = 1;
815 simcall->issuer->waiting_synchro = NULL;
816 xbt_fifo_remove(simcall->issuer->comms, synchro);
817 if(synchro->comm.detached){
818 if(simcall->issuer == synchro->comm.src_proc){
819 if(synchro->comm.dst_proc)
820 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
822 if(simcall->issuer == synchro->comm.dst_proc){
823 if(synchro->comm.src_proc)
824 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
827 SIMIX_simcall_answer(simcall);
831 while (destroy_count-- > 0)
832 SIMIX_comm_destroy(synchro);
836 * \brief This function is called when a Surf communication synchro is finished.
837 * \param synchro the corresponding Simix communication
839 void SIMIX_post_comm(smx_synchro_t synchro)
841 /* Update synchro state */
842 if (synchro->comm.src_timeout &&
843 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
844 synchro->state = SIMIX_SRC_TIMEOUT;
845 else if (synchro->comm.dst_timeout &&
846 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
847 synchro->state = SIMIX_DST_TIMEOUT;
848 else if (synchro->comm.src_timeout &&
849 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
850 synchro->state = SIMIX_SRC_HOST_FAILURE;
851 else if (synchro->comm.dst_timeout &&
852 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
853 synchro->state = SIMIX_DST_HOST_FAILURE;
854 else if (synchro->comm.surf_comm &&
855 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
856 XBT_DEBUG("Puta madre. Surf says that the link broke");
857 synchro->state = SIMIX_LINK_FAILURE;
859 synchro->state = SIMIX_DONE;
861 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
862 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
864 /* destroy the surf actions associated with the Simix communication */
865 SIMIX_comm_destroy_internal_actions(synchro);
867 /* if there are simcalls associated with the synchro, then answer them */
868 if (xbt_fifo_size(synchro->simcalls)) {
869 SIMIX_comm_finish(synchro);
873 void SIMIX_comm_cancel(smx_synchro_t synchro)
875 /* if the synchro is a waiting state means that it is still in a mbox */
876 /* so remove from it and delete it */
877 if (synchro->state == SIMIX_WAITING) {
878 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
879 synchro->state = SIMIX_CANCELED;
881 else if (!MC_is_active() /* when running the MC there are no surf actions */
882 && !MC_record_replay_is_active()
883 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
885 synchro->comm.surf_comm->cancel();
889 void SIMIX_comm_suspend(smx_synchro_t synchro)
891 /*FIXME: shall we suspend also the timeout synchro? */
892 if (synchro->comm.surf_comm)
893 synchro->comm.surf_comm->suspend();
894 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
897 void SIMIX_comm_resume(smx_synchro_t synchro)
899 /*FIXME: check what happen with the timeouts */
900 if (synchro->comm.surf_comm)
901 synchro->comm.surf_comm->resume();
902 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
906 /************* synchro Getters **************/
909 * \brief get the amount remaining from the communication
910 * \param synchro The communication
912 double SIMIX_comm_get_remains(smx_synchro_t synchro)
920 switch (synchro->state) {
923 remains = synchro->comm.surf_comm->getRemains();
928 remains = 0; /*FIXME: check what should be returned */
932 remains = 0; /*FIXME: is this correct? */
938 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
940 return synchro->state;
944 * \brief Return the user data associated to the sender of the communication
945 * \param synchro The communication
946 * \return the user data
948 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
950 return synchro->comm.src_data;
954 * \brief Return the user data associated to the receiver of the communication
955 * \param synchro The communication
956 * \return the user data
958 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
960 return synchro->comm.dst_data;
963 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
965 return synchro->comm.src_proc;
968 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
970 return synchro->comm.dst_proc;
973 /******************************************************************************/
974 /* SIMIX_comm_copy_data callbacks */
975 /******************************************************************************/
976 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
977 &SIMIX_comm_copy_pointer_callback;
980 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
982 SIMIX_comm_copy_data_callback = callback;
985 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
987 xbt_assert((buff_size == sizeof(void *)),
988 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
989 *(void **) (comm->comm.dst_buff) = buff;
992 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
994 XBT_DEBUG("Copy the data over");
995 memcpy(comm->comm.dst_buff, buff, buff_size);
996 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
998 comm->comm.src_buff = NULL;
1004 * \brief Copy the communication data from the sender's buffer to the receiver's one
1005 * \param comm The communication
1007 void SIMIX_comm_copy_data(smx_synchro_t comm)
1009 size_t buff_size = comm->comm.src_buff_size;
1010 /* If there is no data to be copy then return */
1011 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1014 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1016 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1017 comm->comm.src_buff,
1018 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1019 comm->comm.dst_buff, buff_size);
1021 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1022 if (comm->comm.dst_buff_size)
1023 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1025 /* Update the receiver's buffer size to the copied amount */
1026 if (comm->comm.dst_buff_size)
1027 *comm->comm.dst_buff_size = buff_size;
1030 if(comm->comm.copy_data_fun)
1031 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1033 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1037 /* Set the copied flag so we copy data only once */
1038 /* (this function might be called from both communication ends) */
1039 comm->comm.copied = 1;