1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
30 void SIMIX_network_init(void)
32 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
35 void SIMIX_network_exit(void)
37 xbt_dict_free(&rdv_points);
40 /******************************************************************************/
41 /* Rendez-Vous Points */
42 /******************************************************************************/
44 smx_rdv_t SIMIX_rdv_create(const char *name)
46 /* two processes may have pushed the same rdv_create simcall at the same time */
47 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
50 rdv = xbt_new0(s_smx_rvpoint_t, 1);
51 rdv->name = name ? xbt_strdup(name) : NULL;
52 rdv->comm_fifo = xbt_fifo_new();
53 rdv->done_comm_fifo = xbt_fifo_new();
54 rdv->permanent_receiver=NULL;
56 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
59 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
64 void SIMIX_rdv_destroy(smx_rdv_t rdv)
67 xbt_dict_remove(rdv_points, rdv->name);
70 void SIMIX_rdv_free(void *data)
72 XBT_DEBUG("rdv free %p", data);
73 smx_rdv_t rdv = (smx_rdv_t) data;
75 xbt_fifo_free(rdv->comm_fifo);
76 xbt_fifo_free(rdv->done_comm_fifo);
81 xbt_dict_t SIMIX_get_rdv_points()
86 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
88 return xbt_dict_get_or_null(rdv_points, name);
91 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
93 smx_action_t comm = NULL;
94 xbt_fifo_item_t item = NULL;
97 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
98 if (comm->comm.src_proc->smx_host == host)
105 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
107 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
111 * \brief get the receiver (process associated to the mailbox)
112 * \param rdv The rendez-vous point
113 * \return process The receiving process (NULL if not set)
115 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
117 return rdv->permanent_receiver;
121 * \brief set the receiver of the rendez vous point to allow eager sends
122 * \param rdv The rendez-vous point
123 * \param process The receiving process
125 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
127 rdv->permanent_receiver=process;
131 * \brief Pushes a communication action into a rendez-vous point
132 * \param rdv The rendez-vous point
133 * \param comm The communication action
135 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
137 xbt_fifo_push(rdv->comm_fifo, comm);
138 comm->comm.rdv = rdv;
142 * \brief Removes a communication action from a rendez-vous point
143 * \param rdv The rendez-vous point
144 * \param comm The communication action
146 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
148 xbt_fifo_remove(rdv->comm_fifo, comm);
149 comm->comm.rdv = NULL;
153 * \brief Checks if there is a communication action queued in a fifo matching our needs
154 * \param type The type of communication we are looking for (comm_send, comm_recv)
155 * \return The communication action if found, NULL otherwise
157 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
158 int (*match_fun)(void *, void *,smx_action_t),
159 void *this_user_data, smx_action_t my_action)
162 xbt_fifo_item_t item;
163 void* other_user_data = NULL;
165 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
166 if (action->comm.type == SIMIX_COMM_SEND) {
167 other_user_data = action->comm.src_data;
168 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
169 other_user_data = action->comm.dst_data;
171 if (action->comm.type == type &&
172 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
173 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
174 XBT_DEBUG("Found a matching communication action %p", action);
175 xbt_fifo_remove_item(fifo, item);
176 xbt_fifo_free_item(item);
177 action->comm.refcount++;
178 action->comm.rdv = NULL;
181 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
182 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
183 action, (int)action->comm.type, (int)type);
185 XBT_DEBUG("No matching communication action found");
191 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
192 * \param type The type of communication we are looking for (comm_send, comm_recv)
193 * \return The communication action if found, NULL otherwise
195 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
196 int (*match_fun)(void *, void *,smx_action_t),
197 void *this_user_data, smx_action_t my_action)
200 xbt_fifo_item_t item;
201 void* other_user_data = NULL;
203 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
204 if (action->comm.type == SIMIX_COMM_SEND) {
205 other_user_data = action->comm.src_data;
206 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
207 other_user_data = action->comm.dst_data;
209 if (action->comm.type == type &&
210 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
211 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
212 XBT_DEBUG("Found a matching communication action %p", action);
213 action->comm.refcount++;
217 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
218 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
219 action, (int)action->comm.type, (int)type);
221 XBT_DEBUG("No matching communication action found");
224 /******************************************************************************/
225 /* Communication Actions */
226 /******************************************************************************/
229 * \brief Creates a new communicate action
230 * \param type The direction of communication (comm_send, comm_recv)
231 * \return The new communicate action
233 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
237 /* alloc structures */
238 act = xbt_mallocator_get(simix_global->action_mallocator);
240 act->type = SIMIX_ACTION_COMMUNICATE;
241 act->state = SIMIX_WAITING;
243 /* set communication */
244 act->comm.type = type;
245 act->comm.refcount = 1;
247 #ifdef HAVE_LATENCY_BOUND_TRACKING
248 //initialize with unknown value
249 act->latency_limited = -1;
253 act->category = NULL;
256 XBT_DEBUG("Create communicate action %p", act);
263 * \brief Destroy a communicate action
264 * \param action The communicate action to be destroyed
266 void SIMIX_comm_destroy(smx_action_t action)
268 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
269 action, action->comm.refcount, (int)action->state);
271 if (action->comm.refcount <= 0) {
272 xbt_backtrace_display_current();
273 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
274 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
276 action->comm.refcount--;
277 if (action->comm.refcount > 0)
279 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
280 action->comm.refcount);
282 #ifdef HAVE_LATENCY_BOUND_TRACKING
283 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
286 xbt_free(action->name);
287 SIMIX_comm_destroy_internal_actions(action);
289 if (action->comm.detached && action->state != SIMIX_DONE) {
290 /* the communication has failed and was detached:
291 * we have to free the buffer */
292 if (action->comm.clean_fun) {
293 action->comm.clean_fun(action->comm.src_buff);
295 action->comm.src_buff = NULL;
298 xbt_mallocator_release(simix_global->action_mallocator, action);
301 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
303 if (action->comm.surf_comm){
304 #ifdef HAVE_LATENCY_BOUND_TRACKING
305 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
307 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
308 action->comm.surf_comm = NULL;
311 if (action->comm.src_timeout){
312 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
313 action->comm.src_timeout = NULL;
316 if (action->comm.dst_timeout){
317 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
318 action->comm.dst_timeout = NULL;
322 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
323 double task_size, double rate,
324 void *src_buff, size_t src_buff_size,
325 int (*match_fun)(void *, void *,smx_action_t),
326 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
330 XBT_DEBUG("send from %p\n", rdv);
332 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
333 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
335 /* Look for communication action matching our needs. We also provide a description of
336 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
338 * If it is not found then push our communication into the rendez-vous point */
339 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
342 other_action = this_action;
344 if (rdv->permanent_receiver!=NULL){
345 //this mailbox is for small messages, which have to be sent right now
346 other_action->state = SIMIX_READY;
347 other_action->comm.dst_proc=rdv->permanent_receiver;
348 other_action->comm.refcount++;
349 other_action->comm.rdv = rdv;
350 xbt_fifo_push(rdv->done_comm_fifo,other_action);
351 other_action->comm.rdv=rdv;
352 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
355 SIMIX_rdv_push(rdv, this_action);
358 XBT_DEBUG("Receive already pushed\n");
360 SIMIX_comm_destroy(this_action);
361 --smx_total_comms; // this creation was a pure waste
363 other_action->state = SIMIX_READY;
364 other_action->comm.type = SIMIX_COMM_READY;
367 xbt_fifo_push(src_proc->comms, other_action);
369 /* if the communication action is detached then decrease the refcount
370 * by one, so it will be eliminated by the receiver's destroy call */
372 other_action->comm.detached = 1;
373 other_action->comm.refcount--;
374 other_action->comm.clean_fun = clean_fun;
376 other_action->comm.clean_fun = NULL;
379 /* Setup the communication action */
380 other_action->comm.src_proc = src_proc;
381 other_action->comm.task_size = task_size;
382 other_action->comm.rate = rate;
383 other_action->comm.src_buff = src_buff;
384 other_action->comm.src_buff_size = src_buff_size;
385 other_action->comm.src_data = data;
387 other_action->comm.match_fun = match_fun;
390 other_action->state = SIMIX_RUNNING;
394 SIMIX_comm_start(other_action);
395 return (detached ? NULL : other_action);
398 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
399 void *dst_buff, size_t *dst_buff_size,
400 int (*match_fun)(void *, void *, smx_action_t), void *data)
402 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
403 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
405 smx_action_t other_action;
406 //communication already done, get it inside the fifo of completed comms
407 //permanent receive v1
408 //int already_received=0;
409 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
411 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
412 //find a match in the already received fifo
413 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
414 //if not found, assume the receiver came first, register it to the mailbox in the classical way
416 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
417 other_action = this_action;
418 SIMIX_rdv_push(rdv, this_action);
420 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
422 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
423 other_action->state = SIMIX_DONE;
424 other_action->comm.type = SIMIX_COMM_DONE;
425 other_action->comm.rdv = NULL;
426 //SIMIX_comm_destroy(this_action);
427 //--smx_total_comms; // this creation was a pure waste
428 //already_received=1;
429 //other_action->comm.refcount--;
431 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
433 // other_action->comm.refcount--;
434 SIMIX_comm_destroy(this_action);
435 --smx_total_comms; // this creation was a pure waste
438 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
440 /* Look for communication action matching our needs. We also provide a description of
441 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
443 * If it is not found then push our communication into the rendez-vous point */
444 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
447 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
448 other_action = this_action;
449 SIMIX_rdv_push(rdv, this_action);
451 SIMIX_comm_destroy(this_action);
452 --smx_total_comms; // this creation was a pure waste
453 other_action->state = SIMIX_READY;
454 other_action->comm.type = SIMIX_COMM_READY;
455 // other_action->comm.refcount--;
457 xbt_fifo_push(dst_proc->comms, other_action);
460 /* Setup communication action */
461 other_action->comm.dst_proc = dst_proc;
462 other_action->comm.dst_buff = dst_buff;
463 other_action->comm.dst_buff_size = dst_buff_size;
464 other_action->comm.dst_data = data;
466 other_action->comm.match_fun = match_fun;
469 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
470 SIMIX_comm_copy_data(other_action);*/
474 other_action->state = SIMIX_RUNNING;
478 SIMIX_comm_start(other_action);
484 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
485 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
487 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
488 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
490 smx_action_t other_action;
491 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
492 //find a match in the already received fifo
493 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
495 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497 if(other_action)other_action->comm.refcount--;
499 SIMIX_comm_destroy(this_action);
504 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
507 /* the simcall may be a wait, a send or a recv */
510 /* Associate this simcall to the wait action */
511 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
513 xbt_fifo_push(action->simcalls, simcall);
514 simcall->issuer->waiting_action = action;
518 action->state = SIMIX_DONE;
520 /* If we reached this point, the wait simcall must have a timeout */
521 /* Otherwise it shouldn't be enabled and executed by the MC */
525 if (action->comm.src_proc == simcall->issuer)
526 action->state = SIMIX_SRC_TIMEOUT;
528 action->state = SIMIX_DST_TIMEOUT;
531 SIMIX_comm_finish(action);
535 /* If the action has already finish perform the error handling, */
536 /* otherwise set up a waiting timeout on the right side */
537 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
538 SIMIX_comm_finish(action);
539 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
540 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
541 surf_workstation_model->action_data_set(sleep, action);
543 if (simcall->issuer == action->comm.src_proc)
544 action->comm.src_timeout = sleep;
546 action->comm.dst_timeout = sleep;
550 void SIMIX_pre_comm_test(smx_simcall_t simcall)
552 smx_action_t action = simcall->comm_test.comm;
555 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
556 if(simcall->comm_test.result){
557 action->state = SIMIX_DONE;
558 xbt_fifo_push(action->simcalls, simcall);
559 SIMIX_comm_finish(action);
561 SIMIX_simcall_answer(simcall);
566 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
567 if (simcall->comm_test.result) {
568 xbt_fifo_push(action->simcalls, simcall);
569 SIMIX_comm_finish(action);
571 SIMIX_simcall_answer(simcall);
575 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
579 xbt_dynar_t actions = simcall->comm_testany.comms;
580 simcall->comm_testany.result = -1;
584 SIMIX_simcall_answer(simcall);
586 action = xbt_dynar_get_as(actions, idx, smx_action_t);
587 simcall->comm_testany.result = idx;
588 xbt_fifo_push(action->simcalls, simcall);
589 action->state = SIMIX_DONE;
590 SIMIX_comm_finish(action);
595 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
596 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
597 simcall->comm_testany.result = cursor;
598 xbt_fifo_push(action->simcalls, simcall);
599 SIMIX_comm_finish(action);
603 SIMIX_simcall_answer(simcall);
606 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
609 unsigned int cursor = 0;
610 xbt_dynar_t actions = simcall->comm_waitany.comms;
613 action = xbt_dynar_get_as(actions, idx, smx_action_t);
614 xbt_fifo_push(action->simcalls, simcall);
615 simcall->comm_waitany.result = idx;
616 action->state = SIMIX_DONE;
617 SIMIX_comm_finish(action);
621 xbt_dynar_foreach(actions, cursor, action){
622 /* associate this simcall to the the action */
623 xbt_fifo_push(action->simcalls, simcall);
625 /* see if the action is already finished */
626 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
627 SIMIX_comm_finish(action);
633 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
636 unsigned int cursor = 0;
637 xbt_dynar_t actions = simcall->comm_waitany.comms;
639 xbt_dynar_foreach(actions, cursor, action) {
640 xbt_fifo_remove(action->simcalls, simcall);
645 * \brief Starts the simulation of a communication action.
646 * \param action the communication action
648 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
650 /* If both the sender and the receiver are already there, start the communication */
651 if (action->state == SIMIX_READY) {
653 smx_host_t sender = action->comm.src_proc->smx_host;
654 smx_host_t receiver = action->comm.dst_proc->smx_host;
656 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
657 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
659 action->comm.surf_comm = surf_workstation_model->extension.workstation.
660 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
662 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
664 action->state = SIMIX_RUNNING;
666 /* If a link is failed, detect it immediately */
667 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
668 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
669 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
670 action->state = SIMIX_LINK_FAILURE;
671 SIMIX_comm_destroy_internal_actions(action);
674 /* If any of the process is suspend, create the action but stop its execution,
675 it will be restarted when the sender process resume */
676 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
677 SIMIX_process_is_suspended(action->comm.dst_proc)) {
678 /* FIXME: check what should happen with the action state */
680 if (SIMIX_process_is_suspended(action->comm.src_proc))
681 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
682 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
684 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
685 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
687 surf_workstation_model->suspend(action->comm.surf_comm);
694 * \brief Answers the SIMIX simcalls associated to a communication action.
695 * \param action a finished communication action
697 void SIMIX_comm_finish(smx_action_t action)
699 unsigned int destroy_count = 0;
700 smx_simcall_t simcall;
702 while ((simcall = xbt_fifo_shift(action->simcalls))) {
704 /* If a waitany simcall is waiting for this action to finish, then remove
705 it from the other actions in the waitany list. Afterwards, get the
706 position of the actual action in the waitany dynar and
707 return it as the result of the simcall */
708 if (simcall->call == SIMCALL_COMM_WAITANY) {
709 SIMIX_waitany_remove_simcall_from_actions(simcall);
711 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
714 /* If the action is still in a rendez-vous point then remove from it */
715 if (action->comm.rdv)
716 SIMIX_rdv_remove(action->comm.rdv, action);
718 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
720 /* Check out for errors */
721 switch (action->state) {
724 XBT_DEBUG("Communication %p complete!", action);
725 SIMIX_comm_copy_data(action);
728 case SIMIX_SRC_TIMEOUT:
729 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
730 "Communication timeouted because of sender");
733 case SIMIX_DST_TIMEOUT:
734 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
735 "Communication timeouted because of receiver");
738 case SIMIX_SRC_HOST_FAILURE:
739 if (simcall->issuer == action->comm.src_proc)
740 simcall->issuer->context->iwannadie = 1;
741 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
743 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
746 case SIMIX_DST_HOST_FAILURE:
747 if (simcall->issuer == action->comm.dst_proc)
748 simcall->issuer->context->iwannadie = 1;
749 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
751 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
754 case SIMIX_LINK_FAILURE:
755 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
757 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
758 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
759 simcall->issuer->name, simcall->issuer, action->comm.detached);
760 if (action->comm.src_proc == simcall->issuer) {
761 XBT_DEBUG("I'm source");
762 } else if (action->comm.dst_proc == simcall->issuer) {
763 XBT_DEBUG("I'm dest");
765 XBT_DEBUG("I'm neither source nor dest");
767 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
771 if (simcall->issuer == action->comm.dst_proc)
772 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
773 "Communication canceled by the sender");
775 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
776 "Communication canceled by the receiver");
780 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
783 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
784 if (simcall->issuer->doexception) {
785 if (simcall->call == SIMCALL_COMM_WAITANY) {
786 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
788 else if (simcall->call == SIMCALL_COMM_TESTANY) {
789 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
793 if (surf_workstation_model->extension.
794 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
795 simcall->issuer->context->iwannadie = 1;
798 simcall->issuer->waiting_action = NULL;
799 xbt_fifo_remove(simcall->issuer->comms, action);
800 SIMIX_simcall_answer(simcall);
804 while (destroy_count-- > 0)
805 SIMIX_comm_destroy(action);
809 * \brief This function is called when a Surf communication action is finished.
810 * \param action the corresponding Simix communication
812 void SIMIX_post_comm(smx_action_t action)
814 /* Update action state */
815 if (action->comm.src_timeout &&
816 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
817 action->state = SIMIX_SRC_TIMEOUT;
818 else if (action->comm.dst_timeout &&
819 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
820 action->state = SIMIX_DST_TIMEOUT;
821 else if (action->comm.src_timeout &&
822 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
823 action->state = SIMIX_SRC_HOST_FAILURE;
824 else if (action->comm.dst_timeout &&
825 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
826 action->state = SIMIX_DST_HOST_FAILURE;
827 else if (action->comm.surf_comm &&
828 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
829 XBT_DEBUG("Puta madre. Surf says that the link broke");
830 action->state = SIMIX_LINK_FAILURE;
832 action->state = SIMIX_DONE;
834 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
835 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
837 /* destroy the surf actions associated with the Simix communication */
838 SIMIX_comm_destroy_internal_actions(action);
840 /* remove the communication action from the list of pending communications
841 * of both processes (if they still exist) */
842 if (action->comm.src_proc) {
843 xbt_fifo_remove(action->comm.src_proc->comms, action);
845 if (action->comm.dst_proc) {
846 xbt_fifo_remove(action->comm.dst_proc->comms, action);
849 /* if there are simcalls associated with the action, then answer them */
850 if (xbt_fifo_size(action->simcalls)) {
851 SIMIX_comm_finish(action);
855 void SIMIX_comm_cancel(smx_action_t action)
857 /* if the action is a waiting state means that it is still in a rdv */
858 /* so remove from it and delete it */
859 if (action->state == SIMIX_WAITING) {
860 SIMIX_rdv_remove(action->comm.rdv, action);
861 action->state = SIMIX_CANCELED;
863 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
864 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
866 surf_workstation_model->action_cancel(action->comm.surf_comm);
870 void SIMIX_comm_suspend(smx_action_t action)
872 /*FIXME: shall we suspend also the timeout actions? */
873 if (action->comm.surf_comm)
874 surf_workstation_model->suspend(action->comm.surf_comm);
875 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
878 void SIMIX_comm_resume(smx_action_t action)
880 /*FIXME: check what happen with the timeouts */
881 if (action->comm.surf_comm)
882 surf_workstation_model->resume(action->comm.surf_comm);
883 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
887 /************* Action Getters **************/
890 * \brief get the amount remaining from the communication
891 * \param action The communication
893 double SIMIX_comm_get_remains(smx_action_t action)
901 switch (action->state) {
904 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
909 remains = 0; /*FIXME: check what should be returned */
913 remains = 0; /*FIXME: is this correct? */
919 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
921 return action->state;
925 * \brief Return the user data associated to the sender of the communication
926 * \param action The communication
927 * \return the user data
929 void* SIMIX_comm_get_src_data(smx_action_t action)
931 return action->comm.src_data;
935 * \brief Return the user data associated to the receiver of the communication
936 * \param action The communication
937 * \return the user data
939 void* SIMIX_comm_get_dst_data(smx_action_t action)
941 return action->comm.dst_data;
944 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
946 return action->comm.src_proc;
949 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
951 return action->comm.dst_proc;
954 #ifdef HAVE_LATENCY_BOUND_TRACKING
956 * \brief verify if communication is latency bounded
957 * \param comm The communication
959 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
964 if (action->comm.surf_comm){
965 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
966 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
967 XBT_DEBUG("Action limited is %d", action->latency_limited);
969 return action->latency_limited;
973 /******************************************************************************/
974 /* SIMIX_comm_copy_data callbacks */
975 /******************************************************************************/
976 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
977 &SIMIX_comm_copy_pointer_callback;
980 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
982 SIMIX_comm_copy_data_callback = callback;
985 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
987 xbt_assert((buff_size == sizeof(void *)),
988 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
989 *(void **) (comm->comm.dst_buff) = buff;
992 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
994 XBT_DEBUG("Copy the data over");
995 memcpy(comm->comm.dst_buff, buff, buff_size);
996 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
998 comm->comm.src_buff = NULL;
1004 * \brief Copy the communication data from the sender's buffer to the receiver's one
1005 * \param comm The communication
1007 void SIMIX_comm_copy_data(smx_action_t comm)
1009 size_t buff_size = comm->comm.src_buff_size;
1010 /* If there is no data to be copy then return */
1011 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1014 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1016 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1017 comm->comm.src_buff,
1018 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1019 comm->comm.dst_buff, buff_size);
1021 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1022 if (comm->comm.dst_buff_size)
1023 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1025 /* Update the receiver's buffer size to the copied amount */
1026 if (comm->comm.dst_buff_size)
1027 *comm->comm.dst_buff_size = buff_size;
1030 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1032 /* Set the copied flag so we copy data only once */
1033 /* (this function might be called from both communication ends) */
1034 comm->comm.copied = 1;