1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
27 void SIMIX_network_init(void)
29 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
32 void SIMIX_network_exit(void)
34 xbt_dict_free(&rdv_points);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_rdv_t SIMIX_rdv_create(const char *name)
43 /* two processes may have pushed the same rdv_create simcall at the same time */
44 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47 rdv = xbt_new0(s_smx_rvpoint_t, 1);
48 rdv->name = name ? xbt_strdup(name) : NULL;
49 rdv->comm_fifo = xbt_fifo_new();
50 rdv->done_comm_fifo = xbt_fifo_new();
51 rdv->permanent_receiver=NULL;
53 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
56 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
64 xbt_dict_remove(rdv_points, rdv->name);
67 void SIMIX_rdv_free(void *data)
69 XBT_DEBUG("rdv free %p", data);
70 smx_rdv_t rdv = (smx_rdv_t) data;
72 xbt_fifo_free(rdv->comm_fifo);
73 xbt_fifo_free(rdv->done_comm_fifo);
78 xbt_dict_t SIMIX_get_rdv_points()
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
85 return xbt_dict_get_or_null(rdv_points, name);
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
90 smx_action_t comm = NULL;
91 xbt_fifo_item_t item = NULL;
94 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95 if (comm->comm.src_proc->smx_host == host)
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
104 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
108 * \brief get the receiver (process associated to the mailbox)
109 * \param rdv The rendez-vous point
110 * \return process The receiving process (NULL if not set)
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
114 return rdv->permanent_receiver;
118 * \brief set the receiver of the rendez vous point to allow eager sends
119 * \param rdv The rendez-vous point
120 * \param process The receiving process
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
124 rdv->permanent_receiver=process;
128 * \brief Pushes a communication action into a rendez-vous point
129 * \param rdv The rendez-vous point
130 * \param comm The communication action
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
134 xbt_fifo_push(rdv->comm_fifo, comm);
135 comm->comm.rdv = rdv;
139 * \brief Removes a communication action from a rendez-vous point
140 * \param rdv The rendez-vous point
141 * \param comm The communication action
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
145 xbt_fifo_remove(rdv->comm_fifo, comm);
146 comm->comm.rdv = NULL;
150 * \brief Checks if there is a communication action queued in a fifo matching our needs
151 * \param type The type of communication we are looking for (comm_send, comm_recv)
152 * \return The communication action if found, NULL otherwise
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155 int (*match_fun)(void *, void *,smx_action_t),
156 void *this_user_data, smx_action_t my_action)
159 xbt_fifo_item_t item;
160 void* other_user_data = NULL;
162 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163 if (action->comm.type == SIMIX_COMM_SEND) {
164 other_user_data = action->comm.src_data;
165 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166 other_user_data = action->comm.dst_data;
168 if (action->comm.type == type &&
169 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
170 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
171 XBT_DEBUG("Found a matching communication action %p", action);
172 xbt_fifo_remove_item(fifo, item);
173 xbt_fifo_free_item(item);
174 action->comm.refcount++;
175 action->comm.rdv = NULL;
178 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180 action, (int)action->comm.type, (int)type);
182 XBT_DEBUG("No matching communication action found");
188 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
189 * \param type The type of communication we are looking for (comm_send, comm_recv)
190 * \return The communication action if found, NULL otherwise
192 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
193 int (*match_fun)(void *, void *,smx_action_t),
194 void *this_user_data, smx_action_t my_action)
197 xbt_fifo_item_t item;
198 void* other_user_data = NULL;
200 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
201 if (action->comm.type == SIMIX_COMM_SEND) {
202 other_user_data = action->comm.src_data;
203 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
204 other_user_data = action->comm.dst_data;
206 if (action->comm.type == type &&
207 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
208 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
209 XBT_DEBUG("Found a matching communication action %p", action);
210 action->comm.refcount++;
214 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
215 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
216 action, (int)action->comm.type, (int)type);
218 XBT_DEBUG("No matching communication action found");
221 /******************************************************************************/
222 /* Communication Actions */
223 /******************************************************************************/
226 * \brief Creates a new communicate action
227 * \param type The direction of communication (comm_send, comm_recv)
228 * \return The new communicate action
230 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
234 /* alloc structures */
235 act = xbt_mallocator_get(simix_global->action_mallocator);
237 act->type = SIMIX_ACTION_COMMUNICATE;
238 act->state = SIMIX_WAITING;
240 /* set communication */
241 act->comm.type = type;
242 act->comm.refcount = 1;
244 #ifdef HAVE_LATENCY_BOUND_TRACKING
245 //initialize with unknown value
246 act->latency_limited = -1;
250 act->category = NULL;
253 XBT_DEBUG("Create communicate action %p", act);
260 * \brief Destroy a communicate action
261 * \param action The communicate action to be destroyed
263 void SIMIX_comm_destroy(smx_action_t action)
265 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
266 action, action->comm.refcount, (int)action->state);
268 if (action->comm.refcount <= 0) {
269 xbt_backtrace_display_current();
270 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
271 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
273 action->comm.refcount--;
274 if (action->comm.refcount > 0)
276 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
277 action->comm.refcount);
279 #ifdef HAVE_LATENCY_BOUND_TRACKING
280 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
283 xbt_free(action->name);
284 SIMIX_comm_destroy_internal_actions(action);
286 if (action->comm.detached && action->state != SIMIX_DONE) {
287 /* the communication has failed and was detached:
288 * we have to free the buffer */
289 if (action->comm.clean_fun) {
290 action->comm.clean_fun(action->comm.src_buff);
292 action->comm.src_buff = NULL;
295 xbt_mallocator_release(simix_global->action_mallocator, action);
298 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
300 if (action->comm.surf_comm){
301 #ifdef HAVE_LATENCY_BOUND_TRACKING
302 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
304 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
305 action->comm.surf_comm = NULL;
308 if (action->comm.src_timeout){
309 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
310 action->comm.src_timeout = NULL;
313 if (action->comm.dst_timeout){
314 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
315 action->comm.dst_timeout = NULL;
319 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
320 double task_size, double rate,
321 void *src_buff, size_t src_buff_size,
322 int (*match_fun)(void *, void *,smx_action_t),
323 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
327 XBT_DEBUG("send from %p\n", rdv);
329 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
330 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
332 /* Look for communication action matching our needs. We also provide a description of
333 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
335 * If it is not found then push our communication into the rendez-vous point */
336 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
339 other_action = this_action;
341 if (rdv->permanent_receiver!=NULL){
342 //this mailbox is for small messages, which have to be sent right now
343 other_action->state = SIMIX_READY;
344 other_action->comm.dst_proc=rdv->permanent_receiver;
345 other_action->comm.refcount++;
346 other_action->comm.rdv = rdv;
347 xbt_fifo_push(rdv->done_comm_fifo,other_action);
348 other_action->comm.rdv=rdv;
349 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
352 SIMIX_rdv_push(rdv, this_action);
355 XBT_DEBUG("Receive already pushed\n");
357 SIMIX_comm_destroy(this_action);
358 --smx_total_comms; // this creation was a pure waste
360 other_action->state = SIMIX_READY;
361 other_action->comm.type = SIMIX_COMM_READY;
364 xbt_fifo_push(src_proc->comms, other_action);
366 /* if the communication action is detached then decrease the refcount
367 * by one, so it will be eliminated by the receiver's destroy call */
369 other_action->comm.detached = 1;
370 other_action->comm.refcount--;
371 other_action->comm.clean_fun = clean_fun;
373 other_action->comm.clean_fun = NULL;
376 /* Setup the communication action */
377 other_action->comm.src_proc = src_proc;
378 other_action->comm.task_size = task_size;
379 other_action->comm.rate = rate;
380 other_action->comm.src_buff = src_buff;
381 other_action->comm.src_buff_size = src_buff_size;
382 other_action->comm.src_data = data;
384 other_action->comm.match_fun = match_fun;
387 other_action->state = SIMIX_RUNNING;
391 SIMIX_comm_start(other_action);
392 return (detached ? NULL : other_action);
395 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
396 void *dst_buff, size_t *dst_buff_size,
397 int (*match_fun)(void *, void *, smx_action_t), void *data)
399 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
400 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
402 smx_action_t other_action;
403 //communication already done, get it inside the fifo of completed comms
404 //permanent receive v1
405 //int already_received=0;
406 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
408 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
409 //find a match in the already received fifo
410 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
411 //if not found, assume the receiver came first, register it to the mailbox in the classical way
413 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
414 other_action = this_action;
415 SIMIX_rdv_push(rdv, this_action);
417 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
419 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
420 other_action->state = SIMIX_DONE;
421 other_action->comm.type = SIMIX_COMM_DONE;
422 other_action->comm.rdv = NULL;
423 //SIMIX_comm_destroy(this_action);
424 //--smx_total_comms; // this creation was a pure waste
425 //already_received=1;
426 //other_action->comm.refcount--;
428 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
430 // other_action->comm.refcount--;
431 SIMIX_comm_destroy(this_action);
432 --smx_total_comms; // this creation was a pure waste
435 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
437 /* Look for communication action matching our needs. We also provide a description of
438 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
440 * If it is not found then push our communication into the rendez-vous point */
441 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
444 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
445 other_action = this_action;
446 SIMIX_rdv_push(rdv, this_action);
448 SIMIX_comm_destroy(this_action);
449 --smx_total_comms; // this creation was a pure waste
450 other_action->state = SIMIX_READY;
451 other_action->comm.type = SIMIX_COMM_READY;
452 // other_action->comm.refcount--;
454 xbt_fifo_push(dst_proc->comms, other_action);
457 /* Setup communication action */
458 other_action->comm.dst_proc = dst_proc;
459 other_action->comm.dst_buff = dst_buff;
460 other_action->comm.dst_buff_size = dst_buff_size;
461 other_action->comm.dst_data = data;
463 other_action->comm.match_fun = match_fun;
466 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
467 SIMIX_comm_copy_data(other_action);*/
471 other_action->state = SIMIX_RUNNING;
475 SIMIX_comm_start(other_action);
481 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
482 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
484 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
485 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487 smx_action_t other_action;
488 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
489 //find a match in the already received fifo
490 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
492 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
494 if(other_action)other_action->comm.refcount--;
496 SIMIX_comm_destroy(this_action);
501 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
504 /* the simcall may be a wait, a send or a recv */
507 /* Associate this simcall to the wait action */
508 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
510 xbt_fifo_push(action->simcalls, simcall);
511 simcall->issuer->waiting_action = action;
515 action->state = SIMIX_DONE;
517 /* If we reached this point, the wait simcall must have a timeout */
518 /* Otherwise it shouldn't be enabled and executed by the MC */
522 if (action->comm.src_proc == simcall->issuer)
523 action->state = SIMIX_SRC_TIMEOUT;
525 action->state = SIMIX_DST_TIMEOUT;
528 SIMIX_comm_finish(action);
532 /* If the action has already finish perform the error handling, */
533 /* otherwise set up a waiting timeout on the right side */
534 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
535 SIMIX_comm_finish(action);
536 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
537 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
538 surf_workstation_model->action_data_set(sleep, action);
540 if (simcall->issuer == action->comm.src_proc)
541 action->comm.src_timeout = sleep;
543 action->comm.dst_timeout = sleep;
547 void SIMIX_pre_comm_test(smx_simcall_t simcall)
549 smx_action_t action = simcall->comm_test.comm;
552 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
553 if(simcall->comm_test.result){
554 action->state = SIMIX_DONE;
555 xbt_fifo_push(action->simcalls, simcall);
556 SIMIX_comm_finish(action);
558 SIMIX_simcall_answer(simcall);
563 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
564 if (simcall->comm_test.result) {
565 xbt_fifo_push(action->simcalls, simcall);
566 SIMIX_comm_finish(action);
568 SIMIX_simcall_answer(simcall);
572 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
576 xbt_dynar_t actions = simcall->comm_testany.comms;
577 simcall->comm_testany.result = -1;
581 SIMIX_simcall_answer(simcall);
583 action = xbt_dynar_get_as(actions, idx, smx_action_t);
584 simcall->comm_testany.result = idx;
585 xbt_fifo_push(action->simcalls, simcall);
586 action->state = SIMIX_DONE;
587 SIMIX_comm_finish(action);
592 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
593 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
594 simcall->comm_testany.result = cursor;
595 xbt_fifo_push(action->simcalls, simcall);
596 SIMIX_comm_finish(action);
600 SIMIX_simcall_answer(simcall);
603 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
606 unsigned int cursor = 0;
607 xbt_dynar_t actions = simcall->comm_waitany.comms;
610 action = xbt_dynar_get_as(actions, idx, smx_action_t);
611 xbt_fifo_push(action->simcalls, simcall);
612 simcall->comm_waitany.result = idx;
613 action->state = SIMIX_DONE;
614 SIMIX_comm_finish(action);
618 xbt_dynar_foreach(actions, cursor, action){
619 /* associate this simcall to the the action */
620 xbt_fifo_push(action->simcalls, simcall);
622 /* see if the action is already finished */
623 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
624 SIMIX_comm_finish(action);
630 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
633 unsigned int cursor = 0;
634 xbt_dynar_t actions = simcall->comm_waitany.comms;
636 xbt_dynar_foreach(actions, cursor, action) {
637 xbt_fifo_remove(action->simcalls, simcall);
642 * \brief Starts the simulation of a communication action.
643 * \param action the communication action
645 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
647 /* If both the sender and the receiver are already there, start the communication */
648 if (action->state == SIMIX_READY) {
650 smx_host_t sender = action->comm.src_proc->smx_host;
651 smx_host_t receiver = action->comm.dst_proc->smx_host;
653 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
654 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
656 action->comm.surf_comm = surf_workstation_model->extension.workstation.
657 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
659 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
661 action->state = SIMIX_RUNNING;
663 /* If a link is failed, detect it immediately */
664 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
665 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
666 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
667 action->state = SIMIX_LINK_FAILURE;
668 SIMIX_comm_destroy_internal_actions(action);
671 /* If any of the process is suspend, create the action but stop its execution,
672 it will be restarted when the sender process resume */
673 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
674 SIMIX_process_is_suspended(action->comm.dst_proc)) {
675 /* FIXME: check what should happen with the action state */
677 if (SIMIX_process_is_suspended(action->comm.src_proc))
678 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
679 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
681 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
682 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
684 surf_workstation_model->suspend(action->comm.surf_comm);
691 * \brief Answers the SIMIX simcalls associated to a communication action.
692 * \param action a finished communication action
694 void SIMIX_comm_finish(smx_action_t action)
696 unsigned int destroy_count = 0;
697 smx_simcall_t simcall;
699 while ((simcall = xbt_fifo_shift(action->simcalls))) {
701 /* If a waitany simcall is waiting for this action to finish, then remove
702 it from the other actions in the waitany list. Afterwards, get the
703 position of the actual action in the waitany dynar and
704 return it as the result of the simcall */
705 if (simcall->call == SIMCALL_COMM_WAITANY) {
706 SIMIX_waitany_remove_simcall_from_actions(simcall);
708 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
711 /* If the action is still in a rendez-vous point then remove from it */
712 if (action->comm.rdv)
713 SIMIX_rdv_remove(action->comm.rdv, action);
715 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
717 /* Check out for errors */
718 switch (action->state) {
721 XBT_DEBUG("Communication %p complete!", action);
722 SIMIX_comm_copy_data(action);
725 case SIMIX_SRC_TIMEOUT:
726 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
727 "Communication timeouted because of sender");
730 case SIMIX_DST_TIMEOUT:
731 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
732 "Communication timeouted because of receiver");
735 case SIMIX_SRC_HOST_FAILURE:
736 if (simcall->issuer == action->comm.src_proc)
737 simcall->issuer->context->iwannadie = 1;
738 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
740 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
743 case SIMIX_DST_HOST_FAILURE:
744 if (simcall->issuer == action->comm.dst_proc)
745 simcall->issuer->context->iwannadie = 1;
746 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
748 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
751 case SIMIX_LINK_FAILURE:
752 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
754 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
755 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
756 simcall->issuer->name, simcall->issuer, action->comm.detached);
757 if (action->comm.src_proc == simcall->issuer) {
758 XBT_DEBUG("I'm source");
759 } else if (action->comm.dst_proc == simcall->issuer) {
760 XBT_DEBUG("I'm dest");
762 XBT_DEBUG("I'm neither source nor dest");
764 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
768 if (simcall->issuer == action->comm.dst_proc)
769 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
770 "Communication canceled by the sender");
772 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
773 "Communication canceled by the receiver");
777 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
780 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
781 if (simcall->issuer->doexception) {
782 if (simcall->call == SIMCALL_COMM_WAITANY) {
783 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
785 else if (simcall->call == SIMCALL_COMM_TESTANY) {
786 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
790 if (surf_workstation_model->extension.
791 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
792 simcall->issuer->context->iwannadie = 1;
795 simcall->issuer->waiting_action = NULL;
796 xbt_fifo_remove(simcall->issuer->comms, action);
797 SIMIX_simcall_answer(simcall);
801 while (destroy_count-- > 0)
802 SIMIX_comm_destroy(action);
806 * \brief This function is called when a Surf communication action is finished.
807 * \param action the corresponding Simix communication
809 void SIMIX_post_comm(smx_action_t action)
811 /* Update action state */
812 if (action->comm.src_timeout &&
813 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
814 action->state = SIMIX_SRC_TIMEOUT;
815 else if (action->comm.dst_timeout &&
816 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
817 action->state = SIMIX_DST_TIMEOUT;
818 else if (action->comm.src_timeout &&
819 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
820 action->state = SIMIX_SRC_HOST_FAILURE;
821 else if (action->comm.dst_timeout &&
822 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
823 action->state = SIMIX_DST_HOST_FAILURE;
824 else if (action->comm.surf_comm &&
825 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
826 XBT_DEBUG("Puta madre. Surf says that the link broke");
827 action->state = SIMIX_LINK_FAILURE;
829 action->state = SIMIX_DONE;
831 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
832 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
834 /* destroy the surf actions associated with the Simix communication */
835 SIMIX_comm_destroy_internal_actions(action);
837 /* remove the communication action from the list of pending communications
838 * of both processes (if they still exist) */
839 if (action->comm.src_proc) {
840 xbt_fifo_remove(action->comm.src_proc->comms, action);
842 if (action->comm.dst_proc) {
843 xbt_fifo_remove(action->comm.dst_proc->comms, action);
846 /* if there are simcalls associated with the action, then answer them */
847 if (xbt_fifo_size(action->simcalls)) {
848 SIMIX_comm_finish(action);
852 void SIMIX_comm_cancel(smx_action_t action)
854 /* if the action is a waiting state means that it is still in a rdv */
855 /* so remove from it and delete it */
856 if (action->state == SIMIX_WAITING) {
857 SIMIX_rdv_remove(action->comm.rdv, action);
858 action->state = SIMIX_CANCELED;
860 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
861 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
863 surf_workstation_model->action_cancel(action->comm.surf_comm);
867 void SIMIX_comm_suspend(smx_action_t action)
869 /*FIXME: shall we suspend also the timeout actions? */
870 if (action->comm.surf_comm)
871 surf_workstation_model->suspend(action->comm.surf_comm);
872 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
875 void SIMIX_comm_resume(smx_action_t action)
877 /*FIXME: check what happen with the timeouts */
878 if (action->comm.surf_comm)
879 surf_workstation_model->resume(action->comm.surf_comm);
880 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
884 /************* Action Getters **************/
887 * \brief get the amount remaining from the communication
888 * \param action The communication
890 double SIMIX_comm_get_remains(smx_action_t action)
898 switch (action->state) {
901 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
906 remains = 0; /*FIXME: check what should be returned */
910 remains = 0; /*FIXME: is this correct? */
916 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
918 return action->state;
922 * \brief Return the user data associated to the sender of the communication
923 * \param action The communication
924 * \return the user data
926 void* SIMIX_comm_get_src_data(smx_action_t action)
928 return action->comm.src_data;
932 * \brief Return the user data associated to the receiver of the communication
933 * \param action The communication
934 * \return the user data
936 void* SIMIX_comm_get_dst_data(smx_action_t action)
938 return action->comm.dst_data;
941 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
943 return action->comm.src_proc;
946 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
948 return action->comm.dst_proc;
951 #ifdef HAVE_LATENCY_BOUND_TRACKING
953 * \brief verify if communication is latency bounded
954 * \param comm The communication
956 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
961 if (action->comm.surf_comm){
962 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
963 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
964 XBT_DEBUG("Action limited is %d", action->latency_limited);
966 return action->latency_limited;
970 /******************************************************************************/
971 /* SIMIX_comm_copy_data callbacks */
972 /******************************************************************************/
973 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
974 &SIMIX_comm_copy_pointer_callback;
977 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
979 SIMIX_comm_copy_data_callback = callback;
982 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
984 xbt_assert((buff_size == sizeof(void *)),
985 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
986 *(void **) (comm->comm.dst_buff) = buff;
989 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
991 XBT_DEBUG("Copy the data over");
992 memcpy(comm->comm.dst_buff, buff, buff_size);
993 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
995 comm->comm.src_buff = NULL;
1001 * \brief Copy the communication data from the sender's buffer to the receiver's one
1002 * \param comm The communication
1004 void SIMIX_comm_copy_data(smx_action_t comm)
1006 size_t buff_size = comm->comm.src_buff_size;
1007 /* If there is no data to be copy then return */
1008 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1011 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1013 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1014 comm->comm.src_buff,
1015 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1016 comm->comm.dst_buff, buff_size);
1018 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1019 if (comm->comm.dst_buff_size)
1020 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1022 /* Update the receiver's buffer size to the copied amount */
1023 if (comm->comm.dst_buff_size)
1024 *comm->comm.dst_buff_size = buff_size;
1027 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1029 /* Set the copied flag so we copy data only once */
1030 /* (this function might be called from both communication ends) */
1031 comm->comm.copied = 1;