Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
merge conflict resolved
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
23                                           int (*match_fun)(void *, void *), void *);
24 static void SIMIX_rdv_free(void *data);
25
26 void SIMIX_network_init(void)
27 {
28   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
29 }
30
31 void SIMIX_network_exit(void)
32 {
33   xbt_dict_free(&rdv_points);
34 }
35
36 /******************************************************************************/
37 /*                           Rendez-Vous Points                               */
38 /******************************************************************************/
39
40 smx_rdv_t SIMIX_rdv_create(const char *name)
41 {
42   /* two processes may have pushed the same rdv_create simcall at the same time */
43   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
44
45   if (!rdv) {
46     rdv = xbt_new0(s_smx_rvpoint_t, 1);
47     rdv->name = name ? xbt_strdup(name) : NULL;
48     rdv->comm_fifo = xbt_fifo_new();
49
50     if (rdv->name)
51       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
52   }
53   return rdv;
54 }
55
56 void SIMIX_rdv_destroy(smx_rdv_t rdv)
57 {
58   if (rdv->name)
59     xbt_dict_remove(rdv_points, rdv->name);
60 }
61
62 void SIMIX_rdv_free(void *data)
63 {
64   smx_rdv_t rdv = (smx_rdv_t) data;
65   xbt_free(rdv->name);
66   xbt_fifo_free(rdv->comm_fifo);
67   xbt_free(rdv);  
68 }
69
70 xbt_dict_t SIMIX_get_rdv_points()
71 {
72   return rdv_points;
73 }
74
75 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
76 {
77   return xbt_dict_get_or_null(rdv_points, name);
78 }
79
80 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
81 {
82   smx_action_t comm = NULL;
83   xbt_fifo_item_t item = NULL;
84   int count = 0;
85
86   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
87     if (comm->comm.src_proc->smx_host == host)
88       count++;
89   }
90
91   return count;
92 }
93
94 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
95 {
96   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
97 }
98
99 /**
100  *  \brief Pushes a communication action into a rendez-vous point
101  *  \param rdv The rendez-vous point
102  *  \param comm The communication action
103  */
104 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
105 {
106   xbt_fifo_push(rdv->comm_fifo, comm);
107   comm->comm.rdv = rdv;
108 }
109
110 /**
111  *  \brief Removes a communication action from a rendez-vous point
112  *  \param rdv The rendez-vous point
113  *  \param comm The communication action
114  */
115 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
116 {
117   xbt_fifo_remove(rdv->comm_fifo, comm);
118   comm->comm.rdv = NULL;
119 }
120
121 /**
122  *  \brief Wrapper to SIMIX_rdv_get_comm
123  */
124 smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
125    return SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
126 }
127
128 /**
129  *  \brief Checks if there is a communication action queued in a rendez-vous matching our needs
130  *  \param type The type of communication we are looking for (comm_send, comm_recv)
131  *  \return The communication action if found, NULL otherwise
132  */
133 smx_action_t SIMIX_rdv_get_comm(smx_rdv_t rdv, e_smx_comm_type_t type,
134                                    int (*match_fun)(void *, void *), void *data)
135 {
136   // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
137   smx_action_t action;
138   xbt_fifo_item_t item;
139   void* comm_data = NULL;
140
141   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
142     if (action->comm.type == SIMIX_COMM_SEND) {
143       comm_data = action->comm.src_data;
144     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
145       comm_data = action->comm.dst_data;
146     }
147     if (action->comm.type == type && (!match_fun || match_fun(data, comm_data))) {
148       XBT_DEBUG("Found a matching communication action %p", action);
149       xbt_fifo_remove_item(rdv->comm_fifo, item);
150       xbt_fifo_free_item(item);
151       action->comm.refcount++;
152       action->comm.rdv = NULL;
153       return action;
154     }
155     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
156            " its type is %d but we are looking for a comm of type %d",
157            action, action->comm.type, type);
158   }
159   XBT_DEBUG("No matching communication action found");
160   return NULL;
161 }
162
163 /**
164  *  \brief Checks if there is a send communication action
165  *  queued in a rendez-vous matching our needs.
166  *  \return 1 if found, 0 otherwise
167  */
168 int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
169
170   smx_action_t action;
171   xbt_fifo_item_t item;
172
173   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
174     if (action->comm.type == SIMIX_COMM_SEND
175         && (!match_fun || match_fun(data, action->comm.src_data))) {
176       XBT_DEBUG("Found a matching communication action %p", action);
177       return 1;
178     }
179   }
180   XBT_DEBUG("No matching communication action found");
181   return 0;
182 }
183
184 /**
185  *  \brief Checks if there is a recv communication action
186  *  queued in a rendez-vous matching our needs.
187  *  \return 1 if found, 0 otherwise
188  */
189 int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
190
191   smx_action_t action;
192   xbt_fifo_item_t item;
193
194   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
195     if (action->comm.type == SIMIX_COMM_RECEIVE
196         && (!match_fun || match_fun(data, action->comm.dst_data))) {
197       XBT_DEBUG("Found a matching communication action %p", action);
198       return 1;
199     }
200   }
201   XBT_DEBUG("No matching communication action found");
202   return 0;
203 }
204
205 /******************************************************************************/
206 /*                            Comunication Actions                            */
207 /******************************************************************************/
208
209 /**
210  *  \brief Creates a new comunicate action
211  *  \param type The direction of communication (comm_send, comm_recv)
212  *  \return The new comunicate action
213  */
214 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
215 {
216   smx_action_t act;
217
218   /* alloc structures */
219   act = xbt_mallocator_get(simix_global->action_mallocator);
220
221   act->type = SIMIX_ACTION_COMMUNICATE;
222   act->state = SIMIX_WAITING;
223
224   /* set communication */
225   act->comm.type = type;
226   act->comm.refcount = 1;
227
228 #ifdef HAVE_LATENCY_BOUND_TRACKING
229   //initialize with unknown value
230   act->latency_limited = -1;
231 #endif
232
233 #ifdef HAVE_TRACING
234   act->category = NULL;
235 #endif
236
237   XBT_DEBUG("Create communicate action %p", act);
238   ++smx_total_comms;
239
240   return act;
241 }
242
243 /**
244  *  \brief Destroy a communicate action
245  *  \param action The communicate action to be destroyed
246  */
247 void SIMIX_comm_destroy(smx_action_t action)
248 {
249   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
250       action, action->comm.refcount, action->state);
251
252   if (action->comm.refcount <= 0) {
253         xbt_backtrace_display_current();
254     xbt_die("the refcount of comm %p is already 0 before decreasing it. "
255             "That's a bug!", action);
256   }
257   action->comm.refcount--;
258   if (action->comm.refcount > 0)
259     return;
260   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
261         action->comm.refcount);
262
263 #ifdef HAVE_LATENCY_BOUND_TRACKING
264     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
265 #endif
266
267   xbt_free(action->name);
268   SIMIX_comm_destroy_internal_actions(action);
269
270   if (action->comm.detached && action->state != SIMIX_DONE) {
271     /* the communication has failed and was detached:
272      * we have to free the buffer */
273     if (action->comm.clean_fun) {
274       action->comm.clean_fun(action->comm.src_buff);
275     }
276     action->comm.src_buff = NULL;
277   }
278
279   xbt_mallocator_release(simix_global->action_mallocator, action);
280 }
281
282 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
283 {
284   if (action->comm.surf_comm){
285 #ifdef HAVE_LATENCY_BOUND_TRACKING
286     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
287 #endif
288     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
289     action->comm.surf_comm = NULL;
290   }
291
292   if (action->comm.src_timeout){
293     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
294     action->comm.src_timeout = NULL;
295   }
296
297   if (action->comm.dst_timeout){
298     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
299     action->comm.dst_timeout = NULL;
300   }
301 }
302
303 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
304                               double task_size, double rate,
305                               void *src_buff, size_t src_buff_size,
306                               int (*match_fun)(void *, void *),
307                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
308                               void *data,
309                               int detached)
310 {
311   smx_action_t action;
312
313   /* Look for communication action matching our needs.
314      If it is not found then create it and push it into the rendez-vous point */
315   action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
316
317   if (!action) {
318     action = SIMIX_comm_new(SIMIX_COMM_SEND);
319     SIMIX_rdv_push(rdv, action);
320   } else {
321     action->state = SIMIX_READY;
322     action->comm.type = SIMIX_COMM_READY;
323   }
324   xbt_fifo_push(src_proc->comms, action);
325
326   /* if the communication action is detached then decrease the refcount
327    * by one, so it will be eliminated by the receiver's destroy call */
328   if (detached) {
329     action->comm.detached = 1;
330     action->comm.refcount--;
331     action->comm.clean_fun = clean_fun;
332   } else {
333     action->comm.clean_fun = NULL;
334   }
335
336   /* Setup the communication action */
337   action->comm.src_proc = src_proc;
338   action->comm.task_size = task_size;
339   action->comm.rate = rate;
340   action->comm.src_buff = src_buff;
341   action->comm.src_buff_size = src_buff_size;
342   action->comm.src_data = data;
343
344   if (MC_IS_ENABLED) {
345     action->state = SIMIX_RUNNING;
346     return action;
347   }
348
349   SIMIX_comm_start(action);
350   return (detached ? NULL : action);
351 }
352
353 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
354                       void *dst_buff, size_t *dst_buff_size,
355                       int (*match_fun)(void *, void *), void *data)
356 {
357   smx_action_t action;
358
359   /* Look for communication action matching our needs.
360    * If it is not found then create it and push it into the rendez-vous point
361    */
362   action = SIMIX_rdv_get_comm(rdv, SIMIX_COMM_SEND, match_fun, data);
363
364   if (!action) {
365     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
366     SIMIX_rdv_push(rdv, action);
367   } else {
368     action->state = SIMIX_READY;
369     action->comm.type = SIMIX_COMM_READY;
370   }
371   xbt_fifo_push(dst_proc->comms, action);
372
373   /* Setup communication action */
374   action->comm.dst_proc = dst_proc;
375   action->comm.dst_buff = dst_buff;
376   action->comm.dst_buff_size = dst_buff_size;
377   action->comm.dst_data = data;
378
379   if (MC_IS_ENABLED) {
380     action->state = SIMIX_RUNNING;
381     return action;
382   }
383
384   SIMIX_comm_start(action);
385   return action;
386 }
387
388 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
389 {
390
391   /* the simcall may be a wait, a send or a recv */
392   surf_action_t sleep;
393
394   /* Associate this simcall to the wait action */
395   xbt_fifo_push(action->simcalls, simcall);
396   simcall->issuer->waiting_action = action;
397
398   if (MC_IS_ENABLED) {
399     if (idx == 0) {
400       action->state = SIMIX_DONE;
401     } else {
402       /* If we reached this point, the wait simcall must have a timeout */
403       /* Otherwise it shouldn't be enabled and executed by the MC */
404       if (timeout == -1)
405         THROW_IMPOSSIBLE;
406
407       if (action->comm.src_proc == simcall->issuer)
408         action->state = SIMIX_SRC_TIMEOUT;
409       else
410         action->state = SIMIX_DST_TIMEOUT;
411     }
412
413     SIMIX_comm_finish(action);
414     return;
415   }
416
417   /* If the action has already finish perform the error handling, */
418   /* otherwise set up a waiting timeout on the right side         */
419   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
420     SIMIX_comm_finish(action);
421   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
422     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
423     surf_workstation_model->action_data_set(sleep, action);
424
425     if (simcall->issuer == action->comm.src_proc)
426       action->comm.src_timeout = sleep;
427     else
428       action->comm.dst_timeout = sleep;
429   }
430 }
431
432 void SIMIX_pre_comm_test(smx_simcall_t simcall)
433 {
434   smx_action_t action = simcall->comm_test.comm;
435
436   if(MC_IS_ENABLED){
437     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
438     if(simcall->comm_test.result){
439       action->state = SIMIX_DONE;
440       xbt_fifo_push(action->simcalls, simcall);
441       SIMIX_comm_finish(action);
442     }else{
443       SIMIX_simcall_answer(simcall);
444     }
445     return;
446   }
447
448   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
449   if (simcall->comm_test.result) {
450     xbt_fifo_push(action->simcalls, simcall);
451     SIMIX_comm_finish(action);
452   } else {
453     SIMIX_simcall_answer(simcall);
454   }
455 }
456
457 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
458 {
459   unsigned int cursor;
460   smx_action_t action;
461   xbt_dynar_t actions = simcall->comm_testany.comms;
462   simcall->comm_testany.result = -1;
463
464   if (MC_IS_ENABLED){
465     if(idx == -1){
466       SIMIX_simcall_answer(simcall);
467     }else{
468       action = xbt_dynar_get_as(actions, idx, smx_action_t);
469       simcall->comm_testany.result = idx;
470       xbt_fifo_push(action->simcalls, simcall);
471       action->state = SIMIX_DONE;
472       SIMIX_comm_finish(action);
473     }
474     return;
475   }
476
477   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
478     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
479       simcall->comm_testany.result = cursor;
480       xbt_fifo_push(action->simcalls, simcall);
481       SIMIX_comm_finish(action);
482       return;
483     }
484   }
485   SIMIX_simcall_answer(simcall);
486 }
487
488 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
489 {
490   smx_action_t action;
491   unsigned int cursor = 0;
492   xbt_dynar_t actions = simcall->comm_waitany.comms;
493
494   if (MC_IS_ENABLED){
495     action = xbt_dynar_get_as(actions, idx, smx_action_t);
496     xbt_fifo_push(action->simcalls, simcall);
497     simcall->comm_waitany.result = idx;
498     action->state = SIMIX_DONE;
499     SIMIX_comm_finish(action);
500     return;
501   }
502
503   xbt_dynar_foreach(actions, cursor, action){
504     /* associate this simcall to the the action */
505     xbt_fifo_push(action->simcalls, simcall);
506
507     /* see if the action is already finished */
508     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
509       SIMIX_comm_finish(action);
510       break;
511     }
512   }
513 }
514
515 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
516 {
517   smx_action_t action;
518   unsigned int cursor = 0;
519   xbt_dynar_t actions = simcall->comm_waitany.comms;
520
521   xbt_dynar_foreach(actions, cursor, action) {
522     xbt_fifo_remove(action->simcalls, simcall);
523   }
524 }
525
526 /**
527  *  \brief Starts the simulation of a communication action.
528  *  \param action the communication action
529  */
530 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
531 {
532   /* If both the sender and the receiver are already there, start the communication */
533   if (action->state == SIMIX_READY) {
534
535     smx_host_t sender = action->comm.src_proc->smx_host;
536     smx_host_t receiver = action->comm.dst_proc->smx_host;
537
538     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
539            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
540
541     action->comm.surf_comm = surf_workstation_model->extension.workstation.
542         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
543
544     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
545
546     action->state = SIMIX_RUNNING;
547
548     /* If a link is failed, detect it immediately */
549     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
550       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
551           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
552       action->state = SIMIX_LINK_FAILURE;
553       SIMIX_comm_destroy_internal_actions(action);
554     }
555
556     /* If any of the process is suspend, create the action but stop its execution,
557        it will be restarted when the sender process resume */
558     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
559         SIMIX_process_is_suspended(action->comm.dst_proc)) {
560       /* FIXME: check what should happen with the action state */
561       surf_workstation_model->suspend(action->comm.surf_comm);
562     }
563   }
564 }
565
566 /**
567  * \brief Answers the SIMIX simcalls associated to a communication action.
568  * \param action a finished communication action
569  */
570 void SIMIX_comm_finish(smx_action_t action)
571 {
572   volatile unsigned int destroy_count = 0;
573   smx_simcall_t simcall;
574
575   while ((simcall = xbt_fifo_shift(action->simcalls))) {
576
577     /* If a waitany simcall is waiting for this action to finish, then remove
578        it from the other actions in the waitany list. Afterwards, get the
579        position of the actual action in the waitany dynar and
580        return it as the result of the simcall */
581     if (simcall->call == SIMCALL_COMM_WAITANY) {
582       SIMIX_waitany_remove_simcall_from_actions(simcall);
583       if (!MC_IS_ENABLED)
584         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
585     }
586
587     /* If the action is still in a rendez-vous point then remove from it */
588     if (action->comm.rdv)
589       SIMIX_rdv_remove(action->comm.rdv, action);
590
591     XBT_DEBUG("SIMIX_comm_finish: action state = %d", action->state);
592
593     /* Check out for errors */
594     switch (action->state) {
595
596       case SIMIX_DONE:
597         XBT_DEBUG("Communication %p complete!", action);
598         SIMIX_comm_copy_data(action);
599         break;
600
601       case SIMIX_SRC_TIMEOUT:
602         TRY {
603           THROWF(timeout_error, 0, "Communication timeouted because of sender");
604         }
605         CATCH(simcall->issuer->running_ctx->exception) {
606           simcall->issuer->doexception = 1;
607         }
608         break;
609
610       case SIMIX_DST_TIMEOUT:
611         TRY {
612           THROWF(timeout_error, 0, "Communication timeouted because of receiver");
613         }
614         CATCH(simcall->issuer->running_ctx->exception) {
615           simcall->issuer->doexception = 1;
616         }
617         break;
618
619       case SIMIX_SRC_HOST_FAILURE:
620         TRY {
621           if (simcall->issuer == action->comm.src_proc)
622             THROWF(host_error, 0, "Host failed");
623           else
624             THROWF(network_error, 0, "Remote peer failed");
625         }
626         CATCH(simcall->issuer->running_ctx->exception) {
627           simcall->issuer->doexception = 1;
628         }
629         break;
630
631       case SIMIX_DST_HOST_FAILURE:
632         TRY {
633           if (simcall->issuer == action->comm.dst_proc)
634             THROWF(host_error, 0, "Host failed");
635           else
636             THROWF(network_error, 0, "Remote peer failed");
637         }
638         CATCH(simcall->issuer->running_ctx->exception) {
639           simcall->issuer->doexception = 1;
640         }
641         break;
642
643       case SIMIX_LINK_FAILURE:
644         TRY {
645           XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
646               action,
647               action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
648               action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
649               simcall->issuer->name, simcall->issuer, action->comm.detached);
650           if (action->comm.src_proc == simcall->issuer) {
651             XBT_DEBUG("I'm source");
652           } else if (action->comm.dst_proc == simcall->issuer) {
653             XBT_DEBUG("I'm dest");
654           } else {
655             XBT_DEBUG("I'm neither source nor dest");
656           }
657           THROWF(network_error, 0, "Link failure");
658         }
659         CATCH(simcall->issuer->running_ctx->exception) {
660           simcall->issuer->doexception = 1;
661         }
662         break;
663
664       case SIMIX_CANCELED:
665         TRY {
666           if (simcall->issuer == action->comm.dst_proc) {
667             THROWF(cancel_error, 0, "Communication canceled by the sender");
668           }
669           else {
670             THROWF(cancel_error, 0, "Communication canceled by the receiver");
671           }
672         }
673         CATCH(simcall->issuer->running_ctx->exception) {
674           simcall->issuer->doexception = 1;
675         }
676         break;
677
678       default:
679         xbt_die("Unexpected action state in SIMIX_comm_finish: %d", action->state);
680     }
681
682     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
683     if (simcall->issuer->doexception) {
684       if (simcall->call == SIMCALL_COMM_WAITANY) {
685         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
686       }
687       else if (simcall->call == SIMCALL_COMM_TESTANY) {
688         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
689       }
690     }
691
692     simcall->issuer->waiting_action = NULL;
693     xbt_fifo_remove(simcall->issuer->comms, action);
694     SIMIX_simcall_answer(simcall);
695     destroy_count++;
696   }
697
698   while (destroy_count-- > 0)
699     SIMIX_comm_destroy(action);
700 }
701
702 /**
703  * \brief This function is called when a Surf communication action is finished.
704  * \param action the corresponding Simix communication
705  */
706 void SIMIX_post_comm(smx_action_t action)
707 {
708   /* Update action state */
709   if (action->comm.src_timeout &&
710      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
711      action->state = SIMIX_SRC_TIMEOUT;
712   else if (action->comm.dst_timeout &&
713           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
714      action->state = SIMIX_DST_TIMEOUT;
715   else if (action->comm.src_timeout &&
716           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
717      action->state = SIMIX_SRC_HOST_FAILURE;
718   else if (action->comm.dst_timeout &&
719           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
720      action->state = SIMIX_DST_HOST_FAILURE;
721   else if (action->comm.surf_comm &&
722           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
723           XBT_DEBUG("Puta madre. Surf says that the link broke");
724      action->state = SIMIX_LINK_FAILURE;
725   } else
726     action->state = SIMIX_DONE;
727
728   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
729       action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
730
731   /* destroy the surf actions associated with the Simix communication */
732   SIMIX_comm_destroy_internal_actions(action);
733
734   /* remove the communication action from the list of pending communications
735    * of both processes (if they still exist) */
736   if (action->comm.src_proc) {
737     xbt_fifo_remove(action->comm.src_proc->comms, action);
738   }
739   if (action->comm.dst_proc) {
740     xbt_fifo_remove(action->comm.dst_proc->comms, action);
741   }
742
743   /* if there are simcalls associated with the action, then answer them */
744   if (xbt_fifo_size(action->simcalls)) {
745     SIMIX_comm_finish(action);
746   }
747 }
748
749 void SIMIX_comm_cancel(smx_action_t action)
750 {
751   /* if the action is a waiting state means that it is still in a rdv */
752   /* so remove from it and delete it */
753   if (action->state == SIMIX_WAITING) {
754     SIMIX_rdv_remove(action->comm.rdv, action);
755     action->state = SIMIX_CANCELED;
756   }
757   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
758       && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
759
760     surf_workstation_model->action_cancel(action->comm.surf_comm);
761   }
762 }
763
764 void SIMIX_comm_suspend(smx_action_t action)
765 {
766   /*FIXME: shall we suspend also the timeout actions? */
767   surf_workstation_model->suspend(action->comm.surf_comm);
768 }
769
770 void SIMIX_comm_resume(smx_action_t action)
771 {
772   /*FIXME: check what happen with the timeouts */
773   surf_workstation_model->resume(action->comm.surf_comm);
774 }
775
776
777 /************* Action Getters **************/
778
779 /**
780  *  \brief get the amount remaining from the communication
781  *  \param action The communication
782  */
783 double SIMIX_comm_get_remains(smx_action_t action)
784 {
785   double remains;
786
787   if(!action){
788       return 0;
789   }
790
791   switch (action->state) {
792
793     case SIMIX_RUNNING:
794       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
795       break;
796
797     case SIMIX_WAITING:
798     case SIMIX_READY:
799       remains = 0; /*FIXME: check what should be returned */
800       break;
801
802     default:
803       remains = 0; /*FIXME: is this correct? */
804       break;
805   }
806   return remains;
807 }
808
809 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
810 {
811   return action->state;
812 }
813
814 /**
815  *  \brief Return the user data associated to the sender of the communication
816  *  \param action The communication
817  *  \return the user data
818  */
819 void* SIMIX_comm_get_src_data(smx_action_t action)
820 {
821   return action->comm.src_data;
822 }
823
824 /**
825  *  \brief Return the user data associated to the receiver of the communication
826  *  \param action The communication
827  *  \return the user data
828  */
829 void* SIMIX_comm_get_dst_data(smx_action_t action)
830 {
831   return action->comm.dst_data;
832 }
833
834 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
835 {
836   return action->comm.src_proc;
837 }
838
839 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
840 {
841   return action->comm.dst_proc;
842 }
843
844 #ifdef HAVE_LATENCY_BOUND_TRACKING
845 /**
846  *  \brief verify if communication is latency bounded
847  *  \param comm The communication
848  */
849 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
850 {
851   if(!action){
852       return 0;
853   }
854   if (action->comm.surf_comm){
855       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
856       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
857       XBT_DEBUG("Action limited is %d", action->latency_limited);
858   }
859   return action->latency_limited;
860 }
861 #endif
862
863 /******************************************************************************/
864 /*                    SIMIX_comm_copy_data callbacks                       */
865 /******************************************************************************/
866 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
867     &SIMIX_comm_copy_pointer_callback;
868
869 void
870 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
871 {
872   SIMIX_comm_copy_data_callback = callback;
873 }
874
875 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
876 {
877   xbt_assert((buff_size == sizeof(void *)),
878               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
879   *(void **) (comm->comm.dst_buff) = buff;
880 }
881
882 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
883 {
884   XBT_DEBUG("Copy the data over");
885   memcpy(comm->comm.dst_buff, buff, buff_size);
886 }
887
888 void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size)
889 {
890   XBT_DEBUG("Copy the data over");
891   memcpy(comm->comm.dst_buff, buff, buff_size);
892   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
893     xbt_free(buff);
894     comm->comm.src_buff = NULL;
895   }
896 }
897
898 /**
899  *  \brief Copy the communication data from the sender's buffer to the receiver's one
900  *  \param comm The communication
901  */
902 void SIMIX_comm_copy_data(smx_action_t comm)
903 {
904   size_t buff_size = comm->comm.src_buff_size;
905   /* If there is no data to be copy then return */
906   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
907     return;
908
909   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
910          comm,
911          comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
912          comm->comm.src_buff,
913          comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
914          comm->comm.dst_buff, buff_size);
915
916   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
917   if (comm->comm.dst_buff_size)
918     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
919
920   /* Update the receiver's buffer size to the copied amount */
921   if (comm->comm.dst_buff_size)
922     *comm->comm.dst_buff_size = buff_size;
923
924   if (buff_size > 0)
925     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
926
927   /* Set the copied flag so we copy data only once */
928   /* (this function might be called from both communication ends) */
929   comm->comm.copied = 1;
930 }