1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
18 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *), void *);
24 static void SIMIX_rdv_free(void *data);
26 void SIMIX_network_init(void)
28 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
31 void SIMIX_network_exit(void)
33 xbt_dict_free(&rdv_points);
36 /******************************************************************************/
37 /* Rendez-Vous Points */
38 /******************************************************************************/
40 smx_rdv_t SIMIX_rdv_create(const char *name)
42 /* two processes may have pushed the same rdv_create request at the same time */
43 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
46 rdv = xbt_new0(s_smx_rvpoint_t, 1);
47 rdv->name = name ? xbt_strdup(name) : NULL;
48 rdv->comm_fifo = xbt_fifo_new();
51 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
56 void SIMIX_rdv_destroy(smx_rdv_t rdv)
59 xbt_dict_remove(rdv_points, rdv->name);
62 void SIMIX_rdv_free(void *data)
64 smx_rdv_t rdv = (smx_rdv_t) data;
66 xbt_fifo_free(rdv->comm_fifo);
70 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
72 return xbt_dict_get_or_null(rdv_points, name);
75 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
77 smx_action_t comm = NULL;
78 xbt_fifo_item_t item = NULL;
81 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
82 if (comm->comm.src_proc->smx_host == host)
89 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
91 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
95 * \brief Push a communication request into a rendez-vous point
96 * \param rdv The rendez-vous point
97 * \param comm The communication request
99 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
101 xbt_fifo_push(rdv->comm_fifo, comm);
102 comm->comm.rdv = rdv;
106 * \brief Remove a communication request from a rendez-vous point
107 * \param rdv The rendez-vous point
108 * \param comm The communication request
110 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
112 xbt_fifo_remove(rdv->comm_fifo, comm);
113 comm->comm.rdv = NULL;
117 * \brief Wrapper to SIMIX_rdv_get_request
119 smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
120 return SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
124 * \brief Checks if there is a communication action queued in a rendez-vous matching our needs
125 * \param type The type of communication we are looking for (comm_send, comm_recv)
126 * \return The communication action if found, NULL otherwise
128 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
129 int (*match_fun)(void *, void *), void *data)
131 // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
133 xbt_fifo_item_t item;
134 void* req_data = NULL;
136 xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
137 if (action->comm.type == SIMIX_COMM_SEND) {
138 req_data = action->comm.src_data;
139 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
140 req_data = action->comm.dst_data;
142 if (action->comm.type == type && (!match_fun || match_fun(data, req_data))) {
143 XBT_DEBUG("Found a matching communication action %p", action);
144 xbt_fifo_remove_item(rdv->comm_fifo, item);
145 xbt_fifo_free_item(item);
146 action->comm.refcount++;
147 action->comm.rdv = NULL;
150 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
151 " its type is %d but we are looking for a comm of type %d",
152 action, action->comm.type, type);
154 XBT_DEBUG("No matching communication action found");
159 * \brief Checks if there is a send communication action
160 * queued in a rendez-vous matching our needs.
161 * \return 1 if found, 0 otherwise
163 int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
166 xbt_fifo_item_t item;
168 xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
169 if (action->comm.type == SIMIX_COMM_SEND
170 && (!match_fun || match_fun(data, action->comm.src_data))) {
171 XBT_DEBUG("Found a matching communication action %p", action);
175 XBT_DEBUG("No matching communication action found");
180 * \brief Checks if there is a recv communication action
181 * queued in a rendez-vous matching our needs.
182 * \return 1 if found, 0 otherwise
184 int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
187 xbt_fifo_item_t item;
189 xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
190 if (action->comm.type == SIMIX_COMM_RECEIVE
191 && (!match_fun || match_fun(data, action->comm.dst_data))) {
192 XBT_DEBUG("Found a matching communication action %p", action);
196 XBT_DEBUG("No matching communication action found");
200 /******************************************************************************/
201 /* Comunication Actions */
202 /******************************************************************************/
205 * \brief Creates a new comunicate action
206 * \param type The type of request (comm_send, comm_recv)
207 * \return The new comunicate action
209 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
213 /* alloc structures */
214 act = xbt_mallocator_get(simix_global->action_mallocator);
216 act->type = SIMIX_ACTION_COMMUNICATE;
217 act->state = SIMIX_WAITING;
219 /* set communication */
220 act->comm.type = type;
221 act->comm.refcount = 1;
223 #ifdef HAVE_LATENCY_BOUND_TRACKING
224 //initialize with unknown value
225 act->latency_limited = -1;
229 act->category = NULL;
232 XBT_DEBUG("Create communicate action %p", act);
239 * \brief Destroy a communicate action
240 * \param action The communicate action to be destroyed
242 void SIMIX_comm_destroy(smx_action_t action)
244 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
245 action, action->comm.refcount, action->state);
247 if (action->comm.refcount <= 0) {
248 xbt_backtrace_display_current();
249 xbt_die("the refcount of comm %p is already 0 before decreasing it. "
250 "That's a bug!", action);
252 action->comm.refcount--;
253 if (action->comm.refcount > 0)
255 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
256 action->comm.refcount);
258 #ifdef HAVE_LATENCY_BOUND_TRACKING
259 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
262 xbt_free(action->name);
263 SIMIX_comm_destroy_internal_actions(action);
265 if (action->comm.detached && action->state != SIMIX_DONE) {
266 /* the communication has failed and was detached:
267 * we have to free the buffer */
268 action->comm.clean_fun(action->comm.src_buff);
269 action->comm.src_buff = NULL;
272 xbt_mallocator_release(simix_global->action_mallocator, action);
275 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
277 if (action->comm.surf_comm){
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
281 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
282 action->comm.surf_comm = NULL;
285 if (action->comm.src_timeout){
286 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
287 action->comm.src_timeout = NULL;
290 if (action->comm.dst_timeout){
291 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
292 action->comm.dst_timeout = NULL;
296 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
297 double task_size, double rate,
298 void *src_buff, size_t src_buff_size,
299 int (*match_fun)(void *, void *),
300 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
306 /* Look for communication request matching our needs.
307 If it is not found then create it and push it into the rendez-vous point */
308 action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
311 action = SIMIX_comm_new(SIMIX_COMM_SEND);
312 SIMIX_rdv_push(rdv, action);
314 action->state = SIMIX_READY;
315 action->comm.type = SIMIX_COMM_READY;
317 xbt_fifo_push(src_proc->comms, action);
319 /* if the communication action is detached then decrease the refcount
320 * by one, so it will be eliminated by the receiver's destroy call */
322 action->comm.detached = 1;
323 action->comm.refcount--;
324 action->comm.clean_fun = clean_fun;
326 action->comm.clean_fun = NULL;
329 /* Setup the communication request */
330 action->comm.src_proc = src_proc;
331 action->comm.task_size = task_size;
332 action->comm.rate = rate;
333 action->comm.src_buff = src_buff;
334 action->comm.src_buff_size = src_buff_size;
335 action->comm.src_data = data;
338 action->state = SIMIX_RUNNING;
342 SIMIX_comm_start(action);
343 return (detached ? NULL : action);
346 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
347 void *dst_buff, size_t *dst_buff_size,
348 int (*match_fun)(void *, void *), void *data)
352 /* Look for communication request matching our needs.
353 * If it is not found then create it and push it into the rendez-vous point
355 action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
358 action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
359 SIMIX_rdv_push(rdv, action);
361 action->state = SIMIX_READY;
362 action->comm.type = SIMIX_COMM_READY;
364 xbt_fifo_push(dst_proc->comms, action);
366 /* Setup communication request */
367 action->comm.dst_proc = dst_proc;
368 action->comm.dst_buff = dst_buff;
369 action->comm.dst_buff_size = dst_buff_size;
370 action->comm.dst_data = data;
373 action->state = SIMIX_RUNNING;
377 SIMIX_comm_start(action);
381 void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx)
384 /* the request may be a wait, a send or a recv */
387 /* Associate this request to the action */
388 xbt_fifo_push(action->request_list, req);
389 req->issuer->waiting_action = action;
393 action->state = SIMIX_DONE;
395 /* If we reached this point, the wait request must have a timeout */
396 /* Otherwise it shouldn't be enabled and executed by the MC */
400 if (action->comm.src_proc == req->issuer)
401 action->state = SIMIX_SRC_TIMEOUT;
403 action->state = SIMIX_DST_TIMEOUT;
406 SIMIX_comm_finish(action);
410 /* If the action has already finish perform the error handling, */
411 /* otherwise set up a waiting timeout on the right side */
412 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
413 SIMIX_comm_finish(action);
414 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
415 sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
416 surf_workstation_model->action_data_set(sleep, action);
418 if (req->issuer == action->comm.src_proc)
419 action->comm.src_timeout = sleep;
421 action->comm.dst_timeout = sleep;
425 void SIMIX_pre_comm_test(smx_req_t req)
427 smx_action_t action = req->comm_test.comm;
430 req->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
431 if(req->comm_test.result){
432 action->state = SIMIX_DONE;
433 xbt_fifo_push(action->request_list, req);
434 SIMIX_comm_finish(action);
436 SIMIX_request_answer(req);
441 req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
442 if (req->comm_test.result) {
443 xbt_fifo_push(action->request_list, req);
444 SIMIX_comm_finish(action);
446 SIMIX_request_answer(req);
450 void SIMIX_pre_comm_testany(smx_req_t req, int idx)
454 xbt_dynar_t actions = req->comm_testany.comms;
455 req->comm_testany.result = -1;
459 SIMIX_request_answer(req);
461 action = xbt_dynar_get_as(actions, idx, smx_action_t);
462 req->comm_testany.result = idx;
463 xbt_fifo_push(action->request_list, req);
464 action->state = SIMIX_DONE;
465 SIMIX_comm_finish(action);
470 xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
471 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
472 req->comm_testany.result = cursor;
473 xbt_fifo_push(action->request_list, req);
474 SIMIX_comm_finish(action);
478 SIMIX_request_answer(req);
481 void SIMIX_pre_comm_waitany(smx_req_t req, int idx)
484 unsigned int cursor = 0;
485 xbt_dynar_t actions = req->comm_waitany.comms;
488 action = xbt_dynar_get_as(actions, idx, smx_action_t);
489 xbt_fifo_push(action->request_list, req);
490 req->comm_waitany.result = idx;
491 action->state = SIMIX_DONE;
492 SIMIX_comm_finish(action);
496 xbt_dynar_foreach(actions, cursor, action){
497 /* associate this request to the the action */
498 xbt_fifo_push(action->request_list, req);
500 /* see if the action is already finished */
501 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
502 SIMIX_comm_finish(action);
508 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
511 unsigned int cursor = 0;
512 xbt_dynar_t actions = req->comm_waitany.comms;
514 xbt_dynar_foreach(actions, cursor, action){
515 xbt_fifo_remove(action->request_list, req);
520 * \brief Start the simulation of a communication request
521 * \param action The communication action
524 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
526 /* If both the sender and the receiver are already there, start the communication */
527 if (action->state == SIMIX_READY) {
529 smx_host_t sender = action->comm.src_proc->smx_host;
530 smx_host_t receiver = action->comm.dst_proc->smx_host;
532 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
533 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
535 action->comm.surf_comm = surf_workstation_model->extension.workstation.
536 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
538 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
540 action->state = SIMIX_RUNNING;
542 /* If a link is failed, detect it immediately */
543 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
544 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
545 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
546 action->state = SIMIX_LINK_FAILURE;
547 SIMIX_comm_destroy_internal_actions(action);
550 /* If any of the process is suspend, create the action but stop its execution,
551 it will be restarted when the sender process resume */
552 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
553 SIMIX_process_is_suspended(action->comm.dst_proc)) {
554 /* FIXME: check what should happen with the action state */
555 surf_workstation_model->suspend(action->comm.surf_comm);
561 * \brief Answers the SIMIX requests associated to a communication action.
562 * \param action a finished communication action
564 void SIMIX_comm_finish(smx_action_t action)
566 volatile unsigned int destroy_count = 0;
569 while ((req = xbt_fifo_shift(action->request_list))) {
571 /* If a waitany request is waiting for this action to finish, then remove
572 it from the other actions in the waitany list. Afterwards, get the
573 position of the actual action in the waitany request's actions dynar and
574 return it as the result of the call */
575 if (req->call == REQ_COMM_WAITANY) {
576 SIMIX_waitany_req_remove_from_actions(req);
578 req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
581 /* If the action is still in a rendez-vous point then remove from it */
582 if (action->comm.rdv)
583 SIMIX_rdv_remove(action->comm.rdv, action);
585 XBT_DEBUG("SIMIX_comm_finish: action state = %d", action->state);
587 /* Check out for errors */
588 switch (action->state) {
591 XBT_DEBUG("Communication %p complete!", action);
592 SIMIX_comm_copy_data(action);
595 case SIMIX_SRC_TIMEOUT:
597 THROWF(timeout_error, 0, "Communication timeouted because of sender");
599 CATCH(req->issuer->running_ctx->exception) {
600 req->issuer->doexception = 1;
604 case SIMIX_DST_TIMEOUT:
606 THROWF(timeout_error, 0, "Communication timeouted because of receiver");
608 CATCH(req->issuer->running_ctx->exception) {
609 req->issuer->doexception = 1;
613 case SIMIX_SRC_HOST_FAILURE:
615 if (req->issuer == action->comm.src_proc)
616 THROWF(host_error, 0, "Host failed");
618 THROWF(network_error, 0, "Remote peer failed");
620 CATCH(req->issuer->running_ctx->exception) {
621 req->issuer->doexception = 1;
625 case SIMIX_DST_HOST_FAILURE:
627 if (req->issuer == action->comm.dst_proc)
628 THROWF(host_error, 0, "Host failed");
630 THROWF(network_error, 0, "Remote peer failed");
632 CATCH(req->issuer->running_ctx->exception) {
633 req->issuer->doexception = 1;
637 case SIMIX_LINK_FAILURE:
639 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
641 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
642 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
643 req->issuer->name, req->issuer, action->comm.detached);
644 if (action->comm.src_proc == req->issuer) {
645 XBT_DEBUG("I'm source");
646 } else if (action->comm.dst_proc == req->issuer) {
647 XBT_DEBUG("I'm dest");
649 XBT_DEBUG("I'm neither source nor dest");
651 THROWF(network_error, 0, "Link failure");
653 CATCH(req->issuer->running_ctx->exception) {
654 req->issuer->doexception = 1;
660 if (req->issuer == action->comm.dst_proc) {
661 THROWF(cancel_error, 0, "Communication canceled by the sender");
664 THROWF(cancel_error, 0, "Communication canceled by the receiver");
667 CATCH(req->issuer->running_ctx->exception) {
668 req->issuer->doexception = 1;
673 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", action->state);
676 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
677 if (req->issuer->doexception) {
678 if (req->call == REQ_COMM_WAITANY) {
679 req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_waitany.comms, &action);
681 else if (req->call == REQ_COMM_TESTANY) {
682 req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_testany.comms, &action);
686 req->issuer->waiting_action = NULL;
687 xbt_fifo_remove(req->issuer->comms, action);
688 SIMIX_request_answer(req);
692 while (destroy_count-- > 0)
693 SIMIX_comm_destroy(action);
697 * \brief This function is called when a Surf communication action is finished.
698 * \param action the corresponding Simix communication
700 void SIMIX_post_comm(smx_action_t action)
702 /* Update action state */
703 if (action->comm.src_timeout &&
704 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
705 action->state = SIMIX_SRC_TIMEOUT;
706 else if (action->comm.dst_timeout &&
707 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
708 action->state = SIMIX_DST_TIMEOUT;
709 else if (action->comm.src_timeout &&
710 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
711 action->state = SIMIX_SRC_HOST_FAILURE;
712 else if (action->comm.dst_timeout &&
713 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
714 action->state = SIMIX_DST_HOST_FAILURE;
715 else if (action->comm.surf_comm &&
716 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
717 XBT_DEBUG("Puta madre. Surf says that the link broke");
718 action->state = SIMIX_LINK_FAILURE;
720 action->state = SIMIX_DONE;
722 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
723 action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
725 /* destroy the surf actions associated with the Simix communication */
726 SIMIX_comm_destroy_internal_actions(action);
728 /* remove the communication action from the list of pending communications
729 * of both processes (if they still exist) */
730 if (action->comm.src_proc) {
731 xbt_fifo_remove(action->comm.src_proc->comms, action);
733 if (action->comm.dst_proc) {
734 xbt_fifo_remove(action->comm.dst_proc->comms, action);
737 /* if there are requests associated with the action, then answer them */
738 if (xbt_fifo_size(action->request_list)) {
739 SIMIX_comm_finish(action);
743 void SIMIX_comm_cancel(smx_action_t action)
745 /* if the action is a waiting state means that it is still in a rdv */
746 /* so remove from it and delete it */
747 if (action->state == SIMIX_WAITING) {
748 SIMIX_rdv_remove(action->comm.rdv, action);
749 action->state = SIMIX_CANCELED;
751 else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
752 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
754 surf_workstation_model->action_cancel(action->comm.surf_comm);
758 void SIMIX_comm_suspend(smx_action_t action)
760 /*FIXME: shall we suspend also the timeout actions? */
761 surf_workstation_model->suspend(action->comm.surf_comm);
764 void SIMIX_comm_resume(smx_action_t action)
766 /*FIXME: check what happen with the timeouts */
767 surf_workstation_model->resume(action->comm.surf_comm);
771 /************* Action Getters **************/
774 * \brief get the amount remaining from the communication
775 * \param action The communication
777 double SIMIX_comm_get_remains(smx_action_t action)
785 switch (action->state) {
788 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
793 remains = 0; /*FIXME: check what should be returned */
797 remains = 0; /*FIXME: is this correct? */
803 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
805 return action->state;
809 * \brief Return the user data associated to the sender of the communication
810 * \param action The communication
811 * \return the user data
813 void* SIMIX_comm_get_src_data(smx_action_t action)
815 return action->comm.src_data;
819 * \brief Return the user data associated to the receiver of the communication
820 * \param action The communication
821 * \return the user data
823 void* SIMIX_comm_get_dst_data(smx_action_t action)
825 return action->comm.dst_data;
828 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
830 return action->comm.src_proc;
833 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
835 return action->comm.dst_proc;
838 #ifdef HAVE_LATENCY_BOUND_TRACKING
840 * \brief verify if communication is latency bounded
841 * \param comm The communication
843 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
848 if (action->comm.surf_comm){
849 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
850 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
851 XBT_DEBUG("Action limited is %d", action->latency_limited);
853 return action->latency_limited;
857 /******************************************************************************/
858 /* SIMIX_comm_copy_data callbacks */
859 /******************************************************************************/
860 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
861 &SIMIX_comm_copy_pointer_callback;
864 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
866 SIMIX_comm_copy_data_callback = callback;
869 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
871 xbt_assert((buff_size == sizeof(void *)),
872 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
873 *(void **) (comm->comm.dst_buff) = buff;
876 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
878 XBT_DEBUG("Copy the data over");
879 memcpy(comm->comm.dst_buff, buff, buff_size);
882 void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size)
884 XBT_DEBUG("Copy the data over");
885 memcpy(comm->comm.dst_buff, buff, buff_size);
886 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
888 comm->comm.src_buff = NULL;
893 * \brief Copy the communication data from the sender's buffer to the receiver's one
894 * \param comm The communication
896 void SIMIX_comm_copy_data(smx_action_t comm)
898 size_t buff_size = comm->comm.src_buff_size;
899 /* If there is no data to be copy then return */
900 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
903 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
905 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
907 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
908 comm->comm.dst_buff, buff_size);
910 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
911 if (comm->comm.dst_buff_size)
912 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
914 /* Update the receiver's buffer size to the copied amount */
915 if (comm->comm.dst_buff_size)
916 *comm->comm.dst_buff_size = buff_size;
919 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
921 /* Set the copied flag so we copy data only once */
922 /* (this function might be called from both communication ends) */
923 comm->comm.copied = 1;