1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smx_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13 "Logging specific to SIMIX (network)");
15 static xbt_dict_t rdv_points = NULL;
16 XBT_IMPORT_NO_EXPORT(unsigned long int) smx_total_comms = 0;
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23 int (*match_fun)(void *, void *,smx_action_t),
24 void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26 int (*match_fun)(void *, void *,smx_action_t),
27 void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
30 void SIMIX_network_init(void)
32 rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34 MC_ignore(&smx_total_comms, sizeof(smx_total_comms));
37 void SIMIX_network_exit(void)
39 xbt_dict_free(&rdv_points);
42 /******************************************************************************/
43 /* Rendez-Vous Points */
44 /******************************************************************************/
46 smx_rdv_t SIMIX_rdv_create(const char *name)
48 /* two processes may have pushed the same rdv_create simcall at the same time */
49 smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
52 rdv = xbt_new0(s_smx_rvpoint_t, 1);
53 rdv->name = name ? xbt_strdup(name) : NULL;
54 rdv->comm_fifo = xbt_fifo_new();
55 rdv->done_comm_fifo = xbt_fifo_new();
56 rdv->permanent_receiver=NULL;
58 XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
61 xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66 void SIMIX_rdv_destroy(smx_rdv_t rdv)
69 xbt_dict_remove(rdv_points, rdv->name);
72 void SIMIX_rdv_free(void *data)
74 XBT_DEBUG("rdv free %p", data);
75 smx_rdv_t rdv = (smx_rdv_t) data;
77 xbt_fifo_free(rdv->comm_fifo);
78 xbt_fifo_free(rdv->done_comm_fifo);
83 xbt_dict_t SIMIX_get_rdv_points()
88 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
90 return xbt_dict_get_or_null(rdv_points, name);
93 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
95 smx_action_t comm = NULL;
96 xbt_fifo_item_t item = NULL;
99 xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
100 if (comm->comm.src_proc->smx_host == host)
107 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
109 return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
113 * \brief get the receiver (process associated to the mailbox)
114 * \param rdv The rendez-vous point
115 * \return process The receiving process (NULL if not set)
117 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
119 return rdv->permanent_receiver;
123 * \brief set the receiver of the rendez vous point to allow eager sends
124 * \param rdv The rendez-vous point
125 * \param process The receiving process
127 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
129 rdv->permanent_receiver=process;
133 * \brief Pushes a communication action into a rendez-vous point
134 * \param rdv The rendez-vous point
135 * \param comm The communication action
137 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
139 xbt_fifo_push(rdv->comm_fifo, comm);
140 comm->comm.rdv = rdv;
144 * \brief Removes a communication action from a rendez-vous point
145 * \param rdv The rendez-vous point
146 * \param comm The communication action
148 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
150 xbt_fifo_remove(rdv->comm_fifo, comm);
151 comm->comm.rdv = NULL;
155 * \brief Checks if there is a communication action queued in a fifo matching our needs
156 * \param type The type of communication we are looking for (comm_send, comm_recv)
157 * \return The communication action if found, NULL otherwise
159 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
160 int (*match_fun)(void *, void *,smx_action_t),
161 void *this_user_data, smx_action_t my_action)
164 xbt_fifo_item_t item;
165 void* other_user_data = NULL;
167 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
168 if (action->comm.type == SIMIX_COMM_SEND) {
169 other_user_data = action->comm.src_data;
170 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
171 other_user_data = action->comm.dst_data;
173 if (action->comm.type == type &&
174 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
175 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
176 XBT_DEBUG("Found a matching communication action %p", action);
177 xbt_fifo_remove_item(fifo, item);
178 xbt_fifo_free_item(item);
179 action->comm.refcount++;
180 action->comm.rdv = NULL;
183 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
184 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
185 action, (int)action->comm.type, (int)type);
187 XBT_DEBUG("No matching communication action found");
193 * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
194 * \param type The type of communication we are looking for (comm_send, comm_recv)
195 * \return The communication action if found, NULL otherwise
197 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
198 int (*match_fun)(void *, void *,smx_action_t),
199 void *this_user_data, smx_action_t my_action)
202 xbt_fifo_item_t item;
203 void* other_user_data = NULL;
205 xbt_fifo_foreach(fifo, item, action, smx_action_t) {
206 if (action->comm.type == SIMIX_COMM_SEND) {
207 other_user_data = action->comm.src_data;
208 } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
209 other_user_data = action->comm.dst_data;
211 if (action->comm.type == type &&
212 (!match_fun || match_fun(this_user_data, other_user_data, action)) &&
213 (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) {
214 XBT_DEBUG("Found a matching communication action %p", action);
215 action->comm.refcount++;
219 XBT_DEBUG("Sorry, communication action %p does not match our needs:"
220 " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
221 action, (int)action->comm.type, (int)type);
223 XBT_DEBUG("No matching communication action found");
226 /******************************************************************************/
227 /* Communication Actions */
228 /******************************************************************************/
231 * \brief Creates a new communicate action
232 * \param type The direction of communication (comm_send, comm_recv)
233 * \return The new communicate action
235 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
239 /* alloc structures */
240 act = xbt_mallocator_get(simix_global->action_mallocator);
242 act->type = SIMIX_ACTION_COMMUNICATE;
243 act->state = SIMIX_WAITING;
245 /* set communication */
246 act->comm.type = type;
247 act->comm.refcount = 1;
249 #ifdef HAVE_LATENCY_BOUND_TRACKING
250 //initialize with unknown value
251 act->latency_limited = -1;
255 act->category = NULL;
258 XBT_DEBUG("Create communicate action %p", act);
265 * \brief Destroy a communicate action
266 * \param action The communicate action to be destroyed
268 void SIMIX_comm_destroy(smx_action_t action)
270 XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
271 action, action->comm.refcount, (int)action->state);
273 if (action->comm.refcount <= 0) {
274 xbt_backtrace_display_current();
275 xbt_die("The refcount of comm %p is already 0 before decreasing it. "
276 "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
278 action->comm.refcount--;
279 if (action->comm.refcount > 0)
281 XBT_DEBUG("Really free communication %p; refcount is now %d", action,
282 action->comm.refcount);
284 #ifdef HAVE_LATENCY_BOUND_TRACKING
285 action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
288 xbt_free(action->name);
289 SIMIX_comm_destroy_internal_actions(action);
291 if (action->comm.detached && action->state != SIMIX_DONE) {
292 /* the communication has failed and was detached:
293 * we have to free the buffer */
294 if (action->comm.clean_fun) {
295 action->comm.clean_fun(action->comm.src_buff);
297 action->comm.src_buff = NULL;
300 xbt_mallocator_release(simix_global->action_mallocator, action);
303 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
305 if (action->comm.surf_comm){
306 #ifdef HAVE_LATENCY_BOUND_TRACKING
307 action->latency_limited = SIMIX_comm_is_latency_bounded(action);
309 action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
310 action->comm.surf_comm = NULL;
313 if (action->comm.src_timeout){
314 action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
315 action->comm.src_timeout = NULL;
318 if (action->comm.dst_timeout){
319 action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
320 action->comm.dst_timeout = NULL;
324 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
325 double task_size, double rate,
326 void *src_buff, size_t src_buff_size,
327 int (*match_fun)(void *, void *,smx_action_t),
328 void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
332 XBT_DEBUG("send from %p\n", rdv);
334 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
335 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
337 /* Look for communication action matching our needs. We also provide a description of
338 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
340 * If it is not found then push our communication into the rendez-vous point */
341 smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
344 other_action = this_action;
346 if (rdv->permanent_receiver!=NULL){
347 //this mailbox is for small messages, which have to be sent right now
348 other_action->state = SIMIX_READY;
349 other_action->comm.dst_proc=rdv->permanent_receiver;
350 other_action->comm.refcount++;
351 other_action->comm.rdv = rdv;
352 xbt_fifo_push(rdv->done_comm_fifo,other_action);
353 other_action->comm.rdv=rdv;
354 XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
357 SIMIX_rdv_push(rdv, this_action);
360 XBT_DEBUG("Receive already pushed\n");
362 SIMIX_comm_destroy(this_action);
363 --smx_total_comms; // this creation was a pure waste
365 other_action->state = SIMIX_READY;
366 other_action->comm.type = SIMIX_COMM_READY;
369 xbt_fifo_push(src_proc->comms, other_action);
371 /* if the communication action is detached then decrease the refcount
372 * by one, so it will be eliminated by the receiver's destroy call */
374 other_action->comm.detached = 1;
375 other_action->comm.refcount--;
376 other_action->comm.clean_fun = clean_fun;
378 other_action->comm.clean_fun = NULL;
381 /* Setup the communication action */
382 other_action->comm.src_proc = src_proc;
383 other_action->comm.task_size = task_size;
384 other_action->comm.rate = rate;
385 other_action->comm.src_buff = src_buff;
386 other_action->comm.src_buff_size = src_buff_size;
387 other_action->comm.src_data = data;
389 other_action->comm.match_fun = match_fun;
391 if (MC_is_active()) {
392 other_action->state = SIMIX_RUNNING;
396 SIMIX_comm_start(other_action);
397 return (detached ? NULL : other_action);
400 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
401 void *dst_buff, size_t *dst_buff_size,
402 int (*match_fun)(void *, void *, smx_action_t), void *data)
404 XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
405 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
407 smx_action_t other_action;
408 //communication already done, get it inside the fifo of completed comms
409 //permanent receive v1
410 //int already_received=0;
411 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
413 XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
414 //find a match in the already received fifo
415 other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
416 //if not found, assume the receiver came first, register it to the mailbox in the classical way
418 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
419 other_action = this_action;
420 SIMIX_rdv_push(rdv, this_action);
422 if(other_action->comm.surf_comm && SIMIX_comm_get_remains(other_action)==0.0)
424 XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
425 other_action->state = SIMIX_DONE;
426 other_action->comm.type = SIMIX_COMM_DONE;
427 other_action->comm.rdv = NULL;
428 //SIMIX_comm_destroy(this_action);
429 //--smx_total_comms; // this creation was a pure waste
430 //already_received=1;
431 //other_action->comm.refcount--;
433 XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
435 // other_action->comm.refcount--;
436 SIMIX_comm_destroy(this_action);
437 --smx_total_comms; // this creation was a pure waste
440 /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
442 /* Look for communication action matching our needs. We also provide a description of
443 * ourself so that the other side also gets a chance of choosing if it wants to match with us.
445 * If it is not found then push our communication into the rendez-vous point */
446 other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
449 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
450 other_action = this_action;
451 SIMIX_rdv_push(rdv, this_action);
453 SIMIX_comm_destroy(this_action);
454 --smx_total_comms; // this creation was a pure waste
455 other_action->state = SIMIX_READY;
456 other_action->comm.type = SIMIX_COMM_READY;
457 // other_action->comm.refcount--;
459 xbt_fifo_push(dst_proc->comms, other_action);
462 /* Setup communication action */
463 other_action->comm.dst_proc = dst_proc;
464 other_action->comm.dst_buff = dst_buff;
465 other_action->comm.dst_buff_size = dst_buff_size;
466 other_action->comm.dst_data = data;
468 other_action->comm.match_fun = match_fun;
471 /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
472 SIMIX_comm_copy_data(other_action);*/
475 if (MC_is_active()) {
476 other_action->state = SIMIX_RUNNING;
480 SIMIX_comm_start(other_action);
486 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
487 int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
489 XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
490 smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
492 smx_action_t other_action=NULL;
493 if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
494 //find a match in the already received fifo
495 XBT_DEBUG("first try in the perm recv mailbox \n");
497 other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
501 XBT_DEBUG("second try in the other mailbox");
502 other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
505 if(other_action)other_action->comm.refcount--;
507 SIMIX_comm_destroy(this_action);
512 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
515 /* the simcall may be a wait, a send or a recv */
518 /* Associate this simcall to the wait action */
519 XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
521 xbt_fifo_push(action->simcalls, simcall);
522 simcall->issuer->waiting_action = action;
524 if (MC_is_active()) {
526 action->state = SIMIX_DONE;
528 /* If we reached this point, the wait simcall must have a timeout */
529 /* Otherwise it shouldn't be enabled and executed by the MC */
533 if (action->comm.src_proc == simcall->issuer)
534 action->state = SIMIX_SRC_TIMEOUT;
536 action->state = SIMIX_DST_TIMEOUT;
539 SIMIX_comm_finish(action);
543 /* If the action has already finish perform the error handling, */
544 /* otherwise set up a waiting timeout on the right side */
545 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
546 SIMIX_comm_finish(action);
547 } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
548 sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
549 surf_workstation_model->action_data_set(sleep, action);
551 if (simcall->issuer == action->comm.src_proc)
552 action->comm.src_timeout = sleep;
554 action->comm.dst_timeout = sleep;
558 void SIMIX_pre_comm_test(smx_simcall_t simcall)
560 smx_action_t action = simcall->comm_test.comm;
563 simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
564 if(simcall->comm_test.result){
565 action->state = SIMIX_DONE;
566 xbt_fifo_push(action->simcalls, simcall);
567 SIMIX_comm_finish(action);
569 SIMIX_simcall_answer(simcall);
574 simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
575 if (simcall->comm_test.result) {
576 xbt_fifo_push(action->simcalls, simcall);
577 SIMIX_comm_finish(action);
579 SIMIX_simcall_answer(simcall);
583 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
587 xbt_dynar_t actions = simcall->comm_testany.comms;
588 simcall->comm_testany.result = -1;
592 SIMIX_simcall_answer(simcall);
594 action = xbt_dynar_get_as(actions, idx, smx_action_t);
595 simcall->comm_testany.result = idx;
596 xbt_fifo_push(action->simcalls, simcall);
597 action->state = SIMIX_DONE;
598 SIMIX_comm_finish(action);
603 xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
604 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
605 simcall->comm_testany.result = cursor;
606 xbt_fifo_push(action->simcalls, simcall);
607 SIMIX_comm_finish(action);
611 SIMIX_simcall_answer(simcall);
614 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
617 unsigned int cursor = 0;
618 xbt_dynar_t actions = simcall->comm_waitany.comms;
621 action = xbt_dynar_get_as(actions, idx, smx_action_t);
622 xbt_fifo_push(action->simcalls, simcall);
623 simcall->comm_waitany.result = idx;
624 action->state = SIMIX_DONE;
625 SIMIX_comm_finish(action);
629 xbt_dynar_foreach(actions, cursor, action){
630 /* associate this simcall to the the action */
631 xbt_fifo_push(action->simcalls, simcall);
633 /* see if the action is already finished */
634 if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
635 SIMIX_comm_finish(action);
641 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
644 unsigned int cursor = 0;
645 xbt_dynar_t actions = simcall->comm_waitany.comms;
647 xbt_dynar_foreach(actions, cursor, action) {
648 xbt_fifo_remove(action->simcalls, simcall);
653 * \brief Starts the simulation of a communication action.
654 * \param action the communication action
656 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
658 /* If both the sender and the receiver are already there, start the communication */
659 if (action->state == SIMIX_READY) {
661 smx_host_t sender = action->comm.src_proc->smx_host;
662 smx_host_t receiver = action->comm.dst_proc->smx_host;
664 XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
665 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
667 action->comm.surf_comm = surf_workstation_model->extension.workstation.
668 communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
670 surf_workstation_model->action_data_set(action->comm.surf_comm, action);
672 action->state = SIMIX_RUNNING;
674 /* If a link is failed, detect it immediately */
675 if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
676 XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
677 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
678 action->state = SIMIX_LINK_FAILURE;
679 SIMIX_comm_destroy_internal_actions(action);
682 /* If any of the process is suspend, create the action but stop its execution,
683 it will be restarted when the sender process resume */
684 if (SIMIX_process_is_suspended(action->comm.src_proc) ||
685 SIMIX_process_is_suspended(action->comm.dst_proc)) {
686 /* FIXME: check what should happen with the action state */
688 if (SIMIX_process_is_suspended(action->comm.src_proc))
689 XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
690 SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
692 XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
693 SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
695 surf_workstation_model->suspend(action->comm.surf_comm);
702 * \brief Answers the SIMIX simcalls associated to a communication action.
703 * \param action a finished communication action
705 void SIMIX_comm_finish(smx_action_t action)
707 unsigned int destroy_count = 0;
708 smx_simcall_t simcall;
710 while ((simcall = xbt_fifo_shift(action->simcalls))) {
712 /* If a waitany simcall is waiting for this action to finish, then remove
713 it from the other actions in the waitany list. Afterwards, get the
714 position of the actual action in the waitany dynar and
715 return it as the result of the simcall */
716 if (simcall->call == SIMCALL_COMM_WAITANY) {
717 SIMIX_waitany_remove_simcall_from_actions(simcall);
719 simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
722 /* If the action is still in a rendez-vous point then remove from it */
723 if (action->comm.rdv)
724 SIMIX_rdv_remove(action->comm.rdv, action);
726 XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
728 /* Check out for errors */
729 switch (action->state) {
732 XBT_DEBUG("Communication %p complete!", action);
733 SIMIX_comm_copy_data(action);
736 case SIMIX_SRC_TIMEOUT:
737 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
738 "Communication timeouted because of sender");
741 case SIMIX_DST_TIMEOUT:
742 SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
743 "Communication timeouted because of receiver");
746 case SIMIX_SRC_HOST_FAILURE:
747 if (simcall->issuer == action->comm.src_proc)
748 simcall->issuer->context->iwannadie = 1;
749 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
751 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
754 case SIMIX_DST_HOST_FAILURE:
755 if (simcall->issuer == action->comm.dst_proc)
756 simcall->issuer->context->iwannadie = 1;
757 // SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
759 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
762 case SIMIX_LINK_FAILURE:
763 XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
765 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
766 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
767 simcall->issuer->name, simcall->issuer, action->comm.detached);
768 if (action->comm.src_proc == simcall->issuer) {
769 XBT_DEBUG("I'm source");
770 } else if (action->comm.dst_proc == simcall->issuer) {
771 XBT_DEBUG("I'm dest");
773 XBT_DEBUG("I'm neither source nor dest");
775 SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
779 if (simcall->issuer == action->comm.dst_proc)
780 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
781 "Communication canceled by the sender");
783 SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
784 "Communication canceled by the receiver");
788 xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
791 /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
792 if (simcall->issuer->doexception) {
793 if (simcall->call == SIMCALL_COMM_WAITANY) {
794 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
796 else if (simcall->call == SIMCALL_COMM_TESTANY) {
797 simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
801 if (surf_workstation_model->extension.
802 workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
803 simcall->issuer->context->iwannadie = 1;
806 simcall->issuer->waiting_action = NULL;
807 xbt_fifo_remove(simcall->issuer->comms, action);
808 SIMIX_simcall_answer(simcall);
812 while (destroy_count-- > 0)
813 SIMIX_comm_destroy(action);
817 * \brief This function is called when a Surf communication action is finished.
818 * \param action the corresponding Simix communication
820 void SIMIX_post_comm(smx_action_t action)
822 /* Update action state */
823 if (action->comm.src_timeout &&
824 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
825 action->state = SIMIX_SRC_TIMEOUT;
826 else if (action->comm.dst_timeout &&
827 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
828 action->state = SIMIX_DST_TIMEOUT;
829 else if (action->comm.src_timeout &&
830 surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
831 action->state = SIMIX_SRC_HOST_FAILURE;
832 else if (action->comm.dst_timeout &&
833 surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
834 action->state = SIMIX_DST_HOST_FAILURE;
835 else if (action->comm.surf_comm &&
836 surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
837 XBT_DEBUG("Puta madre. Surf says that the link broke");
838 action->state = SIMIX_LINK_FAILURE;
840 action->state = SIMIX_DONE;
842 XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
843 action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
845 /* destroy the surf actions associated with the Simix communication */
846 SIMIX_comm_destroy_internal_actions(action);
848 /* remove the communication action from the list of pending communications
849 * of both processes (if they still exist) */
850 if (action->comm.src_proc) {
851 xbt_fifo_remove(action->comm.src_proc->comms, action);
853 if (action->comm.dst_proc) {
854 xbt_fifo_remove(action->comm.dst_proc->comms, action);
857 /* if there are simcalls associated with the action, then answer them */
858 if (xbt_fifo_size(action->simcalls)) {
859 SIMIX_comm_finish(action);
863 void SIMIX_comm_cancel(smx_action_t action)
865 /* if the action is a waiting state means that it is still in a rdv */
866 /* so remove from it and delete it */
867 if (action->state == SIMIX_WAITING) {
868 SIMIX_rdv_remove(action->comm.rdv, action);
869 action->state = SIMIX_CANCELED;
871 else if (!MC_is_active() /* when running the MC there are no surf actions */
872 && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
874 surf_workstation_model->action_cancel(action->comm.surf_comm);
878 void SIMIX_comm_suspend(smx_action_t action)
880 /*FIXME: shall we suspend also the timeout actions? */
881 if (action->comm.surf_comm)
882 surf_workstation_model->suspend(action->comm.surf_comm);
883 /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
886 void SIMIX_comm_resume(smx_action_t action)
888 /*FIXME: check what happen with the timeouts */
889 if (action->comm.surf_comm)
890 surf_workstation_model->resume(action->comm.surf_comm);
891 /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
895 /************* Action Getters **************/
898 * \brief get the amount remaining from the communication
899 * \param action The communication
901 double SIMIX_comm_get_remains(smx_action_t action)
909 switch (action->state) {
912 remains = surf_workstation_model->get_remains(action->comm.surf_comm);
917 remains = 0; /*FIXME: check what should be returned */
921 remains = 0; /*FIXME: is this correct? */
927 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
929 return action->state;
933 * \brief Return the user data associated to the sender of the communication
934 * \param action The communication
935 * \return the user data
937 void* SIMIX_comm_get_src_data(smx_action_t action)
939 return action->comm.src_data;
943 * \brief Return the user data associated to the receiver of the communication
944 * \param action The communication
945 * \return the user data
947 void* SIMIX_comm_get_dst_data(smx_action_t action)
949 return action->comm.dst_data;
952 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
954 return action->comm.src_proc;
957 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
959 return action->comm.dst_proc;
962 #ifdef HAVE_LATENCY_BOUND_TRACKING
964 * \brief verify if communication is latency bounded
965 * \param comm The communication
967 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
972 if (action->comm.surf_comm){
973 XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
974 action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
975 XBT_DEBUG("Action limited is %d", action->latency_limited);
977 return action->latency_limited;
981 /******************************************************************************/
982 /* SIMIX_comm_copy_data callbacks */
983 /******************************************************************************/
984 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
985 &SIMIX_comm_copy_pointer_callback;
988 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
990 SIMIX_comm_copy_data_callback = callback;
993 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
995 xbt_assert((buff_size == sizeof(void *)),
996 "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
997 *(void **) (comm->comm.dst_buff) = buff;
1000 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1002 XBT_DEBUG("Copy the data over");
1003 memcpy(comm->comm.dst_buff, buff, buff_size);
1004 if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1006 comm->comm.src_buff = NULL;
1012 * \brief Copy the communication data from the sender's buffer to the receiver's one
1013 * \param comm The communication
1015 void SIMIX_comm_copy_data(smx_action_t comm)
1017 size_t buff_size = comm->comm.src_buff_size;
1018 /* If there is no data to be copy then return */
1019 if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1022 XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1024 comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1025 comm->comm.src_buff,
1026 comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1027 comm->comm.dst_buff, buff_size);
1029 /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1030 if (comm->comm.dst_buff_size)
1031 buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1033 /* Update the receiver's buffer size to the copied amount */
1034 if (comm->comm.dst_buff_size)
1035 *comm->comm.dst_buff_size = buff_size;
1038 SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1040 /* Set the copied flag so we copy data only once */
1041 /* (this function might be called from both communication ends) */
1042 comm->comm.copied = 1;