1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
27 void SIMIX_network_init(void)
29 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
32 void SIMIX_network_exit(void)
34 xbt_dict_free(&rdv_points);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_rdv_t SIMIX_rdv_create(const char *name)
43 /* two processes may have pushed the same rdv_create simcall at the same time */
44 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47 rdv = xbt_new0(s_smx_rvpoint_t, 1);
48 rdv->name = name ? xbt_strdup(name) : NULL;
49 rdv->comm_fifo = xbt_fifo_new();
50 rdv->done_comm_fifo = xbt_fifo_new();
51 rdv->permanent_receiver=NULL;
53 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
56 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
64 xbt_dict_remove(rdv_points, rdv->name);
67 void SIMIX_rdv_free(void *data)
69 XBT_DEBUG("rdv free %p", data);
70 smx_rdv_t rdv = (smx_rdv_t) data;
72 xbt_fifo_free(rdv->comm_fifo);
73 xbt_fifo_free(rdv->done_comm_fifo);
78 xbt_dict_t SIMIX_get_rdv_points()
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
85 return xbt_dict_get_or_null(rdv_points, name);
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
90 smx_action_t comm = NULL;
91 xbt_fifo_item_t item = NULL;
94 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95 if (comm->comm.src_proc->smx_host == host)
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
104 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
108 * \brief get the receiver (process associated to the mailbox)
109 * \param rdv The rendez-vous point
110 * \return process The receiving process (NULL if not set)
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
114 return rdv->permanent_receiver;
118 * \brief set the receiver of the rendez vous point to allow eager sends
119 * \param rdv The rendez-vous point
120 * \param process The receiving process
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
124 rdv->permanent_receiver=process;
128 * \brief Pushes a communication action into a rendez-vous point
129 * \param rdv The rendez-vous point
130 * \param comm The communication action
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
134 xbt_fifo_push(rdv->comm_fifo, comm);
135 comm->comm.rdv = rdv;
139 * \brief Removes a communication action from a rendez-vous point
140 * \param rdv The rendez-vous point
141 * \param comm The communication action
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
145 xbt_fifo_remove(rdv->comm_fifo, comm);
146 comm->comm.rdv = NULL;
150 * \brief Checks if there is a communication action queued in a fifo matching our needs
151 * \param type The type of communication we are looking for (comm_send, comm_recv)
152 * \return The communication action if found, NULL otherwise
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155 int (*match_fun)(void *, void *,smx_action_t),
156 void *this_user_data, smx_action_t my_action)
159 xbt_fifo_item_t item;
160 void* other_user_data = NULL;
162 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163 if (action->comm.type == SIMIX_COMM_SEND) {
164 other_user_data = action->comm.src_data;
165 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166 other_user_data = action->comm.dst_data;
168 if (action->comm.type == type &&
169 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
170 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
171 XBT_DEBUG("Found a matching communication action %p", action);
172 xbt_fifo_remove_item(fifo, item);
173 xbt_fifo_free_item(item);
174 action->comm.refcount++;
175 action->comm.rdv = NULL;
178 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180 action, (int)action->comm.type, (int)type);
182 XBT_DEBUG("No matching communication action found");
187 /******************************************************************************/
188 /* Communication Actions */
189 /******************************************************************************/
192 * \brief Creates a new communicate action
193 * \param type The direction of communication (comm_send, comm_recv)
194 * \return The new communicate action
196 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
200 /* alloc structures */
201 act = xbt_mallocator_get(simix_global->action_mallocator);
203 act->type = SIMIX_ACTION_COMMUNICATE;
204 act->state = SIMIX_WAITING;
206 /* set communication */
207 act->comm.type = type;
208 act->comm.refcount = 1;
210 #ifdef HAVE_LATENCY_BOUND_TRACKING
211 //initialize with unknown value
212 act->latency_limited = -1;
216 act->category = NULL;
219 XBT_DEBUG("Create communicate action %p", act);
226 * \brief Destroy a communicate action
227 * \param action The communicate action to be destroyed
229 void SIMIX_comm_destroy(smx_action_t action)
231 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
232 action, action->comm.refcount, (int)action->state);
234 if (action->comm.refcount <= 0) {
235 xbt_backtrace_display_current();
236 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
237 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
239 action->comm.refcount--;
240 if (action->comm.refcount > 0)
242 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
243 action->comm.refcount);
245 #ifdef HAVE_LATENCY_BOUND_TRACKING
246 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
249 xbt_free(action->name);
250 SIMIX_comm_destroy_internal_actions(action);
252 if (action->comm.detached && action->state != SIMIX_DONE) {
253 /* the communication has failed and was detached:
254 * we have to free the buffer */
255 if (action->comm.clean_fun) {
256 action->comm.clean_fun(action->comm.src_buff);
258 action->comm.src_buff = NULL;
261 xbt_mallocator_release(simix_global->action_mallocator, action);
264 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
266 if (action->comm.surf_comm){
267 #ifdef HAVE_LATENCY_BOUND_TRACKING
268 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
270 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
271 action->comm.surf_comm = NULL;
274 if (action->comm.src_timeout){
275 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
276 action->comm.src_timeout = NULL;
279 if (action->comm.dst_timeout){
280 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
281 action->comm.dst_timeout = NULL;
285 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
286 double task_size, double rate,
287 void *src_buff, size_t src_buff_size,
288 int (*match_fun)(void *, void *,smx_action_t),
289 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
293 XBT_DEBUG("send from %p\n", rdv);
295 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
296 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
298 /* Look for communication action matching our needs. We also provide a description of
299 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
301 * If it is not found then push our communication into the rendez-vous point */
302 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
305 other_action = this_action;
307 if (rdv->permanent_receiver!=NULL){
308 //this mailbox is for small messages, which have to be sent right now
309 other_action->state = SIMIX_READY;
310 other_action->comm.dst_proc=rdv->permanent_receiver;
311 other_action->comm.refcount++;
312 other_action->comm.rdv = rdv;
313 xbt_fifo_push(rdv->done_comm_fifo,other_action);
314 other_action->comm.rdv=rdv;
315 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
318 SIMIX_rdv_push(rdv, this_action);
321 XBT_DEBUG("Receive already pushed\n");
323 SIMIX_comm_destroy(this_action);
324 --smx_total_comms; // this creation was a pure waste
326 other_action->state = SIMIX_READY;
327 other_action->comm.type = SIMIX_COMM_READY;
330 xbt_fifo_push(src_proc->comms, other_action);
332 /* if the communication action is detached then decrease the refcount
333 * by one, so it will be eliminated by the receiver's destroy call */
335 other_action->comm.detached = 1;
336 other_action->comm.refcount--;
337 other_action->comm.clean_fun = clean_fun;
339 other_action->comm.clean_fun = NULL;
342 /* Setup the communication action */
343 other_action->comm.src_proc = src_proc;
344 other_action->comm.task_size = task_size;
345 other_action->comm.rate = rate;
346 other_action->comm.src_buff = src_buff;
347 other_action->comm.src_buff_size = src_buff_size;
348 other_action->comm.src_data = data;
350 other_action->comm.match_fun = match_fun;
353 other_action->state = SIMIX_RUNNING;
357 SIMIX_comm_start(other_action);
358 return (detached ? NULL : other_action);
361 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
362 void *dst_buff, size_t *dst_buff_size,
363 int (*match_fun)(void *, void *, smx_action_t), void *data)
365 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
366 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
368 smx_action_t other_action;
369 //communication already done, get it inside the fifo of completed comms
370 //permanent receive v1
371 //int already_received=0;
372 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
374 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
375 //find a match in the already received fifo
376 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
377 //if not found, assume the receiver came first, register it to the mailbox in the classical way
379 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
380 other_action = this_action;
381 SIMIX_rdv_push(rdv, this_action);
383 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
385 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
386 other_action->state = SIMIX_DONE;
387 other_action->comm.type = SIMIX_COMM_DONE;
388 other_action->comm.rdv = NULL;
389 SIMIX_comm_destroy(this_action);
390 --smx_total_comms; // this creation was a pure waste
391 //already_received=1;
392 other_action->comm.refcount--;
394 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
396 other_action->comm.refcount--;
397 SIMIX_comm_destroy(this_action);
398 --smx_total_comms; // this creation was a pure waste
401 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
403 /* Look for communication action matching our needs. We also provide a description of
404 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
406 * If it is not found then push our communication into the rendez-vous point */
407 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
410 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
411 other_action = this_action;
412 SIMIX_rdv_push(rdv, this_action);
414 SIMIX_comm_destroy(this_action);
415 --smx_total_comms; // this creation was a pure waste
416 other_action->state = SIMIX_READY;
417 other_action->comm.type = SIMIX_COMM_READY;
418 xbt_fifo_push(dst_proc->comms, other_action);
424 /* Setup communication action */
425 other_action->comm.dst_proc = dst_proc;
426 other_action->comm.dst_buff = dst_buff;
427 other_action->comm.dst_buff_size = dst_buff_size;
428 other_action->comm.dst_data = data;
430 other_action->comm.match_fun = match_fun;
433 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
434 SIMIX_comm_copy_data(other_action);*/
438 other_action->state = SIMIX_RUNNING;
442 SIMIX_comm_start(other_action);
447 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
450 /* the simcall may be a wait, a send or a recv */
453 /* Associate this simcall to the wait action */
454 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
456 xbt_fifo_push(action->simcalls, simcall);
457 simcall->issuer->waiting_action = action;
461 action->state = SIMIX_DONE;
463 /* If we reached this point, the wait simcall must have a timeout */
464 /* Otherwise it shouldn't be enabled and executed by the MC */
468 if (action->comm.src_proc == simcall->issuer)
469 action->state = SIMIX_SRC_TIMEOUT;
471 action->state = SIMIX_DST_TIMEOUT;
474 SIMIX_comm_finish(action);
478 /* If the action has already finish perform the error handling, */
479 /* otherwise set up a waiting timeout on the right side */
480 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
481 SIMIX_comm_finish(action);
482 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
483 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
484 surf_workstation_model->action_data_set(sleep, action);
486 if (simcall->issuer == action->comm.src_proc)
487 action->comm.src_timeout = sleep;
489 action->comm.dst_timeout = sleep;
493 void SIMIX_pre_comm_test(smx_simcall_t simcall)
495 smx_action_t action = simcall->comm_test.comm;
498 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
499 if(simcall->comm_test.result){
500 action->state = SIMIX_DONE;
501 xbt_fifo_push(action->simcalls, simcall);
502 SIMIX_comm_finish(action);
504 SIMIX_simcall_answer(simcall);
509 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
510 if (simcall->comm_test.result) {
511 xbt_fifo_push(action->simcalls, simcall);
512 SIMIX_comm_finish(action);
514 SIMIX_simcall_answer(simcall);
518 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
522 xbt_dynar_t actions = simcall->comm_testany.comms;
523 simcall->comm_testany.result = -1;
527 SIMIX_simcall_answer(simcall);
529 action = xbt_dynar_get_as(actions, idx, smx_action_t);
530 simcall->comm_testany.result = idx;
531 xbt_fifo_push(action->simcalls, simcall);
532 action->state = SIMIX_DONE;
533 SIMIX_comm_finish(action);
538 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
539 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
540 simcall->comm_testany.result = cursor;
541 xbt_fifo_push(action->simcalls, simcall);
542 SIMIX_comm_finish(action);
546 SIMIX_simcall_answer(simcall);
549 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
552 unsigned int cursor = 0;
553 xbt_dynar_t actions = simcall->comm_waitany.comms;
556 action = xbt_dynar_get_as(actions, idx, smx_action_t);
557 xbt_fifo_push(action->simcalls, simcall);
558 simcall->comm_waitany.result = idx;
559 action->state = SIMIX_DONE;
560 SIMIX_comm_finish(action);
564 xbt_dynar_foreach(actions, cursor, action){
565 /* associate this simcall to the the action */
566 xbt_fifo_push(action->simcalls, simcall);
568 /* see if the action is already finished */
569 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
570 SIMIX_comm_finish(action);
576 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
579 unsigned int cursor = 0;
580 xbt_dynar_t actions = simcall->comm_waitany.comms;
582 xbt_dynar_foreach(actions, cursor, action) {
583 xbt_fifo_remove(action->simcalls, simcall);
588 * \brief Starts the simulation of a communication action.
589 * \param action the communication action
591 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
593 /* If both the sender and the receiver are already there, start the communication */
594 if (action->state == SIMIX_READY) {
596 smx_host_t sender = action->comm.src_proc->smx_host;
597 smx_host_t receiver = action->comm.dst_proc->smx_host;
599 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
600 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
602 action->comm.surf_comm = surf_workstation_model->extension.workstation.
603 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
605 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
607 action->state = SIMIX_RUNNING;
609 /* If a link is failed, detect it immediately */
610 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
611 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
612 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
613 action->state = SIMIX_LINK_FAILURE;
614 SIMIX_comm_destroy_internal_actions(action);
617 /* If any of the process is suspend, create the action but stop its execution,
618 it will be restarted when the sender process resume */
619 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
620 SIMIX_process_is_suspended(action->comm.dst_proc)) {
621 /* FIXME: check what should happen with the action state */
623 if (SIMIX_process_is_suspended(action->comm.src_proc))
624 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
625 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
627 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
628 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
630 surf_workstation_model->suspend(action->comm.surf_comm);
637 * \brief Answers the SIMIX simcalls associated to a communication action.
638 * \param action a finished communication action
640 void SIMIX_comm_finish(smx_action_t action)
642 unsigned int destroy_count = 0;
643 smx_simcall_t simcall;
645 while ((simcall = xbt_fifo_shift(action->simcalls))) {
647 /* If a waitany simcall is waiting for this action to finish, then remove
648 it from the other actions in the waitany list. Afterwards, get the
649 position of the actual action in the waitany dynar and
650 return it as the result of the simcall */
651 if (simcall->call == SIMCALL_COMM_WAITANY) {
652 SIMIX_waitany_remove_simcall_from_actions(simcall);
654 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
657 /* If the action is still in a rendez-vous point then remove from it */
658 if (action->comm.rdv)
659 SIMIX_rdv_remove(action->comm.rdv, action);
661 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
663 /* Check out for errors */
664 switch (action->state) {
667 XBT_DEBUG("Communication %p complete!", action);
668 SIMIX_comm_copy_data(action);
671 case SIMIX_SRC_TIMEOUT:
672 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
673 "Communication timeouted because of sender");
676 case SIMIX_DST_TIMEOUT:
677 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
678 "Communication timeouted because of receiver");
681 case SIMIX_SRC_HOST_FAILURE:
682 if (simcall->issuer == action->comm.src_proc)
683 simcall->issuer->context->iwannadie = 1;
684 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
686 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
689 case SIMIX_DST_HOST_FAILURE:
690 if (simcall->issuer == action->comm.dst_proc)
691 simcall->issuer->context->iwannadie = 1;
692 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
694 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
697 case SIMIX_LINK_FAILURE:
698 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
700 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
701 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
702 simcall->issuer->name, simcall->issuer, action->comm.detached);
703 if (action->comm.src_proc == simcall->issuer) {
704 XBT_DEBUG("I'm source");
705 } else if (action->comm.dst_proc == simcall->issuer) {
706 XBT_DEBUG("I'm dest");
708 XBT_DEBUG("I'm neither source nor dest");
710 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
714 if (simcall->issuer == action->comm.dst_proc)
715 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
716 "Communication canceled by the sender");
718 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
719 "Communication canceled by the receiver");
723 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
726 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
727 if (simcall->issuer->doexception) {
728 if (simcall->call == SIMCALL_COMM_WAITANY) {
729 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
731 else if (simcall->call == SIMCALL_COMM_TESTANY) {
732 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
736 if (surf_workstation_model->extension.
737 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
738 simcall->issuer->context->iwannadie = 1;
741 simcall->issuer->waiting_action = NULL;
742 xbt_fifo_remove(simcall->issuer->comms, action);
743 SIMIX_simcall_answer(simcall);
747 while (destroy_count-- > 0)
748 SIMIX_comm_destroy(action);
752 * \brief This function is called when a Surf communication action is finished.
753 * \param action the corresponding Simix communication
755 void SIMIX_post_comm(smx_action_t action)
757 /* Update action state */
758 if (action->comm.src_timeout &&
759 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
760 action->state = SIMIX_SRC_TIMEOUT;
761 else if (action->comm.dst_timeout &&
762 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
763 action->state = SIMIX_DST_TIMEOUT;
764 else if (action->comm.src_timeout &&
765 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
766 action->state = SIMIX_SRC_HOST_FAILURE;
767 else if (action->comm.dst_timeout &&
768 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
769 action->state = SIMIX_DST_HOST_FAILURE;
770 else if (action->comm.surf_comm &&
771 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
772 XBT_DEBUG("Puta madre. Surf says that the link broke");
773 action->state = SIMIX_LINK_FAILURE;
775 action->state = SIMIX_DONE;
777 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
778 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
780 /* destroy the surf actions associated with the Simix communication */
781 SIMIX_comm_destroy_internal_actions(action);
783 /* remove the communication action from the list of pending communications
784 * of both processes (if they still exist) */
785 if (action->comm.src_proc) {
786 xbt_fifo_remove(action->comm.src_proc->comms, action);
788 if (action->comm.dst_proc) {
789 xbt_fifo_remove(action->comm.dst_proc->comms, action);
792 /* if there are simcalls associated with the action, then answer them */
793 if (xbt_fifo_size(action->simcalls)) {
794 SIMIX_comm_finish(action);
798 void SIMIX_comm_cancel(smx_action_t action)
800 /* if the action is a waiting state means that it is still in a rdv */
801 /* so remove from it and delete it */
802 if (action->state == SIMIX_WAITING) {
803 SIMIX_rdv_remove(action->comm.rdv, action);
804 action->state = SIMIX_CANCELED;
806 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
807 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
809 surf_workstation_model->action_cancel(action->comm.surf_comm);
813 void SIMIX_comm_suspend(smx_action_t action)
815 /*FIXME: shall we suspend also the timeout actions? */
816 if (action->comm.surf_comm)
817 surf_workstation_model->suspend(action->comm.surf_comm);
818 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
821 void SIMIX_comm_resume(smx_action_t action)
823 /*FIXME: check what happen with the timeouts */
824 if (action->comm.surf_comm)
825 surf_workstation_model->resume(action->comm.surf_comm);
826 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
830 /************* Action Getters **************/
833 * \brief get the amount remaining from the communication
834 * \param action The communication
836 double SIMIX_comm_get_remains(smx_action_t action)
844 switch (action->state) {
847 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
852 remains = 0; /*FIXME: check what should be returned */
856 remains = 0; /*FIXME: is this correct? */
862 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
864 return action->state;
868 * \brief Return the user data associated to the sender of the communication
869 * \param action The communication
870 * \return the user data
872 void* SIMIX_comm_get_src_data(smx_action_t action)
874 return action->comm.src_data;
878 * \brief Return the user data associated to the receiver of the communication
879 * \param action The communication
880 * \return the user data
882 void* SIMIX_comm_get_dst_data(smx_action_t action)
884 return action->comm.dst_data;
887 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
889 return action->comm.src_proc;
892 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
894 return action->comm.dst_proc;
897 #ifdef HAVE_LATENCY_BOUND_TRACKING
899 * \brief verify if communication is latency bounded
900 * \param comm The communication
902 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
907 if (action->comm.surf_comm){
908 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
909 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
910 XBT_DEBUG("Action limited is %d", action->latency_limited);
912 return action->latency_limited;
916 /******************************************************************************/
917 /* SIMIX_comm_copy_data callbacks */
918 /******************************************************************************/
919 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
920 &SIMIX_comm_copy_pointer_callback;
923 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
925 SIMIX_comm_copy_data_callback = callback;
928 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
930 xbt_assert((buff_size == sizeof(void *)),
931 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
932 *(void **) (comm->comm.dst_buff) = buff;
935 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
937 XBT_DEBUG("Copy the data over");
938 memcpy(comm->comm.dst_buff, buff, buff_size);
939 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
941 comm->comm.src_buff = NULL;
947 * \brief Copy the communication data from the sender's buffer to the receiver's one
948 * \param comm The communication
950 void SIMIX_comm_copy_data(smx_action_t comm)
952 size_t buff_size = comm->comm.src_buff_size;
953 /* If there is no data to be copy then return */
954 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
957 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
959 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
961 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
962 comm->comm.dst_buff, buff_size);
964 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
965 if (comm->comm.dst_buff_size)
966 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
968 /* Update the receiver's buffer size to the copied amount */
969 if (comm->comm.dst_buff_size)
970 *comm->comm.dst_buff_size = buff_size;
973 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
975 /* Set the copied flag so we copy data only once */
976 /* (this function might be called from both communication ends) */
977 comm->comm.copied = 1;