1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
11 #include "src/mc/mc_replay.h"
13 #include "simgrid/s4u/mailbox.hpp"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17 static void SIMIX_mbox_free(void *data);
18 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t),
26 void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28 int (*match_fun)(void *, void *,smx_synchro_t),
29 void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
32 void SIMIX_mailbox_exit(void)
34 xbt_dict_free(&mailboxes);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
43 xbt_assert(name, "Mailboxes must have a name");
44 /* two processes may have pushed the same mbox_create simcall at the same time */
45 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
48 mbox = xbt_new0(s_smx_mailbox_t, 1);
49 mbox->name = xbt_strdup(name);
50 mbox->comm_fifo = xbt_fifo_new();
51 mbox->done_comm_fifo = xbt_fifo_new();
52 mbox->permanent_receiver=NULL;
54 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
60 void SIMIX_mbox_free(void *data)
62 XBT_DEBUG("mbox free %p", data);
63 smx_mailbox_t mbox = (smx_mailbox_t) data;
65 xbt_fifo_free(mbox->comm_fifo);
66 xbt_fifo_free(mbox->done_comm_fifo);
71 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
73 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
76 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
78 return (smx_synchro_t) xbt_fifo_get_item_content(
79 xbt_fifo_get_first_item(mbox->comm_fifo));
83 * \brief get the receiver (process associated to the mailbox)
84 * \param mbox The rendez-vous point
85 * \return process The receiving process (NULL if not set)
87 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
89 return mbox->permanent_receiver;
93 * \brief set the receiver of the rendez vous point to allow eager sends
94 * \param mbox The rendez-vous point
95 * \param process The receiving process
97 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
99 mbox->permanent_receiver=process;
103 * \brief Pushes a communication synchro into a rendez-vous point
104 * \param mbox The mailbox
105 * \param comm The communication synchro
107 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
109 xbt_fifo_push(mbox->comm_fifo, comm);
110 comm->comm.mbox = mbox;
114 * \brief Removes a communication synchro from a rendez-vous point
115 * \param mbox The rendez-vous point
116 * \param comm The communication synchro
118 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
120 xbt_fifo_remove(mbox->comm_fifo, comm);
121 comm->comm.mbox = NULL;
125 * \brief Checks if there is a communication synchro queued in a fifo matching our needs
126 * \param type The type of communication we are looking for (comm_send, comm_recv)
127 * \return The communication synchro if found, NULL otherwise
129 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
130 int (*match_fun)(void *, void *,smx_synchro_t),
131 void *this_user_data, smx_synchro_t my_synchro)
133 smx_synchro_t synchro;
134 xbt_fifo_item_t item;
135 void* other_user_data = NULL;
137 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
138 if (synchro->comm.type == SIMIX_COMM_SEND) {
139 other_user_data = synchro->comm.src_data;
140 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
141 other_user_data = synchro->comm.dst_data;
143 if (synchro->comm.type == type &&
144 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
145 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
146 XBT_DEBUG("Found a matching communication synchro %p", synchro);
147 xbt_fifo_remove_item(fifo, item);
148 xbt_fifo_free_item(item);
149 synchro->comm.refcount++;
151 synchro->comm.mbox_cpy = synchro->comm.mbox;
153 synchro->comm.mbox = NULL;
156 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
157 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
158 synchro, (int)synchro->comm.type, (int)type);
160 XBT_DEBUG("No matching communication synchro found");
166 * \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
167 * \param type The type of communication we are looking for (comm_send, comm_recv)
168 * \return The communication synchro if found, NULL otherwise
170 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
171 int (*match_fun)(void *, void *,smx_synchro_t),
172 void *this_user_data, smx_synchro_t my_synchro)
174 smx_synchro_t synchro;
175 xbt_fifo_item_t item;
176 void* other_user_data = NULL;
178 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
179 if (synchro->comm.type == SIMIX_COMM_SEND) {
180 other_user_data = synchro->comm.src_data;
181 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
182 other_user_data = synchro->comm.dst_data;
184 if (synchro->comm.type == type &&
185 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
186 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
187 XBT_DEBUG("Found a matching communication synchro %p", synchro);
188 synchro->comm.refcount++;
192 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
193 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
194 synchro, (int)synchro->comm.type, (int)type);
196 XBT_DEBUG("No matching communication synchro found");
199 /******************************************************************************/
200 /* Communication synchros */
201 /******************************************************************************/
204 * \brief Creates a new communicate synchro
205 * \param type The direction of communication (comm_send, comm_recv)
206 * \return The new communicate synchro
208 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
210 smx_synchro_t synchro;
212 /* alloc structures */
213 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
215 synchro->type = SIMIX_SYNC_COMMUNICATE;
216 synchro->state = SIMIX_WAITING;
218 /* set communication */
219 synchro->comm.type = type;
220 synchro->comm.refcount = 1;
221 synchro->comm.src_data=NULL;
222 synchro->comm.dst_data=NULL;
224 synchro->category = NULL;
226 XBT_DEBUG("Create communicate synchro %p", synchro);
232 * \brief Destroy a communicate synchro
233 * \param synchro The communicate synchro to be destroyed
235 void SIMIX_comm_destroy(smx_synchro_t synchro)
237 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
238 synchro, synchro->comm.refcount, (int)synchro->state);
240 if (synchro->comm.refcount <= 0) {
241 xbt_backtrace_display_current();
242 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
243 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
245 synchro->comm.refcount--;
246 if (synchro->comm.refcount > 0)
248 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
249 synchro->comm.refcount);
251 xbt_free(synchro->name);
252 SIMIX_comm_destroy_internal_actions(synchro);
254 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
255 /* the communication has failed and was detached:
256 * we have to free the buffer */
257 if (synchro->comm.clean_fun) {
258 synchro->comm.clean_fun(synchro->comm.src_buff);
260 synchro->comm.src_buff = NULL;
263 if(synchro->comm.mbox)
264 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
266 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
269 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
271 if (synchro->comm.surf_comm){
272 synchro->comm.surf_comm->unref();
273 synchro->comm.surf_comm = NULL;
276 if (synchro->comm.src_timeout){
277 synchro->comm.src_timeout->unref();
278 synchro->comm.src_timeout = NULL;
281 if (synchro->comm.dst_timeout){
282 synchro->comm.dst_timeout->unref();
283 synchro->comm.dst_timeout = NULL;
287 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
288 double task_size, double rate,
289 void *src_buff, size_t src_buff_size,
290 int (*match_fun)(void *, void *,smx_synchro_t),
291 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
292 void *data, double timeout){
293 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
294 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
296 SIMCALL_SET_MC_VALUE(simcall, 0);
297 simcall_HANDLER_comm_wait(simcall, comm, timeout);
299 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
300 double task_size, double rate,
301 void *src_buff, size_t src_buff_size,
302 int (*match_fun)(void *, void *,smx_synchro_t),
303 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
304 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
305 void *data, int detached)
307 XBT_DEBUG("send from %p", mbox);
309 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
310 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
312 /* Look for communication synchro matching our needs. We also provide a description of
313 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
315 * If it is not found then push our communication into the rendez-vous point */
316 smx_synchro_t other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
318 if (!other_synchro) {
319 other_synchro = this_synchro;
321 if (mbox->permanent_receiver!=NULL){
322 //this mailbox is for small messages, which have to be sent right now
323 other_synchro->state = SIMIX_READY;
324 other_synchro->comm.dst_proc=mbox->permanent_receiver;
325 other_synchro->comm.refcount++;
326 xbt_fifo_push(mbox->done_comm_fifo,other_synchro);
327 other_synchro->comm.mbox=mbox;
328 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
331 SIMIX_mbox_push(mbox, this_synchro);
334 XBT_DEBUG("Receive already pushed");
336 SIMIX_comm_destroy(this_synchro);
338 other_synchro->state = SIMIX_READY;
339 other_synchro->comm.type = SIMIX_COMM_READY;
342 xbt_fifo_push(src_proc->comms, other_synchro);
344 /* if the communication synchro is detached then decrease the refcount
345 * by one, so it will be eliminated by the receiver's destroy call */
347 other_synchro->comm.detached = 1;
348 other_synchro->comm.refcount--;
349 other_synchro->comm.clean_fun = clean_fun;
351 other_synchro->comm.clean_fun = NULL;
354 /* Setup the communication synchro */
355 other_synchro->comm.src_proc = src_proc;
356 other_synchro->comm.task_size = task_size;
357 other_synchro->comm.rate = rate;
358 other_synchro->comm.src_buff = src_buff;
359 other_synchro->comm.src_buff_size = src_buff_size;
360 other_synchro->comm.src_data = data;
362 other_synchro->comm.match_fun = match_fun;
363 other_synchro->comm.copy_data_fun = copy_data_fun;
366 if (MC_is_active() || MC_record_replay_is_active()) {
367 other_synchro->state = SIMIX_RUNNING;
368 return (detached ? NULL : other_synchro);
371 SIMIX_comm_start(other_synchro);
372 return (detached ? NULL : other_synchro);
375 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
376 void *dst_buff, size_t *dst_buff_size,
377 int (*match_fun)(void *, void *, smx_synchro_t),
378 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
379 void *data, double timeout, double rate)
381 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
382 dst_buff_size, match_fun, copy_data_fun, data, rate);
383 SIMCALL_SET_MC_VALUE(simcall, 0);
384 simcall_HANDLER_comm_wait(simcall, comm, timeout);
387 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
388 void *dst_buff, size_t *dst_buff_size,
389 int (*match_fun)(void *, void *, smx_synchro_t),
390 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
391 void *data, double rate)
393 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
396 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
397 int (*match_fun)(void *, void *, smx_synchro_t),
398 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
399 void *data, double rate)
401 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_fifo);
402 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
404 smx_synchro_t other_synchro;
405 //communication already done, get it inside the fifo of completed comms
406 if (mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0) {
408 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
409 //find a match in the already received fifo
410 other_synchro = SIMIX_fifo_get_comm(mbox->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
411 //if not found, assume the receiver came first, register it to the mailbox in the classical way
412 if (!other_synchro) {
413 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
414 other_synchro = this_synchro;
415 SIMIX_mbox_push(mbox, this_synchro);
417 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
418 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
419 other_synchro->state = SIMIX_DONE;
420 other_synchro->comm.type = SIMIX_COMM_DONE;
421 other_synchro->comm.mbox = NULL;
423 other_synchro->comm.refcount--;
424 SIMIX_comm_destroy(this_synchro);
427 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
429 /* Look for communication synchro matching our needs. We also provide a description of
430 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
432 * If it is not found then push our communication into the rendez-vous point */
433 other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
435 if (!other_synchro) {
436 XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(mbox->comm_fifo));
437 other_synchro = this_synchro;
438 SIMIX_mbox_push(mbox, this_synchro);
440 SIMIX_comm_destroy(this_synchro);
441 other_synchro->state = SIMIX_READY;
442 other_synchro->comm.type = SIMIX_COMM_READY;
443 //other_synchro->comm.refcount--;
445 xbt_fifo_push(dst_proc->comms, other_synchro);
448 /* Setup communication synchro */
449 other_synchro->comm.dst_proc = dst_proc;
450 other_synchro->comm.dst_buff = dst_buff;
451 other_synchro->comm.dst_buff_size = dst_buff_size;
452 other_synchro->comm.dst_data = data;
454 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
455 other_synchro->comm.rate = rate;
457 other_synchro->comm.match_fun = match_fun;
458 other_synchro->comm.copy_data_fun = copy_data_fun;
460 if (MC_is_active() || MC_record_replay_is_active()) {
461 other_synchro->state = SIMIX_RUNNING;
462 return other_synchro;
465 SIMIX_comm_start(other_synchro);
466 return other_synchro;
469 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
470 int type, int src, int tag,
471 int (*match_fun)(void *, void *, smx_synchro_t),
473 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
476 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
477 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
479 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_fifo);
480 smx_synchro_t this_synchro;
483 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
484 smx_type = SIMIX_COMM_RECEIVE;
486 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487 smx_type = SIMIX_COMM_SEND;
489 smx_synchro_t other_synchro=NULL;
490 if(mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0){
491 //find a match in the already received fifo
492 XBT_DEBUG("first try in the perm recv mailbox");
494 other_synchro = SIMIX_fifo_probe_comm(
495 mbox->done_comm_fifo, (e_smx_comm_type_t) smx_type,
496 match_fun, data, this_synchro);
500 XBT_DEBUG("try in the normal mailbox");
501 other_synchro = SIMIX_fifo_probe_comm(
502 mbox->comm_fifo, (e_smx_comm_type_t) smx_type,
503 match_fun, data, this_synchro);
506 if(other_synchro)other_synchro->comm.refcount--;
508 SIMIX_comm_destroy(this_synchro);
509 return other_synchro;
512 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
514 /* the simcall may be a wait, a send or a recv */
517 /* Associate this simcall to the wait synchro */
518 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
520 xbt_fifo_push(synchro->simcalls, simcall);
521 simcall->issuer->waiting_synchro = synchro;
523 if (MC_is_active() || MC_record_replay_is_active()) {
524 int idx = SIMCALL_GET_MC_VALUE(simcall);
526 synchro->state = SIMIX_DONE;
528 /* If we reached this point, the wait simcall must have a timeout */
529 /* Otherwise it shouldn't be enabled and executed by the MC */
533 if (synchro->comm.src_proc == simcall->issuer)
534 synchro->state = SIMIX_SRC_TIMEOUT;
536 synchro->state = SIMIX_DST_TIMEOUT;
539 SIMIX_comm_finish(synchro);
543 /* If the synchro has already finish perform the error handling, */
544 /* otherwise set up a waiting timeout on the right side */
545 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
546 SIMIX_comm_finish(synchro);
547 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
548 sleep = surf_host_sleep(simcall->issuer->host, timeout);
549 sleep->setData(synchro);
551 if (simcall->issuer == synchro->comm.src_proc)
552 synchro->comm.src_timeout = sleep;
554 synchro->comm.dst_timeout = sleep;
558 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
560 if(MC_is_active() || MC_record_replay_is_active()){
561 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
562 if(simcall_comm_test__get__result(simcall)){
563 synchro->state = SIMIX_DONE;
564 xbt_fifo_push(synchro->simcalls, simcall);
565 SIMIX_comm_finish(synchro);
567 SIMIX_simcall_answer(simcall);
572 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
573 if (simcall_comm_test__get__result(simcall)) {
574 xbt_fifo_push(synchro->simcalls, simcall);
575 SIMIX_comm_finish(synchro);
577 SIMIX_simcall_answer(simcall);
581 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
584 smx_synchro_t synchro;
585 simcall_comm_testany__set__result(simcall, -1);
587 if (MC_is_active() || MC_record_replay_is_active()){
588 int idx = SIMCALL_GET_MC_VALUE(simcall);
590 SIMIX_simcall_answer(simcall);
592 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
593 simcall_comm_testany__set__result(simcall, idx);
594 xbt_fifo_push(synchro->simcalls, simcall);
595 synchro->state = SIMIX_DONE;
596 SIMIX_comm_finish(synchro);
601 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
602 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
603 simcall_comm_testany__set__result(simcall, cursor);
604 xbt_fifo_push(synchro->simcalls, simcall);
605 SIMIX_comm_finish(synchro);
609 SIMIX_simcall_answer(simcall);
612 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
614 smx_synchro_t synchro;
615 unsigned int cursor = 0;
617 if (MC_is_active() || MC_record_replay_is_active()){
618 int idx = SIMCALL_GET_MC_VALUE(simcall);
619 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
620 xbt_fifo_push(synchro->simcalls, simcall);
621 simcall_comm_waitany__set__result(simcall, idx);
622 synchro->state = SIMIX_DONE;
623 SIMIX_comm_finish(synchro);
627 xbt_dynar_foreach(synchros, cursor, synchro){
628 /* associate this simcall to the the synchro */
629 xbt_fifo_push(synchro->simcalls, simcall);
631 /* see if the synchro is already finished */
632 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
633 SIMIX_comm_finish(synchro);
639 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
641 smx_synchro_t synchro;
642 unsigned int cursor = 0;
643 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
645 xbt_dynar_foreach(synchros, cursor, synchro) {
646 xbt_fifo_remove(synchro->simcalls, simcall);
651 * \brief Starts the simulation of a communication synchro.
652 * \param synchro the communication synchro
654 static inline void SIMIX_comm_start(smx_synchro_t synchro)
656 /* If both the sender and the receiver are already there, start the communication */
657 if (synchro->state == SIMIX_READY) {
659 sg_host_t sender = synchro->comm.src_proc->host;
660 sg_host_t receiver = synchro->comm.dst_proc->host;
662 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
663 sg_host_get_name(sender), sg_host_get_name(receiver));
665 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
667 synchro->comm.task_size, synchro->comm.rate);
669 synchro->comm.surf_comm->setData(synchro);
671 synchro->state = SIMIX_RUNNING;
673 /* If a link is failed, detect it immediately */
674 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
675 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
676 sg_host_get_name(sender), sg_host_get_name(receiver));
677 synchro->state = SIMIX_LINK_FAILURE;
678 SIMIX_comm_destroy_internal_actions(synchro);
681 /* If any of the process is suspend, create the synchro but stop its execution,
682 it will be restarted when the sender process resume */
683 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
684 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
685 /* FIXME: check what should happen with the synchro state */
687 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
688 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
689 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
691 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
692 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
694 synchro->comm.surf_comm->suspend();
701 * \brief Answers the SIMIX simcalls associated to a communication synchro.
702 * \param synchro a finished communication synchro
704 void SIMIX_comm_finish(smx_synchro_t synchro)
706 unsigned int destroy_count = 0;
707 smx_simcall_t simcall;
709 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
711 /* If a waitany simcall is waiting for this synchro to finish, then remove
712 it from the other synchros in the waitany list. Afterwards, get the
713 position of the actual synchro in the waitany dynar and
714 return it as the result of the simcall */
716 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
717 continue; // if process handling comm is killed
718 if (simcall->call == SIMCALL_COMM_WAITANY) {
719 SIMIX_waitany_remove_simcall_from_actions(simcall);
720 if (!MC_is_active() && !MC_record_replay_is_active())
721 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
724 /* If the synchro is still in a rendez-vous point then remove from it */
725 if (synchro->comm.mbox)
726 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
728 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
730 /* Check out for errors */
732 if (simcall->issuer->host->isOff()) {
733 simcall->issuer->context->iwannadie = 1;
734 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
737 switch (synchro->state) {
740 XBT_DEBUG("Communication %p complete!", synchro);
741 SIMIX_comm_copy_data(synchro);
744 case SIMIX_SRC_TIMEOUT:
745 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
746 "Communication timeouted because of sender");
749 case SIMIX_DST_TIMEOUT:
750 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
751 "Communication timeouted because of receiver");
754 case SIMIX_SRC_HOST_FAILURE:
755 if (simcall->issuer == synchro->comm.src_proc)
756 simcall->issuer->context->iwannadie = 1;
757 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
759 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
762 case SIMIX_DST_HOST_FAILURE:
763 if (simcall->issuer == synchro->comm.dst_proc)
764 simcall->issuer->context->iwannadie = 1;
765 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
767 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
770 case SIMIX_LINK_FAILURE:
772 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
774 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
775 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
776 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
777 if (synchro->comm.src_proc == simcall->issuer) {
778 XBT_DEBUG("I'm source");
779 } else if (synchro->comm.dst_proc == simcall->issuer) {
780 XBT_DEBUG("I'm dest");
782 XBT_DEBUG("I'm neither source nor dest");
784 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
788 if (simcall->issuer == synchro->comm.dst_proc)
789 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
790 "Communication canceled by the sender");
792 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
793 "Communication canceled by the receiver");
797 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
800 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
801 if (simcall->issuer->doexception) {
802 if (simcall->call == SIMCALL_COMM_WAITANY) {
803 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
805 else if (simcall->call == SIMCALL_COMM_TESTANY) {
806 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
810 if (simcall->issuer->host->isOff()) {
811 simcall->issuer->context->iwannadie = 1;
814 simcall->issuer->waiting_synchro = NULL;
815 xbt_fifo_remove(simcall->issuer->comms, synchro);
816 if(synchro->comm.detached){
817 if(simcall->issuer == synchro->comm.src_proc){
818 if(synchro->comm.dst_proc)
819 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
821 if(simcall->issuer == synchro->comm.dst_proc){
822 if(synchro->comm.src_proc)
823 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
826 SIMIX_simcall_answer(simcall);
830 while (destroy_count-- > 0)
831 SIMIX_comm_destroy(synchro);
835 * \brief This function is called when a Surf communication synchro is finished.
836 * \param synchro the corresponding Simix communication
838 void SIMIX_post_comm(smx_synchro_t synchro)
840 /* Update synchro state */
841 if (synchro->comm.src_timeout &&
842 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
843 synchro->state = SIMIX_SRC_TIMEOUT;
844 else if (synchro->comm.dst_timeout &&
845 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
846 synchro->state = SIMIX_DST_TIMEOUT;
847 else if (synchro->comm.src_timeout &&
848 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
849 synchro->state = SIMIX_SRC_HOST_FAILURE;
850 else if (synchro->comm.dst_timeout &&
851 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
852 synchro->state = SIMIX_DST_HOST_FAILURE;
853 else if (synchro->comm.surf_comm &&
854 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
855 XBT_DEBUG("Puta madre. Surf says that the link broke");
856 synchro->state = SIMIX_LINK_FAILURE;
858 synchro->state = SIMIX_DONE;
860 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
861 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
863 /* destroy the surf actions associated with the Simix communication */
864 SIMIX_comm_destroy_internal_actions(synchro);
866 /* if there are simcalls associated with the synchro, then answer them */
867 if (xbt_fifo_size(synchro->simcalls)) {
868 SIMIX_comm_finish(synchro);
872 void SIMIX_comm_cancel(smx_synchro_t synchro)
874 /* if the synchro is a waiting state means that it is still in a mbox */
875 /* so remove from it and delete it */
876 if (synchro->state == SIMIX_WAITING) {
877 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
878 synchro->state = SIMIX_CANCELED;
880 else if (!MC_is_active() /* when running the MC there are no surf actions */
881 && !MC_record_replay_is_active()
882 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
884 synchro->comm.surf_comm->cancel();
888 void SIMIX_comm_suspend(smx_synchro_t synchro)
890 /*FIXME: shall we suspend also the timeout synchro? */
891 if (synchro->comm.surf_comm)
892 synchro->comm.surf_comm->suspend();
893 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
896 void SIMIX_comm_resume(smx_synchro_t synchro)
898 /*FIXME: check what happen with the timeouts */
899 if (synchro->comm.surf_comm)
900 synchro->comm.surf_comm->resume();
901 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
905 /************* synchro Getters **************/
908 * \brief get the amount remaining from the communication
909 * \param synchro The communication
911 double SIMIX_comm_get_remains(smx_synchro_t synchro)
919 switch (synchro->state) {
922 remains = synchro->comm.surf_comm->getRemains();
927 remains = 0; /*FIXME: check what should be returned */
931 remains = 0; /*FIXME: is this correct? */
937 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
939 return synchro->state;
943 * \brief Return the user data associated to the sender of the communication
944 * \param synchro The communication
945 * \return the user data
947 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
949 return synchro->comm.src_data;
953 * \brief Return the user data associated to the receiver of the communication
954 * \param synchro The communication
955 * \return the user data
957 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
959 return synchro->comm.dst_data;
962 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
964 return synchro->comm.src_proc;
967 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
969 return synchro->comm.dst_proc;
972 /******************************************************************************/
973 /* SIMIX_comm_copy_data callbacks */
974 /******************************************************************************/
975 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
976 &SIMIX_comm_copy_pointer_callback;
979 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
981 SIMIX_comm_copy_data_callback = callback;
984 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
986 xbt_assert((buff_size == sizeof(void *)),
987 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
988 *(void **) (comm->comm.dst_buff) = buff;
991 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
993 XBT_DEBUG("Copy the data over");
994 memcpy(comm->comm.dst_buff, buff, buff_size);
995 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
997 comm->comm.src_buff = NULL;
1003 * \brief Copy the communication data from the sender's buffer to the receiver's one
1004 * \param comm The communication
1006 void SIMIX_comm_copy_data(smx_synchro_t comm)
1008 size_t buff_size = comm->comm.src_buff_size;
1009 /* If there is no data to be copy then return */
1010 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1013 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1015 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1016 comm->comm.src_buff,
1017 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1018 comm->comm.dst_buff, buff_size);
1020 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1021 if (comm->comm.dst_buff_size)
1022 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1024 /* Update the receiver's buffer size to the copied amount */
1025 if (comm->comm.dst_buff_size)
1026 *comm->comm.dst_buff_size = buff_size;
1029 if(comm->comm.copy_data_fun)
1030 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1032 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1036 /* Set the copied flag so we copy data only once */
1037 /* (this function might be called from both communication ends) */
1038 comm->comm.copied = 1;