1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
30 void SIMIX_network_init(void)
32 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
35 void SIMIX_network_exit(void)
37 xbt_dict_free(&rdv_points);
40 /******************************************************************************/
41 /* Rendez-Vous Points */
42 /******************************************************************************/
44 smx_rdv_t SIMIX_rdv_create(const char *name)
46 /* two processes may have pushed the same rdv_create simcall at the same time */
47 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
50 rdv = xbt_new0(s_smx_rvpoint_t, 1);
51 rdv->name = name ? xbt_strdup(name) : NULL;
52 rdv->comm_fifo = xbt_fifo_new();
53 rdv->done_comm_fifo = xbt_fifo_new();
54 rdv->permanent_receiver=NULL;
56 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
59 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
64 void SIMIX_rdv_destroy(smx_rdv_t rdv)
67 xbt_dict_remove(rdv_points, rdv->name);
70 void SIMIX_rdv_free(void *data)
72 XBT_DEBUG("rdv free %p", data);
73 smx_rdv_t rdv = (smx_rdv_t) data;
75 xbt_fifo_free(rdv->comm_fifo);
76 xbt_fifo_free(rdv->done_comm_fifo);
81 xbt_dict_t SIMIX_get_rdv_points()
86 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
88 return xbt_dict_get_or_null(rdv_points, name);
91 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
93 smx_action_t comm = NULL;
94 xbt_fifo_item_t item = NULL;
97 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
98 if (comm->comm.src_proc->smx_host == host)
105 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
107 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
111 * \brief get the receiver (process associated to the mailbox)
112 * \param rdv The rendez-vous point
113 * \return process The receiving process (NULL if not set)
115 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
117 return rdv->permanent_receiver;
121 * \brief set the receiver of the rendez vous point to allow eager sends
122 * \param rdv The rendez-vous point
123 * \param process The receiving process
125 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
127 rdv->permanent_receiver=process;
131 * \brief Pushes a communication action into a rendez-vous point
132 * \param rdv The rendez-vous point
133 * \param comm The communication action
135 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
137 xbt_fifo_push(rdv->comm_fifo, comm);
138 comm->comm.rdv = rdv;
142 * \brief Removes a communication action from a rendez-vous point
143 * \param rdv The rendez-vous point
144 * \param comm The communication action
146 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
148 xbt_fifo_remove(rdv->comm_fifo, comm);
149 comm->comm.rdv = NULL;
153 * \brief Checks if there is a communication action queued in a fifo matching our needs
154 * \param type The type of communication we are looking for (comm_send, comm_recv)
155 * \return The communication action if found, NULL otherwise
157 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
158 int (*match_fun)(void *, void *,smx_action_t),
159 void *this_user_data, smx_action_t my_action)
162 xbt_fifo_item_t item;
163 void* other_user_data = NULL;
165 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
166 if (action->comm.type == SIMIX_COMM_SEND) {
167 other_user_data = action->comm.src_data;
168 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
169 other_user_data = action->comm.dst_data;
171 if (action->comm.type == type &&
172 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
173 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
174 XBT_DEBUG("Found a matching communication action %p", action);
175 xbt_fifo_remove_item(fifo, item);
176 xbt_fifo_free_item(item);
177 action->comm.refcount++;
178 action->comm.rdv = NULL;
181 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
182 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
183 action, (int)action->comm.type, (int)type);
185 XBT_DEBUG("No matching communication action found");
191 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
192 * \param type The type of communication we are looking for (comm_send, comm_recv)
193 * \return The communication action if found, NULL otherwise
195 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
196 int (*match_fun)(void *, void *,smx_action_t),
197 void *this_user_data, smx_action_t my_action)
200 xbt_fifo_item_t item;
201 void* other_user_data = NULL;
203 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
204 if (action->comm.type == SIMIX_COMM_SEND) {
205 other_user_data = action->comm.src_data;
206 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
207 other_user_data = action->comm.dst_data;
209 if (action->comm.type == type &&
210 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
211 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
212 XBT_DEBUG("Found a matching communication action %p", action);
213 action->comm.refcount++;
217 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
218 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
219 action, (int)action->comm.type, (int)type);
221 XBT_DEBUG("No matching communication action found");
224 /******************************************************************************/
225 /* Communication Actions */
226 /******************************************************************************/
229 * \brief Creates a new communicate action
230 * \param type The direction of communication (comm_send, comm_recv)
231 * \return The new communicate action
233 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
237 /* alloc structures */
238 act = xbt_mallocator_get(simix_global->action_mallocator);
240 act->type = SIMIX_ACTION_COMMUNICATE;
241 act->state = SIMIX_WAITING;
243 /* set communication */
244 act->comm.type = type;
245 act->comm.refcount = 1;
247 #ifdef HAVE_LATENCY_BOUND_TRACKING
248 //initialize with unknown value
249 act->latency_limited = -1;
253 act->category = NULL;
256 XBT_DEBUG("Create communicate action %p", act);
263 * \brief Destroy a communicate action
264 * \param action The communicate action to be destroyed
266 void SIMIX_comm_destroy(smx_action_t action)
268 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
269 action, action->comm.refcount, (int)action->state);
271 if (action->comm.refcount <= 0) {
272 xbt_backtrace_display_current();
273 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
274 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
276 action->comm.refcount--;
277 if (action->comm.refcount > 0)
279 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
280 action->comm.refcount);
282 #ifdef HAVE_LATENCY_BOUND_TRACKING
283 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
286 xbt_free(action->name);
287 SIMIX_comm_destroy_internal_actions(action);
289 if (action->comm.detached && action->state != SIMIX_DONE) {
290 /* the communication has failed and was detached:
291 * we have to free the buffer */
292 if (action->comm.clean_fun) {
293 action->comm.clean_fun(action->comm.src_buff);
295 action->comm.src_buff = NULL;
298 xbt_mallocator_release(simix_global->action_mallocator, action);
301 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
303 if (action->comm.surf_comm){
304 #ifdef HAVE_LATENCY_BOUND_TRACKING
305 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
307 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
308 action->comm.surf_comm = NULL;
311 if (action->comm.src_timeout){
312 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
313 action->comm.src_timeout = NULL;
316 if (action->comm.dst_timeout){
317 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
318 action->comm.dst_timeout = NULL;
322 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
323 double task_size, double rate,
324 void *src_buff, size_t src_buff_size,
325 int (*match_fun)(void *, void *,smx_action_t),
326 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
330 XBT_DEBUG("send from %p\n", rdv);
332 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
333 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
335 /* Look for communication action matching our needs. We also provide a description of
336 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
338 * If it is not found then push our communication into the rendez-vous point */
339 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
342 other_action = this_action;
344 if (rdv->permanent_receiver!=NULL){
345 //this mailbox is for small messages, which have to be sent right now
346 other_action->state = SIMIX_READY;
347 other_action->comm.dst_proc=rdv->permanent_receiver;
348 other_action->comm.refcount++;
349 other_action->comm.rdv = rdv;
350 xbt_fifo_push(rdv->done_comm_fifo,other_action);
351 other_action->comm.rdv=rdv;
352 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
355 SIMIX_rdv_push(rdv, this_action);
358 XBT_DEBUG("Receive already pushed\n");
360 SIMIX_comm_destroy(this_action);
361 --smx_total_comms; // this creation was a pure waste
363 other_action->state = SIMIX_READY;
364 other_action->comm.type = SIMIX_COMM_READY;
367 xbt_fifo_push(src_proc->comms, other_action);
369 /* if the communication action is detached then decrease the refcount
370 * by one, so it will be eliminated by the receiver's destroy call */
372 other_action->comm.detached = 1;
373 other_action->comm.refcount--;
374 other_action->comm.clean_fun = clean_fun;
376 other_action->comm.clean_fun = NULL;
379 /* Setup the communication action */
380 other_action->comm.src_proc = src_proc;
381 other_action->comm.task_size = task_size;
382 other_action->comm.rate = rate;
383 other_action->comm.src_buff = src_buff;
384 other_action->comm.src_buff_size = src_buff_size;
385 other_action->comm.src_data = data;
387 other_action->comm.match_fun = match_fun;
390 other_action->state = SIMIX_RUNNING;
394 SIMIX_comm_start(other_action);
395 return (detached ? NULL : other_action);
398 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
399 void *dst_buff, size_t *dst_buff_size,
400 int (*match_fun)(void *, void *, smx_action_t), void *data)
402 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
403 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
405 smx_action_t other_action;
406 //communication already done, get it inside the fifo of completed comms
407 //permanent receive v1
408 //int already_received=0;
409 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
411 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
412 //find a match in the already received fifo
413 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
414 //if not found, assume the receiver came first, register it to the mailbox in the classical way
416 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
417 other_action = this_action;
418 SIMIX_rdv_push(rdv, this_action);
420 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
422 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
423 other_action->state = SIMIX_DONE;
424 other_action->comm.type = SIMIX_COMM_DONE;
425 other_action->comm.rdv = NULL;
426 //SIMIX_comm_destroy(this_action);
427 //--smx_total_comms; // this creation was a pure waste
428 //already_received=1;
429 //other_action->comm.refcount--;
431 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
433 // other_action->comm.refcount--;
434 SIMIX_comm_destroy(this_action);
435 --smx_total_comms; // this creation was a pure waste
438 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
440 /* Look for communication action matching our needs. We also provide a description of
441 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
443 * If it is not found then push our communication into the rendez-vous point */
444 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
447 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
448 other_action = this_action;
449 SIMIX_rdv_push(rdv, this_action);
451 SIMIX_comm_destroy(this_action);
452 --smx_total_comms; // this creation was a pure waste
453 other_action->state = SIMIX_READY;
454 other_action->comm.type = SIMIX_COMM_READY;
455 // other_action->comm.refcount--;
457 xbt_fifo_push(dst_proc->comms, other_action);
460 /* Setup communication action */
461 other_action->comm.dst_proc = dst_proc;
462 other_action->comm.dst_buff = dst_buff;
463 other_action->comm.dst_buff_size = dst_buff_size;
464 other_action->comm.dst_data = data;
466 other_action->comm.match_fun = match_fun;
469 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
470 SIMIX_comm_copy_data(other_action);*/
474 other_action->state = SIMIX_RUNNING;
478 SIMIX_comm_start(other_action);
484 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
485 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
487 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
488 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
490 smx_action_t other_action=NULL;
491 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
492 //find a match in the already received fifo
493 XBT_DEBUG("first try in the perm recv mailbox \n");
495 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
499 XBT_DEBUG("second try in the other mailbox");
500 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
503 if(other_action)other_action->comm.refcount--;
505 SIMIX_comm_destroy(this_action);
510 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
513 /* the simcall may be a wait, a send or a recv */
516 /* Associate this simcall to the wait action */
517 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
519 xbt_fifo_push(action->simcalls, simcall);
520 simcall->issuer->waiting_action = action;
524 action->state = SIMIX_DONE;
526 /* If we reached this point, the wait simcall must have a timeout */
527 /* Otherwise it shouldn't be enabled and executed by the MC */
531 if (action->comm.src_proc == simcall->issuer)
532 action->state = SIMIX_SRC_TIMEOUT;
534 action->state = SIMIX_DST_TIMEOUT;
537 SIMIX_comm_finish(action);
541 /* If the action has already finish perform the error handling, */
542 /* otherwise set up a waiting timeout on the right side */
543 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
544 SIMIX_comm_finish(action);
545 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
546 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
547 surf_workstation_model->action_data_set(sleep, action);
549 if (simcall->issuer == action->comm.src_proc)
550 action->comm.src_timeout = sleep;
552 action->comm.dst_timeout = sleep;
556 void SIMIX_pre_comm_test(smx_simcall_t simcall)
558 smx_action_t action = simcall->comm_test.comm;
561 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
562 if(simcall->comm_test.result){
563 action->state = SIMIX_DONE;
564 xbt_fifo_push(action->simcalls, simcall);
565 SIMIX_comm_finish(action);
567 SIMIX_simcall_answer(simcall);
572 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
573 if (simcall->comm_test.result) {
574 xbt_fifo_push(action->simcalls, simcall);
575 SIMIX_comm_finish(action);
577 SIMIX_simcall_answer(simcall);
581 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
585 xbt_dynar_t actions = simcall->comm_testany.comms;
586 simcall->comm_testany.result = -1;
590 SIMIX_simcall_answer(simcall);
592 action = xbt_dynar_get_as(actions, idx, smx_action_t);
593 simcall->comm_testany.result = idx;
594 xbt_fifo_push(action->simcalls, simcall);
595 action->state = SIMIX_DONE;
596 SIMIX_comm_finish(action);
601 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
602 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
603 simcall->comm_testany.result = cursor;
604 xbt_fifo_push(action->simcalls, simcall);
605 SIMIX_comm_finish(action);
609 SIMIX_simcall_answer(simcall);
612 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
615 unsigned int cursor = 0;
616 xbt_dynar_t actions = simcall->comm_waitany.comms;
619 action = xbt_dynar_get_as(actions, idx, smx_action_t);
620 xbt_fifo_push(action->simcalls, simcall);
621 simcall->comm_waitany.result = idx;
622 action->state = SIMIX_DONE;
623 SIMIX_comm_finish(action);
627 xbt_dynar_foreach(actions, cursor, action){
628 /* associate this simcall to the the action */
629 xbt_fifo_push(action->simcalls, simcall);
631 /* see if the action is already finished */
632 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
633 SIMIX_comm_finish(action);
639 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
642 unsigned int cursor = 0;
643 xbt_dynar_t actions = simcall->comm_waitany.comms;
645 xbt_dynar_foreach(actions, cursor, action) {
646 xbt_fifo_remove(action->simcalls, simcall);
651 * \brief Starts the simulation of a communication action.
652 * \param action the communication action
654 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
656 /* If both the sender and the receiver are already there, start the communication */
657 if (action->state == SIMIX_READY) {
659 smx_host_t sender = action->comm.src_proc->smx_host;
660 smx_host_t receiver = action->comm.dst_proc->smx_host;
662 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
663 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
665 action->comm.surf_comm = surf_workstation_model->extension.workstation.
666 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
668 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
670 action->state = SIMIX_RUNNING;
672 /* If a link is failed, detect it immediately */
673 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
674 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
675 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
676 action->state = SIMIX_LINK_FAILURE;
677 SIMIX_comm_destroy_internal_actions(action);
680 /* If any of the process is suspend, create the action but stop its execution,
681 it will be restarted when the sender process resume */
682 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
683 SIMIX_process_is_suspended(action->comm.dst_proc)) {
684 /* FIXME: check what should happen with the action state */
686 if (SIMIX_process_is_suspended(action->comm.src_proc))
687 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
688 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
690 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
691 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
693 surf_workstation_model->suspend(action->comm.surf_comm);
700 * \brief Answers the SIMIX simcalls associated to a communication action.
701 * \param action a finished communication action
703 void SIMIX_comm_finish(smx_action_t action)
705 unsigned int destroy_count = 0;
706 smx_simcall_t simcall;
708 while ((simcall = xbt_fifo_shift(action->simcalls))) {
710 /* If a waitany simcall is waiting for this action to finish, then remove
711 it from the other actions in the waitany list. Afterwards, get the
712 position of the actual action in the waitany dynar and
713 return it as the result of the simcall */
714 if (simcall->call == SIMCALL_COMM_WAITANY) {
715 SIMIX_waitany_remove_simcall_from_actions(simcall);
717 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
720 /* If the action is still in a rendez-vous point then remove from it */
721 if (action->comm.rdv)
722 SIMIX_rdv_remove(action->comm.rdv, action);
724 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
726 /* Check out for errors */
727 switch (action->state) {
730 XBT_DEBUG("Communication %p complete!", action);
731 SIMIX_comm_copy_data(action);
734 case SIMIX_SRC_TIMEOUT:
735 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
736 "Communication timeouted because of sender");
739 case SIMIX_DST_TIMEOUT:
740 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
741 "Communication timeouted because of receiver");
744 case SIMIX_SRC_HOST_FAILURE:
745 if (simcall->issuer == action->comm.src_proc)
746 simcall->issuer->context->iwannadie = 1;
747 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
749 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
752 case SIMIX_DST_HOST_FAILURE:
753 if (simcall->issuer == action->comm.dst_proc)
754 simcall->issuer->context->iwannadie = 1;
755 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
757 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
760 case SIMIX_LINK_FAILURE:
761 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
763 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
764 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
765 simcall->issuer->name, simcall->issuer, action->comm.detached);
766 if (action->comm.src_proc == simcall->issuer) {
767 XBT_DEBUG("I'm source");
768 } else if (action->comm.dst_proc == simcall->issuer) {
769 XBT_DEBUG("I'm dest");
771 XBT_DEBUG("I'm neither source nor dest");
773 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
777 if (simcall->issuer == action->comm.dst_proc)
778 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
779 "Communication canceled by the sender");
781 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
782 "Communication canceled by the receiver");
786 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
789 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
790 if (simcall->issuer->doexception) {
791 if (simcall->call == SIMCALL_COMM_WAITANY) {
792 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
794 else if (simcall->call == SIMCALL_COMM_TESTANY) {
795 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
799 if (surf_workstation_model->extension.
800 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
801 simcall->issuer->context->iwannadie = 1;
804 simcall->issuer->waiting_action = NULL;
805 xbt_fifo_remove(simcall->issuer->comms, action);
806 SIMIX_simcall_answer(simcall);
810 while (destroy_count-- > 0)
811 SIMIX_comm_destroy(action);
815 * \brief This function is called when a Surf communication action is finished.
816 * \param action the corresponding Simix communication
818 void SIMIX_post_comm(smx_action_t action)
820 /* Update action state */
821 if (action->comm.src_timeout &&
822 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
823 action->state = SIMIX_SRC_TIMEOUT;
824 else if (action->comm.dst_timeout &&
825 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
826 action->state = SIMIX_DST_TIMEOUT;
827 else if (action->comm.src_timeout &&
828 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
829 action->state = SIMIX_SRC_HOST_FAILURE;
830 else if (action->comm.dst_timeout &&
831 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
832 action->state = SIMIX_DST_HOST_FAILURE;
833 else if (action->comm.surf_comm &&
834 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
835 XBT_DEBUG("Puta madre. Surf says that the link broke");
836 action->state = SIMIX_LINK_FAILURE;
838 action->state = SIMIX_DONE;
840 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
841 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
843 /* destroy the surf actions associated with the Simix communication */
844 SIMIX_comm_destroy_internal_actions(action);
846 /* remove the communication action from the list of pending communications
847 * of both processes (if they still exist) */
848 if (action->comm.src_proc) {
849 xbt_fifo_remove(action->comm.src_proc->comms, action);
851 if (action->comm.dst_proc) {
852 xbt_fifo_remove(action->comm.dst_proc->comms, action);
855 /* if there are simcalls associated with the action, then answer them */
856 if (xbt_fifo_size(action->simcalls)) {
857 SIMIX_comm_finish(action);
861 void SIMIX_comm_cancel(smx_action_t action)
863 /* if the action is a waiting state means that it is still in a rdv */
864 /* so remove from it and delete it */
865 if (action->state == SIMIX_WAITING) {
866 SIMIX_rdv_remove(action->comm.rdv, action);
867 action->state = SIMIX_CANCELED;
869 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
870 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
872 surf_workstation_model->action_cancel(action->comm.surf_comm);
876 void SIMIX_comm_suspend(smx_action_t action)
878 /*FIXME: shall we suspend also the timeout actions? */
879 if (action->comm.surf_comm)
880 surf_workstation_model->suspend(action->comm.surf_comm);
881 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
884 void SIMIX_comm_resume(smx_action_t action)
886 /*FIXME: check what happen with the timeouts */
887 if (action->comm.surf_comm)
888 surf_workstation_model->resume(action->comm.surf_comm);
889 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
893 /************* Action Getters **************/
896 * \brief get the amount remaining from the communication
897 * \param action The communication
899 double SIMIX_comm_get_remains(smx_action_t action)
907 switch (action->state) {
910 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
915 remains = 0; /*FIXME: check what should be returned */
919 remains = 0; /*FIXME: is this correct? */
925 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
927 return action->state;
931 * \brief Return the user data associated to the sender of the communication
932 * \param action The communication
933 * \return the user data
935 void* SIMIX_comm_get_src_data(smx_action_t action)
937 return action->comm.src_data;
941 * \brief Return the user data associated to the receiver of the communication
942 * \param action The communication
943 * \return the user data
945 void* SIMIX_comm_get_dst_data(smx_action_t action)
947 return action->comm.dst_data;
950 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
952 return action->comm.src_proc;
955 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
957 return action->comm.dst_proc;
960 #ifdef HAVE_LATENCY_BOUND_TRACKING
962 * \brief verify if communication is latency bounded
963 * \param comm The communication
965 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
970 if (action->comm.surf_comm){
971 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
972 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
973 XBT_DEBUG("Action limited is %d", action->latency_limited);
975 return action->latency_limited;
979 /******************************************************************************/
980 /* SIMIX_comm_copy_data callbacks */
981 /******************************************************************************/
982 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
983 &SIMIX_comm_copy_pointer_callback;
986 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
988 SIMIX_comm_copy_data_callback = callback;
991 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
993 xbt_assert((buff_size == sizeof(void *)),
994 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
995 *(void **) (comm->comm.dst_buff) = buff;
998 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1000 XBT_DEBUG("Copy the data over");
1001 memcpy(comm->comm.dst_buff, buff, buff_size);
1002 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1004 comm->comm.src_buff = NULL;
1010 * \brief Copy the communication data from the sender's buffer to the receiver's one
1011 * \param comm The communication
1013 void SIMIX_comm_copy_data(smx_action_t comm)
1015 size_t buff_size = comm->comm.src_buff_size;
1016 /* If there is no data to be copy then return */
1017 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1020 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1022 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1023 comm->comm.src_buff,
1024 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1025 comm->comm.dst_buff, buff_size);
1027 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1028 if (comm->comm.dst_buff_size)
1029 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1031 /* Update the receiver's buffer size to the copied amount */
1032 if (comm->comm.dst_buff_size)
1033 *comm->comm.dst_buff_size = buff_size;
1036 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1038 /* Set the copied flag so we copy data only once */
1039 /* (this function might be called from both communication ends) */
1040 comm->comm.copied = 1;