1 /* Copyright (c) 2009-2013. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
31 void SIMIX_network_init(void)
33 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 void SIMIX_network_exit(void)
38 xbt_dict_free(&rdv_points);
41 /******************************************************************************/
42 /* Rendez-Vous Points */
43 /******************************************************************************/
45 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
46 return SIMIX_rdv_create(name);
48 smx_rdv_t SIMIX_rdv_create(const char *name)
50 /* two processes may have pushed the same rdv_create simcall at the same time */
51 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54 rdv = xbt_new0(s_smx_rvpoint_t, 1);
55 rdv->name = name ? xbt_strdup(name) : NULL;
56 rdv->comm_fifo = xbt_fifo_new();
57 rdv->done_comm_fifo = xbt_fifo_new();
58 rdv->permanent_receiver=NULL;
60 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
68 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
69 return SIMIX_rdv_destroy(rdv);
71 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 xbt_dict_remove(rdv_points, rdv->name);
77 void SIMIX_rdv_free(void *data)
79 XBT_DEBUG("rdv free %p", data);
80 smx_rdv_t rdv = (smx_rdv_t) data;
82 xbt_fifo_free(rdv->comm_fifo);
83 xbt_fifo_free(rdv->done_comm_fifo);
88 xbt_dict_t SIMIX_get_rdv_points()
93 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
94 return SIMIX_rdv_get_by_name(name);
96 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 return xbt_dict_get_or_null(rdv_points, name);
101 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
102 return SIMIX_rdv_comm_count_by_host(rdv, host);
104 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 smx_action_t comm = NULL;
107 xbt_fifo_item_t item = NULL;
110 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
111 if (comm->comm.src_proc->smx_host == host)
118 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
119 return SIMIX_rdv_get_head(rdv);
121 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
127 return SIMIX_rdv_get_receiver(rdv);
130 * \brief get the receiver (process associated to the mailbox)
131 * \param rdv The rendez-vous point
132 * \return process The receiving process (NULL if not set)
134 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 return rdv->permanent_receiver;
139 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
140 smx_process_t process){
141 SIMIX_rdv_set_receiver(rdv, process);
144 * \brief set the receiver of the rendez vous point to allow eager sends
145 * \param rdv The rendez-vous point
146 * \param process The receiving process
148 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 rdv->permanent_receiver=process;
154 * \brief Pushes a communication action into a rendez-vous point
155 * \param rdv The rendez-vous point
156 * \param comm The communication action
158 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 xbt_fifo_push(rdv->comm_fifo, comm);
161 comm->comm.rdv = rdv;
165 * \brief Removes a communication action from a rendez-vous point
166 * \param rdv The rendez-vous point
167 * \param comm The communication action
169 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 xbt_fifo_remove(rdv->comm_fifo, comm);
172 comm->comm.rdv = NULL;
176 * \brief Checks if there is a communication action queued in a fifo matching our needs
177 * \param type The type of communication we are looking for (comm_send, comm_recv)
178 * \return The communication action if found, NULL otherwise
180 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
181 int (*match_fun)(void *, void *,smx_action_t),
182 void *this_user_data, smx_action_t my_action)
185 xbt_fifo_item_t item;
186 void* other_user_data = NULL;
188 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
189 if (action->comm.type == SIMIX_COMM_SEND) {
190 other_user_data = action->comm.src_data;
191 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
192 other_user_data = action->comm.dst_data;
194 if (action->comm.type == type &&
195 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
196 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
197 XBT_DEBUG("Found a matching communication action %p", action);
198 xbt_fifo_remove_item(fifo, item);
199 xbt_fifo_free_item(item);
200 action->comm.refcount++;
202 action->comm.rdv_cpy = action->comm.rdv;
204 action->comm.rdv = NULL;
207 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
208 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
209 action, (int)action->comm.type, (int)type);
211 XBT_DEBUG("No matching communication action found");
217 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
218 * \param type The type of communication we are looking for (comm_send, comm_recv)
219 * \return The communication action if found, NULL otherwise
221 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
222 int (*match_fun)(void *, void *,smx_action_t),
223 void *this_user_data, smx_action_t my_action)
226 xbt_fifo_item_t item;
227 void* other_user_data = NULL;
229 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
230 if (action->comm.type == SIMIX_COMM_SEND) {
231 other_user_data = action->comm.src_data;
232 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
233 other_user_data = action->comm.dst_data;
235 if (action->comm.type == type &&
236 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
237 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
238 XBT_DEBUG("Found a matching communication action %p", action);
239 action->comm.refcount++;
243 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
244 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
245 action, (int)action->comm.type, (int)type);
247 XBT_DEBUG("No matching communication action found");
250 /******************************************************************************/
251 /* Communication Actions */
252 /******************************************************************************/
255 * \brief Creates a new communicate action
256 * \param type The direction of communication (comm_send, comm_recv)
257 * \return The new communicate action
259 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
263 /* alloc structures */
264 act = xbt_mallocator_get(simix_global->action_mallocator);
266 act->type = SIMIX_ACTION_COMMUNICATE;
267 act->state = SIMIX_WAITING;
269 /* set communication */
270 act->comm.type = type;
271 act->comm.refcount = 1;
272 act->comm.src_data=NULL;
273 act->comm.dst_data=NULL;
276 #ifdef HAVE_LATENCY_BOUND_TRACKING
277 //initialize with unknown value
278 act->latency_limited = -1;
282 act->category = NULL;
285 XBT_DEBUG("Create communicate action %p", act);
291 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
292 SIMIX_comm_destroy(action);
295 * \brief Destroy a communicate action
296 * \param action The communicate action to be destroyed
298 void SIMIX_comm_destroy(smx_action_t action)
300 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
301 action, action->comm.refcount, (int)action->state);
303 if (action->comm.refcount <= 0) {
304 xbt_backtrace_display_current();
305 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
306 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
308 action->comm.refcount--;
309 if (action->comm.refcount > 0)
311 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
312 action->comm.refcount);
314 #ifdef HAVE_LATENCY_BOUND_TRACKING
315 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
318 xbt_free(action->name);
319 SIMIX_comm_destroy_internal_actions(action);
321 if (action->comm.detached && action->state != SIMIX_DONE) {
322 /* the communication has failed and was detached:
323 * we have to free the buffer */
324 if (action->comm.clean_fun) {
325 action->comm.clean_fun(action->comm.src_buff);
327 action->comm.src_buff = NULL;
331 SIMIX_rdv_remove(action->comm.rdv, action);
333 xbt_mallocator_release(simix_global->action_mallocator, action);
336 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
338 if (action->comm.surf_comm){
339 #ifdef HAVE_LATENCY_BOUND_TRACKING
340 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
342 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
343 action->comm.surf_comm = NULL;
346 if (action->comm.src_timeout){
347 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
348 action->comm.src_timeout = NULL;
351 if (action->comm.dst_timeout){
352 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
353 action->comm.dst_timeout = NULL;
357 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
358 double task_size, double rate,
359 void *src_buff, size_t src_buff_size,
360 int (*match_fun)(void *, void *,smx_action_t),
361 void *data, double timeout){
362 smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
363 src_buff, src_buff_size, match_fun, NULL,
365 simcall->mc_value = 0;
366 SIMIX_pre_comm_wait(simcall, comm, timeout);
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
369 double task_size, double rate,
370 void *src_buff, size_t src_buff_size,
371 int (*match_fun)(void *, void *,smx_action_t),
372 void (*clean_fun)(void *),
373 void *data, int detached){
374 return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
375 src_buff_size, match_fun, clean_fun, data, detached);
378 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
379 double task_size, double rate,
380 void *src_buff, size_t src_buff_size,
381 int (*match_fun)(void *, void *,smx_action_t),
382 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
386 XBT_DEBUG("send from %p\n", rdv);
388 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
389 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
391 /* Look for communication action matching our needs. We also provide a description of
392 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
394 * If it is not found then push our communication into the rendez-vous point */
395 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398 other_action = this_action;
400 if (rdv->permanent_receiver!=NULL){
401 //this mailbox is for small messages, which have to be sent right now
402 other_action->state = SIMIX_READY;
403 other_action->comm.dst_proc=rdv->permanent_receiver;
404 other_action->comm.refcount++;
405 xbt_fifo_push(rdv->done_comm_fifo,other_action);
406 other_action->comm.rdv=rdv;
407 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410 SIMIX_rdv_push(rdv, this_action);
413 XBT_DEBUG("Receive already pushed\n");
415 SIMIX_comm_destroy(this_action);
416 --smx_total_comms; // this creation was a pure waste
418 other_action->state = SIMIX_READY;
419 other_action->comm.type = SIMIX_COMM_READY;
422 xbt_fifo_push(src_proc->comms, other_action);
424 /* if the communication action is detached then decrease the refcount
425 * by one, so it will be eliminated by the receiver's destroy call */
427 other_action->comm.detached = 1;
428 other_action->comm.refcount--;
429 other_action->comm.clean_fun = clean_fun;
431 other_action->comm.clean_fun = NULL;
434 /* Setup the communication action */
435 other_action->comm.src_proc = src_proc;
436 other_action->comm.task_size = task_size;
437 other_action->comm.rate = rate;
438 other_action->comm.src_buff = src_buff;
439 other_action->comm.src_buff_size = src_buff_size;
440 other_action->comm.src_data = data;
442 other_action->comm.match_fun = match_fun;
444 if (MC_is_active()) {
445 other_action->state = SIMIX_RUNNING;
449 SIMIX_comm_start(other_action);
450 return (detached ? NULL : other_action);
453 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
454 void *dst_buff, size_t *dst_buff_size,
455 int (*match_fun)(void *, void *, smx_action_t),
456 void *data, double timeout){
457 smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
458 dst_buff_size, match_fun, data);
459 simcall->mc_value = 0;
460 SIMIX_pre_comm_wait(simcall, comm, timeout);
463 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
464 void *dst_buff, size_t *dst_buff_size,
465 int (*match_fun)(void *, void *, smx_action_t),
466 void *data, double timeout, double rate){
467 smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
468 dst_buff_size, match_fun, data, rate);
469 simcall->mc_value = 0;
470 SIMIX_pre_comm_wait(simcall, comm, timeout);
473 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
474 void *dst_buff, size_t *dst_buff_size,
475 int (*match_fun)(void *, void *, smx_action_t),
477 return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
481 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
482 void *dst_buff, size_t *dst_buff_size,
483 int (*match_fun)(void *, void *, smx_action_t), void *data)
485 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
486 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
488 smx_action_t other_action;
489 //communication already done, get it inside the fifo of completed comms
490 //permanent receive v1
491 //int already_received=0;
492 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
494 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
495 //find a match in the already received fifo
496 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497 //if not found, assume the receiver came first, register it to the mailbox in the classical way
499 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
500 other_action = this_action;
501 SIMIX_rdv_push(rdv, this_action);
503 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
505 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
506 other_action->state = SIMIX_DONE;
507 other_action->comm.type = SIMIX_COMM_DONE;
508 other_action->comm.rdv = NULL;
509 //SIMIX_comm_destroy(this_action);
510 //--smx_total_comms; // this creation was a pure waste
511 //already_received=1;
512 //other_action->comm.refcount--;
514 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
516 other_action->comm.refcount--;
517 SIMIX_comm_destroy(this_action);
518 --smx_total_comms; // this creation was a pure waste
521 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
523 /* Look for communication action matching our needs. We also provide a description of
524 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
526 * If it is not found then push our communication into the rendez-vous point */
527 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
530 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
531 other_action = this_action;
532 SIMIX_rdv_push(rdv, this_action);
534 SIMIX_comm_destroy(this_action);
535 --smx_total_comms; // this creation was a pure waste
536 other_action->state = SIMIX_READY;
537 other_action->comm.type = SIMIX_COMM_READY;
538 //other_action->comm.refcount--;
540 xbt_fifo_push(dst_proc->comms, other_action);
543 /* Setup communication action */
544 other_action->comm.dst_proc = dst_proc;
545 other_action->comm.dst_buff = dst_buff;
546 other_action->comm.dst_buff_size = dst_buff_size;
547 other_action->comm.dst_data = data;
549 other_action->comm.match_fun = match_fun;
552 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
553 SIMIX_comm_copy_data(other_action);*/
556 if (MC_is_active()) {
557 other_action->state = SIMIX_RUNNING;
561 SIMIX_comm_start(other_action);
566 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
567 void *dst_buff, size_t *dst_buff_size,
568 int (*match_fun)(void *, void *, smx_action_t),
569 void *data, double rate){
570 return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
571 match_fun, data, rate);
574 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
575 void *dst_buff, size_t *dst_buff_size,
576 int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
578 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
579 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
581 smx_action_t other_action;
582 //communication already done, get it inside the fifo of completed comms
583 //permanent receive v1
584 //int already_received=0;
585 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
587 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
588 //find a match in the already received fifo
589 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
590 //if not found, assume the receiver came first, register it to the mailbox in the classical way
592 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
593 other_action = this_action;
594 SIMIX_rdv_push(rdv, this_action);
596 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
598 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
599 other_action->state = SIMIX_DONE;
600 other_action->comm.type = SIMIX_COMM_DONE;
601 other_action->comm.rdv = NULL;
602 //SIMIX_comm_destroy(this_action);
603 //--smx_total_comms; // this creation was a pure waste
604 //already_received=1;
605 //other_action->comm.refcount--;
607 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
609 other_action->comm.refcount--;
610 SIMIX_comm_destroy(this_action);
611 --smx_total_comms; // this creation was a pure waste
614 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
616 /* Look for communication action matching our needs. We also provide a description of
617 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
619 * If it is not found then push our communication into the rendez-vous point */
620 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
623 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
624 other_action = this_action;
625 SIMIX_rdv_push(rdv, this_action);
627 SIMIX_comm_destroy(this_action);
628 --smx_total_comms; // this creation was a pure waste
629 other_action->state = SIMIX_READY;
630 other_action->comm.type = SIMIX_COMM_READY;
631 //other_action->comm.refcount--;
633 xbt_fifo_push(dst_proc->comms, other_action);
636 /* Setup communication action */
637 other_action->comm.dst_proc = dst_proc;
638 other_action->comm.dst_buff = dst_buff;
639 other_action->comm.dst_buff_size = dst_buff_size;
640 other_action->comm.dst_data = data;
642 if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
643 other_action->comm.rate = rate;
645 other_action->comm.match_fun = match_fun;
648 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
649 SIMIX_comm_copy_data(other_action);*/
652 if (MC_is_active()) {
653 other_action->state = SIMIX_RUNNING;
657 SIMIX_comm_start(other_action);
662 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
664 int (*match_fun)(void *, void *, smx_action_t),
666 return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
669 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
670 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
672 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
673 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
675 smx_action_t other_action=NULL;
676 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
677 //find a match in the already received fifo
678 XBT_DEBUG("first try in the perm recv mailbox \n");
680 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
684 XBT_DEBUG("second try in the other mailbox");
685 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
688 if(other_action)other_action->comm.refcount--;
690 SIMIX_comm_destroy(this_action);
695 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
697 /* the simcall may be a wait, a send or a recv */
700 /* Associate this simcall to the wait action */
701 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
703 xbt_fifo_push(action->simcalls, simcall);
704 simcall->issuer->waiting_action = action;
706 if (MC_is_active()) {
707 int idx = simcall->mc_value;
709 action->state = SIMIX_DONE;
711 /* If we reached this point, the wait simcall must have a timeout */
712 /* Otherwise it shouldn't be enabled and executed by the MC */
716 if (action->comm.src_proc == simcall->issuer)
717 action->state = SIMIX_SRC_TIMEOUT;
719 action->state = SIMIX_DST_TIMEOUT;
722 SIMIX_comm_finish(action);
726 /* If the action has already finish perform the error handling, */
727 /* otherwise set up a waiting timeout on the right side */
728 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
729 SIMIX_comm_finish(action);
730 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
731 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
732 surf_workstation_model->action_data_set(sleep, action);
734 if (simcall->issuer == action->comm.src_proc)
735 action->comm.src_timeout = sleep;
737 action->comm.dst_timeout = sleep;
741 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
744 simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
745 if(simcall_comm_test__get__result(simcall)){
746 action->state = SIMIX_DONE;
747 xbt_fifo_push(action->simcalls, simcall);
748 SIMIX_comm_finish(action);
750 SIMIX_simcall_answer(simcall);
755 simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
756 if (simcall_comm_test__get__result(simcall)) {
757 xbt_fifo_push(action->simcalls, simcall);
758 SIMIX_comm_finish(action);
760 SIMIX_simcall_answer(simcall);
764 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
768 simcall_comm_testany__set__result(simcall, -1);
771 int idx = simcall->mc_value;
773 SIMIX_simcall_answer(simcall);
775 action = xbt_dynar_get_as(actions, idx, smx_action_t);
776 simcall_comm_testany__set__result(simcall, idx);
777 xbt_fifo_push(action->simcalls, simcall);
778 action->state = SIMIX_DONE;
779 SIMIX_comm_finish(action);
784 xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
785 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
786 simcall_comm_testany__set__result(simcall, cursor);
787 xbt_fifo_push(action->simcalls, simcall);
788 SIMIX_comm_finish(action);
792 SIMIX_simcall_answer(simcall);
795 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
798 unsigned int cursor = 0;
801 int idx = simcall->mc_value;
802 action = xbt_dynar_get_as(actions, idx, smx_action_t);
803 xbt_fifo_push(action->simcalls, simcall);
804 simcall_comm_waitany__set__result(simcall, idx);
805 action->state = SIMIX_DONE;
806 SIMIX_comm_finish(action);
810 xbt_dynar_foreach(actions, cursor, action){
811 /* associate this simcall to the the action */
812 xbt_fifo_push(action->simcalls, simcall);
814 /* see if the action is already finished */
815 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
816 SIMIX_comm_finish(action);
822 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
825 unsigned int cursor = 0;
826 xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
828 xbt_dynar_foreach(actions, cursor, action) {
829 xbt_fifo_remove(action->simcalls, simcall);
834 * \brief Starts the simulation of a communication action.
835 * \param action the communication action
837 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
839 /* If both the sender and the receiver are already there, start the communication */
840 if (action->state == SIMIX_READY) {
842 smx_host_t sender = action->comm.src_proc->smx_host;
843 smx_host_t receiver = action->comm.dst_proc->smx_host;
845 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
846 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
848 action->comm.surf_comm = surf_workstation_model->extension.workstation.
849 communicate(sender, receiver, action->comm.task_size, action->comm.rate);
851 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
853 action->state = SIMIX_RUNNING;
855 /* If a link is failed, detect it immediately */
856 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
857 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
858 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
859 action->state = SIMIX_LINK_FAILURE;
860 SIMIX_comm_destroy_internal_actions(action);
863 /* If any of the process is suspend, create the action but stop its execution,
864 it will be restarted when the sender process resume */
865 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
866 SIMIX_process_is_suspended(action->comm.dst_proc)) {
867 /* FIXME: check what should happen with the action state */
869 if (SIMIX_process_is_suspended(action->comm.src_proc))
870 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
871 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
873 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
874 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
876 surf_workstation_model->suspend(action->comm.surf_comm);
883 * \brief Answers the SIMIX simcalls associated to a communication action.
884 * \param action a finished communication action
886 void SIMIX_comm_finish(smx_action_t action)
888 unsigned int destroy_count = 0;
889 smx_simcall_t simcall;
891 while ((simcall = xbt_fifo_shift(action->simcalls))) {
893 /* If a waitany simcall is waiting for this action to finish, then remove
894 it from the other actions in the waitany list. Afterwards, get the
895 position of the actual action in the waitany dynar and
896 return it as the result of the simcall */
897 if (simcall->call == SIMCALL_COMM_WAITANY) {
898 SIMIX_waitany_remove_simcall_from_actions(simcall);
900 simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
903 /* If the action is still in a rendez-vous point then remove from it */
904 if (action->comm.rdv)
905 SIMIX_rdv_remove(action->comm.rdv, action);
907 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
909 /* Check out for errors */
910 switch (action->state) {
913 XBT_DEBUG("Communication %p complete!", action);
914 SIMIX_comm_copy_data(action);
917 case SIMIX_SRC_TIMEOUT:
918 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
919 "Communication timeouted because of sender");
922 case SIMIX_DST_TIMEOUT:
923 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
924 "Communication timeouted because of receiver");
927 case SIMIX_SRC_HOST_FAILURE:
928 if (simcall->issuer == action->comm.src_proc)
929 simcall->issuer->context->iwannadie = 1;
930 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
932 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
935 case SIMIX_DST_HOST_FAILURE:
936 if (simcall->issuer == action->comm.dst_proc)
937 simcall->issuer->context->iwannadie = 1;
938 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
940 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
943 case SIMIX_LINK_FAILURE:
944 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
946 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
947 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
948 simcall->issuer->name, simcall->issuer, action->comm.detached);
949 if (action->comm.src_proc == simcall->issuer) {
950 XBT_DEBUG("I'm source");
951 } else if (action->comm.dst_proc == simcall->issuer) {
952 XBT_DEBUG("I'm dest");
954 XBT_DEBUG("I'm neither source nor dest");
956 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
960 if (simcall->issuer == action->comm.dst_proc)
961 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
962 "Communication canceled by the sender");
964 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
965 "Communication canceled by the receiver");
969 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
972 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
973 if (simcall->issuer->doexception) {
974 if (simcall->call == SIMCALL_COMM_WAITANY) {
975 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
977 else if (simcall->call == SIMCALL_COMM_TESTANY) {
978 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
982 if (surf_workstation_model->extension.
983 workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
984 simcall->issuer->context->iwannadie = 1;
987 simcall->issuer->waiting_action = NULL;
988 xbt_fifo_remove(simcall->issuer->comms, action);
989 if(action->comm.detached){
990 if(simcall->issuer == action->comm.src_proc){
991 if(action->comm.dst_proc)
992 xbt_fifo_remove(action->comm.dst_proc->comms, action);
994 if(simcall->issuer == action->comm.dst_proc){
995 if(action->comm.src_proc)
996 xbt_fifo_remove(action->comm.src_proc->comms, action);
999 SIMIX_simcall_answer(simcall);
1003 while (destroy_count-- > 0)
1004 SIMIX_comm_destroy(action);
1008 * \brief This function is called when a Surf communication action is finished.
1009 * \param action the corresponding Simix communication
1011 void SIMIX_post_comm(smx_action_t action)
1013 /* Update action state */
1014 if (action->comm.src_timeout &&
1015 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1016 action->state = SIMIX_SRC_TIMEOUT;
1017 else if (action->comm.dst_timeout &&
1018 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1019 action->state = SIMIX_DST_TIMEOUT;
1020 else if (action->comm.src_timeout &&
1021 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1022 action->state = SIMIX_SRC_HOST_FAILURE;
1023 else if (action->comm.dst_timeout &&
1024 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1025 action->state = SIMIX_DST_HOST_FAILURE;
1026 else if (action->comm.surf_comm &&
1027 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1028 XBT_DEBUG("Puta madre. Surf says that the link broke");
1029 action->state = SIMIX_LINK_FAILURE;
1031 action->state = SIMIX_DONE;
1033 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1034 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1036 /* destroy the surf actions associated with the Simix communication */
1037 SIMIX_comm_destroy_internal_actions(action);
1039 /* remove the communication action from the list of pending communications
1040 * of both processes (if they still exist) */
1041 if (action->comm.src_proc) {
1042 xbt_fifo_remove(action->comm.src_proc->comms, action);
1044 if (action->comm.dst_proc) {
1045 xbt_fifo_remove(action->comm.dst_proc->comms, action);
1048 /* if there are simcalls associated with the action, then answer them */
1049 if (xbt_fifo_size(action->simcalls)) {
1050 SIMIX_comm_finish(action);
1054 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1055 SIMIX_comm_cancel(action);
1057 void SIMIX_comm_cancel(smx_action_t action)
1059 /* if the action is a waiting state means that it is still in a rdv */
1060 /* so remove from it and delete it */
1061 if (action->state == SIMIX_WAITING) {
1062 SIMIX_rdv_remove(action->comm.rdv, action);
1063 action->state = SIMIX_CANCELED;
1065 else if (!MC_is_active() /* when running the MC there are no surf actions */
1066 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1068 surf_workstation_model->action_cancel(action->comm.surf_comm);
1072 void SIMIX_comm_suspend(smx_action_t action)
1074 /*FIXME: shall we suspend also the timeout actions? */
1075 if (action->comm.surf_comm)
1076 surf_workstation_model->suspend(action->comm.surf_comm);
1077 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1080 void SIMIX_comm_resume(smx_action_t action)
1082 /*FIXME: check what happen with the timeouts */
1083 if (action->comm.surf_comm)
1084 surf_workstation_model->resume(action->comm.surf_comm);
1085 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1089 /************* Action Getters **************/
1091 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1092 return SIMIX_comm_get_remains(action);
1095 * \brief get the amount remaining from the communication
1096 * \param action The communication
1098 double SIMIX_comm_get_remains(smx_action_t action)
1106 switch (action->state) {
1109 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1114 remains = 0; /*FIXME: check what should be returned */
1118 remains = 0; /*FIXME: is this correct? */
1124 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1125 return SIMIX_comm_get_state(action);
1127 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1129 return action->state;
1132 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1133 return SIMIX_comm_get_src_data(action);
1136 * \brief Return the user data associated to the sender of the communication
1137 * \param action The communication
1138 * \return the user data
1140 void* SIMIX_comm_get_src_data(smx_action_t action)
1142 return action->comm.src_data;
1145 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1146 return SIMIX_comm_get_dst_data(action);
1149 * \brief Return the user data associated to the receiver of the communication
1150 * \param action The communication
1151 * \return the user data
1153 void* SIMIX_comm_get_dst_data(smx_action_t action)
1155 return action->comm.dst_data;
1158 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1159 return SIMIX_comm_get_src_proc(action);
1161 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1163 return action->comm.src_proc;
1166 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1167 return SIMIX_comm_get_dst_proc(action);
1169 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1171 return action->comm.dst_proc;
1174 #ifdef HAVE_LATENCY_BOUND_TRACKING
1175 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1177 return SIMIX_comm_is_latency_bounded(action);
1181 * \brief verify if communication is latency bounded
1182 * \param comm The communication
1184 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1189 if (action->comm.surf_comm){
1190 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1191 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1192 XBT_DEBUG("Action limited is %d", action->latency_limited);
1194 return action->latency_limited;
1198 /******************************************************************************/
1199 /* SIMIX_comm_copy_data callbacks */
1200 /******************************************************************************/
1201 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1202 &SIMIX_comm_copy_pointer_callback;
1205 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1207 SIMIX_comm_copy_data_callback = callback;
1210 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1212 xbt_assert((buff_size == sizeof(void *)),
1213 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1214 *(void **) (comm->comm.dst_buff) = buff;
1217 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1219 XBT_DEBUG("Copy the data over");
1220 memcpy(comm->comm.dst_buff, buff, buff_size);
1221 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1223 comm->comm.src_buff = NULL;
1229 * \brief Copy the communication data from the sender's buffer to the receiver's one
1230 * \param comm The communication
1232 void SIMIX_comm_copy_data(smx_action_t comm)
1234 size_t buff_size = comm->comm.src_buff_size;
1235 /* If there is no data to be copy then return */
1236 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1239 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1241 comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1242 comm->comm.src_buff,
1243 comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1244 comm->comm.dst_buff, buff_size);
1246 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1247 if (comm->comm.dst_buff_size)
1248 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1250 /* Update the receiver's buffer size to the copied amount */
1251 if (comm->comm.dst_buff_size)
1252 *comm->comm.dst_buff_size = buff_size;
1255 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1257 /* Set the copied flag so we copy data only once */
1258 /* (this function might be called from both communication ends) */
1259 comm->comm.copied = 1;