1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
11 #include "src/mc/mc_replay.h"
13 #include "simgrid/s4u/mailbox.hpp"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17 static void SIMIX_mbox_free(void *data);
18 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t),
26 void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28 int (*match_fun)(void *, void *,smx_synchro_t),
29 void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
32 void SIMIX_mailbox_exit(void)
34 xbt_dict_free(&mailboxes);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
43 xbt_assert(name, "Mailboxes must have a name");
44 /* two processes may have pushed the same mbox_create simcall at the same time */
45 smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
48 mbox = xbt_new0(s_smx_mailbox_t, 1);
49 mbox->name = xbt_strdup(name);
50 mbox->comm_fifo = xbt_fifo_new();
51 mbox->done_comm_fifo = xbt_fifo_new();
52 mbox->permanent_receiver=NULL;
54 XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55 xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
60 void SIMIX_mbox_free(void *data)
62 XBT_DEBUG("mbox free %p", data);
63 smx_mailbox_t mbox = (smx_mailbox_t) data;
65 xbt_fifo_free(mbox->comm_fifo);
66 xbt_fifo_free(mbox->done_comm_fifo);
71 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
73 return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
76 int SIMIX_mbox_comm_count_by_host(smx_mailbox_t mbox, sg_host_t host)
78 smx_synchro_t comm = NULL;
79 xbt_fifo_item_t item = NULL;
82 xbt_fifo_foreach(mbox->comm_fifo, item, comm, smx_synchro_t) {
83 if (comm->comm.src_proc->host == host)
90 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
92 return (smx_synchro_t) xbt_fifo_get_item_content(
93 xbt_fifo_get_first_item(mbox->comm_fifo));
97 * \brief get the receiver (process associated to the mailbox)
98 * \param mbox The rendez-vous point
99 * \return process The receiving process (NULL if not set)
101 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
103 return mbox->permanent_receiver;
107 * \brief set the receiver of the rendez vous point to allow eager sends
108 * \param mbox The rendez-vous point
109 * \param process The receiving process
111 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
113 mbox->permanent_receiver=process;
117 * \brief Pushes a communication synchro into a rendez-vous point
118 * \param mbox The mailbox
119 * \param comm The communication synchro
121 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
123 xbt_fifo_push(mbox->comm_fifo, comm);
124 comm->comm.mbox = mbox;
128 * \brief Removes a communication synchro from a rendez-vous point
129 * \param mbox The rendez-vous point
130 * \param comm The communication synchro
132 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
134 xbt_fifo_remove(mbox->comm_fifo, comm);
135 comm->comm.mbox = NULL;
139 * \brief Checks if there is a communication synchro queued in a fifo matching our needs
140 * \param type The type of communication we are looking for (comm_send, comm_recv)
141 * \return The communication synchro if found, NULL otherwise
143 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
144 int (*match_fun)(void *, void *,smx_synchro_t),
145 void *this_user_data, smx_synchro_t my_synchro)
147 smx_synchro_t synchro;
148 xbt_fifo_item_t item;
149 void* other_user_data = NULL;
151 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
152 if (synchro->comm.type == SIMIX_COMM_SEND) {
153 other_user_data = synchro->comm.src_data;
154 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
155 other_user_data = synchro->comm.dst_data;
157 if (synchro->comm.type == type &&
158 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
159 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
160 XBT_DEBUG("Found a matching communication synchro %p", synchro);
161 xbt_fifo_remove_item(fifo, item);
162 xbt_fifo_free_item(item);
163 synchro->comm.refcount++;
165 synchro->comm.mbox_cpy = synchro->comm.mbox;
167 synchro->comm.mbox = NULL;
170 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
171 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
172 synchro, (int)synchro->comm.type, (int)type);
174 XBT_DEBUG("No matching communication synchro found");
180 * \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
181 * \param type The type of communication we are looking for (comm_send, comm_recv)
182 * \return The communication synchro if found, NULL otherwise
184 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
185 int (*match_fun)(void *, void *,smx_synchro_t),
186 void *this_user_data, smx_synchro_t my_synchro)
188 smx_synchro_t synchro;
189 xbt_fifo_item_t item;
190 void* other_user_data = NULL;
192 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
193 if (synchro->comm.type == SIMIX_COMM_SEND) {
194 other_user_data = synchro->comm.src_data;
195 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
196 other_user_data = synchro->comm.dst_data;
198 if (synchro->comm.type == type &&
199 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
200 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
201 XBT_DEBUG("Found a matching communication synchro %p", synchro);
202 synchro->comm.refcount++;
206 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
207 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
208 synchro, (int)synchro->comm.type, (int)type);
210 XBT_DEBUG("No matching communication synchro found");
213 /******************************************************************************/
214 /* Communication synchros */
215 /******************************************************************************/
218 * \brief Creates a new communicate synchro
219 * \param type The direction of communication (comm_send, comm_recv)
220 * \return The new communicate synchro
222 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
224 smx_synchro_t synchro;
226 /* alloc structures */
227 synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
229 synchro->type = SIMIX_SYNC_COMMUNICATE;
230 synchro->state = SIMIX_WAITING;
232 /* set communication */
233 synchro->comm.type = type;
234 synchro->comm.refcount = 1;
235 synchro->comm.src_data=NULL;
236 synchro->comm.dst_data=NULL;
238 synchro->category = NULL;
240 XBT_DEBUG("Create communicate synchro %p", synchro);
246 * \brief Destroy a communicate synchro
247 * \param synchro The communicate synchro to be destroyed
249 void SIMIX_comm_destroy(smx_synchro_t synchro)
251 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
252 synchro, synchro->comm.refcount, (int)synchro->state);
254 if (synchro->comm.refcount <= 0) {
255 xbt_backtrace_display_current();
256 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
257 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
259 synchro->comm.refcount--;
260 if (synchro->comm.refcount > 0)
262 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
263 synchro->comm.refcount);
265 xbt_free(synchro->name);
266 SIMIX_comm_destroy_internal_actions(synchro);
268 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
269 /* the communication has failed and was detached:
270 * we have to free the buffer */
271 if (synchro->comm.clean_fun) {
272 synchro->comm.clean_fun(synchro->comm.src_buff);
274 synchro->comm.src_buff = NULL;
277 if(synchro->comm.mbox)
278 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
280 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
283 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
285 if (synchro->comm.surf_comm){
286 synchro->comm.surf_comm->unref();
287 synchro->comm.surf_comm = NULL;
290 if (synchro->comm.src_timeout){
291 synchro->comm.src_timeout->unref();
292 synchro->comm.src_timeout = NULL;
295 if (synchro->comm.dst_timeout){
296 synchro->comm.dst_timeout->unref();
297 synchro->comm.dst_timeout = NULL;
301 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
302 double task_size, double rate,
303 void *src_buff, size_t src_buff_size,
304 int (*match_fun)(void *, void *,smx_synchro_t),
305 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
306 void *data, double timeout){
307 smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
308 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
310 SIMCALL_SET_MC_VALUE(simcall, 0);
311 simcall_HANDLER_comm_wait(simcall, comm, timeout);
313 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
314 double task_size, double rate,
315 void *src_buff, size_t src_buff_size,
316 int (*match_fun)(void *, void *,smx_synchro_t),
317 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
318 void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
319 void *data, int detached)
321 XBT_DEBUG("send from %p", mbox);
323 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
324 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
326 /* Look for communication synchro matching our needs. We also provide a description of
327 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
329 * If it is not found then push our communication into the rendez-vous point */
330 smx_synchro_t other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
332 if (!other_synchro) {
333 other_synchro = this_synchro;
335 if (mbox->permanent_receiver!=NULL){
336 //this mailbox is for small messages, which have to be sent right now
337 other_synchro->state = SIMIX_READY;
338 other_synchro->comm.dst_proc=mbox->permanent_receiver;
339 other_synchro->comm.refcount++;
340 xbt_fifo_push(mbox->done_comm_fifo,other_synchro);
341 other_synchro->comm.mbox=mbox;
342 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
345 SIMIX_mbox_push(mbox, this_synchro);
348 XBT_DEBUG("Receive already pushed");
350 SIMIX_comm_destroy(this_synchro);
352 other_synchro->state = SIMIX_READY;
353 other_synchro->comm.type = SIMIX_COMM_READY;
356 xbt_fifo_push(src_proc->comms, other_synchro);
358 /* if the communication synchro is detached then decrease the refcount
359 * by one, so it will be eliminated by the receiver's destroy call */
361 other_synchro->comm.detached = 1;
362 other_synchro->comm.refcount--;
363 other_synchro->comm.clean_fun = clean_fun;
365 other_synchro->comm.clean_fun = NULL;
368 /* Setup the communication synchro */
369 other_synchro->comm.src_proc = src_proc;
370 other_synchro->comm.task_size = task_size;
371 other_synchro->comm.rate = rate;
372 other_synchro->comm.src_buff = src_buff;
373 other_synchro->comm.src_buff_size = src_buff_size;
374 other_synchro->comm.src_data = data;
376 other_synchro->comm.match_fun = match_fun;
377 other_synchro->comm.copy_data_fun = copy_data_fun;
380 if (MC_is_active() || MC_record_replay_is_active()) {
381 other_synchro->state = SIMIX_RUNNING;
382 return (detached ? NULL : other_synchro);
385 SIMIX_comm_start(other_synchro);
386 return (detached ? NULL : other_synchro);
389 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
390 void *dst_buff, size_t *dst_buff_size,
391 int (*match_fun)(void *, void *, smx_synchro_t),
392 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
393 void *data, double timeout, double rate)
395 smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
396 dst_buff_size, match_fun, copy_data_fun, data, rate);
397 SIMCALL_SET_MC_VALUE(simcall, 0);
398 simcall_HANDLER_comm_wait(simcall, comm, timeout);
401 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
402 void *dst_buff, size_t *dst_buff_size,
403 int (*match_fun)(void *, void *, smx_synchro_t),
404 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
405 void *data, double rate)
407 return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
410 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
411 int (*match_fun)(void *, void *, smx_synchro_t),
412 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
413 void *data, double rate)
415 XBT_DEBUG("recv from %p %p", mbox, mbox->comm_fifo);
416 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
418 smx_synchro_t other_synchro;
419 //communication already done, get it inside the fifo of completed comms
420 if (mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0) {
422 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
423 //find a match in the already received fifo
424 other_synchro = SIMIX_fifo_get_comm(mbox->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
425 //if not found, assume the receiver came first, register it to the mailbox in the classical way
426 if (!other_synchro) {
427 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
428 other_synchro = this_synchro;
429 SIMIX_mbox_push(mbox, this_synchro);
431 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
432 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
433 other_synchro->state = SIMIX_DONE;
434 other_synchro->comm.type = SIMIX_COMM_DONE;
435 other_synchro->comm.mbox = NULL;
437 other_synchro->comm.refcount--;
438 SIMIX_comm_destroy(this_synchro);
441 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
443 /* Look for communication synchro matching our needs. We also provide a description of
444 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
446 * If it is not found then push our communication into the rendez-vous point */
447 other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
449 if (!other_synchro) {
450 XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(mbox->comm_fifo));
451 other_synchro = this_synchro;
452 SIMIX_mbox_push(mbox, this_synchro);
454 SIMIX_comm_destroy(this_synchro);
455 other_synchro->state = SIMIX_READY;
456 other_synchro->comm.type = SIMIX_COMM_READY;
457 //other_synchro->comm.refcount--;
459 xbt_fifo_push(dst_proc->comms, other_synchro);
462 /* Setup communication synchro */
463 other_synchro->comm.dst_proc = dst_proc;
464 other_synchro->comm.dst_buff = dst_buff;
465 other_synchro->comm.dst_buff_size = dst_buff_size;
466 other_synchro->comm.dst_data = data;
468 if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
469 other_synchro->comm.rate = rate;
471 other_synchro->comm.match_fun = match_fun;
472 other_synchro->comm.copy_data_fun = copy_data_fun;
474 if (MC_is_active() || MC_record_replay_is_active()) {
475 other_synchro->state = SIMIX_RUNNING;
476 return other_synchro;
479 SIMIX_comm_start(other_synchro);
480 return other_synchro;
483 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
484 int type, int src, int tag,
485 int (*match_fun)(void *, void *, smx_synchro_t),
487 return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
490 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
491 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
493 XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_fifo);
494 smx_synchro_t this_synchro;
497 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
498 smx_type = SIMIX_COMM_RECEIVE;
500 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
501 smx_type = SIMIX_COMM_SEND;
503 smx_synchro_t other_synchro=NULL;
504 if(mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0){
505 //find a match in the already received fifo
506 XBT_DEBUG("first try in the perm recv mailbox");
508 other_synchro = SIMIX_fifo_probe_comm(
509 mbox->done_comm_fifo, (e_smx_comm_type_t) smx_type,
510 match_fun, data, this_synchro);
514 XBT_DEBUG("try in the normal mailbox");
515 other_synchro = SIMIX_fifo_probe_comm(
516 mbox->comm_fifo, (e_smx_comm_type_t) smx_type,
517 match_fun, data, this_synchro);
520 if(other_synchro)other_synchro->comm.refcount--;
522 SIMIX_comm_destroy(this_synchro);
523 return other_synchro;
526 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
528 /* the simcall may be a wait, a send or a recv */
531 /* Associate this simcall to the wait synchro */
532 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
534 xbt_fifo_push(synchro->simcalls, simcall);
535 simcall->issuer->waiting_synchro = synchro;
537 if (MC_is_active() || MC_record_replay_is_active()) {
538 int idx = SIMCALL_GET_MC_VALUE(simcall);
540 synchro->state = SIMIX_DONE;
542 /* If we reached this point, the wait simcall must have a timeout */
543 /* Otherwise it shouldn't be enabled and executed by the MC */
547 if (synchro->comm.src_proc == simcall->issuer)
548 synchro->state = SIMIX_SRC_TIMEOUT;
550 synchro->state = SIMIX_DST_TIMEOUT;
553 SIMIX_comm_finish(synchro);
557 /* If the synchro has already finish perform the error handling, */
558 /* otherwise set up a waiting timeout on the right side */
559 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
560 SIMIX_comm_finish(synchro);
561 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
562 sleep = surf_host_sleep(simcall->issuer->host, timeout);
563 sleep->setData(synchro);
565 if (simcall->issuer == synchro->comm.src_proc)
566 synchro->comm.src_timeout = sleep;
568 synchro->comm.dst_timeout = sleep;
572 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
574 if(MC_is_active() || MC_record_replay_is_active()){
575 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
576 if(simcall_comm_test__get__result(simcall)){
577 synchro->state = SIMIX_DONE;
578 xbt_fifo_push(synchro->simcalls, simcall);
579 SIMIX_comm_finish(synchro);
581 SIMIX_simcall_answer(simcall);
586 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
587 if (simcall_comm_test__get__result(simcall)) {
588 xbt_fifo_push(synchro->simcalls, simcall);
589 SIMIX_comm_finish(synchro);
591 SIMIX_simcall_answer(simcall);
595 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
598 smx_synchro_t synchro;
599 simcall_comm_testany__set__result(simcall, -1);
601 if (MC_is_active() || MC_record_replay_is_active()){
602 int idx = SIMCALL_GET_MC_VALUE(simcall);
604 SIMIX_simcall_answer(simcall);
606 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
607 simcall_comm_testany__set__result(simcall, idx);
608 xbt_fifo_push(synchro->simcalls, simcall);
609 synchro->state = SIMIX_DONE;
610 SIMIX_comm_finish(synchro);
615 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
616 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
617 simcall_comm_testany__set__result(simcall, cursor);
618 xbt_fifo_push(synchro->simcalls, simcall);
619 SIMIX_comm_finish(synchro);
623 SIMIX_simcall_answer(simcall);
626 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
628 smx_synchro_t synchro;
629 unsigned int cursor = 0;
631 if (MC_is_active() || MC_record_replay_is_active()){
632 int idx = SIMCALL_GET_MC_VALUE(simcall);
633 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
634 xbt_fifo_push(synchro->simcalls, simcall);
635 simcall_comm_waitany__set__result(simcall, idx);
636 synchro->state = SIMIX_DONE;
637 SIMIX_comm_finish(synchro);
641 xbt_dynar_foreach(synchros, cursor, synchro){
642 /* associate this simcall to the the synchro */
643 xbt_fifo_push(synchro->simcalls, simcall);
645 /* see if the synchro is already finished */
646 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
647 SIMIX_comm_finish(synchro);
653 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
655 smx_synchro_t synchro;
656 unsigned int cursor = 0;
657 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
659 xbt_dynar_foreach(synchros, cursor, synchro) {
660 xbt_fifo_remove(synchro->simcalls, simcall);
665 * \brief Starts the simulation of a communication synchro.
666 * \param synchro the communication synchro
668 static inline void SIMIX_comm_start(smx_synchro_t synchro)
670 /* If both the sender and the receiver are already there, start the communication */
671 if (synchro->state == SIMIX_READY) {
673 sg_host_t sender = synchro->comm.src_proc->host;
674 sg_host_t receiver = synchro->comm.dst_proc->host;
676 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
677 sg_host_get_name(sender), sg_host_get_name(receiver));
679 synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
681 synchro->comm.task_size, synchro->comm.rate);
683 synchro->comm.surf_comm->setData(synchro);
685 synchro->state = SIMIX_RUNNING;
687 /* If a link is failed, detect it immediately */
688 if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
689 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
690 sg_host_get_name(sender), sg_host_get_name(receiver));
691 synchro->state = SIMIX_LINK_FAILURE;
692 SIMIX_comm_destroy_internal_actions(synchro);
695 /* If any of the process is suspend, create the synchro but stop its execution,
696 it will be restarted when the sender process resume */
697 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
698 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
699 /* FIXME: check what should happen with the synchro state */
701 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
702 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
703 sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
705 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
706 sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
708 synchro->comm.surf_comm->suspend();
715 * \brief Answers the SIMIX simcalls associated to a communication synchro.
716 * \param synchro a finished communication synchro
718 void SIMIX_comm_finish(smx_synchro_t synchro)
720 unsigned int destroy_count = 0;
721 smx_simcall_t simcall;
723 while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
725 /* If a waitany simcall is waiting for this synchro to finish, then remove
726 it from the other synchros in the waitany list. Afterwards, get the
727 position of the actual synchro in the waitany dynar and
728 return it as the result of the simcall */
730 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
731 continue; // if process handling comm is killed
732 if (simcall->call == SIMCALL_COMM_WAITANY) {
733 SIMIX_waitany_remove_simcall_from_actions(simcall);
734 if (!MC_is_active() && !MC_record_replay_is_active())
735 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
738 /* If the synchro is still in a rendez-vous point then remove from it */
739 if (synchro->comm.mbox)
740 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
742 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
744 /* Check out for errors */
746 if (simcall->issuer->host->isOff()) {
747 simcall->issuer->context->iwannadie = 1;
748 SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
751 switch (synchro->state) {
754 XBT_DEBUG("Communication %p complete!", synchro);
755 SIMIX_comm_copy_data(synchro);
758 case SIMIX_SRC_TIMEOUT:
759 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
760 "Communication timeouted because of sender");
763 case SIMIX_DST_TIMEOUT:
764 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
765 "Communication timeouted because of receiver");
768 case SIMIX_SRC_HOST_FAILURE:
769 if (simcall->issuer == synchro->comm.src_proc)
770 simcall->issuer->context->iwannadie = 1;
771 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
773 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
776 case SIMIX_DST_HOST_FAILURE:
777 if (simcall->issuer == synchro->comm.dst_proc)
778 simcall->issuer->context->iwannadie = 1;
779 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
781 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
784 case SIMIX_LINK_FAILURE:
786 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
788 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
789 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
790 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
791 if (synchro->comm.src_proc == simcall->issuer) {
792 XBT_DEBUG("I'm source");
793 } else if (synchro->comm.dst_proc == simcall->issuer) {
794 XBT_DEBUG("I'm dest");
796 XBT_DEBUG("I'm neither source nor dest");
798 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
802 if (simcall->issuer == synchro->comm.dst_proc)
803 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
804 "Communication canceled by the sender");
806 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
807 "Communication canceled by the receiver");
811 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
814 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
815 if (simcall->issuer->doexception) {
816 if (simcall->call == SIMCALL_COMM_WAITANY) {
817 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
819 else if (simcall->call == SIMCALL_COMM_TESTANY) {
820 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
824 if (simcall->issuer->host->isOff()) {
825 simcall->issuer->context->iwannadie = 1;
828 simcall->issuer->waiting_synchro = NULL;
829 xbt_fifo_remove(simcall->issuer->comms, synchro);
830 if(synchro->comm.detached){
831 if(simcall->issuer == synchro->comm.src_proc){
832 if(synchro->comm.dst_proc)
833 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
835 if(simcall->issuer == synchro->comm.dst_proc){
836 if(synchro->comm.src_proc)
837 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
840 SIMIX_simcall_answer(simcall);
844 while (destroy_count-- > 0)
845 SIMIX_comm_destroy(synchro);
849 * \brief This function is called when a Surf communication synchro is finished.
850 * \param synchro the corresponding Simix communication
852 void SIMIX_post_comm(smx_synchro_t synchro)
854 /* Update synchro state */
855 if (synchro->comm.src_timeout &&
856 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
857 synchro->state = SIMIX_SRC_TIMEOUT;
858 else if (synchro->comm.dst_timeout &&
859 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
860 synchro->state = SIMIX_DST_TIMEOUT;
861 else if (synchro->comm.src_timeout &&
862 synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
863 synchro->state = SIMIX_SRC_HOST_FAILURE;
864 else if (synchro->comm.dst_timeout &&
865 synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
866 synchro->state = SIMIX_DST_HOST_FAILURE;
867 else if (synchro->comm.surf_comm &&
868 synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
869 XBT_DEBUG("Puta madre. Surf says that the link broke");
870 synchro->state = SIMIX_LINK_FAILURE;
872 synchro->state = SIMIX_DONE;
874 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
875 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
877 /* destroy the surf actions associated with the Simix communication */
878 SIMIX_comm_destroy_internal_actions(synchro);
880 /* if there are simcalls associated with the synchro, then answer them */
881 if (xbt_fifo_size(synchro->simcalls)) {
882 SIMIX_comm_finish(synchro);
886 void SIMIX_comm_cancel(smx_synchro_t synchro)
888 /* if the synchro is a waiting state means that it is still in a mbox */
889 /* so remove from it and delete it */
890 if (synchro->state == SIMIX_WAITING) {
891 SIMIX_mbox_remove(synchro->comm.mbox, synchro);
892 synchro->state = SIMIX_CANCELED;
894 else if (!MC_is_active() /* when running the MC there are no surf actions */
895 && !MC_record_replay_is_active()
896 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
898 synchro->comm.surf_comm->cancel();
902 void SIMIX_comm_suspend(smx_synchro_t synchro)
904 /*FIXME: shall we suspend also the timeout synchro? */
905 if (synchro->comm.surf_comm)
906 synchro->comm.surf_comm->suspend();
907 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
910 void SIMIX_comm_resume(smx_synchro_t synchro)
912 /*FIXME: check what happen with the timeouts */
913 if (synchro->comm.surf_comm)
914 synchro->comm.surf_comm->resume();
915 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
919 /************* synchro Getters **************/
922 * \brief get the amount remaining from the communication
923 * \param synchro The communication
925 double SIMIX_comm_get_remains(smx_synchro_t synchro)
933 switch (synchro->state) {
936 remains = synchro->comm.surf_comm->getRemains();
941 remains = 0; /*FIXME: check what should be returned */
945 remains = 0; /*FIXME: is this correct? */
951 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
953 return synchro->state;
957 * \brief Return the user data associated to the sender of the communication
958 * \param synchro The communication
959 * \return the user data
961 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
963 return synchro->comm.src_data;
967 * \brief Return the user data associated to the receiver of the communication
968 * \param synchro The communication
969 * \return the user data
971 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
973 return synchro->comm.dst_data;
976 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
978 return synchro->comm.src_proc;
981 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
983 return synchro->comm.dst_proc;
986 /******************************************************************************/
987 /* SIMIX_comm_copy_data callbacks */
988 /******************************************************************************/
989 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
990 &SIMIX_comm_copy_pointer_callback;
993 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
995 SIMIX_comm_copy_data_callback = callback;
998 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1000 xbt_assert((buff_size == sizeof(void *)),
1001 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1002 *(void **) (comm->comm.dst_buff) = buff;
1005 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1007 XBT_DEBUG("Copy the data over");
1008 memcpy(comm->comm.dst_buff, buff, buff_size);
1009 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1011 comm->comm.src_buff = NULL;
1017 * \brief Copy the communication data from the sender's buffer to the receiver's one
1018 * \param comm The communication
1020 void SIMIX_comm_copy_data(smx_synchro_t comm)
1022 size_t buff_size = comm->comm.src_buff_size;
1023 /* If there is no data to be copy then return */
1024 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1027 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1029 comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1030 comm->comm.src_buff,
1031 comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1032 comm->comm.dst_buff, buff_size);
1034 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1035 if (comm->comm.dst_buff_size)
1036 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1038 /* Update the receiver's buffer size to the copied amount */
1039 if (comm->comm.dst_buff_size)
1040 *comm->comm.dst_buff_size = buff_size;
1043 if(comm->comm.copy_data_fun)
1044 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1046 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1050 /* Set the copied flag so we copy data only once */
1051 /* (this function might be called from both communication ends) */
1052 comm->comm.copied = 1;