1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
27 void SIMIX_network_init(void)
29 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
32 void SIMIX_network_exit(void)
34 xbt_dict_free(&rdv_points);
37 /******************************************************************************/
38 /* Rendez-Vous Points */
39 /******************************************************************************/
41 smx_rdv_t SIMIX_rdv_create(const char *name)
43 /* two processes may have pushed the same rdv_create simcall at the same time */
44 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47 rdv = xbt_new0(s_smx_rvpoint_t, 1);
48 rdv->name = name ? xbt_strdup(name) : NULL;
49 rdv->comm_fifo = xbt_fifo_new();
50 rdv->done_comm_fifo = xbt_fifo_new();
51 rdv->permanent_receiver=NULL;
53 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
56 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
64 xbt_dict_remove(rdv_points, rdv->name);
67 void SIMIX_rdv_free(void *data)
69 XBT_DEBUG("rdv free %p", data);
70 smx_rdv_t rdv = (smx_rdv_t) data;
72 xbt_fifo_free(rdv->comm_fifo);
73 xbt_fifo_free(rdv->done_comm_fifo);
78 xbt_dict_t SIMIX_get_rdv_points()
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
85 return xbt_dict_get_or_null(rdv_points, name);
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
90 smx_action_t comm = NULL;
91 xbt_fifo_item_t item = NULL;
94 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95 if (comm->comm.src_proc->smx_host == host)
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
104 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
108 * \brief get the receiver (process associated to the mailbox)
109 * \param rdv The rendez-vous point
110 * \return process The receiving process (NULL if not set)
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
114 return rdv->permanent_receiver;
118 * \brief set the receiver of the rendez vous point to allow eager sends
119 * \param rdv The rendez-vous point
120 * \param process The receiving process
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
124 rdv->permanent_receiver=process;
128 * \brief Pushes a communication action into a rendez-vous point
129 * \param rdv The rendez-vous point
130 * \param comm The communication action
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
134 xbt_fifo_push(rdv->comm_fifo, comm);
135 comm->comm.rdv = rdv;
139 * \brief Removes a communication action from a rendez-vous point
140 * \param rdv The rendez-vous point
141 * \param comm The communication action
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
145 xbt_fifo_remove(rdv->comm_fifo, comm);
146 comm->comm.rdv = NULL;
150 * \brief Checks if there is a communication action queued in a fifo matching our needs
151 * \param type The type of communication we are looking for (comm_send, comm_recv)
152 * \return The communication action if found, NULL otherwise
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155 int (*match_fun)(void *, void *,smx_action_t),
156 void *this_user_data, smx_action_t my_action)
159 xbt_fifo_item_t item;
160 void* other_user_data = NULL;
162 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163 if (action->comm.type == SIMIX_COMM_SEND) {
164 other_user_data = action->comm.src_data;
165 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166 other_user_data = action->comm.dst_data;
168 if (action->comm.type == type &&
169 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
170 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
171 XBT_DEBUG("Found a matching communication action %p", action);
172 xbt_fifo_remove_item(fifo, item);
173 xbt_fifo_free_item(item);
174 action->comm.refcount++;
175 action->comm.rdv = NULL;
178 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180 action, (int)action->comm.type, (int)type);
182 XBT_DEBUG("No matching communication action found");
187 /******************************************************************************/
188 /* Communication Actions */
189 /******************************************************************************/
192 * \brief Creates a new communicate action
193 * \param type The direction of communication (comm_send, comm_recv)
194 * \return The new communicate action
196 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
200 /* alloc structures */
201 act = xbt_mallocator_get(simix_global->action_mallocator);
203 act->type = SIMIX_ACTION_COMMUNICATE;
204 act->state = SIMIX_WAITING;
206 /* set communication */
207 act->comm.type = type;
208 act->comm.refcount = 1;
210 #ifdef HAVE_LATENCY_BOUND_TRACKING
211 //initialize with unknown value
212 act->latency_limited = -1;
216 act->category = NULL;
219 XBT_DEBUG("Create communicate action %p", act);
226 * \brief Destroy a communicate action
227 * \param action The communicate action to be destroyed
229 void SIMIX_comm_destroy(smx_action_t action)
231 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
232 action, action->comm.refcount, (int)action->state);
234 if (action->comm.refcount <= 0) {
235 xbt_backtrace_display_current();
236 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
237 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
239 action->comm.refcount--;
240 if (action->comm.refcount > 0)
242 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
243 action->comm.refcount);
245 #ifdef HAVE_LATENCY_BOUND_TRACKING
246 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
249 xbt_free(action->name);
250 SIMIX_comm_destroy_internal_actions(action);
252 if (action->comm.detached && action->state != SIMIX_DONE) {
253 /* the communication has failed and was detached:
254 * we have to free the buffer */
255 if (action->comm.clean_fun) {
256 action->comm.clean_fun(action->comm.src_buff);
258 action->comm.src_buff = NULL;
261 xbt_mallocator_release(simix_global->action_mallocator, action);
264 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
266 if (action->comm.surf_comm){
267 #ifdef HAVE_LATENCY_BOUND_TRACKING
268 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
270 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
271 action->comm.surf_comm = NULL;
274 if (action->comm.src_timeout){
275 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
276 action->comm.src_timeout = NULL;
279 if (action->comm.dst_timeout){
280 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
281 action->comm.dst_timeout = NULL;
285 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
286 double task_size, double rate,
287 void *src_buff, size_t src_buff_size,
288 int (*match_fun)(void *, void *,smx_action_t),
289 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
293 XBT_DEBUG("send from %p\n", rdv);
295 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
296 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
298 /* Look for communication action matching our needs. We also provide a description of
299 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
301 * If it is not found then push our communication into the rendez-vous point */
302 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
305 other_action = this_action;
307 if (rdv->permanent_receiver!=NULL){
308 //this mailbox is for small messages, which have to be sent right now
309 other_action->state = SIMIX_READY;
310 other_action->comm.dst_proc=rdv->permanent_receiver;
311 other_action->comm.refcount++;
312 other_action->comm.rdv = rdv;
313 xbt_fifo_push(rdv->done_comm_fifo,other_action);
314 other_action->comm.rdv=rdv;
315 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
318 SIMIX_rdv_push(rdv, this_action);
321 XBT_DEBUG("Receive already pushed\n");
323 SIMIX_comm_destroy(this_action);
324 --smx_total_comms; // this creation was a pure waste
326 other_action->state = SIMIX_READY;
327 other_action->comm.type = SIMIX_COMM_READY;
330 xbt_fifo_push(src_proc->comms, other_action);
332 /* if the communication action is detached then decrease the refcount
333 * by one, so it will be eliminated by the receiver's destroy call */
335 other_action->comm.detached = 1;
336 other_action->comm.refcount--;
337 other_action->comm.clean_fun = clean_fun;
339 other_action->comm.clean_fun = NULL;
342 /* Setup the communication action */
343 other_action->comm.src_proc = src_proc;
344 other_action->comm.task_size = task_size;
345 other_action->comm.rate = rate;
346 other_action->comm.src_buff = src_buff;
347 other_action->comm.src_buff_size = src_buff_size;
348 other_action->comm.src_data = data;
350 other_action->comm.match_fun = match_fun;
353 other_action->state = SIMIX_RUNNING;
357 SIMIX_comm_start(other_action);
358 return (detached ? NULL : other_action);
361 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
362 void *dst_buff, size_t *dst_buff_size,
363 int (*match_fun)(void *, void *, smx_action_t), void *data)
365 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
366 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
368 smx_action_t other_action;
369 //communication already done, get it inside the fifo of completed comms
370 //permanent receive v1
371 //int already_received=0;
372 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
374 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
375 //find a match in the already received fifo
376 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
377 //if not found, assume the receiver came first, register it to the mailbox in the classical way
379 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
380 other_action = this_action;
381 SIMIX_rdv_push(rdv, this_action);
383 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
385 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
386 other_action->state = SIMIX_DONE;
387 other_action->comm.type = SIMIX_COMM_DONE;
388 other_action->comm.rdv = NULL;
389 SIMIX_comm_destroy(this_action);
390 --smx_total_comms; // this creation was a pure waste
391 //already_received=1;
392 other_action->comm.refcount--;
394 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
396 other_action->comm.refcount--;
397 SIMIX_comm_destroy(this_action);
398 --smx_total_comms; // this creation was a pure waste
401 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
403 /* Look for communication action matching our needs. We also provide a description of
404 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
406 * If it is not found then push our communication into the rendez-vous point */
407 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
410 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
411 other_action = this_action;
412 SIMIX_rdv_push(rdv, this_action);
414 SIMIX_comm_destroy(this_action);
415 --smx_total_comms; // this creation was a pure waste
416 other_action->state = SIMIX_READY;
417 other_action->comm.type = SIMIX_COMM_READY;
419 xbt_fifo_push(dst_proc->comms, other_action);
422 /* Setup communication action */
423 other_action->comm.dst_proc = dst_proc;
424 other_action->comm.dst_buff = dst_buff;
425 other_action->comm.dst_buff_size = dst_buff_size;
426 other_action->comm.dst_data = data;
428 other_action->comm.match_fun = match_fun;
431 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
432 SIMIX_comm_copy_data(other_action);*/
436 other_action->state = SIMIX_RUNNING;
440 SIMIX_comm_start(other_action);
445 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
448 /* the simcall may be a wait, a send or a recv */
451 /* Associate this simcall to the wait action */
452 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
454 xbt_fifo_push(action->simcalls, simcall);
455 simcall->issuer->waiting_action = action;
459 action->state = SIMIX_DONE;
461 /* If we reached this point, the wait simcall must have a timeout */
462 /* Otherwise it shouldn't be enabled and executed by the MC */
466 if (action->comm.src_proc == simcall->issuer)
467 action->state = SIMIX_SRC_TIMEOUT;
469 action->state = SIMIX_DST_TIMEOUT;
472 SIMIX_comm_finish(action);
476 /* If the action has already finish perform the error handling, */
477 /* otherwise set up a waiting timeout on the right side */
478 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
479 SIMIX_comm_finish(action);
480 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
481 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
482 surf_workstation_model->action_data_set(sleep, action);
484 if (simcall->issuer == action->comm.src_proc)
485 action->comm.src_timeout = sleep;
487 action->comm.dst_timeout = sleep;
491 void SIMIX_pre_comm_test(smx_simcall_t simcall)
493 smx_action_t action = simcall->comm_test.comm;
496 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
497 if(simcall->comm_test.result){
498 action->state = SIMIX_DONE;
499 xbt_fifo_push(action->simcalls, simcall);
500 SIMIX_comm_finish(action);
502 SIMIX_simcall_answer(simcall);
507 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
508 if (simcall->comm_test.result) {
509 xbt_fifo_push(action->simcalls, simcall);
510 SIMIX_comm_finish(action);
512 SIMIX_simcall_answer(simcall);
516 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
520 xbt_dynar_t actions = simcall->comm_testany.comms;
521 simcall->comm_testany.result = -1;
525 SIMIX_simcall_answer(simcall);
527 action = xbt_dynar_get_as(actions, idx, smx_action_t);
528 simcall->comm_testany.result = idx;
529 xbt_fifo_push(action->simcalls, simcall);
530 action->state = SIMIX_DONE;
531 SIMIX_comm_finish(action);
536 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
537 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
538 simcall->comm_testany.result = cursor;
539 xbt_fifo_push(action->simcalls, simcall);
540 SIMIX_comm_finish(action);
544 SIMIX_simcall_answer(simcall);
547 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
550 unsigned int cursor = 0;
551 xbt_dynar_t actions = simcall->comm_waitany.comms;
554 action = xbt_dynar_get_as(actions, idx, smx_action_t);
555 xbt_fifo_push(action->simcalls, simcall);
556 simcall->comm_waitany.result = idx;
557 action->state = SIMIX_DONE;
558 SIMIX_comm_finish(action);
562 xbt_dynar_foreach(actions, cursor, action){
563 /* associate this simcall to the the action */
564 xbt_fifo_push(action->simcalls, simcall);
566 /* see if the action is already finished */
567 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
568 SIMIX_comm_finish(action);
574 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
577 unsigned int cursor = 0;
578 xbt_dynar_t actions = simcall->comm_waitany.comms;
580 xbt_dynar_foreach(actions, cursor, action) {
581 xbt_fifo_remove(action->simcalls, simcall);
586 * \brief Starts the simulation of a communication action.
587 * \param action the communication action
589 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
591 /* If both the sender and the receiver are already there, start the communication */
592 if (action->state == SIMIX_READY) {
594 smx_host_t sender = action->comm.src_proc->smx_host;
595 smx_host_t receiver = action->comm.dst_proc->smx_host;
597 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
598 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
600 action->comm.surf_comm = surf_workstation_model->extension.workstation.
601 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
603 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
605 action->state = SIMIX_RUNNING;
607 /* If a link is failed, detect it immediately */
608 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
609 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
610 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
611 action->state = SIMIX_LINK_FAILURE;
612 SIMIX_comm_destroy_internal_actions(action);
615 /* If any of the process is suspend, create the action but stop its execution,
616 it will be restarted when the sender process resume */
617 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
618 SIMIX_process_is_suspended(action->comm.dst_proc)) {
619 /* FIXME: check what should happen with the action state */
621 if (SIMIX_process_is_suspended(action->comm.src_proc))
622 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
623 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
625 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
626 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
628 surf_workstation_model->suspend(action->comm.surf_comm);
635 * \brief Answers the SIMIX simcalls associated to a communication action.
636 * \param action a finished communication action
638 void SIMIX_comm_finish(smx_action_t action)
640 unsigned int destroy_count = 0;
641 smx_simcall_t simcall;
643 while ((simcall = xbt_fifo_shift(action->simcalls))) {
645 /* If a waitany simcall is waiting for this action to finish, then remove
646 it from the other actions in the waitany list. Afterwards, get the
647 position of the actual action in the waitany dynar and
648 return it as the result of the simcall */
649 if (simcall->call == SIMCALL_COMM_WAITANY) {
650 SIMIX_waitany_remove_simcall_from_actions(simcall);
652 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
655 /* If the action is still in a rendez-vous point then remove from it */
656 if (action->comm.rdv)
657 SIMIX_rdv_remove(action->comm.rdv, action);
659 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
661 /* Check out for errors */
662 switch (action->state) {
665 XBT_DEBUG("Communication %p complete!", action);
666 SIMIX_comm_copy_data(action);
669 case SIMIX_SRC_TIMEOUT:
670 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
671 "Communication timeouted because of sender");
674 case SIMIX_DST_TIMEOUT:
675 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
676 "Communication timeouted because of receiver");
679 case SIMIX_SRC_HOST_FAILURE:
680 if (simcall->issuer == action->comm.src_proc)
681 simcall->issuer->context->iwannadie = 1;
682 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
684 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
687 case SIMIX_DST_HOST_FAILURE:
688 if (simcall->issuer == action->comm.dst_proc)
689 simcall->issuer->context->iwannadie = 1;
690 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
692 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
695 case SIMIX_LINK_FAILURE:
696 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
698 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
699 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
700 simcall->issuer->name, simcall->issuer, action->comm.detached);
701 if (action->comm.src_proc == simcall->issuer) {
702 XBT_DEBUG("I'm source");
703 } else if (action->comm.dst_proc == simcall->issuer) {
704 XBT_DEBUG("I'm dest");
706 XBT_DEBUG("I'm neither source nor dest");
708 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
712 if (simcall->issuer == action->comm.dst_proc)
713 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
714 "Communication canceled by the sender");
716 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
717 "Communication canceled by the receiver");
721 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
724 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
725 if (simcall->issuer->doexception) {
726 if (simcall->call == SIMCALL_COMM_WAITANY) {
727 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
729 else if (simcall->call == SIMCALL_COMM_TESTANY) {
730 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
734 if (surf_workstation_model->extension.
735 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
736 simcall->issuer->context->iwannadie = 1;
739 simcall->issuer->waiting_action = NULL;
740 xbt_fifo_remove(simcall->issuer->comms, action);
741 SIMIX_simcall_answer(simcall);
745 while (destroy_count-- > 0)
746 SIMIX_comm_destroy(action);
750 * \brief This function is called when a Surf communication action is finished.
751 * \param action the corresponding Simix communication
753 void SIMIX_post_comm(smx_action_t action)
755 /* Update action state */
756 if (action->comm.src_timeout &&
757 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
758 action->state = SIMIX_SRC_TIMEOUT;
759 else if (action->comm.dst_timeout &&
760 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
761 action->state = SIMIX_DST_TIMEOUT;
762 else if (action->comm.src_timeout &&
763 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
764 action->state = SIMIX_SRC_HOST_FAILURE;
765 else if (action->comm.dst_timeout &&
766 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
767 action->state = SIMIX_DST_HOST_FAILURE;
768 else if (action->comm.surf_comm &&
769 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
770 XBT_DEBUG("Puta madre. Surf says that the link broke");
771 action->state = SIMIX_LINK_FAILURE;
773 action->state = SIMIX_DONE;
775 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
776 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
778 /* destroy the surf actions associated with the Simix communication */
779 SIMIX_comm_destroy_internal_actions(action);
781 /* remove the communication action from the list of pending communications
782 * of both processes (if they still exist) */
783 if (action->comm.src_proc) {
784 xbt_fifo_remove(action->comm.src_proc->comms, action);
786 if (action->comm.dst_proc) {
787 xbt_fifo_remove(action->comm.dst_proc->comms, action);
790 /* if there are simcalls associated with the action, then answer them */
791 if (xbt_fifo_size(action->simcalls)) {
792 SIMIX_comm_finish(action);
796 void SIMIX_comm_cancel(smx_action_t action)
798 /* if the action is a waiting state means that it is still in a rdv */
799 /* so remove from it and delete it */
800 if (action->state == SIMIX_WAITING) {
801 SIMIX_rdv_remove(action->comm.rdv, action);
802 action->state = SIMIX_CANCELED;
804 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
805 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
807 surf_workstation_model->action_cancel(action->comm.surf_comm);
811 void SIMIX_comm_suspend(smx_action_t action)
813 /*FIXME: shall we suspend also the timeout actions? */
814 if (action->comm.surf_comm)
815 surf_workstation_model->suspend(action->comm.surf_comm);
816 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
819 void SIMIX_comm_resume(smx_action_t action)
821 /*FIXME: check what happen with the timeouts */
822 if (action->comm.surf_comm)
823 surf_workstation_model->resume(action->comm.surf_comm);
824 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
828 /************* Action Getters **************/
831 * \brief get the amount remaining from the communication
832 * \param action The communication
834 double SIMIX_comm_get_remains(smx_action_t action)
842 switch (action->state) {
845 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
850 remains = 0; /*FIXME: check what should be returned */
854 remains = 0; /*FIXME: is this correct? */
860 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
862 return action->state;
866 * \brief Return the user data associated to the sender of the communication
867 * \param action The communication
868 * \return the user data
870 void* SIMIX_comm_get_src_data(smx_action_t action)
872 return action->comm.src_data;
876 * \brief Return the user data associated to the receiver of the communication
877 * \param action The communication
878 * \return the user data
880 void* SIMIX_comm_get_dst_data(smx_action_t action)
882 return action->comm.dst_data;
885 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
887 return action->comm.src_proc;
890 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
892 return action->comm.dst_proc;
895 #ifdef HAVE_LATENCY_BOUND_TRACKING
897 * \brief verify if communication is latency bounded
898 * \param comm The communication
900 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
905 if (action->comm.surf_comm){
906 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
907 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
908 XBT_DEBUG("Action limited is %d", action->latency_limited);
910 return action->latency_limited;
914 /******************************************************************************/
915 /* SIMIX_comm_copy_data callbacks */
916 /******************************************************************************/
917 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
918 &SIMIX_comm_copy_pointer_callback;
921 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
923 SIMIX_comm_copy_data_callback = callback;
926 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
928 xbt_assert((buff_size == sizeof(void *)),
929 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
930 *(void **) (comm->comm.dst_buff) = buff;
933 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
935 XBT_DEBUG("Copy the data over");
936 memcpy(comm->comm.dst_buff, buff, buff_size);
937 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
939 comm->comm.src_buff = NULL;
945 * \brief Copy the communication data from the sender's buffer to the receiver's one
946 * \param comm The communication
948 void SIMIX_comm_copy_data(smx_action_t comm)
950 size_t buff_size = comm->comm.src_buff_size;
951 /* If there is no data to be copy then return */
952 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
955 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
957 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
959 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
960 comm->comm.dst_buff, buff_size);
962 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
963 if (comm->comm.dst_buff_size)
964 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
966 /* Update the receiver's buffer size to the copied amount */
967 if (comm->comm.dst_buff_size)
968 *comm->comm.dst_buff_size = buff_size;
971 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
973 /* Set the copied flag so we copy data only once */
974 /* (this function might be called from both communication ends) */
975 comm->comm.copied = 1;