Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
53133ae2979e25c6dadbc6d45c6bed0441fbbd9a
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore_data_bss(&smx_total_comms, sizeof(smx_total_comms));
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
47   return SIMIX_rdv_create(name);
48 }
49 smx_rdv_t SIMIX_rdv_create(const char *name)
50 {
51   /* two processes may have pushed the same rdv_create simcall at the same time */
52   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
53
54   if (!rdv) {
55     rdv = xbt_new0(s_smx_rvpoint_t, 1);
56     rdv->name = name ? xbt_strdup(name) : NULL;
57     rdv->comm_fifo = xbt_fifo_new();
58     rdv->done_comm_fifo = xbt_fifo_new();
59     rdv->permanent_receiver=NULL;
60
61     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
62
63     if (rdv->name)
64       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
65   }
66   return rdv;
67 }
68
69 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
70   return SIMIX_rdv_destroy(rdv);
71 }
72 void SIMIX_rdv_destroy(smx_rdv_t rdv)
73 {
74   if (rdv->name)
75     xbt_dict_remove(rdv_points, rdv->name);
76 }
77
78 void SIMIX_rdv_free(void *data)
79 {
80   XBT_DEBUG("rdv free %p", data);
81   smx_rdv_t rdv = (smx_rdv_t) data;
82   xbt_free(rdv->name);
83   xbt_fifo_free(rdv->comm_fifo);
84   xbt_fifo_free(rdv->done_comm_fifo);
85
86   xbt_free(rdv);  
87 }
88
89 xbt_dict_t SIMIX_get_rdv_points()
90 {
91   return rdv_points;
92 }
93
94 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
95   return SIMIX_rdv_get_by_name(name);
96 }
97 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 {
99   return xbt_dict_get_or_null(rdv_points, name);
100 }
101
102 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
103   return SIMIX_rdv_comm_count_by_host(rdv, host);
104 }
105 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 {
107   smx_action_t comm = NULL;
108   xbt_fifo_item_t item = NULL;
109   int count = 0;
110
111   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
112     if (comm->comm.src_proc->smx_host == host)
113       count++;
114   }
115
116   return count;
117 }
118
119 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
120   return SIMIX_rdv_get_head(rdv);
121 }
122 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 {
124   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 }
126
127 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
128   return SIMIX_rdv_get_receiver(rdv);
129 }
130 /**
131  *  \brief get the receiver (process associated to the mailbox)
132  *  \param rdv The rendez-vous point
133  *  \return process The receiving process (NULL if not set)
134  */
135 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 {
137   return rdv->permanent_receiver;
138 }
139
140 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
141                             smx_process_t process){
142   SIMIX_rdv_set_receiver(rdv, process);
143 }
144 /**
145  *  \brief set the receiver of the rendez vous point to allow eager sends
146  *  \param rdv The rendez-vous point
147  *  \param process The receiving process
148  */
149 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 {
151   rdv->permanent_receiver=process;
152 }
153
154 /**
155  *  \brief Pushes a communication action into a rendez-vous point
156  *  \param rdv The rendez-vous point
157  *  \param comm The communication action
158  */
159 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 {
161   xbt_fifo_push(rdv->comm_fifo, comm);
162   comm->comm.rdv = rdv;
163 }
164
165 /**
166  *  \brief Removes a communication action from a rendez-vous point
167  *  \param rdv The rendez-vous point
168  *  \param comm The communication action
169  */
170 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 {
172   xbt_fifo_remove(rdv->comm_fifo, comm);
173   comm->comm.rdv = NULL;
174 }
175
176 /**
177  *  \brief Checks if there is a communication action queued in a fifo matching our needs
178  *  \param type The type of communication we are looking for (comm_send, comm_recv)
179  *  \return The communication action if found, NULL otherwise
180  */
181 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
182                                  int (*match_fun)(void *, void *,smx_action_t),
183                                  void *this_user_data, smx_action_t my_action)
184 {
185   smx_action_t action;
186   xbt_fifo_item_t item;
187   void* other_user_data = NULL;
188
189   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
190     if (action->comm.type == SIMIX_COMM_SEND) {
191       other_user_data = action->comm.src_data;
192     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
193       other_user_data = action->comm.dst_data;
194     }
195     if (action->comm.type == type &&
196         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
197         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
198       XBT_DEBUG("Found a matching communication action %p", action);
199       xbt_fifo_remove_item(fifo, item);
200       xbt_fifo_free_item(item);
201       action->comm.refcount++;
202 #ifdef HAVE_MC
203       action->comm.rdv_cpy = action->comm.rdv;
204 #endif
205       action->comm.rdv = NULL;
206       return action;
207     }
208     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
209               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
210               action, (int)action->comm.type, (int)type);
211   }
212   XBT_DEBUG("No matching communication action found");
213   return NULL;
214 }
215
216
217 /**
218  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
219  *  \param type The type of communication we are looking for (comm_send, comm_recv)
220  *  \return The communication action if found, NULL otherwise
221  */
222 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
223                                  int (*match_fun)(void *, void *,smx_action_t),
224                                  void *this_user_data, smx_action_t my_action)
225 {
226   smx_action_t action;
227   xbt_fifo_item_t item;
228   void* other_user_data = NULL;
229
230   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
231     if (action->comm.type == SIMIX_COMM_SEND) {
232       other_user_data = action->comm.src_data;
233     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
234       other_user_data = action->comm.dst_data;
235     }
236     if (action->comm.type == type &&
237         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
238         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
239       XBT_DEBUG("Found a matching communication action %p", action);
240       action->comm.refcount++;
241
242       return action;
243     }
244     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
245               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
246               action, (int)action->comm.type, (int)type);
247   }
248   XBT_DEBUG("No matching communication action found");
249   return NULL;
250 }
251 /******************************************************************************/
252 /*                            Communication Actions                            */
253 /******************************************************************************/
254
255 /**
256  *  \brief Creates a new communicate action
257  *  \param type The direction of communication (comm_send, comm_recv)
258  *  \return The new communicate action
259  */
260 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
261 {
262   smx_action_t act;
263
264   /* alloc structures */
265   act = xbt_mallocator_get(simix_global->action_mallocator);
266
267   act->type = SIMIX_ACTION_COMMUNICATE;
268   act->state = SIMIX_WAITING;
269
270   /* set communication */
271   act->comm.type = type;
272   act->comm.refcount = 1;
273   act->comm.src_data=NULL;
274   act->comm.dst_data=NULL;
275
276
277 #ifdef HAVE_LATENCY_BOUND_TRACKING
278   //initialize with unknown value
279   act->latency_limited = -1;
280 #endif
281
282 #ifdef HAVE_TRACING
283   act->category = NULL;
284 #endif
285
286   XBT_DEBUG("Create communicate action %p", act);
287   ++smx_total_comms;
288
289   return act;
290 }
291
292 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
293   SIMIX_comm_destroy(action);
294 }
295 /**
296  *  \brief Destroy a communicate action
297  *  \param action The communicate action to be destroyed
298  */
299 void SIMIX_comm_destroy(smx_action_t action)
300 {
301   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
302             action, action->comm.refcount, (int)action->state);
303
304   if (action->comm.refcount <= 0) {
305     xbt_backtrace_display_current();
306     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
307             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
308   }
309   action->comm.refcount--;
310   if (action->comm.refcount > 0)
311       return;
312   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
313             action->comm.refcount);
314
315 #ifdef HAVE_LATENCY_BOUND_TRACKING
316   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
317 #endif
318
319   xbt_free(action->name);
320   SIMIX_comm_destroy_internal_actions(action);
321
322   if (action->comm.detached && action->state != SIMIX_DONE) {
323     /* the communication has failed and was detached:
324      * we have to free the buffer */
325     if (action->comm.clean_fun) {
326       action->comm.clean_fun(action->comm.src_buff);
327     }
328     action->comm.src_buff = NULL;
329   }
330
331   if(action->comm.rdv)
332     SIMIX_rdv_remove(action->comm.rdv, action);
333
334   xbt_mallocator_release(simix_global->action_mallocator, action);
335 }
336
337 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
338 {
339   if (action->comm.surf_comm){
340 #ifdef HAVE_LATENCY_BOUND_TRACKING
341     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
342 #endif
343     surf_action_unref(action->comm.surf_comm);
344     action->comm.surf_comm = NULL;
345   }
346
347   if (action->comm.src_timeout){
348     surf_action_unref(action->comm.src_timeout);
349     action->comm.src_timeout = NULL;
350   }
351
352   if (action->comm.dst_timeout){
353     surf_action_unref(action->comm.dst_timeout);
354     action->comm.dst_timeout = NULL;
355   }
356 }
357
358 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
359                                   double task_size, double rate,
360                                   void *src_buff, size_t src_buff_size,
361                                   int (*match_fun)(void *, void *,smx_action_t),
362                                   void *data, double timeout){
363   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
364                                        src_buff, src_buff_size, match_fun, NULL,
365                                        data, 0);
366   simcall->mc_value = 0;
367   SIMIX_pre_comm_wait(simcall, comm, timeout);
368 }
369 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
370                                   double task_size, double rate,
371                                   void *src_buff, size_t src_buff_size,
372                                   int (*match_fun)(void *, void *,smx_action_t),
373                                   void (*clean_fun)(void *), 
374                                   void *data, int detached){
375   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void *data,
385                               int detached)
386 {
387   XBT_DEBUG("send from %p\n", rdv);
388
389   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
390   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
391
392   /* Look for communication action matching our needs. We also provide a description of
393    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
394    *
395    * If it is not found then push our communication into the rendez-vous point */
396   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
397
398   if (!other_action) {
399     other_action = this_action;
400
401     if (rdv->permanent_receiver!=NULL){
402       //this mailbox is for small messages, which have to be sent right now
403       other_action->state = SIMIX_READY;
404       other_action->comm.dst_proc=rdv->permanent_receiver;
405       other_action->comm.refcount++;
406       xbt_fifo_push(rdv->done_comm_fifo,other_action);
407       other_action->comm.rdv=rdv;
408       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
409
410     }else{
411       SIMIX_rdv_push(rdv, this_action);
412     }
413   } else {
414     XBT_DEBUG("Receive already pushed\n");
415
416     SIMIX_comm_destroy(this_action);
417     --smx_total_comms; // this creation was a pure waste
418
419     other_action->state = SIMIX_READY;
420     other_action->comm.type = SIMIX_COMM_READY;
421
422   }
423   xbt_fifo_push(src_proc->comms, other_action);
424
425   /* if the communication action is detached then decrease the refcount
426    * by one, so it will be eliminated by the receiver's destroy call */
427   if (detached) {
428     other_action->comm.detached = 1;
429     other_action->comm.refcount--;
430     other_action->comm.clean_fun = clean_fun;
431   } else {
432     other_action->comm.clean_fun = NULL;
433   }
434
435   /* Setup the communication action */
436   other_action->comm.src_proc = src_proc;
437   other_action->comm.task_size = task_size;
438   other_action->comm.rate = rate;
439   other_action->comm.src_buff = src_buff;
440   other_action->comm.src_buff_size = src_buff_size;
441   other_action->comm.src_data = data;
442
443   other_action->comm.match_fun = match_fun;
444
445   if (MC_is_active()) {
446     other_action->state = SIMIX_RUNNING;
447     return other_action;
448   }
449
450   SIMIX_comm_start(other_action);
451   return (detached ? NULL : other_action);
452 }
453
454 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
455                                   void *dst_buff, size_t *dst_buff_size,
456                                   int (*match_fun)(void *, void *, smx_action_t),
457                                   void *data, double timeout){
458   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
459                                        dst_buff_size, match_fun, data);
460   simcall->mc_value = 0;
461   SIMIX_pre_comm_wait(simcall, comm, timeout);
462 }
463
464 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
465                                   void *dst_buff, size_t *dst_buff_size,
466                                   int (*match_fun)(void *, void *, smx_action_t),
467                                   void *data, double timeout, double rate){
468   smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
469                                        dst_buff_size, match_fun, data, rate);
470   simcall->mc_value = 0;
471   SIMIX_pre_comm_wait(simcall, comm, timeout);
472 }
473
474 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
475                                   void *dst_buff, size_t *dst_buff_size,
476                                   int (*match_fun)(void *, void *, smx_action_t),
477                                   void *data){
478   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
479                           match_fun, data);
480 }
481
482 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
483                               void *dst_buff, size_t *dst_buff_size,
484                               int (*match_fun)(void *, void *, smx_action_t), void *data)
485 {
486   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
487   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
488
489   smx_action_t other_action;
490   //communication already done, get it inside the fifo of completed comms
491   //permanent receive v1
492   //int already_received=0;
493   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
494
495     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
496     //find a match in the already received fifo
497     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
498     //if not found, assume the receiver came first, register it to the mailbox in the classical way
499     if (!other_action)  {
500       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
501       other_action = this_action;
502       SIMIX_rdv_push(rdv, this_action);
503     }else{
504       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
505       {
506         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
507         other_action->state = SIMIX_DONE;
508         other_action->comm.type = SIMIX_COMM_DONE;
509         other_action->comm.rdv = NULL;
510         //SIMIX_comm_destroy(this_action);
511         //--smx_total_comms; // this creation was a pure waste
512         //already_received=1;
513         //other_action->comm.refcount--;
514       }/*else{
515          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
516          }*/
517       other_action->comm.refcount--;
518       SIMIX_comm_destroy(this_action);
519       --smx_total_comms; // this creation was a pure waste
520     }
521   }else{
522     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
523
524     /* Look for communication action matching our needs. We also provide a description of
525      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
526      *
527      * If it is not found then push our communication into the rendez-vous point */
528     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
529
530     if (!other_action) {
531       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
532       other_action = this_action;
533       SIMIX_rdv_push(rdv, this_action);
534     } else {
535       SIMIX_comm_destroy(this_action);
536       --smx_total_comms; // this creation was a pure waste
537       other_action->state = SIMIX_READY;
538       other_action->comm.type = SIMIX_COMM_READY;
539       //other_action->comm.refcount--;
540     }
541     xbt_fifo_push(dst_proc->comms, other_action);
542   }
543
544   /* Setup communication action */
545   other_action->comm.dst_proc = dst_proc;
546   other_action->comm.dst_buff = dst_buff;
547   other_action->comm.dst_buff_size = dst_buff_size;
548   other_action->comm.dst_data = data;
549
550   other_action->comm.match_fun = match_fun;
551
552
553   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
554     SIMIX_comm_copy_data(other_action);*/
555
556
557   if (MC_is_active()) {
558     other_action->state = SIMIX_RUNNING;
559     return other_action;
560   }
561
562   SIMIX_comm_start(other_action);
563   // }
564   return other_action;
565 }
566
567 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
568                                   void *dst_buff, size_t *dst_buff_size,
569                                   int (*match_fun)(void *, void *, smx_action_t),
570                                   void *data, double rate){
571   return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
572                           match_fun, data, rate);
573 }
574
575 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
576                               void *dst_buff, size_t *dst_buff_size,
577                               int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
578 {
579   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
580   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
581
582   smx_action_t other_action;
583   //communication already done, get it inside the fifo of completed comms
584   //permanent receive v1
585   //int already_received=0;
586   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
587
588     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
589     //find a match in the already received fifo
590     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
591     //if not found, assume the receiver came first, register it to the mailbox in the classical way
592     if (!other_action)  {
593       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
594       other_action = this_action;
595       SIMIX_rdv_push(rdv, this_action);
596     }else{
597       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
598       {
599         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
600         other_action->state = SIMIX_DONE;
601         other_action->comm.type = SIMIX_COMM_DONE;
602         other_action->comm.rdv = NULL;
603         //SIMIX_comm_destroy(this_action);
604         //--smx_total_comms; // this creation was a pure waste
605         //already_received=1;
606         //other_action->comm.refcount--;
607       }/*else{
608          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
609          }*/
610       other_action->comm.refcount--;
611       SIMIX_comm_destroy(this_action);
612       --smx_total_comms; // this creation was a pure waste
613     }
614   }else{
615     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
616
617     /* Look for communication action matching our needs. We also provide a description of
618      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
619      *
620      * If it is not found then push our communication into the rendez-vous point */
621     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
622
623     if (!other_action) {
624       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
625       other_action = this_action;
626       SIMIX_rdv_push(rdv, this_action);
627     } else {
628       SIMIX_comm_destroy(this_action);
629       --smx_total_comms; // this creation was a pure waste
630       other_action->state = SIMIX_READY;
631       other_action->comm.type = SIMIX_COMM_READY;
632       //other_action->comm.refcount--;
633     }
634     xbt_fifo_push(dst_proc->comms, other_action);
635   }
636
637   /* Setup communication action */
638   other_action->comm.dst_proc = dst_proc;
639   other_action->comm.dst_buff = dst_buff;
640   other_action->comm.dst_buff_size = dst_buff_size;
641   other_action->comm.dst_data = data;
642
643   if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
644           other_action->comm.rate = rate;
645
646   other_action->comm.match_fun = match_fun;
647
648
649   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
650     SIMIX_comm_copy_data(other_action);*/
651
652
653   if (MC_is_active()) {
654     other_action->state = SIMIX_RUNNING;
655     return other_action;
656   }
657
658   SIMIX_comm_start(other_action);
659   // }
660   return other_action;
661 }
662
663 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
664                                    int src, int tag,
665                                    int (*match_fun)(void *, void *, smx_action_t),
666                                    void *data){
667   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
668 }
669
670 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
671                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
672 {
673   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
674   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
675
676   smx_action_t other_action=NULL;
677   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
678     //find a match in the already received fifo
679       XBT_DEBUG("first try in the perm recv mailbox \n");
680
681     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
682   }
683  // }else{
684     if(!other_action){
685         XBT_DEBUG("second try in the other mailbox");
686         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
687     }
688 //  }
689   if(other_action)other_action->comm.refcount--;
690
691   SIMIX_comm_destroy(this_action);
692   --smx_total_comms;
693   return other_action;
694 }
695
696 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
697 {
698   int idx = simcall->mc_value;
699   /* the simcall may be a wait, a send or a recv */
700   surf_action_t sleep;
701
702   /* Associate this simcall to the wait action */
703   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
704
705   xbt_fifo_push(action->simcalls, simcall);
706   simcall->issuer->waiting_action = action;
707
708   if (MC_is_active()) {
709     if (idx == 0) {
710       action->state = SIMIX_DONE;
711     } else {
712       /* If we reached this point, the wait simcall must have a timeout */
713       /* Otherwise it shouldn't be enabled and executed by the MC */
714       if (timeout == -1)
715         THROW_IMPOSSIBLE;
716
717       if (action->comm.src_proc == simcall->issuer)
718         action->state = SIMIX_SRC_TIMEOUT;
719       else
720         action->state = SIMIX_DST_TIMEOUT;
721     }
722
723     SIMIX_comm_finish(action);
724     return;
725   }
726
727   /* If the action has already finish perform the error handling, */
728   /* otherwise set up a waiting timeout on the right side         */
729   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
730     SIMIX_comm_finish(action);
731   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
732     sleep = surf_workstation_sleep(surf_workstation_resource_priv(simcall->issuer->smx_host), timeout);
733     surf_action_set_data(sleep, action);
734
735     if (simcall->issuer == action->comm.src_proc)
736       action->comm.src_timeout = sleep;
737     else
738       action->comm.dst_timeout = sleep;
739   }
740 }
741
742 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
743 {
744   if(MC_is_active()){
745     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
746     if(simcall_comm_test__get__result(simcall)){
747       action->state = SIMIX_DONE;
748       xbt_fifo_push(action->simcalls, simcall);
749       SIMIX_comm_finish(action);
750     }else{
751       SIMIX_simcall_answer(simcall);
752     }
753     return;
754   }
755
756   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
757   if (simcall_comm_test__get__result(simcall)) {
758     xbt_fifo_push(action->simcalls, simcall);
759     SIMIX_comm_finish(action);
760   } else {
761     SIMIX_simcall_answer(simcall);
762   }
763 }
764
765 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
766 {
767   int idx = simcall->mc_value;
768   unsigned int cursor;
769   smx_action_t action;
770   simcall_comm_testany__set__result(simcall, -1);
771
772   if (MC_is_active()){
773     if(idx == -1){
774       SIMIX_simcall_answer(simcall);
775     }else{
776       action = xbt_dynar_get_as(actions, idx, smx_action_t);
777       simcall_comm_testany__set__result(simcall, idx);
778       xbt_fifo_push(action->simcalls, simcall);
779       action->state = SIMIX_DONE;
780       SIMIX_comm_finish(action);
781     }
782     return;
783   }
784
785   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
786     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
787       simcall_comm_testany__set__result(simcall, cursor);
788       xbt_fifo_push(action->simcalls, simcall);
789       SIMIX_comm_finish(action);
790       return;
791     }
792   }
793   SIMIX_simcall_answer(simcall);
794 }
795
796 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
797 {
798   int idx = simcall->mc_value;
799   smx_action_t action;
800   unsigned int cursor = 0;
801
802   if (MC_is_active()){
803     action = xbt_dynar_get_as(actions, idx, smx_action_t);
804     xbt_fifo_push(action->simcalls, simcall);
805     simcall_comm_waitany__set__result(simcall, idx);
806     action->state = SIMIX_DONE;
807     SIMIX_comm_finish(action);
808     return;
809   }
810
811   xbt_dynar_foreach(actions, cursor, action){
812     /* associate this simcall to the the action */
813     xbt_fifo_push(action->simcalls, simcall);
814
815     /* see if the action is already finished */
816     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
817       SIMIX_comm_finish(action);
818       break;
819     }
820   }
821 }
822
823 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
824 {
825   smx_action_t action;
826   unsigned int cursor = 0;
827   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
828
829   xbt_dynar_foreach(actions, cursor, action) {
830     xbt_fifo_remove(action->simcalls, simcall);
831   }
832 }
833
834 /**
835  *  \brief Starts the simulation of a communication action.
836  *  \param action the communication action
837  */
838 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
839 {
840   /* If both the sender and the receiver are already there, start the communication */
841   if (action->state == SIMIX_READY) {
842
843     smx_host_t sender = action->comm.src_proc->smx_host;
844     smx_host_t receiver = action->comm.dst_proc->smx_host;
845
846     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
847               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
848
849     action->comm.surf_comm = surf_workstation_communicate(sender, receiver, action->comm.task_size, action->comm.rate);
850
851     surf_action_set_data(action->comm.surf_comm, action);
852
853     action->state = SIMIX_RUNNING;
854
855     /* If a link is failed, detect it immediately */
856     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
857       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
858                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
859       action->state = SIMIX_LINK_FAILURE;
860       SIMIX_comm_destroy_internal_actions(action);
861     }
862
863     /* If any of the process is suspend, create the action but stop its execution,
864        it will be restarted when the sender process resume */
865     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
866         SIMIX_process_is_suspended(action->comm.dst_proc)) {
867       /* FIXME: check what should happen with the action state */
868
869       if (SIMIX_process_is_suspended(action->comm.src_proc))
870         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
871                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
872       else
873         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
874                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
875
876       surf_action_suspend(action->comm.surf_comm);
877
878     }
879   }
880 }
881
882 /**
883  * \brief Answers the SIMIX simcalls associated to a communication action.
884  * \param action a finished communication action
885  */
886 void SIMIX_comm_finish(smx_action_t action)
887 {
888   unsigned int destroy_count = 0;
889   smx_simcall_t simcall;
890
891   while ((simcall = xbt_fifo_shift(action->simcalls))) {
892
893     /* If a waitany simcall is waiting for this action to finish, then remove
894        it from the other actions in the waitany list. Afterwards, get the
895        position of the actual action in the waitany dynar and
896        return it as the result of the simcall */
897     if (simcall->call == SIMCALL_COMM_WAITANY) {
898       SIMIX_waitany_remove_simcall_from_actions(simcall);
899       if (!MC_is_active())
900         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
901     }
902
903     /* If the action is still in a rendez-vous point then remove from it */
904     if (action->comm.rdv)
905       SIMIX_rdv_remove(action->comm.rdv, action);
906
907     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
908
909     /* Check out for errors */
910     switch (action->state) {
911
912     case SIMIX_DONE:
913       XBT_DEBUG("Communication %p complete!", action);
914       SIMIX_comm_copy_data(action);
915       break;
916
917     case SIMIX_SRC_TIMEOUT:
918       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
919                     "Communication timeouted because of sender");
920       break;
921
922     case SIMIX_DST_TIMEOUT:
923       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
924                     "Communication timeouted because of receiver");
925       break;
926
927     case SIMIX_SRC_HOST_FAILURE:
928       if (simcall->issuer == action->comm.src_proc)
929         simcall->issuer->context->iwannadie = 1;
930 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
931       else
932         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
933       break;
934
935     case SIMIX_DST_HOST_FAILURE:
936       if (simcall->issuer == action->comm.dst_proc)
937         simcall->issuer->context->iwannadie = 1;
938 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
939       else
940         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
941       break;
942
943     case SIMIX_LINK_FAILURE:
944       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
945                 action,
946                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
947                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
948                 simcall->issuer->name, simcall->issuer, action->comm.detached);
949       if (action->comm.src_proc == simcall->issuer) {
950         XBT_DEBUG("I'm source");
951       } else if (action->comm.dst_proc == simcall->issuer) {
952         XBT_DEBUG("I'm dest");
953       } else {
954         XBT_DEBUG("I'm neither source nor dest");
955       }
956       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
957       break;
958
959     case SIMIX_CANCELED:
960       if (simcall->issuer == action->comm.dst_proc)
961         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
962                       "Communication canceled by the sender");
963       else
964         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
965                       "Communication canceled by the receiver");
966       break;
967
968     default:
969       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
970     }
971
972     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
973     if (simcall->issuer->doexception) {
974       if (simcall->call == SIMCALL_COMM_WAITANY) {
975         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
976       }
977       else if (simcall->call == SIMCALL_COMM_TESTANY) {
978         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
979       }
980     }
981
982     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
983       simcall->issuer->context->iwannadie = 1;
984     }
985
986     simcall->issuer->waiting_action = NULL;
987     xbt_fifo_remove(simcall->issuer->comms, action);
988     if(action->comm.detached){
989       if(simcall->issuer == action->comm.src_proc){
990         if(action->comm.dst_proc)
991           xbt_fifo_remove(action->comm.dst_proc->comms, action);
992       }
993       if(simcall->issuer == action->comm.dst_proc){
994         if(action->comm.src_proc)
995           xbt_fifo_remove(action->comm.src_proc->comms, action);
996       }
997     }
998     SIMIX_simcall_answer(simcall);
999     destroy_count++;
1000   }
1001
1002   while (destroy_count-- > 0)
1003     SIMIX_comm_destroy(action);
1004 }
1005
1006 /**
1007  * \brief This function is called when a Surf communication action is finished.
1008  * \param action the corresponding Simix communication
1009  */
1010 void SIMIX_post_comm(smx_action_t action)
1011 {
1012   /* Update action state */
1013   if (action->comm.src_timeout &&
1014       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
1015     action->state = SIMIX_SRC_TIMEOUT;
1016   else if (action->comm.dst_timeout &&
1017           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
1018     action->state = SIMIX_DST_TIMEOUT;
1019   else if (action->comm.src_timeout &&
1020           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
1021     action->state = SIMIX_SRC_HOST_FAILURE;
1022   else if (action->comm.dst_timeout &&
1023       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1024     action->state = SIMIX_DST_HOST_FAILURE;
1025   else if (action->comm.surf_comm &&
1026           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1027     XBT_DEBUG("Puta madre. Surf says that the link broke");
1028     action->state = SIMIX_LINK_FAILURE;
1029   } else
1030     action->state = SIMIX_DONE;
1031
1032   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1033             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1034
1035   /* destroy the surf actions associated with the Simix communication */
1036   SIMIX_comm_destroy_internal_actions(action);
1037
1038   /* remove the communication action from the list of pending communications
1039    * of both processes (if they still exist) */
1040   if (action->comm.src_proc) {
1041     xbt_fifo_remove(action->comm.src_proc->comms, action);
1042   }
1043   if (action->comm.dst_proc) {
1044     xbt_fifo_remove(action->comm.dst_proc->comms, action);
1045   }
1046
1047   /* if there are simcalls associated with the action, then answer them */
1048   if (xbt_fifo_size(action->simcalls)) {
1049     SIMIX_comm_finish(action);
1050   }
1051 }
1052
1053 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1054   SIMIX_comm_cancel(action);
1055 }
1056 void SIMIX_comm_cancel(smx_action_t action)
1057 {
1058   /* if the action is a waiting state means that it is still in a rdv */
1059   /* so remove from it and delete it */
1060   if (action->state == SIMIX_WAITING) {
1061     SIMIX_rdv_remove(action->comm.rdv, action);
1062     action->state = SIMIX_CANCELED;
1063   }
1064   else if (!MC_is_active() /* when running the MC there are no surf actions */
1065            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1066
1067     surf_action_cancel(action->comm.surf_comm);
1068   }
1069 }
1070
1071 void SIMIX_comm_suspend(smx_action_t action)
1072 {
1073   /*FIXME: shall we suspend also the timeout actions? */
1074   if (action->comm.surf_comm)
1075     surf_action_suspend(action->comm.surf_comm);
1076   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1077 }
1078
1079 void SIMIX_comm_resume(smx_action_t action)
1080 {
1081   /*FIXME: check what happen with the timeouts */
1082   if (action->comm.surf_comm)
1083     surf_action_resume(action->comm.surf_comm);
1084   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1085 }
1086
1087
1088 /************* Action Getters **************/
1089
1090 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1091   return SIMIX_comm_get_remains(action);
1092 }
1093 /**
1094  *  \brief get the amount remaining from the communication
1095  *  \param action The communication
1096  */
1097 double SIMIX_comm_get_remains(smx_action_t action)
1098 {
1099   double remains;
1100
1101   if(!action){
1102     return 0;
1103   }
1104
1105   switch (action->state) {
1106
1107   case SIMIX_RUNNING:
1108     remains = surf_action_get_remains(action->comm.surf_comm);
1109     break;
1110
1111   case SIMIX_WAITING:
1112   case SIMIX_READY:
1113     remains = 0; /*FIXME: check what should be returned */
1114     break;
1115
1116   default:
1117     remains = 0; /*FIXME: is this correct? */
1118     break;
1119   }
1120   return remains;
1121 }
1122
1123 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1124   return SIMIX_comm_get_state(action);
1125 }
1126 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1127 {
1128   return action->state;
1129 }
1130
1131 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1132   return SIMIX_comm_get_src_data(action);
1133 }
1134 /**
1135  *  \brief Return the user data associated to the sender of the communication
1136  *  \param action The communication
1137  *  \return the user data
1138  */
1139 void* SIMIX_comm_get_src_data(smx_action_t action)
1140 {
1141   return action->comm.src_data;
1142 }
1143
1144 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1145   return SIMIX_comm_get_dst_data(action);
1146 }
1147 /**
1148  *  \brief Return the user data associated to the receiver of the communication
1149  *  \param action The communication
1150  *  \return the user data
1151  */
1152 void* SIMIX_comm_get_dst_data(smx_action_t action)
1153 {
1154   return action->comm.dst_data;
1155 }
1156
1157 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1158   return SIMIX_comm_get_src_proc(action);
1159 }
1160 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1161 {
1162   return action->comm.src_proc;
1163 }
1164
1165 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1166   return SIMIX_comm_get_dst_proc(action);
1167 }
1168 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1169 {
1170   return action->comm.dst_proc;
1171 }
1172
1173 #ifdef HAVE_LATENCY_BOUND_TRACKING
1174 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1175 {
1176   return SIMIX_comm_is_latency_bounded(action);
1177 }
1178
1179 /**
1180  *  \brief verify if communication is latency bounded
1181  *  \param comm The communication
1182  */
1183 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1184 {
1185   if(!action){
1186     return 0;
1187   }
1188   if (action->comm.surf_comm){
1189     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1190     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1191     XBT_DEBUG("Action limited is %d", action->latency_limited);
1192   }
1193   return action->latency_limited;
1194 }
1195 #endif
1196
1197 /******************************************************************************/
1198 /*                    SIMIX_comm_copy_data callbacks                       */
1199 /******************************************************************************/
1200 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1201   &SIMIX_comm_copy_pointer_callback;
1202
1203 void
1204 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1205 {
1206   SIMIX_comm_copy_data_callback = callback;
1207 }
1208
1209 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1210 {
1211   xbt_assert((buff_size == sizeof(void *)),
1212              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1213   *(void **) (comm->comm.dst_buff) = buff;
1214 }
1215
1216 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1217 {
1218   XBT_DEBUG("Copy the data over");
1219   memcpy(comm->comm.dst_buff, buff, buff_size);
1220   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1221     xbt_free(buff);
1222     comm->comm.src_buff = NULL;
1223   }
1224 }
1225
1226
1227 /**
1228  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1229  *  \param comm The communication
1230  */
1231 void SIMIX_comm_copy_data(smx_action_t comm)
1232 {
1233   size_t buff_size = comm->comm.src_buff_size;
1234   /* If there is no data to be copy then return */
1235   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1236     return;
1237
1238   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1239             comm,
1240             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1241             comm->comm.src_buff,
1242             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1243             comm->comm.dst_buff, buff_size);
1244
1245   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1246   if (comm->comm.dst_buff_size)
1247     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1248
1249   /* Update the receiver's buffer size to the copied amount */
1250   if (comm->comm.dst_buff_size)
1251     *comm->comm.dst_buff_size = buff_size;
1252
1253   if (buff_size > 0)
1254     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1255
1256   /* Set the copied flag so we copy data only once */
1257   /* (this function might be called from both communication ends) */
1258   comm->comm.copied = 1;
1259 }