Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
74cefadfa53e0898e45460967b6c7f461b90f7fc
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
23                                           int (*match_fun)(void *, void *), void *);
24 static void SIMIX_rdv_free(void *data);
25
26 void SIMIX_network_init(void)
27 {
28   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
29 }
30
31 void SIMIX_network_exit(void)
32 {
33   xbt_dict_free(&rdv_points);
34 }
35
36 /******************************************************************************/
37 /*                           Rendez-Vous Points                               */
38 /******************************************************************************/
39
40 smx_rdv_t SIMIX_rdv_create(const char *name)
41 {
42   /* two processes may have pushed the same rdv_create simcall at the same time */
43   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
44
45   if (!rdv) {
46     rdv = xbt_new0(s_smx_rvpoint_t, 1);
47     rdv->name = name ? xbt_strdup(name) : NULL;
48     rdv->comm_fifo = xbt_fifo_new();
49
50     if (rdv->name)
51       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
52   }
53   return rdv;
54 }
55
56 void SIMIX_rdv_destroy(smx_rdv_t rdv)
57 {
58   if (rdv->name)
59     xbt_dict_remove(rdv_points, rdv->name);
60 }
61
62 void SIMIX_rdv_free(void *data)
63 {
64   smx_rdv_t rdv = (smx_rdv_t) data;
65   xbt_free(rdv->name);
66   xbt_fifo_free(rdv->comm_fifo);
67   xbt_free(rdv);  
68 }
69
70 xbt_dict_t SIMIX_get_rdv_points()
71 {
72   return rdv_points;
73 }
74
75 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
76 {
77   return xbt_dict_get_or_null(rdv_points, name);
78 }
79
80 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
81 {
82   smx_action_t comm = NULL;
83   xbt_fifo_item_t item = NULL;
84   int count = 0;
85
86   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
87     if (comm->comm.src_proc->smx_host == host)
88       count++;
89   }
90
91   return count;
92 }
93
94 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
95 {
96   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
97 }
98
99 /**
100  *  \brief Pushes a communication action into a rendez-vous point
101  *  \param rdv The rendez-vous point
102  *  \param comm The communication action
103  */
104 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
105 {
106   xbt_fifo_push(rdv->comm_fifo, comm);
107   comm->comm.rdv = rdv;
108 }
109
110 /**
111  *  \brief Removes a communication action from a rendez-vous point
112  *  \param rdv The rendez-vous point
113  *  \param comm The communication action
114  */
115 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
116 {
117   xbt_fifo_remove(rdv->comm_fifo, comm);
118   comm->comm.rdv = NULL;
119 }
120
121 /**
122  *  \brief Wrapper to SIMIX_rdv_get_comm
123  */
124 smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
125    return SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
126 }
127
128 /**
129  *  \brief Checks if there is a communication action queued in a rendez-vous matching our needs
130  *  \param type The type of communication we are looking for (comm_send, comm_recv)
131  *  \return The communication action if found, NULL otherwise
132  */
133 smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
134                                    int (*match_fun)(void *, void *), void *data)
135 {
136   // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
137   smx_action_t action;
138   xbt_fifo_item_t item;
139   void* comm_data = NULL;
140
141   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
142     if (action->comm.type == SIMIX_COMM_SEND) {
143       comm_data = action->comm.src_data;
144     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
145       comm_data = action->comm.dst_data;
146     }
147     if (action->comm.type == type && (!match_fun || match_fun(data, comm_data))) {
148       XBT_DEBUG("Found a matching communication action %p", action);
149       xbt_fifo_remove_item(rdv->comm_fifo, item);
150       xbt_fifo_free_item(item);
151       action->comm.refcount++;
152       action->comm.rdv = NULL;
153       return action;
154     }
155     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
156            " its type is %d but we are looking for a comm of type %d",
157            action, action->comm.type, type);
158   }
159   XBT_DEBUG("No matching communication action found");
160   return NULL;
161 }
162
163 /**
164  *  \brief Checks if there is a send communication action
165  *  queued in a rendez-vous matching our needs.
166  *  \return 1 if found, 0 otherwise
167  */
168 int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
169
170   smx_action_t action;
171   xbt_fifo_item_t item;
172
173   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
174     if (action->comm.type == SIMIX_COMM_SEND
175         && (!match_fun || match_fun(data, action->comm.src_data))) {
176       XBT_DEBUG("Found a matching communication action %p", action);
177       return 1;
178     }
179   }
180   XBT_DEBUG("No matching communication action found");
181   return 0;
182 }
183
184 /**
185  *  \brief Checks if there is a recv communication action
186  *  queued in a rendez-vous matching our needs.
187  *  \return 1 if found, 0 otherwise
188  */
189 int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
190
191   smx_action_t action;
192   xbt_fifo_item_t item;
193
194   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
195     if (action->comm.type == SIMIX_COMM_RECEIVE
196         && (!match_fun || match_fun(data, action->comm.dst_data))) {
197       XBT_DEBUG("Found a matching communication action %p", action);
198       return 1;
199     }
200   }
201   XBT_DEBUG("No matching communication action found");
202   return 0;
203 }
204
205 /******************************************************************************/
206 /*                            Comunication Actions                            */
207 /******************************************************************************/
208
209 /**
210  *  \brief Creates a new comunicate action
211  *  \param type The direction of communication (comm_send, comm_recv)
212  *  \return The new comunicate action
213  */
214 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
215 {
216   smx_action_t act;
217
218   /* alloc structures */
219   act = xbt_mallocator_get(simix_global->action_mallocator);
220
221   act->type = SIMIX_ACTION_COMMUNICATE;
222   act->state = SIMIX_WAITING;
223
224   /* set communication */
225   act->comm.type = type;
226   act->comm.refcount = 1;
227
228 #ifdef HAVE_LATENCY_BOUND_TRACKING
229   //initialize with unknown value
230   act->latency_limited = -1;
231 #endif
232
233 #ifdef HAVE_TRACING
234   act->category = NULL;
235 #endif
236
237   XBT_DEBUG("Create communicate action %p", act);
238   ++smx_total_comms;
239
240   return act;
241 }
242
243 /**
244  *  \brief Destroy a communicate action
245  *  \param action The communicate action to be destroyed
246  */
247 void SIMIX_comm_destroy(smx_action_t action)
248 {
249   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
250       action, action->comm.refcount, action->state);
251
252   if (action->comm.refcount <= 0) {
253         xbt_backtrace_display_current();
254     xbt_die("the refcount of comm %p is already 0 before decreasing it. "
255             "That's a bug!", action);
256   }
257   action->comm.refcount--;
258   if (action->comm.refcount > 0)
259     return;
260   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
261         action->comm.refcount);
262
263 #ifdef HAVE_LATENCY_BOUND_TRACKING
264     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
265 #endif
266
267   xbt_free(action->name);
268   SIMIX_comm_destroy_internal_actions(action);
269
270   if (action->comm.detached && action->state != SIMIX_DONE) {
271     /* the communication has failed and was detached:
272      * we have to free the buffer */
273     if (action->comm.clean_fun) {
274       action->comm.clean_fun(action->comm.src_buff);
275     }
276     action->comm.src_buff = NULL;
277   }
278
279   xbt_mallocator_release(simix_global->action_mallocator, action);
280 }
281
282 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
283 {
284   if (action->comm.surf_comm){
285 #ifdef HAVE_LATENCY_BOUND_TRACKING
286     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
287 #endif
288     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
289     action->comm.surf_comm = NULL;
290   }
291
292   if (action->comm.src_timeout){
293     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
294     action->comm.src_timeout = NULL;
295   }
296
297   if (action->comm.dst_timeout){
298     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
299     action->comm.dst_timeout = NULL;
300   }
301 }
302
303 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
304                               double task_size, double rate,
305                               void *src_buff, size_t src_buff_size,
306                               int (*match_fun)(void *, void *),
307                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
308                               void *data,
309                               int detached)
310 {
311   smx_action_t action;
312
313   /* Look for communication action matching our needs.
314      If it is not found then create it and push it into the rendez-vous point */
315   action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
316
317   if (!action) {
318     action = SIMIX_comm_new(SIMIX_COMM_SEND);
319     SIMIX_rdv_push(rdv, action);
320   } else {
321     action->state = SIMIX_READY;
322     action->comm.type = SIMIX_COMM_READY;
323   }
324   xbt_fifo_push(src_proc->comms, action);
325
326   /* if the communication action is detached then decrease the refcount
327    * by one, so it will be eliminated by the receiver's destroy call */
328   if (detached) {
329     action->comm.detached = 1;
330     action->comm.refcount--;
331     action->comm.clean_fun = clean_fun;
332   } else {
333     action->comm.clean_fun = NULL;
334   }
335
336   /* Setup the communication action */
337   action->comm.src_proc = src_proc;
338   action->comm.task_size = task_size;
339   action->comm.rate = rate;
340   action->comm.src_buff = src_buff;
341   action->comm.src_buff_size = src_buff_size;
342   action->comm.src_data = data;
343
344   if (MC_IS_ENABLED) {
345     action->state = SIMIX_RUNNING;
346     return action;
347   }
348
349   SIMIX_comm_start(action);
350   return (detached ? NULL : action);
351 }
352
353 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
354                       void *dst_buff, size_t *dst_buff_size,
355                       int (*match_fun)(void *, void *), void *data)
356 {
357   smx_action_t action;
358
359   /* Look for communication action matching our needs.
360    * If it is not found then create it and push it into the rendez-vous point
361    */
362   action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
363
364   if (!action) {
365     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
366     SIMIX_rdv_push(rdv, action);
367   } else {
368     action->state = SIMIX_READY;
369     action->comm.type = SIMIX_COMM_READY;
370   }
371   xbt_fifo_push(dst_proc->comms, action);
372
373   /* Setup communication action */
374   action->comm.dst_proc = dst_proc;
375   action->comm.dst_buff = dst_buff;
376   action->comm.dst_buff_size = dst_buff_size;
377   action->comm.dst_data = data;
378
379   if (MC_IS_ENABLED) {
380     action->state = SIMIX_RUNNING;
381     return action;
382   }
383
384   SIMIX_comm_start(action);
385   return action;
386 }
387
388 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
389 {
390
391   /* the simcall may be a wait, a send or a recv */
392   surf_action_t sleep;
393
394   /* Associate this simcall to the wait action */
395   xbt_fifo_push(action->simcalls, simcall);
396   simcall->issuer->waiting_action = action;
397
398   if (MC_IS_ENABLED) {
399     if (idx == 0) {
400       action->state = SIMIX_DONE;
401     } else {
402       /* If we reached this point, the wait simcall must have a timeout */
403       /* Otherwise it shouldn't be enabled and executed by the MC */
404       if (timeout == -1)
405         THROW_IMPOSSIBLE;
406
407       if (action->comm.src_proc == simcall->issuer)
408         action->state = SIMIX_SRC_TIMEOUT;
409       else
410         action->state = SIMIX_DST_TIMEOUT;
411     }
412
413     SIMIX_comm_finish(action);
414     return;
415   }
416
417   /* If the action has already finish perform the error handling, */
418   /* otherwise set up a waiting timeout on the right side         */
419   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
420     SIMIX_comm_finish(action);
421   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
422     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
423     surf_workstation_model->action_data_set(sleep, action);
424
425     if (simcall->issuer == action->comm.src_proc)
426       action->comm.src_timeout = sleep;
427     else
428       action->comm.dst_timeout = sleep;
429   }
430 }
431
432 void SIMIX_pre_comm_test(smx_simcall_t simcall)
433 {
434   smx_action_t action = simcall->comm_test.comm;
435
436   if(MC_IS_ENABLED){
437     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
438     if(simcall->comm_test.result){
439       action->state = SIMIX_DONE;
440       xbt_fifo_push(action->simcalls, simcall);
441       SIMIX_comm_finish(action);
442     }else{
443       SIMIX_simcall_answer(simcall);
444     }
445     return;
446   }
447
448   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
449   if (simcall->comm_test.result) {
450     xbt_fifo_push(action->simcalls, simcall);
451     SIMIX_comm_finish(action);
452   } else {
453     SIMIX_simcall_answer(simcall);
454   }
455 }
456
457 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
458 {
459   unsigned int cursor;
460   smx_action_t action;
461   xbt_dynar_t actions = simcall->comm_testany.comms;
462   simcall->comm_testany.result = -1;
463
464   if (MC_IS_ENABLED){
465     if(idx == -1){
466       SIMIX_simcall_answer(simcall);
467     }else{
468       action = xbt_dynar_get_as(actions, idx, smx_action_t);
469       simcall->comm_testany.result = idx;
470       xbt_fifo_push(action->simcalls, simcall);
471       action->state = SIMIX_DONE;
472       SIMIX_comm_finish(action);
473     }
474     return;
475   }
476
477   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
478     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
479       simcall->comm_testany.result = cursor;
480       xbt_fifo_push(action->simcalls, simcall);
481       SIMIX_comm_finish(action);
482       return;
483     }
484   }
485   SIMIX_simcall_answer(simcall);
486 }
487
488 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
489 {
490   smx_action_t action;
491   unsigned int cursor = 0;
492   xbt_dynar_t actions = simcall->comm_waitany.comms;
493
494   if (MC_IS_ENABLED){
495     action = xbt_dynar_get_as(actions, idx, smx_action_t);
496     xbt_fifo_push(action->simcalls, simcall);
497     simcall->comm_waitany.result = idx;
498     action->state = SIMIX_DONE;
499     SIMIX_comm_finish(action);
500     return;
501   }
502
503   xbt_dynar_foreach(actions, cursor, action){
504     /* associate this simcall to the the action */
505     xbt_fifo_push(action->simcalls, simcall);
506
507     /* see if the action is already finished */
508     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
509       SIMIX_comm_finish(action);
510       break;
511     }
512   }
513 }
514
515 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
516 {
517   smx_action_t action;
518   unsigned int cursor = 0;
519   xbt_dynar_t actions = simcall->comm_waitany.comms;
520
521   xbt_dynar_foreach(actions, cursor, action) {
522     xbt_fifo_remove(action->simcalls, simcall);
523   }
524 }
525
526 /**
527  *  \brief Starts the simulation of a communication action.
528  *  \param action the communication action
529  */
530 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
531 {
532   /* If both the sender and the receiver are already there, start the communication */
533   if (action->state == SIMIX_READY) {
534
535     smx_host_t sender = action->comm.src_proc->smx_host;
536     smx_host_t receiver = action->comm.dst_proc->smx_host;
537
538     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
539            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
540
541     action->comm.surf_comm = surf_workstation_model->extension.workstation.
542         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
543
544     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
545
546     action->state = SIMIX_RUNNING;
547
548     /* If a link is failed, detect it immediately */
549     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
550       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
551           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
552       action->state = SIMIX_LINK_FAILURE;
553       SIMIX_comm_destroy_internal_actions(action);
554     }
555
556     /* If any of the process is suspend, create the action but stop its execution,
557        it will be restarted when the sender process resume */
558     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
559         SIMIX_process_is_suspended(action->comm.dst_proc)) {
560       /* FIXME: check what should happen with the action state */
561       surf_workstation_model->suspend(action->comm.surf_comm);
562     }
563   }
564 }
565
566 /**
567  * \brief Answers the SIMIX simcalls associated to a communication action.
568  * \param action a finished communication action
569  */
570 void SIMIX_comm_finish(smx_action_t action)
571 {
572   unsigned int destroy_count = 0;
573   smx_simcall_t simcall;
574
575   while ((simcall = xbt_fifo_shift(action->simcalls))) {
576
577     /* If a waitany simcall is waiting for this action to finish, then remove
578        it from the other actions in the waitany list. Afterwards, get the
579        position of the actual action in the waitany dynar and
580        return it as the result of the simcall */
581     if (simcall->call == SIMCALL_COMM_WAITANY) {
582       SIMIX_waitany_remove_simcall_from_actions(simcall);
583       if (!MC_IS_ENABLED)
584         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
585     }
586
587     /* If the action is still in a rendez-vous point then remove from it */
588     if (action->comm.rdv)
589       SIMIX_rdv_remove(action->comm.rdv, action);
590
591     XBT_DEBUG("SIMIX_comm_finish: action state = %d", action->state);
592
593     /* Check out for errors */
594     switch (action->state) {
595
596       case SIMIX_DONE:
597         XBT_DEBUG("Communication %p complete!", action);
598         SIMIX_comm_copy_data(action);
599         break;
600
601       case SIMIX_SRC_TIMEOUT:
602         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
603                   "Communication timeouted because of sender");
604         break;
605
606       case SIMIX_DST_TIMEOUT:
607         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
608                   "Communication timeouted because of receiver");
609         break;
610
611       case SIMIX_SRC_HOST_FAILURE:
612         if (simcall->issuer == action->comm.src_proc)
613           SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
614         else
615           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
616         break;
617
618       case SIMIX_DST_HOST_FAILURE:
619         if (simcall->issuer == action->comm.dst_proc)
620           SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
621         else
622           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
623         break;
624
625       case SIMIX_LINK_FAILURE:
626         XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
627             action,
628             action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
629             action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
630             simcall->issuer->name, simcall->issuer, action->comm.detached);
631         if (action->comm.src_proc == simcall->issuer) {
632           XBT_DEBUG("I'm source");
633         } else if (action->comm.dst_proc == simcall->issuer) {
634           XBT_DEBUG("I'm dest");
635         } else {
636           XBT_DEBUG("I'm neither source nor dest");
637         }
638         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
639         break;
640
641       case SIMIX_CANCELED:
642         if (simcall->issuer == action->comm.dst_proc)
643           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
644                     "Communication canceled by the sender");
645         else
646           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
647                     "Communication canceled by the receiver");
648         break;
649
650       default:
651         xbt_die("Unexpected action state in SIMIX_comm_finish: %d", action->state);
652     }
653
654     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
655     if (simcall->issuer->doexception) {
656       if (simcall->call == SIMCALL_COMM_WAITANY) {
657         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
658       }
659       else if (simcall->call == SIMCALL_COMM_TESTANY) {
660         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
661       }
662     }
663
664     simcall->issuer->waiting_action = NULL;
665     xbt_fifo_remove(simcall->issuer->comms, action);
666     SIMIX_simcall_answer(simcall);
667     destroy_count++;
668   }
669
670   while (destroy_count-- > 0)
671     SIMIX_comm_destroy(action);
672 }
673
674 /**
675  * \brief This function is called when a Surf communication action is finished.
676  * \param action the corresponding Simix communication
677  */
678 void SIMIX_post_comm(smx_action_t action)
679 {
680   /* Update action state */
681   if (action->comm.src_timeout &&
682      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
683      action->state = SIMIX_SRC_TIMEOUT;
684   else if (action->comm.dst_timeout &&
685           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
686      action->state = SIMIX_DST_TIMEOUT;
687   else if (action->comm.src_timeout &&
688           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
689      action->state = SIMIX_SRC_HOST_FAILURE;
690   else if (action->comm.dst_timeout &&
691           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
692      action->state = SIMIX_DST_HOST_FAILURE;
693   else if (action->comm.surf_comm &&
694           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
695           XBT_DEBUG("Puta madre. Surf says that the link broke");
696      action->state = SIMIX_LINK_FAILURE;
697   } else
698     action->state = SIMIX_DONE;
699
700   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
701       action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
702
703   /* destroy the surf actions associated with the Simix communication */
704   SIMIX_comm_destroy_internal_actions(action);
705
706   /* remove the communication action from the list of pending communications
707    * of both processes (if they still exist) */
708   if (action->comm.src_proc) {
709     xbt_fifo_remove(action->comm.src_proc->comms, action);
710   }
711   if (action->comm.dst_proc) {
712     xbt_fifo_remove(action->comm.dst_proc->comms, action);
713   }
714
715   /* if there are simcalls associated with the action, then answer them */
716   if (xbt_fifo_size(action->simcalls)) {
717     SIMIX_comm_finish(action);
718   }
719 }
720
721 void SIMIX_comm_cancel(smx_action_t action)
722 {
723   /* if the action is a waiting state means that it is still in a rdv */
724   /* so remove from it and delete it */
725   if (action->state == SIMIX_WAITING) {
726     SIMIX_rdv_remove(action->comm.rdv, action);
727     action->state = SIMIX_CANCELED;
728   }
729   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
730       && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
731
732     surf_workstation_model->action_cancel(action->comm.surf_comm);
733   }
734 }
735
736 void SIMIX_comm_suspend(smx_action_t action)
737 {
738   /*FIXME: shall we suspend also the timeout actions? */
739   surf_workstation_model->suspend(action->comm.surf_comm);
740 }
741
742 void SIMIX_comm_resume(smx_action_t action)
743 {
744   /*FIXME: check what happen with the timeouts */
745   surf_workstation_model->resume(action->comm.surf_comm);
746 }
747
748
749 /************* Action Getters **************/
750
751 /**
752  *  \brief get the amount remaining from the communication
753  *  \param action The communication
754  */
755 double SIMIX_comm_get_remains(smx_action_t action)
756 {
757   double remains;
758
759   if(!action){
760       return 0;
761   }
762
763   switch (action->state) {
764
765     case SIMIX_RUNNING:
766       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
767       break;
768
769     case SIMIX_WAITING:
770     case SIMIX_READY:
771       remains = 0; /*FIXME: check what should be returned */
772       break;
773
774     default:
775       remains = 0; /*FIXME: is this correct? */
776       break;
777   }
778   return remains;
779 }
780
781 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
782 {
783   return action->state;
784 }
785
786 /**
787  *  \brief Return the user data associated to the sender of the communication
788  *  \param action The communication
789  *  \return the user data
790  */
791 void* SIMIX_comm_get_src_data(smx_action_t action)
792 {
793   return action->comm.src_data;
794 }
795
796 /**
797  *  \brief Return the user data associated to the receiver of the communication
798  *  \param action The communication
799  *  \return the user data
800  */
801 void* SIMIX_comm_get_dst_data(smx_action_t action)
802 {
803   return action->comm.dst_data;
804 }
805
806 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
807 {
808   return action->comm.src_proc;
809 }
810
811 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
812 {
813   return action->comm.dst_proc;
814 }
815
816 #ifdef HAVE_LATENCY_BOUND_TRACKING
817 /**
818  *  \brief verify if communication is latency bounded
819  *  \param comm The communication
820  */
821 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
822 {
823   if(!action){
824       return 0;
825   }
826   if (action->comm.surf_comm){
827       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
828       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
829       XBT_DEBUG("Action limited is %d", action->latency_limited);
830   }
831   return action->latency_limited;
832 }
833 #endif
834
835 /******************************************************************************/
836 /*                    SIMIX_comm_copy_data callbacks                       */
837 /******************************************************************************/
838 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
839     &SIMIX_comm_copy_pointer_callback;
840
841 void
842 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
843 {
844   SIMIX_comm_copy_data_callback = callback;
845 }
846
847 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
848 {
849   xbt_assert((buff_size == sizeof(void *)),
850               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
851   *(void **) (comm->comm.dst_buff) = buff;
852 }
853
854 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
855 {
856   XBT_DEBUG("Copy the data over");
857   memcpy(comm->comm.dst_buff, buff, buff_size);
858 }
859
860 void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size)
861 {
862   XBT_DEBUG("Copy the data over");
863   memcpy(comm->comm.dst_buff, buff, buff_size);
864   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
865     xbt_free(buff);
866     comm->comm.src_buff = NULL;
867   }
868 }
869
870 /**
871  *  \brief Copy the communication data from the sender's buffer to the receiver's one
872  *  \param comm The communication
873  */
874 void SIMIX_comm_copy_data(smx_action_t comm)
875 {
876   size_t buff_size = comm->comm.src_buff_size;
877   /* If there is no data to be copy then return */
878   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
879     return;
880
881   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
882          comm,
883          comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
884          comm->comm.src_buff,
885          comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
886          comm->comm.dst_buff, buff_size);
887
888   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
889   if (comm->comm.dst_buff_size)
890     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
891
892   /* Update the receiver's buffer size to the copied amount */
893   if (comm->comm.dst_buff_size)
894     *comm->comm.dst_buff_size = buff_size;
895
896   if (buff_size > 0)
897     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
898
899   /* Set the copied flag so we copy data only once */
900   /* (this function might be called from both communication ends) */
901   comm->comm.copied = 1;
902 }