1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
11 #include "smpi/private.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15 "SIMIX network-related synchronization");
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25 int (*match_fun)(void *, void *,smx_synchro_t),
26 void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28 int (*match_fun)(void *, void *,smx_synchro_t),
29 void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_synchro_t synchro);
33 void SIMIX_network_init(void)
35 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
38 void SIMIX_network_exit(void)
40 xbt_dict_free(&rdv_points);
43 /******************************************************************************/
44 /* Rendez-Vous Points */
45 /******************************************************************************/
47 smx_rdv_t simcall_HANDLER_rdv_create(smx_simcall_t simcall, const char *name){
48 return SIMIX_rdv_create(name);
50 smx_rdv_t SIMIX_rdv_create(const char *name)
52 /* two processes may have pushed the same rdv_create simcall at the same time */
53 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
56 rdv = xbt_new0(s_smx_rvpoint_t, 1);
57 rdv->name = name ? xbt_strdup(name) : NULL;
58 rdv->comm_fifo = xbt_fifo_new();
59 rdv->done_comm_fifo = xbt_fifo_new();
60 rdv->permanent_receiver=NULL;
62 XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
65 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
70 void simcall_HANDLER_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71 return SIMIX_rdv_destroy(rdv);
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
76 xbt_dict_remove(rdv_points, rdv->name);
79 void SIMIX_rdv_free(void *data)
81 XBT_DEBUG("rdv free %p", data);
82 smx_rdv_t rdv = (smx_rdv_t) data;
84 xbt_fifo_free(rdv->comm_fifo);
85 xbt_fifo_free(rdv->done_comm_fifo);
90 xbt_dict_t SIMIX_get_rdv_points()
95 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
97 return xbt_dict_get_or_null(rdv_points, name);
100 unsigned int simcall_HANDLER_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
101 return SIMIX_rdv_comm_count_by_host(rdv, host);
103 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
105 smx_synchro_t comm = NULL;
106 xbt_fifo_item_t item = NULL;
109 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
110 if (comm->comm.src_proc->smx_host == host)
117 smx_synchro_t simcall_HANDLER_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
118 return SIMIX_rdv_get_head(rdv);
120 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
122 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 smx_process_t simcall_HANDLER_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
126 return SIMIX_rdv_get_receiver(rdv);
129 * \brief get the receiver (process associated to the mailbox)
130 * \param rdv The rendez-vous point
131 * \return process The receiving process (NULL if not set)
133 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
135 return rdv->permanent_receiver;
138 void simcall_HANDLER_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
139 smx_process_t process){
140 SIMIX_rdv_set_receiver(rdv, process);
143 * \brief set the receiver of the rendez vous point to allow eager sends
144 * \param rdv The rendez-vous point
145 * \param process The receiving process
147 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
149 rdv->permanent_receiver=process;
153 * \brief Pushes a communication synchro into a rendez-vous point
154 * \param rdv The rendez-vous point
155 * \param comm The communication synchro
157 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
159 xbt_fifo_push(rdv->comm_fifo, comm);
160 comm->comm.rdv = rdv;
164 * \brief Removes a communication synchro from a rendez-vous point
165 * \param rdv The rendez-vous point
166 * \param comm The communication synchro
168 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
170 xbt_fifo_remove(rdv->comm_fifo, comm);
171 comm->comm.rdv = NULL;
175 * \brief Checks if there is a communication synchro queued in a fifo matching our needs
176 * \param type The type of communication we are looking for (comm_send, comm_recv)
177 * \return The communication synchro if found, NULL otherwise
179 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
180 int (*match_fun)(void *, void *,smx_synchro_t),
181 void *this_user_data, smx_synchro_t my_synchro)
183 smx_synchro_t synchro;
184 xbt_fifo_item_t item;
185 void* other_user_data = NULL;
187 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
188 if (synchro->comm.type == SIMIX_COMM_SEND) {
189 other_user_data = synchro->comm.src_data;
190 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
191 other_user_data = synchro->comm.dst_data;
193 if (synchro->comm.type == type &&
194 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
195 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
196 XBT_DEBUG("Found a matching communication synchro %p", synchro);
197 xbt_fifo_remove_item(fifo, item);
198 xbt_fifo_free_item(item);
199 synchro->comm.refcount++;
201 synchro->comm.rdv_cpy = synchro->comm.rdv;
203 synchro->comm.rdv = NULL;
206 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
207 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
208 synchro, (int)synchro->comm.type, (int)type);
210 XBT_DEBUG("No matching communication synchro found");
216 * \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
217 * \param type The type of communication we are looking for (comm_send, comm_recv)
218 * \return The communication synchro if found, NULL otherwise
220 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
221 int (*match_fun)(void *, void *,smx_synchro_t),
222 void *this_user_data, smx_synchro_t my_synchro)
224 smx_synchro_t synchro;
225 xbt_fifo_item_t item;
226 void* other_user_data = NULL;
228 xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
229 if (synchro->comm.type == SIMIX_COMM_SEND) {
230 other_user_data = synchro->comm.src_data;
231 } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
232 other_user_data = synchro->comm.dst_data;
234 if (synchro->comm.type == type &&
235 (!match_fun || match_fun(this_user_data, other_user_data, synchro)) &&
236 (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data, my_synchro))) {
237 XBT_DEBUG("Found a matching communication synchro %p", synchro);
238 synchro->comm.refcount++;
242 XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
243 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
244 synchro, (int)synchro->comm.type, (int)type);
246 XBT_DEBUG("No matching communication synchro found");
249 /******************************************************************************/
250 /* Communication synchros */
251 /******************************************************************************/
254 * \brief Creates a new communicate synchro
255 * \param type The direction of communication (comm_send, comm_recv)
256 * \return The new communicate synchro
258 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
260 smx_synchro_t synchro;
262 /* alloc structures */
263 synchro = xbt_mallocator_get(simix_global->synchro_mallocator);
265 synchro->type = SIMIX_SYNC_COMMUNICATE;
266 synchro->state = SIMIX_WAITING;
268 /* set communication */
269 synchro->comm.type = type;
270 synchro->comm.refcount = 1;
271 synchro->comm.src_data=NULL;
272 synchro->comm.dst_data=NULL;
275 #ifdef HAVE_LATENCY_BOUND_TRACKING
276 //initialize with unknown value
277 synchro->latency_limited = -1;
281 synchro->category = NULL;
284 XBT_DEBUG("Create communicate synchro %p", synchro);
291 * \brief Destroy a communicate synchro
292 * \param synchro The communicate synchro to be destroyed
294 void SIMIX_comm_destroy(smx_synchro_t synchro)
296 XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
297 synchro, synchro->comm.refcount, (int)synchro->state);
299 if (synchro->comm.refcount <= 0) {
300 xbt_backtrace_display_current();
301 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
302 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
304 synchro->comm.refcount--;
305 if (synchro->comm.refcount > 0)
307 XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
308 synchro->comm.refcount);
310 #ifdef HAVE_LATENCY_BOUND_TRACKING
311 synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
314 xbt_free(synchro->name);
315 SIMIX_comm_destroy_internal_actions(synchro);
317 if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
318 /* the communication has failed and was detached:
319 * we have to free the buffer */
320 if (synchro->comm.clean_fun) {
321 synchro->comm.clean_fun(synchro->comm.src_buff);
323 synchro->comm.src_buff = NULL;
326 if(synchro->comm.rdv)
327 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
329 xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
332 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
334 if (synchro->comm.surf_comm){
335 #ifdef HAVE_LATENCY_BOUND_TRACKING
336 synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
338 surf_action_unref(synchro->comm.surf_comm);
339 synchro->comm.surf_comm = NULL;
342 if (synchro->comm.src_timeout){
343 surf_action_unref(synchro->comm.src_timeout);
344 synchro->comm.src_timeout = NULL;
347 if (synchro->comm.dst_timeout){
348 surf_action_unref(synchro->comm.dst_timeout);
349 synchro->comm.dst_timeout = NULL;
353 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
354 double task_size, double rate,
355 void *src_buff, size_t src_buff_size,
356 int (*match_fun)(void *, void *,smx_synchro_t),
357 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
358 void *data, double timeout){
359 smx_synchro_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
360 src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
362 SIMCALL_SET_MC_VALUE(simcall, 0);
363 simcall_HANDLER_comm_wait(simcall, comm, timeout);
365 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
366 double task_size, double rate,
367 void *src_buff, size_t src_buff_size,
368 int (*match_fun)(void *, void *,smx_synchro_t),
369 void (*clean_fun)(void *),
370 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
371 void *data, int detached){
372 return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
373 src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
376 smx_synchro_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
377 double task_size, double rate,
378 void *src_buff, size_t src_buff_size,
379 int (*match_fun)(void *, void *,smx_synchro_t),
380 void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
381 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
385 XBT_DEBUG("send from %p", rdv);
387 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
388 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
390 /* Look for communication synchro matching our needs. We also provide a description of
391 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
393 * If it is not found then push our communication into the rendez-vous point */
394 smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
396 if (!other_synchro) {
397 other_synchro = this_synchro;
399 if (rdv->permanent_receiver!=NULL){
400 //this mailbox is for small messages, which have to be sent right now
401 other_synchro->state = SIMIX_READY;
402 other_synchro->comm.dst_proc=rdv->permanent_receiver;
403 other_synchro->comm.refcount++;
404 xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
405 other_synchro->comm.rdv=rdv;
406 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
409 SIMIX_rdv_push(rdv, this_synchro);
412 XBT_DEBUG("Receive already pushed");
414 SIMIX_comm_destroy(this_synchro);
415 --smx_total_comms; // this creation was a pure waste
417 other_synchro->state = SIMIX_READY;
418 other_synchro->comm.type = SIMIX_COMM_READY;
421 xbt_fifo_push(src_proc->comms, other_synchro);
423 /* if the communication synchro is detached then decrease the refcount
424 * by one, so it will be eliminated by the receiver's destroy call */
426 other_synchro->comm.detached = 1;
427 other_synchro->comm.refcount--;
428 other_synchro->comm.clean_fun = clean_fun;
430 other_synchro->comm.clean_fun = NULL;
433 /* Setup the communication synchro */
434 other_synchro->comm.src_proc = src_proc;
435 other_synchro->comm.task_size = task_size;
436 other_synchro->comm.rate = rate;
437 other_synchro->comm.src_buff = src_buff;
438 other_synchro->comm.src_buff_size = src_buff_size;
439 other_synchro->comm.src_data = data;
441 other_synchro->comm.match_fun = match_fun;
442 other_synchro->comm.copy_data_fun = copy_data_fun;
445 if (MC_is_active()) {
446 other_synchro->state = SIMIX_RUNNING;
447 return (detached ? NULL : other_synchro);
450 SIMIX_comm_start(other_synchro);
451 return (detached ? NULL : other_synchro);
454 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
455 void *dst_buff, size_t *dst_buff_size,
456 int (*match_fun)(void *, void *, smx_synchro_t),
457 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
458 void *data, double timeout, double rate)
460 smx_synchro_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
461 dst_buff_size, match_fun, copy_data_fun, data, rate);
462 SIMCALL_SET_MC_VALUE(simcall, 0);
463 simcall_HANDLER_comm_wait(simcall, comm, timeout);
466 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
467 void *dst_buff, size_t *dst_buff_size,
468 int (*match_fun)(void *, void *, smx_synchro_t),
469 void (*copy_data_fun)(smx_synchro_t, void*, size_t),
470 void *data, double rate)
472 return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
473 match_fun, copy_data_fun, data, rate);
476 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
477 void *dst_buff, size_t *dst_buff_size,
478 int (*match_fun)(void *, void *, smx_synchro_t),
479 void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
480 void *data, double rate)
482 XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
483 smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
485 smx_synchro_t other_synchro;
486 //communication already done, get it inside the fifo of completed comms
487 //permanent receive v1
488 //int already_received=0;
489 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
491 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
492 //find a match in the already received fifo
493 other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
494 //if not found, assume the receiver came first, register it to the mailbox in the classical way
495 if (!other_synchro) {
496 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
497 other_synchro = this_synchro;
498 SIMIX_rdv_push(rdv, this_synchro);
500 if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0)
502 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
503 other_synchro->state = SIMIX_DONE;
504 other_synchro->comm.type = SIMIX_COMM_DONE;
505 other_synchro->comm.rdv = NULL;
507 XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
509 other_synchro->comm.refcount--;
510 SIMIX_comm_destroy(this_synchro);
511 --smx_total_comms; // this creation was a pure waste
514 /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
516 /* Look for communication synchro matching our needs. We also provide a description of
517 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
519 * If it is not found then push our communication into the rendez-vous point */
520 other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
522 if (!other_synchro) {
523 XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
524 other_synchro = this_synchro;
525 SIMIX_rdv_push(rdv, this_synchro);
527 SIMIX_comm_destroy(this_synchro);
528 --smx_total_comms; // this creation was a pure waste
529 other_synchro->state = SIMIX_READY;
530 other_synchro->comm.type = SIMIX_COMM_READY;
531 //other_synchro->comm.refcount--;
533 xbt_fifo_push(dst_proc->comms, other_synchro);
536 /* Setup communication synchro */
537 other_synchro->comm.dst_proc = dst_proc;
538 other_synchro->comm.dst_buff = dst_buff;
539 other_synchro->comm.dst_buff_size = dst_buff_size;
540 other_synchro->comm.dst_data = data;
543 (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
544 other_synchro->comm.rate = rate;
546 other_synchro->comm.match_fun = match_fun;
547 other_synchro->comm.copy_data_fun = copy_data_fun;
550 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
551 SIMIX_comm_copy_data(other_synchro);*/
554 if (MC_is_active()) {
555 other_synchro->state = SIMIX_RUNNING;
556 return other_synchro;
559 SIMIX_comm_start(other_synchro);
561 return other_synchro;
564 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
565 int type, int src, int tag,
566 int (*match_fun)(void *, void *, smx_synchro_t),
568 return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
571 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
572 int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
574 XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
575 smx_synchro_t this_synchro;
578 this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
579 smx_type = SIMIX_COMM_RECEIVE;
581 this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
582 smx_type = SIMIX_COMM_SEND;
584 smx_synchro_t other_synchro=NULL;
585 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586 //find a match in the already received fifo
587 XBT_DEBUG("first try in the perm recv mailbox");
589 other_synchro = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_synchro);
593 XBT_DEBUG("try in the normal mailbox");
594 other_synchro = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_synchro);
597 if(other_synchro)other_synchro->comm.refcount--;
599 SIMIX_comm_destroy(this_synchro);
601 return other_synchro;
604 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
606 /* the simcall may be a wait, a send or a recv */
609 /* Associate this simcall to the wait synchro */
610 XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
612 xbt_fifo_push(synchro->simcalls, simcall);
613 simcall->issuer->waiting_synchro = synchro;
615 if (MC_is_active()) {
616 int idx = SIMCALL_GET_MC_VALUE(simcall);
618 synchro->state = SIMIX_DONE;
620 /* If we reached this point, the wait simcall must have a timeout */
621 /* Otherwise it shouldn't be enabled and executed by the MC */
625 if (synchro->comm.src_proc == simcall->issuer)
626 synchro->state = SIMIX_SRC_TIMEOUT;
628 synchro->state = SIMIX_DST_TIMEOUT;
631 SIMIX_comm_finish(synchro);
635 /* If the synchro has already finish perform the error handling, */
636 /* otherwise set up a waiting timeout on the right side */
637 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
638 SIMIX_comm_finish(synchro);
639 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
640 sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
641 surf_action_set_data(sleep, synchro);
643 if (simcall->issuer == synchro->comm.src_proc)
644 synchro->comm.src_timeout = sleep;
646 synchro->comm.dst_timeout = sleep;
650 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
653 simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
654 if(simcall_comm_test__get__result(simcall)){
655 synchro->state = SIMIX_DONE;
656 xbt_fifo_push(synchro->simcalls, simcall);
657 SIMIX_comm_finish(synchro);
659 SIMIX_simcall_answer(simcall);
664 simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
665 if (simcall_comm_test__get__result(simcall)) {
666 xbt_fifo_push(synchro->simcalls, simcall);
667 SIMIX_comm_finish(synchro);
669 SIMIX_simcall_answer(simcall);
673 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
676 smx_synchro_t synchro;
677 simcall_comm_testany__set__result(simcall, -1);
680 int idx = SIMCALL_GET_MC_VALUE(simcall);
682 SIMIX_simcall_answer(simcall);
684 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
685 simcall_comm_testany__set__result(simcall, idx);
686 xbt_fifo_push(synchro->simcalls, simcall);
687 synchro->state = SIMIX_DONE;
688 SIMIX_comm_finish(synchro);
693 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
694 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
695 simcall_comm_testany__set__result(simcall, cursor);
696 xbt_fifo_push(synchro->simcalls, simcall);
697 SIMIX_comm_finish(synchro);
701 SIMIX_simcall_answer(simcall);
704 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
706 smx_synchro_t synchro;
707 unsigned int cursor = 0;
710 int idx = SIMCALL_GET_MC_VALUE(simcall);
711 synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
712 xbt_fifo_push(synchro->simcalls, simcall);
713 simcall_comm_waitany__set__result(simcall, idx);
714 synchro->state = SIMIX_DONE;
715 SIMIX_comm_finish(synchro);
719 xbt_dynar_foreach(synchros, cursor, synchro){
720 /* associate this simcall to the the synchro */
721 xbt_fifo_push(synchro->simcalls, simcall);
723 /* see if the synchro is already finished */
724 if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
725 SIMIX_comm_finish(synchro);
731 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
733 smx_synchro_t synchro;
734 unsigned int cursor = 0;
735 xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
737 xbt_dynar_foreach(synchros, cursor, synchro) {
738 xbt_fifo_remove(synchro->simcalls, simcall);
743 * \brief Starts the simulation of a communication synchro.
744 * \param synchro the communication synchro
746 static XBT_INLINE void SIMIX_comm_start(smx_synchro_t synchro)
748 /* If both the sender and the receiver are already there, start the communication */
749 if (synchro->state == SIMIX_READY) {
751 smx_host_t sender = synchro->comm.src_proc->smx_host;
752 smx_host_t receiver = synchro->comm.dst_proc->smx_host;
754 XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
755 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
757 synchro->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
759 synchro->comm.task_size, synchro->comm.rate);
761 surf_action_set_data(synchro->comm.surf_comm, synchro);
763 synchro->state = SIMIX_RUNNING;
765 /* If a link is failed, detect it immediately */
766 if (surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
767 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
768 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
769 synchro->state = SIMIX_LINK_FAILURE;
770 SIMIX_comm_destroy_internal_actions(synchro);
773 /* If any of the process is suspend, create the synchro but stop its execution,
774 it will be restarted when the sender process resume */
775 if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
776 SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
777 /* FIXME: check what should happen with the synchro state */
779 if (SIMIX_process_is_suspended(synchro->comm.src_proc))
780 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
781 SIMIX_host_get_name(synchro->comm.src_proc->smx_host), synchro->comm.src_proc->name);
783 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
784 SIMIX_host_get_name(synchro->comm.dst_proc->smx_host), synchro->comm.dst_proc->name);
786 surf_action_suspend(synchro->comm.surf_comm);
793 * \brief Answers the SIMIX simcalls associated to a communication synchro.
794 * \param synchro a finished communication synchro
796 void SIMIX_comm_finish(smx_synchro_t synchro)
798 unsigned int destroy_count = 0;
799 smx_simcall_t simcall;
802 while ((simcall = xbt_fifo_shift(synchro->simcalls))) {
804 /* If a waitany simcall is waiting for this synchro to finish, then remove
805 it from the other synchros in the waitany list. Afterwards, get the
806 position of the actual synchro in the waitany dynar and
807 return it as the result of the simcall */
809 if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
810 continue; // if process handling comm is killed
811 if (simcall->call == SIMCALL_COMM_WAITANY) {
812 SIMIX_waitany_remove_simcall_from_actions(simcall);
814 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
817 /* If the synchro is still in a rendez-vous point then remove from it */
818 if (synchro->comm.rdv)
819 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
821 XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
823 /* Check out for errors */
824 switch (synchro->state) {
827 XBT_DEBUG("Communication %p complete!", synchro);
828 SIMIX_comm_copy_data(synchro);
831 case SIMIX_SRC_TIMEOUT:
832 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
833 "Communication timeouted because of sender");
836 case SIMIX_DST_TIMEOUT:
837 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
838 "Communication timeouted because of receiver");
841 case SIMIX_SRC_HOST_FAILURE:
842 if (simcall->issuer == synchro->comm.src_proc)
843 simcall->issuer->context->iwannadie = 1;
844 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
846 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
849 case SIMIX_DST_HOST_FAILURE:
850 if (simcall->issuer == synchro->comm.dst_proc)
851 simcall->issuer->context->iwannadie = 1;
852 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
854 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
857 case SIMIX_LINK_FAILURE:
858 XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
860 synchro->comm.src_proc ? sg_host_name(synchro->comm.src_proc->smx_host) : NULL,
861 synchro->comm.dst_proc ? sg_host_name(synchro->comm.dst_proc->smx_host) : NULL,
862 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
863 if (synchro->comm.src_proc == simcall->issuer) {
864 XBT_DEBUG("I'm source");
865 } else if (synchro->comm.dst_proc == simcall->issuer) {
866 XBT_DEBUG("I'm dest");
868 XBT_DEBUG("I'm neither source nor dest");
870 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
874 if (simcall->issuer == synchro->comm.dst_proc)
875 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
876 "Communication canceled by the sender");
878 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
879 "Communication canceled by the receiver");
883 xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
886 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
887 if (simcall->issuer->doexception) {
888 if (simcall->call == SIMCALL_COMM_WAITANY) {
889 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
891 else if (simcall->call == SIMCALL_COMM_TESTANY) {
892 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
896 if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
897 simcall->issuer->context->iwannadie = 1;
900 simcall->issuer->waiting_synchro = NULL;
901 xbt_fifo_remove(simcall->issuer->comms, synchro);
902 if(synchro->comm.detached){
903 if(simcall->issuer == synchro->comm.src_proc){
904 if(synchro->comm.dst_proc)
905 xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
907 if(simcall->issuer == synchro->comm.dst_proc){
908 if(synchro->comm.src_proc)
909 xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
912 SIMIX_simcall_answer(simcall);
916 while (destroy_count-- > 0)
917 SIMIX_comm_destroy(synchro);
921 * \brief This function is called when a Surf communication synchro is finished.
922 * \param synchro the corresponding Simix communication
924 void SIMIX_post_comm(smx_synchro_t synchro)
926 /* Update synchro state */
927 if (synchro->comm.src_timeout &&
928 surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_DONE)
929 synchro->state = SIMIX_SRC_TIMEOUT;
930 else if (synchro->comm.dst_timeout &&
931 surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_DONE)
932 synchro->state = SIMIX_DST_TIMEOUT;
933 else if (synchro->comm.src_timeout &&
934 surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_FAILED)
935 synchro->state = SIMIX_SRC_HOST_FAILURE;
936 else if (synchro->comm.dst_timeout &&
937 surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_FAILED)
938 synchro->state = SIMIX_DST_HOST_FAILURE;
939 else if (synchro->comm.surf_comm &&
940 surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
941 XBT_DEBUG("Puta madre. Surf says that the link broke");
942 synchro->state = SIMIX_LINK_FAILURE;
944 synchro->state = SIMIX_DONE;
946 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
947 synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
949 /* destroy the surf actions associated with the Simix communication */
950 SIMIX_comm_destroy_internal_actions(synchro);
952 /* if there are simcalls associated with the synchro, then answer them */
953 if (xbt_fifo_size(synchro->simcalls)) {
954 SIMIX_comm_finish(synchro);
958 void simcall_HANDLER_comm_cancel(smx_simcall_t simcall, smx_synchro_t synchro){
959 SIMIX_comm_cancel(synchro);
961 void SIMIX_comm_cancel(smx_synchro_t synchro)
963 /* if the synchro is a waiting state means that it is still in a rdv */
964 /* so remove from it and delete it */
965 if (synchro->state == SIMIX_WAITING) {
966 SIMIX_rdv_remove(synchro->comm.rdv, synchro);
967 synchro->state = SIMIX_CANCELED;
969 else if (!MC_is_active() /* when running the MC there are no surf actions */
970 && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
972 surf_action_cancel(synchro->comm.surf_comm);
976 void SIMIX_comm_suspend(smx_synchro_t synchro)
978 /*FIXME: shall we suspend also the timeout synchro? */
979 if (synchro->comm.surf_comm)
980 surf_action_suspend(synchro->comm.surf_comm);
981 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
984 void SIMIX_comm_resume(smx_synchro_t synchro)
986 /*FIXME: check what happen with the timeouts */
987 if (synchro->comm.surf_comm)
988 surf_action_resume(synchro->comm.surf_comm);
989 /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
993 /************* synchro Getters **************/
995 double simcall_HANDLER_comm_get_remains(smx_simcall_t simcall, smx_synchro_t synchro){
996 return SIMIX_comm_get_remains(synchro);
999 * \brief get the amount remaining from the communication
1000 * \param synchro The communication
1002 double SIMIX_comm_get_remains(smx_synchro_t synchro)
1010 switch (synchro->state) {
1013 remains = surf_action_get_remains(synchro->comm.surf_comm);
1018 remains = 0; /*FIXME: check what should be returned */
1022 remains = 0; /*FIXME: is this correct? */
1028 e_smx_state_t simcall_HANDLER_comm_get_state(smx_simcall_t simcall, smx_synchro_t synchro){
1029 return SIMIX_comm_get_state(synchro);
1031 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
1033 return synchro->state;
1036 void* simcall_HANDLER_comm_get_src_data(smx_simcall_t simcall, smx_synchro_t synchro){
1037 return SIMIX_comm_get_src_data(synchro);
1040 * \brief Return the user data associated to the sender of the communication
1041 * \param synchro The communication
1042 * \return the user data
1044 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
1046 return synchro->comm.src_data;
1049 void* simcall_HANDLER_comm_get_dst_data(smx_simcall_t simcall, smx_synchro_t synchro){
1050 return SIMIX_comm_get_dst_data(synchro);
1053 * \brief Return the user data associated to the receiver of the communication
1054 * \param synchro The communication
1055 * \return the user data
1057 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1059 return synchro->comm.dst_data;
1062 smx_process_t simcall_HANDLER_comm_get_src_proc(smx_simcall_t simcall, smx_synchro_t synchro){
1063 return SIMIX_comm_get_src_proc(synchro);
1065 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1067 return synchro->comm.src_proc;
1070 smx_process_t simcall_HANDLER_comm_get_dst_proc(smx_simcall_t simcall, smx_synchro_t synchro){
1071 return SIMIX_comm_get_dst_proc(synchro);
1073 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1075 return synchro->comm.dst_proc;
1078 #ifdef HAVE_LATENCY_BOUND_TRACKING
1079 int simcall_HANDLER_comm_is_latency_bounded(smx_simcall_t simcall, smx_synchro_t synchro)
1081 return SIMIX_comm_is_latency_bounded(synchro);
1085 * \brief verify if communication is latency bounded
1086 * \param comm The communication
1088 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1093 if (synchro->comm.surf_comm){
1094 XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1095 synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1096 XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1098 return synchro->latency_limited;
1102 /******************************************************************************/
1103 /* SIMIX_comm_copy_data callbacks */
1104 /******************************************************************************/
1105 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1106 &SIMIX_comm_copy_pointer_callback;
1109 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1111 SIMIX_comm_copy_data_callback = callback;
1114 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1116 xbt_assert((buff_size == sizeof(void *)),
1117 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1118 *(void **) (comm->comm.dst_buff) = buff;
1121 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1123 XBT_DEBUG("Copy the data over");
1124 memcpy(comm->comm.dst_buff, buff, buff_size);
1125 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1127 comm->comm.src_buff = NULL;
1133 * \brief Copy the communication data from the sender's buffer to the receiver's one
1134 * \param comm The communication
1136 void SIMIX_comm_copy_data(smx_synchro_t comm)
1138 size_t buff_size = comm->comm.src_buff_size;
1139 /* If there is no data to be copy then return */
1140 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1143 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1145 comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1146 comm->comm.src_buff,
1147 comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1148 comm->comm.dst_buff, buff_size);
1150 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1151 if (comm->comm.dst_buff_size)
1152 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1154 /* Update the receiver's buffer size to the copied amount */
1155 if (comm->comm.dst_buff_size)
1156 *comm->comm.dst_buff_size = buff_size;
1159 if(comm->comm.copy_data_fun)
1160 comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1162 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1166 /* Set the copied flag so we copy data only once */
1167 /* (this function might be called from both communication ends) */
1168 comm->comm.copied = 1;