Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'hypervisor' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid...
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore_global_variable("smx_total_comms");
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
47   return SIMIX_rdv_create(name);
48 }
49 smx_rdv_t SIMIX_rdv_create(const char *name)
50 {
51   /* two processes may have pushed the same rdv_create simcall at the same time */
52   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
53
54   if (!rdv) {
55     rdv = xbt_new0(s_smx_rvpoint_t, 1);
56     rdv->name = name ? xbt_strdup(name) : NULL;
57     rdv->comm_fifo = xbt_fifo_new();
58     rdv->done_comm_fifo = xbt_fifo_new();
59     rdv->permanent_receiver=NULL;
60
61     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
62
63     if (rdv->name)
64       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
65   }
66   return rdv;
67 }
68
69 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
70   return SIMIX_rdv_destroy(rdv);
71 }
72 void SIMIX_rdv_destroy(smx_rdv_t rdv)
73 {
74   if (rdv->name)
75     xbt_dict_remove(rdv_points, rdv->name);
76 }
77
78 void SIMIX_rdv_free(void *data)
79 {
80   XBT_DEBUG("rdv free %p", data);
81   smx_rdv_t rdv = (smx_rdv_t) data;
82   xbt_free(rdv->name);
83   xbt_fifo_free(rdv->comm_fifo);
84   xbt_fifo_free(rdv->done_comm_fifo);
85
86   xbt_free(rdv);  
87 }
88
89 xbt_dict_t SIMIX_get_rdv_points()
90 {
91   return rdv_points;
92 }
93
94 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
95   return SIMIX_rdv_get_by_name(name);
96 }
97 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 {
99   return xbt_dict_get_or_null(rdv_points, name);
100 }
101
102 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
103   return SIMIX_rdv_comm_count_by_host(rdv, host);
104 }
105 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 {
107   smx_action_t comm = NULL;
108   xbt_fifo_item_t item = NULL;
109   int count = 0;
110
111   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
112     if (comm->comm.src_proc->smx_host == host)
113       count++;
114   }
115
116   return count;
117 }
118
119 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
120   return SIMIX_rdv_get_head(rdv);
121 }
122 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 {
124   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 }
126
127 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
128   return SIMIX_rdv_get_receiver(rdv);
129 }
130 /**
131  *  \brief get the receiver (process associated to the mailbox)
132  *  \param rdv The rendez-vous point
133  *  \return process The receiving process (NULL if not set)
134  */
135 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 {
137   return rdv->permanent_receiver;
138 }
139
140 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
141                             smx_process_t process){
142   SIMIX_rdv_set_receiver(rdv, process);
143 }
144 /**
145  *  \brief set the receiver of the rendez vous point to allow eager sends
146  *  \param rdv The rendez-vous point
147  *  \param process The receiving process
148  */
149 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 {
151   rdv->permanent_receiver=process;
152 }
153
154 /**
155  *  \brief Pushes a communication action into a rendez-vous point
156  *  \param rdv The rendez-vous point
157  *  \param comm The communication action
158  */
159 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 {
161   xbt_fifo_push(rdv->comm_fifo, comm);
162   comm->comm.rdv = rdv;
163 }
164
165 /**
166  *  \brief Removes a communication action from a rendez-vous point
167  *  \param rdv The rendez-vous point
168  *  \param comm The communication action
169  */
170 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 {
172   xbt_fifo_remove(rdv->comm_fifo, comm);
173   comm->comm.rdv = NULL;
174 }
175
176 /**
177  *  \brief Checks if there is a communication action queued in a fifo matching our needs
178  *  \param type The type of communication we are looking for (comm_send, comm_recv)
179  *  \return The communication action if found, NULL otherwise
180  */
181 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
182                                  int (*match_fun)(void *, void *,smx_action_t),
183                                  void *this_user_data, smx_action_t my_action)
184 {
185   smx_action_t action;
186   xbt_fifo_item_t item;
187   void* other_user_data = NULL;
188
189   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
190     if (action->comm.type == SIMIX_COMM_SEND) {
191       other_user_data = action->comm.src_data;
192     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
193       other_user_data = action->comm.dst_data;
194     }
195     if (action->comm.type == type &&
196         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
197         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
198       XBT_DEBUG("Found a matching communication action %p", action);
199       xbt_fifo_remove_item(fifo, item);
200       xbt_fifo_free_item(item);
201       action->comm.refcount++;
202 #ifdef HAVE_MC
203       action->comm.rdv_cpy = action->comm.rdv;
204 #endif
205       action->comm.rdv = NULL;
206       return action;
207     }
208     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
209               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
210               action, (int)action->comm.type, (int)type);
211   }
212   XBT_DEBUG("No matching communication action found");
213   return NULL;
214 }
215
216
217 /**
218  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
219  *  \param type The type of communication we are looking for (comm_send, comm_recv)
220  *  \return The communication action if found, NULL otherwise
221  */
222 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
223                                  int (*match_fun)(void *, void *,smx_action_t),
224                                  void *this_user_data, smx_action_t my_action)
225 {
226   smx_action_t action;
227   xbt_fifo_item_t item;
228   void* other_user_data = NULL;
229
230   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
231     if (action->comm.type == SIMIX_COMM_SEND) {
232       other_user_data = action->comm.src_data;
233     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
234       other_user_data = action->comm.dst_data;
235     }
236     if (action->comm.type == type &&
237         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
238         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
239       XBT_DEBUG("Found a matching communication action %p", action);
240       action->comm.refcount++;
241
242       return action;
243     }
244     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
245               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
246               action, (int)action->comm.type, (int)type);
247   }
248   XBT_DEBUG("No matching communication action found");
249   return NULL;
250 }
251 /******************************************************************************/
252 /*                            Communication Actions                            */
253 /******************************************************************************/
254
255 /**
256  *  \brief Creates a new communicate action
257  *  \param type The direction of communication (comm_send, comm_recv)
258  *  \return The new communicate action
259  */
260 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
261 {
262   smx_action_t act;
263
264   /* alloc structures */
265   act = xbt_mallocator_get(simix_global->action_mallocator);
266
267   act->type = SIMIX_ACTION_COMMUNICATE;
268   act->state = SIMIX_WAITING;
269
270   /* set communication */
271   act->comm.type = type;
272   act->comm.refcount = 1;
273   act->comm.src_data=NULL;
274   act->comm.dst_data=NULL;
275
276
277 #ifdef HAVE_LATENCY_BOUND_TRACKING
278   //initialize with unknown value
279   act->latency_limited = -1;
280 #endif
281
282 #ifdef HAVE_TRACING
283   act->category = NULL;
284 #endif
285
286   XBT_DEBUG("Create communicate action %p", act);
287   ++smx_total_comms;
288
289   return act;
290 }
291
292 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
293   SIMIX_comm_destroy(action);
294 }
295 /**
296  *  \brief Destroy a communicate action
297  *  \param action The communicate action to be destroyed
298  */
299 void SIMIX_comm_destroy(smx_action_t action)
300 {
301   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
302             action, action->comm.refcount, (int)action->state);
303
304   if (action->comm.refcount <= 0) {
305     xbt_backtrace_display_current();
306     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
307             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
308   }
309   action->comm.refcount--;
310   if (action->comm.refcount > 0)
311       return;
312   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
313             action->comm.refcount);
314
315 #ifdef HAVE_LATENCY_BOUND_TRACKING
316   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
317 #endif
318
319   xbt_free(action->name);
320   SIMIX_comm_destroy_internal_actions(action);
321
322   if (action->comm.detached && action->state != SIMIX_DONE) {
323     /* the communication has failed and was detached:
324      * we have to free the buffer */
325     if (action->comm.clean_fun) {
326       action->comm.clean_fun(action->comm.src_buff);
327     }
328     action->comm.src_buff = NULL;
329   }
330
331   if(action->comm.rdv)
332     SIMIX_rdv_remove(action->comm.rdv, action);
333
334   xbt_mallocator_release(simix_global->action_mallocator, action);
335 }
336
337 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
338 {
339   if (action->comm.surf_comm){
340 #ifdef HAVE_LATENCY_BOUND_TRACKING
341     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
342 #endif
343     action->comm.surf_comm->model_obj->action_unref(action->comm.surf_comm);
344     action->comm.surf_comm = NULL;
345   }
346
347   if (action->comm.src_timeout){
348     action->comm.src_timeout->model_obj->action_unref(action->comm.src_timeout);
349     action->comm.src_timeout = NULL;
350   }
351
352   if (action->comm.dst_timeout){
353     action->comm.dst_timeout->model_obj->action_unref(action->comm.dst_timeout);
354     action->comm.dst_timeout = NULL;
355   }
356 }
357
358 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
359                                   double task_size, double rate,
360                                   void *src_buff, size_t src_buff_size,
361                                   int (*match_fun)(void *, void *,smx_action_t),
362                                   void *data, double timeout){
363   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
364                                        src_buff, src_buff_size, match_fun, NULL,
365                                        data, 0);
366   simcall->mc_value = 0;
367   SIMIX_pre_comm_wait(simcall, comm, timeout);
368 }
369 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
370                                   double task_size, double rate,
371                                   void *src_buff, size_t src_buff_size,
372                                   int (*match_fun)(void *, void *,smx_action_t),
373                                   void (*clean_fun)(void *), 
374                                   void *data, int detached){
375   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void *data,
385                               int detached)
386 {
387   XBT_DEBUG("send from %p\n", rdv);
388
389   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
390   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
391
392   /* Look for communication action matching our needs. We also provide a description of
393    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
394    *
395    * If it is not found then push our communication into the rendez-vous point */
396   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
397
398   if (!other_action) {
399     other_action = this_action;
400
401     if (rdv->permanent_receiver!=NULL){
402       //this mailbox is for small messages, which have to be sent right now
403       other_action->state = SIMIX_READY;
404       other_action->comm.dst_proc=rdv->permanent_receiver;
405       other_action->comm.refcount++;
406       xbt_fifo_push(rdv->done_comm_fifo,other_action);
407       other_action->comm.rdv=rdv;
408       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
409
410     }else{
411       SIMIX_rdv_push(rdv, this_action);
412     }
413   } else {
414     XBT_DEBUG("Receive already pushed\n");
415
416     SIMIX_comm_destroy(this_action);
417     --smx_total_comms; // this creation was a pure waste
418
419     other_action->state = SIMIX_READY;
420     other_action->comm.type = SIMIX_COMM_READY;
421
422   }
423   xbt_fifo_push(src_proc->comms, other_action);
424
425   /* if the communication action is detached then decrease the refcount
426    * by one, so it will be eliminated by the receiver's destroy call */
427   if (detached) {
428     other_action->comm.detached = 1;
429     other_action->comm.refcount--;
430     other_action->comm.clean_fun = clean_fun;
431   } else {
432     other_action->comm.clean_fun = NULL;
433   }
434
435   /* Setup the communication action */
436   other_action->comm.src_proc = src_proc;
437   other_action->comm.task_size = task_size;
438   other_action->comm.rate = rate;
439   other_action->comm.src_buff = src_buff;
440   other_action->comm.src_buff_size = src_buff_size;
441   other_action->comm.src_data = data;
442
443   other_action->comm.match_fun = match_fun;
444
445   if (MC_is_active()) {
446     other_action->state = SIMIX_RUNNING;
447     return other_action;
448   }
449
450   SIMIX_comm_start(other_action);
451   return (detached ? NULL : other_action);
452 }
453
454 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
455                                   void *dst_buff, size_t *dst_buff_size,
456                                   int (*match_fun)(void *, void *, smx_action_t),
457                                   void *data, double timeout){
458   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
459                                        dst_buff_size, match_fun, data);
460   simcall->mc_value = 0;
461   SIMIX_pre_comm_wait(simcall, comm, timeout);
462 }
463
464 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
465                                   void *dst_buff, size_t *dst_buff_size,
466                                   int (*match_fun)(void *, void *, smx_action_t),
467                                   void *data, double timeout, double rate){
468   smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
469                                        dst_buff_size, match_fun, data, rate);
470   simcall->mc_value = 0;
471   SIMIX_pre_comm_wait(simcall, comm, timeout);
472 }
473
474 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
475                                   void *dst_buff, size_t *dst_buff_size,
476                                   int (*match_fun)(void *, void *, smx_action_t),
477                                   void *data){
478   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
479                           match_fun, data);
480 }
481
482 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
483                               void *dst_buff, size_t *dst_buff_size,
484                               int (*match_fun)(void *, void *, smx_action_t), void *data)
485 {
486   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
487   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
488
489   smx_action_t other_action;
490   //communication already done, get it inside the fifo of completed comms
491   //permanent receive v1
492   //int already_received=0;
493   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
494
495     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
496     //find a match in the already received fifo
497     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
498     //if not found, assume the receiver came first, register it to the mailbox in the classical way
499     if (!other_action)  {
500       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
501       other_action = this_action;
502       SIMIX_rdv_push(rdv, this_action);
503     }else{
504       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
505       {
506         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
507         other_action->state = SIMIX_DONE;
508         other_action->comm.type = SIMIX_COMM_DONE;
509         other_action->comm.rdv = NULL;
510         //SIMIX_comm_destroy(this_action);
511         //--smx_total_comms; // this creation was a pure waste
512         //already_received=1;
513         //other_action->comm.refcount--;
514       }/*else{
515          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
516          }*/
517       other_action->comm.refcount--;
518       SIMIX_comm_destroy(this_action);
519       --smx_total_comms; // this creation was a pure waste
520     }
521   }else{
522     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
523
524     /* Look for communication action matching our needs. We also provide a description of
525      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
526      *
527      * If it is not found then push our communication into the rendez-vous point */
528     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
529
530     if (!other_action) {
531       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
532       other_action = this_action;
533       SIMIX_rdv_push(rdv, this_action);
534     } else {
535       SIMIX_comm_destroy(this_action);
536       --smx_total_comms; // this creation was a pure waste
537       other_action->state = SIMIX_READY;
538       other_action->comm.type = SIMIX_COMM_READY;
539       //other_action->comm.refcount--;
540     }
541     xbt_fifo_push(dst_proc->comms, other_action);
542   }
543
544   /* Setup communication action */
545   other_action->comm.dst_proc = dst_proc;
546   other_action->comm.dst_buff = dst_buff;
547   other_action->comm.dst_buff_size = dst_buff_size;
548   other_action->comm.dst_data = data;
549
550   other_action->comm.match_fun = match_fun;
551
552
553   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
554     SIMIX_comm_copy_data(other_action);*/
555
556
557   if (MC_is_active()) {
558     other_action->state = SIMIX_RUNNING;
559     return other_action;
560   }
561
562   SIMIX_comm_start(other_action);
563   // }
564   return other_action;
565 }
566
567 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
568                                   void *dst_buff, size_t *dst_buff_size,
569                                   int (*match_fun)(void *, void *, smx_action_t),
570                                   void *data, double rate){
571   return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
572                           match_fun, data, rate);
573 }
574
575 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
576                               void *dst_buff, size_t *dst_buff_size,
577                               int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
578 {
579   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
580   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
581
582   smx_action_t other_action;
583   //communication already done, get it inside the fifo of completed comms
584   //permanent receive v1
585   //int already_received=0;
586   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
587
588     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
589     //find a match in the already received fifo
590     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
591     //if not found, assume the receiver came first, register it to the mailbox in the classical way
592     if (!other_action)  {
593       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
594       other_action = this_action;
595       SIMIX_rdv_push(rdv, this_action);
596     }else{
597       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
598       {
599         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
600         other_action->state = SIMIX_DONE;
601         other_action->comm.type = SIMIX_COMM_DONE;
602         other_action->comm.rdv = NULL;
603         //SIMIX_comm_destroy(this_action);
604         //--smx_total_comms; // this creation was a pure waste
605         //already_received=1;
606         //other_action->comm.refcount--;
607       }/*else{
608          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
609          }*/
610       other_action->comm.refcount--;
611       SIMIX_comm_destroy(this_action);
612       --smx_total_comms; // this creation was a pure waste
613     }
614   }else{
615     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
616
617     /* Look for communication action matching our needs. We also provide a description of
618      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
619      *
620      * If it is not found then push our communication into the rendez-vous point */
621     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
622
623     if (!other_action) {
624       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
625       other_action = this_action;
626       SIMIX_rdv_push(rdv, this_action);
627     } else {
628       SIMIX_comm_destroy(this_action);
629       --smx_total_comms; // this creation was a pure waste
630       other_action->state = SIMIX_READY;
631       other_action->comm.type = SIMIX_COMM_READY;
632       //other_action->comm.refcount--;
633     }
634     xbt_fifo_push(dst_proc->comms, other_action);
635   }
636
637   /* Setup communication action */
638   other_action->comm.dst_proc = dst_proc;
639   other_action->comm.dst_buff = dst_buff;
640   other_action->comm.dst_buff_size = dst_buff_size;
641   other_action->comm.dst_data = data;
642
643   if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
644           other_action->comm.rate = rate;
645
646   other_action->comm.match_fun = match_fun;
647
648
649   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
650     SIMIX_comm_copy_data(other_action);*/
651
652
653   if (MC_is_active()) {
654     other_action->state = SIMIX_RUNNING;
655     return other_action;
656   }
657
658   SIMIX_comm_start(other_action);
659   // }
660   return other_action;
661 }
662
663 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
664                                    int src, int tag,
665                                    int (*match_fun)(void *, void *, smx_action_t),
666                                    void *data){
667   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
668 }
669
670 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
671                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
672 {
673   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
674   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
675
676   smx_action_t other_action=NULL;
677   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
678     //find a match in the already received fifo
679       XBT_DEBUG("first try in the perm recv mailbox \n");
680
681     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
682   }
683  // }else{
684     if(!other_action){
685         XBT_DEBUG("second try in the other mailbox");
686         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
687     }
688 //  }
689   if(other_action)other_action->comm.refcount--;
690
691   SIMIX_comm_destroy(this_action);
692   --smx_total_comms;
693   return other_action;
694 }
695
696 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
697 {
698   /* the simcall may be a wait, a send or a recv */
699   surf_action_t sleep;
700
701   /* Associate this simcall to the wait action */
702   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
703
704   xbt_fifo_push(action->simcalls, simcall);
705   simcall->issuer->waiting_action = action;
706
707   if (MC_is_active()) {
708     int idx = simcall->mc_value;
709     if (idx == 0) {
710       action->state = SIMIX_DONE;
711     } else {
712       /* If we reached this point, the wait simcall must have a timeout */
713       /* Otherwise it shouldn't be enabled and executed by the MC */
714       if (timeout == -1)
715         THROW_IMPOSSIBLE;
716
717       if (action->comm.src_proc == simcall->issuer)
718         action->state = SIMIX_SRC_TIMEOUT;
719       else
720         action->state = SIMIX_DST_TIMEOUT;
721     }
722
723     SIMIX_comm_finish(action);
724     return;
725   }
726
727   /* If the action has already finish perform the error handling, */
728   /* otherwise set up a waiting timeout on the right side         */
729   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
730     SIMIX_comm_finish(action);
731   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
732     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
733     surf_workstation_model->action_data_set(sleep, action);
734
735     if (simcall->issuer == action->comm.src_proc)
736       action->comm.src_timeout = sleep;
737     else
738       action->comm.dst_timeout = sleep;
739   }
740 }
741
742 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
743 {
744   if(MC_is_active()){
745     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
746     if(simcall_comm_test__get__result(simcall)){
747       action->state = SIMIX_DONE;
748       xbt_fifo_push(action->simcalls, simcall);
749       SIMIX_comm_finish(action);
750     }else{
751       SIMIX_simcall_answer(simcall);
752     }
753     return;
754   }
755
756   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
757   if (simcall_comm_test__get__result(simcall)) {
758     xbt_fifo_push(action->simcalls, simcall);
759     SIMIX_comm_finish(action);
760   } else {
761     SIMIX_simcall_answer(simcall);
762   }
763 }
764
765 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
766 {
767   unsigned int cursor;
768   smx_action_t action;
769   simcall_comm_testany__set__result(simcall, -1);
770
771   if (MC_is_active()){
772     int idx = simcall->mc_value;
773     if(idx == -1){
774       SIMIX_simcall_answer(simcall);
775     }else{
776       action = xbt_dynar_get_as(actions, idx, smx_action_t);
777       simcall_comm_testany__set__result(simcall, idx);
778       xbt_fifo_push(action->simcalls, simcall);
779       action->state = SIMIX_DONE;
780       SIMIX_comm_finish(action);
781     }
782     return;
783   }
784
785   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
786     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
787       simcall_comm_testany__set__result(simcall, cursor);
788       xbt_fifo_push(action->simcalls, simcall);
789       SIMIX_comm_finish(action);
790       return;
791     }
792   }
793   SIMIX_simcall_answer(simcall);
794 }
795
796 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
797 {
798   smx_action_t action;
799   unsigned int cursor = 0;
800
801   if (MC_is_active()){
802     int idx = simcall->mc_value;
803     action = xbt_dynar_get_as(actions, idx, smx_action_t);
804     xbt_fifo_push(action->simcalls, simcall);
805     simcall_comm_waitany__set__result(simcall, idx);
806     action->state = SIMIX_DONE;
807     SIMIX_comm_finish(action);
808     return;
809   }
810
811   xbt_dynar_foreach(actions, cursor, action){
812     /* associate this simcall to the the action */
813     xbt_fifo_push(action->simcalls, simcall);
814
815     /* see if the action is already finished */
816     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
817       SIMIX_comm_finish(action);
818       break;
819     }
820   }
821 }
822
823 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
824 {
825   smx_action_t action;
826   unsigned int cursor = 0;
827   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
828
829   xbt_dynar_foreach(actions, cursor, action) {
830     xbt_fifo_remove(action->simcalls, simcall);
831   }
832 }
833
834 /**
835  *  \brief Starts the simulation of a communication action.
836  *  \param action the communication action
837  */
838 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
839 {
840   /* If both the sender and the receiver are already there, start the communication */
841   if (action->state == SIMIX_READY) {
842
843     smx_host_t sender = action->comm.src_proc->smx_host;
844     smx_host_t receiver = action->comm.dst_proc->smx_host;
845
846     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
847               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
848
849     action->comm.surf_comm = surf_workstation_model->extension.workstation.
850       communicate(sender, receiver, action->comm.task_size, action->comm.rate);
851
852     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
853
854     action->state = SIMIX_RUNNING;
855
856     /* If a link is failed, detect it immediately */
857     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
858       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
859                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
860       action->state = SIMIX_LINK_FAILURE;
861       SIMIX_comm_destroy_internal_actions(action);
862     }
863
864     /* If any of the process is suspend, create the action but stop its execution,
865        it will be restarted when the sender process resume */
866     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
867         SIMIX_process_is_suspended(action->comm.dst_proc)) {
868       /* FIXME: check what should happen with the action state */
869
870       if (SIMIX_process_is_suspended(action->comm.src_proc))
871         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
872                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
873       else
874         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
875                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
876
877       surf_workstation_model->suspend(action->comm.surf_comm);
878
879     }
880   }
881 }
882
883 /**
884  * \brief Answers the SIMIX simcalls associated to a communication action.
885  * \param action a finished communication action
886  */
887 void SIMIX_comm_finish(smx_action_t action)
888 {
889   unsigned int destroy_count = 0;
890   smx_simcall_t simcall;
891
892   while ((simcall = xbt_fifo_shift(action->simcalls))) {
893
894     /* If a waitany simcall is waiting for this action to finish, then remove
895        it from the other actions in the waitany list. Afterwards, get the
896        position of the actual action in the waitany dynar and
897        return it as the result of the simcall */
898     if (simcall->call == SIMCALL_COMM_WAITANY) {
899       SIMIX_waitany_remove_simcall_from_actions(simcall);
900       if (!MC_is_active())
901         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
902     }
903
904     /* If the action is still in a rendez-vous point then remove from it */
905     if (action->comm.rdv)
906       SIMIX_rdv_remove(action->comm.rdv, action);
907
908     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
909
910     /* Check out for errors */
911     switch (action->state) {
912
913     case SIMIX_DONE:
914       XBT_DEBUG("Communication %p complete!", action);
915       SIMIX_comm_copy_data(action);
916       break;
917
918     case SIMIX_SRC_TIMEOUT:
919       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
920                     "Communication timeouted because of sender");
921       break;
922
923     case SIMIX_DST_TIMEOUT:
924       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
925                     "Communication timeouted because of receiver");
926       break;
927
928     case SIMIX_SRC_HOST_FAILURE:
929       if (simcall->issuer == action->comm.src_proc)
930         simcall->issuer->context->iwannadie = 1;
931 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
932       else
933         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
934       break;
935
936     case SIMIX_DST_HOST_FAILURE:
937       if (simcall->issuer == action->comm.dst_proc)
938         simcall->issuer->context->iwannadie = 1;
939 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
940       else
941         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
942       break;
943
944     case SIMIX_LINK_FAILURE:
945       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
946                 action,
947                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
948                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
949                 simcall->issuer->name, simcall->issuer, action->comm.detached);
950       if (action->comm.src_proc == simcall->issuer) {
951         XBT_DEBUG("I'm source");
952       } else if (action->comm.dst_proc == simcall->issuer) {
953         XBT_DEBUG("I'm dest");
954       } else {
955         XBT_DEBUG("I'm neither source nor dest");
956       }
957       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
958       break;
959
960     case SIMIX_CANCELED:
961       if (simcall->issuer == action->comm.dst_proc)
962         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
963                       "Communication canceled by the sender");
964       else
965         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
966                       "Communication canceled by the receiver");
967       break;
968
969     default:
970       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
971     }
972
973     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
974     if (simcall->issuer->doexception) {
975       if (simcall->call == SIMCALL_COMM_WAITANY) {
976         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
977       }
978       else if (simcall->call == SIMCALL_COMM_TESTANY) {
979         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
980       }
981     }
982
983     if (surf_workstation_model->extension.
984         workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
985       simcall->issuer->context->iwannadie = 1;
986     }
987
988     simcall->issuer->waiting_action = NULL;
989     xbt_fifo_remove(simcall->issuer->comms, action);
990     if(action->comm.detached){
991       if(simcall->issuer == action->comm.src_proc){
992         if(action->comm.dst_proc)
993           xbt_fifo_remove(action->comm.dst_proc->comms, action);
994       }
995       if(simcall->issuer == action->comm.dst_proc){
996         if(action->comm.src_proc)
997           xbt_fifo_remove(action->comm.src_proc->comms, action);
998       }
999     }
1000     SIMIX_simcall_answer(simcall);
1001     destroy_count++;
1002   }
1003
1004   while (destroy_count-- > 0)
1005     SIMIX_comm_destroy(action);
1006 }
1007
1008 /**
1009  * \brief This function is called when a Surf communication action is finished.
1010  * \param action the corresponding Simix communication
1011  */
1012 void SIMIX_post_comm(smx_action_t action)
1013 {
1014   /* Update action state */
1015   if (action->comm.src_timeout &&
1016       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1017     action->state = SIMIX_SRC_TIMEOUT;
1018   else if (action->comm.dst_timeout &&
1019            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1020     action->state = SIMIX_DST_TIMEOUT;
1021   else if (action->comm.src_timeout &&
1022            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1023     action->state = SIMIX_SRC_HOST_FAILURE;
1024   else if (action->comm.dst_timeout &&
1025            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1026     action->state = SIMIX_DST_HOST_FAILURE;
1027   else if (action->comm.surf_comm &&
1028            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1029     XBT_DEBUG("Puta madre. Surf says that the link broke");
1030     action->state = SIMIX_LINK_FAILURE;
1031   } else
1032     action->state = SIMIX_DONE;
1033
1034   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1035             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1036
1037   /* destroy the surf actions associated with the Simix communication */
1038   SIMIX_comm_destroy_internal_actions(action);
1039
1040   /* remove the communication action from the list of pending communications
1041    * of both processes (if they still exist) */
1042   if (action->comm.src_proc) {
1043     xbt_fifo_remove(action->comm.src_proc->comms, action);
1044   }
1045   if (action->comm.dst_proc) {
1046     xbt_fifo_remove(action->comm.dst_proc->comms, action);
1047   }
1048
1049   /* if there are simcalls associated with the action, then answer them */
1050   if (xbt_fifo_size(action->simcalls)) {
1051     SIMIX_comm_finish(action);
1052   }
1053 }
1054
1055 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1056   SIMIX_comm_cancel(action);
1057 }
1058 void SIMIX_comm_cancel(smx_action_t action)
1059 {
1060   /* if the action is a waiting state means that it is still in a rdv */
1061   /* so remove from it and delete it */
1062   if (action->state == SIMIX_WAITING) {
1063     SIMIX_rdv_remove(action->comm.rdv, action);
1064     action->state = SIMIX_CANCELED;
1065   }
1066   else if (!MC_is_active() /* when running the MC there are no surf actions */
1067            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1068
1069     surf_workstation_model->action_cancel(action->comm.surf_comm);
1070   }
1071 }
1072
1073 void SIMIX_comm_suspend(smx_action_t action)
1074 {
1075   /*FIXME: shall we suspend also the timeout actions? */
1076   if (action->comm.surf_comm)
1077     surf_workstation_model->suspend(action->comm.surf_comm);
1078   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1079 }
1080
1081 void SIMIX_comm_resume(smx_action_t action)
1082 {
1083   /*FIXME: check what happen with the timeouts */
1084   if (action->comm.surf_comm)
1085     surf_workstation_model->resume(action->comm.surf_comm);
1086   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1087 }
1088
1089
1090 /************* Action Getters **************/
1091
1092 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1093   return SIMIX_comm_get_remains(action);
1094 }
1095 /**
1096  *  \brief get the amount remaining from the communication
1097  *  \param action The communication
1098  */
1099 double SIMIX_comm_get_remains(smx_action_t action)
1100 {
1101   double remains;
1102
1103   if(!action){
1104     return 0;
1105   }
1106
1107   switch (action->state) {
1108
1109   case SIMIX_RUNNING:
1110     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1111     break;
1112
1113   case SIMIX_WAITING:
1114   case SIMIX_READY:
1115     remains = 0; /*FIXME: check what should be returned */
1116     break;
1117
1118   default:
1119     remains = 0; /*FIXME: is this correct? */
1120     break;
1121   }
1122   return remains;
1123 }
1124
1125 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1126   return SIMIX_comm_get_state(action);
1127 }
1128 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1129 {
1130   return action->state;
1131 }
1132
1133 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1134   return SIMIX_comm_get_src_data(action);
1135 }
1136 /**
1137  *  \brief Return the user data associated to the sender of the communication
1138  *  \param action The communication
1139  *  \return the user data
1140  */
1141 void* SIMIX_comm_get_src_data(smx_action_t action)
1142 {
1143   return action->comm.src_data;
1144 }
1145
1146 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1147   return SIMIX_comm_get_dst_data(action);
1148 }
1149 /**
1150  *  \brief Return the user data associated to the receiver of the communication
1151  *  \param action The communication
1152  *  \return the user data
1153  */
1154 void* SIMIX_comm_get_dst_data(smx_action_t action)
1155 {
1156   return action->comm.dst_data;
1157 }
1158
1159 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1160   return SIMIX_comm_get_src_proc(action);
1161 }
1162 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1163 {
1164   return action->comm.src_proc;
1165 }
1166
1167 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1168   return SIMIX_comm_get_dst_proc(action);
1169 }
1170 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1171 {
1172   return action->comm.dst_proc;
1173 }
1174
1175 #ifdef HAVE_LATENCY_BOUND_TRACKING
1176 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1177 {
1178   return SIMIX_comm_is_latency_bounded(action);
1179 }
1180
1181 /**
1182  *  \brief verify if communication is latency bounded
1183  *  \param comm The communication
1184  */
1185 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1186 {
1187   if(!action){
1188     return 0;
1189   }
1190   if (action->comm.surf_comm){
1191     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1192     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1193     XBT_DEBUG("Action limited is %d", action->latency_limited);
1194   }
1195   return action->latency_limited;
1196 }
1197 #endif
1198
1199 /******************************************************************************/
1200 /*                    SIMIX_comm_copy_data callbacks                       */
1201 /******************************************************************************/
1202 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1203   &SIMIX_comm_copy_pointer_callback;
1204
1205 void
1206 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1207 {
1208   SIMIX_comm_copy_data_callback = callback;
1209 }
1210
1211 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1212 {
1213   xbt_assert((buff_size == sizeof(void *)),
1214              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1215   *(void **) (comm->comm.dst_buff) = buff;
1216 }
1217
1218 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1219 {
1220   XBT_DEBUG("Copy the data over");
1221   memcpy(comm->comm.dst_buff, buff, buff_size);
1222   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1223     xbt_free(buff);
1224     comm->comm.src_buff = NULL;
1225   }
1226 }
1227
1228
1229 /**
1230  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1231  *  \param comm The communication
1232  */
1233 void SIMIX_comm_copy_data(smx_action_t comm)
1234 {
1235   size_t buff_size = comm->comm.src_buff_size;
1236   /* If there is no data to be copy then return */
1237   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1238     return;
1239
1240   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1241             comm,
1242             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1243             comm->comm.src_buff,
1244             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1245             comm->comm.dst_buff, buff_size);
1246
1247   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1248   if (comm->comm.dst_buff_size)
1249     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1250
1251   /* Update the receiver's buffer size to the copied amount */
1252   if (comm->comm.dst_buff_size)
1253     *comm->comm.dst_buff_size = buff_size;
1254
1255   if (buff_size > 0)
1256     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1257
1258   /* Set the copied flag so we copy data only once */
1259   /* (this function might be called from both communication ends) */
1260   comm->comm.copied = 1;
1261 }