Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
useless line
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore_data_bss(&smx_total_comms, sizeof(smx_total_comms));
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
47   return SIMIX_rdv_create(name);
48 }
49 smx_rdv_t SIMIX_rdv_create(const char *name)
50 {
51   /* two processes may have pushed the same rdv_create simcall at the same time */
52   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
53
54   if (!rdv) {
55     rdv = xbt_new0(s_smx_rvpoint_t, 1);
56     rdv->name = name ? xbt_strdup(name) : NULL;
57     rdv->comm_fifo = xbt_fifo_new();
58     rdv->done_comm_fifo = xbt_fifo_new();
59     rdv->permanent_receiver=NULL;
60
61     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
62
63     if (rdv->name)
64       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
65   }
66   return rdv;
67 }
68
69 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
70   return SIMIX_rdv_destroy(rdv);
71 }
72 void SIMIX_rdv_destroy(smx_rdv_t rdv)
73 {
74   if (rdv->name)
75     xbt_dict_remove(rdv_points, rdv->name);
76 }
77
78 void SIMIX_rdv_free(void *data)
79 {
80   XBT_DEBUG("rdv free %p", data);
81   smx_rdv_t rdv = (smx_rdv_t) data;
82   xbt_free(rdv->name);
83   xbt_fifo_free(rdv->comm_fifo);
84   xbt_fifo_free(rdv->done_comm_fifo);
85
86   xbt_free(rdv);  
87 }
88
89 xbt_dict_t SIMIX_get_rdv_points()
90 {
91   return rdv_points;
92 }
93
94 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
95   return SIMIX_rdv_get_by_name(name);
96 }
97 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 {
99   return xbt_dict_get_or_null(rdv_points, name);
100 }
101
102 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
103   return SIMIX_rdv_comm_count_by_host(rdv, host);
104 }
105 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 {
107   smx_action_t comm = NULL;
108   xbt_fifo_item_t item = NULL;
109   int count = 0;
110
111   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
112     if (comm->comm.src_proc->smx_host == host)
113       count++;
114   }
115
116   return count;
117 }
118
119 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
120   return SIMIX_rdv_get_head(rdv);
121 }
122 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 {
124   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 }
126
127 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
128   return SIMIX_rdv_get_receiver(rdv);
129 }
130 /**
131  *  \brief get the receiver (process associated to the mailbox)
132  *  \param rdv The rendez-vous point
133  *  \return process The receiving process (NULL if not set)
134  */
135 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 {
137   return rdv->permanent_receiver;
138 }
139
140 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
141                             smx_process_t process){
142   SIMIX_rdv_set_receiver(rdv, process);
143 }
144 /**
145  *  \brief set the receiver of the rendez vous point to allow eager sends
146  *  \param rdv The rendez-vous point
147  *  \param process The receiving process
148  */
149 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 {
151   rdv->permanent_receiver=process;
152 }
153
154 /**
155  *  \brief Pushes a communication action into a rendez-vous point
156  *  \param rdv The rendez-vous point
157  *  \param comm The communication action
158  */
159 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 {
161   xbt_fifo_push(rdv->comm_fifo, comm);
162   comm->comm.rdv = rdv;
163 }
164
165 /**
166  *  \brief Removes a communication action from a rendez-vous point
167  *  \param rdv The rendez-vous point
168  *  \param comm The communication action
169  */
170 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 {
172   xbt_fifo_remove(rdv->comm_fifo, comm);
173   comm->comm.rdv = NULL;
174 }
175
176 /**
177  *  \brief Checks if there is a communication action queued in a fifo matching our needs
178  *  \param type The type of communication we are looking for (comm_send, comm_recv)
179  *  \return The communication action if found, NULL otherwise
180  */
181 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
182                                  int (*match_fun)(void *, void *,smx_action_t),
183                                  void *this_user_data, smx_action_t my_action)
184 {
185   smx_action_t action;
186   xbt_fifo_item_t item;
187   void* other_user_data = NULL;
188
189   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
190     if (action->comm.type == SIMIX_COMM_SEND) {
191       other_user_data = action->comm.src_data;
192     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
193       other_user_data = action->comm.dst_data;
194     }
195     if (action->comm.type == type &&
196         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
197         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
198       XBT_DEBUG("Found a matching communication action %p", action);
199       xbt_fifo_remove_item(fifo, item);
200       xbt_fifo_free_item(item);
201       action->comm.refcount++;
202       action->comm.rdv = NULL;
203       return action;
204     }
205     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
206               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
207               action, (int)action->comm.type, (int)type);
208   }
209   XBT_DEBUG("No matching communication action found");
210   return NULL;
211 }
212
213
214 /**
215  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
216  *  \param type The type of communication we are looking for (comm_send, comm_recv)
217  *  \return The communication action if found, NULL otherwise
218  */
219 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
220                                  int (*match_fun)(void *, void *,smx_action_t),
221                                  void *this_user_data, smx_action_t my_action)
222 {
223   smx_action_t action;
224   xbt_fifo_item_t item;
225   void* other_user_data = NULL;
226
227   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
228     if (action->comm.type == SIMIX_COMM_SEND) {
229       other_user_data = action->comm.src_data;
230     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
231       other_user_data = action->comm.dst_data;
232     }
233     if (action->comm.type == type &&
234         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
235         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
236       XBT_DEBUG("Found a matching communication action %p", action);
237       action->comm.refcount++;
238
239       return action;
240     }
241     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
242               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
243               action, (int)action->comm.type, (int)type);
244   }
245   XBT_DEBUG("No matching communication action found");
246   return NULL;
247 }
248 /******************************************************************************/
249 /*                            Communication Actions                            */
250 /******************************************************************************/
251
252 /**
253  *  \brief Creates a new communicate action
254  *  \param type The direction of communication (comm_send, comm_recv)
255  *  \return The new communicate action
256  */
257 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
258 {
259   smx_action_t act;
260
261   /* alloc structures */
262   act = xbt_mallocator_get(simix_global->action_mallocator);
263
264   act->type = SIMIX_ACTION_COMMUNICATE;
265   act->state = SIMIX_WAITING;
266
267   /* set communication */
268   act->comm.type = type;
269   act->comm.refcount = 1;
270   act->comm.src_data=NULL;
271   act->comm.dst_data=NULL;
272
273
274 #ifdef HAVE_LATENCY_BOUND_TRACKING
275   //initialize with unknown value
276   act->latency_limited = -1;
277 #endif
278
279 #ifdef HAVE_TRACING
280   act->category = NULL;
281 #endif
282
283   XBT_DEBUG("Create communicate action %p", act);
284   ++smx_total_comms;
285
286   return act;
287 }
288
289 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
290   SIMIX_comm_destroy(action);
291 }
292 /**
293  *  \brief Destroy a communicate action
294  *  \param action The communicate action to be destroyed
295  */
296 void SIMIX_comm_destroy(smx_action_t action)
297 {
298   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
299             action, action->comm.refcount, (int)action->state);
300
301   if (action->comm.refcount <= 0) {
302     xbt_backtrace_display_current();
303     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
304             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
305   }
306   action->comm.refcount--;
307   if (action->comm.refcount > 0)
308       return;
309   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
310             action->comm.refcount);
311
312 #ifdef HAVE_LATENCY_BOUND_TRACKING
313   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
314 #endif
315
316   xbt_free(action->name);
317   SIMIX_comm_destroy_internal_actions(action);
318
319   if (action->comm.detached && action->state != SIMIX_DONE) {
320     /* the communication has failed and was detached:
321      * we have to free the buffer */
322     if (action->comm.clean_fun) {
323       action->comm.clean_fun(action->comm.src_buff);
324     }
325     action->comm.src_buff = NULL;
326   }
327
328   if(action->comm.rdv)
329     SIMIX_rdv_remove(action->comm.rdv, action);
330
331   xbt_mallocator_release(simix_global->action_mallocator, action);
332 }
333
334 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
335 {
336   if (action->comm.surf_comm){
337 #ifdef HAVE_LATENCY_BOUND_TRACKING
338     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
339 #endif
340     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
341     action->comm.surf_comm = NULL;
342   }
343
344   if (action->comm.src_timeout){
345     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
346     action->comm.src_timeout = NULL;
347   }
348
349   if (action->comm.dst_timeout){
350     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
351     action->comm.dst_timeout = NULL;
352   }
353 }
354
355 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
356                                   double task_size, double rate,
357                                   void *src_buff, size_t src_buff_size,
358                                   int (*match_fun)(void *, void *,smx_action_t),
359                                   void *data, double timeout){
360   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
361                                        src_buff, src_buff_size, match_fun, NULL,
362                                        data, 0);
363   simcall->mc_value = 0;
364   SIMIX_pre_comm_wait(simcall, comm, timeout);
365 }
366 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
367                                   double task_size, double rate,
368                                   void *src_buff, size_t src_buff_size,
369                                   int (*match_fun)(void *, void *,smx_action_t),
370                                   void (*clean_fun)(void *), 
371                                   void *data, int detached){
372   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
373                           src_buff_size, match_fun, clean_fun, data, detached);
374
375 }
376 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
377                               double task_size, double rate,
378                               void *src_buff, size_t src_buff_size,
379                               int (*match_fun)(void *, void *,smx_action_t),
380                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
381                               void *data,
382                               int detached)
383 {
384   XBT_DEBUG("send from %p\n", rdv);
385
386   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
387   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
388
389   /* Look for communication action matching our needs. We also provide a description of
390    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
391    *
392    * If it is not found then push our communication into the rendez-vous point */
393   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
394
395   if (!other_action) {
396     other_action = this_action;
397
398     if (rdv->permanent_receiver!=NULL){
399       //this mailbox is for small messages, which have to be sent right now
400       other_action->state = SIMIX_READY;
401       other_action->comm.dst_proc=rdv->permanent_receiver;
402       other_action->comm.refcount++;
403       xbt_fifo_push(rdv->done_comm_fifo,other_action);
404       other_action->comm.rdv=rdv;
405       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
406
407     }else{
408       SIMIX_rdv_push(rdv, this_action);
409     }
410   } else {
411     XBT_DEBUG("Receive already pushed\n");
412
413     SIMIX_comm_destroy(this_action);
414     --smx_total_comms; // this creation was a pure waste
415
416     other_action->state = SIMIX_READY;
417     other_action->comm.type = SIMIX_COMM_READY;
418
419   }
420   xbt_fifo_push(src_proc->comms, other_action);
421
422   /* if the communication action is detached then decrease the refcount
423    * by one, so it will be eliminated by the receiver's destroy call */
424   if (detached) {
425     other_action->comm.detached = 1;
426     other_action->comm.refcount--;
427     other_action->comm.clean_fun = clean_fun;
428   } else {
429     other_action->comm.clean_fun = NULL;
430   }
431
432   /* Setup the communication action */
433   other_action->comm.src_proc = src_proc;
434   other_action->comm.task_size = task_size;
435   other_action->comm.rate = rate;
436   other_action->comm.src_buff = src_buff;
437   other_action->comm.src_buff_size = src_buff_size;
438   other_action->comm.src_data = data;
439
440   other_action->comm.match_fun = match_fun;
441
442   if (MC_is_active()) {
443     other_action->state = SIMIX_RUNNING;
444     return other_action;
445   }
446
447   SIMIX_comm_start(other_action);
448   return (detached ? NULL : other_action);
449 }
450
451 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
452                                   void *dst_buff, size_t *dst_buff_size,
453                                   int (*match_fun)(void *, void *, smx_action_t),
454                                   void *data, double timeout){
455   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
456                                        dst_buff_size, match_fun, data);
457   simcall->mc_value = 0;
458   SIMIX_pre_comm_wait(simcall, comm, timeout);
459 }
460
461 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
462                                   void *dst_buff, size_t *dst_buff_size,
463                                   int (*match_fun)(void *, void *, smx_action_t),
464                                   void *data, double timeout, double rate){
465   smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
466                                        dst_buff_size, match_fun, data, rate);
467   simcall->mc_value = 0;
468   SIMIX_pre_comm_wait(simcall, comm, timeout);
469 }
470
471 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
472                                   void *dst_buff, size_t *dst_buff_size,
473                                   int (*match_fun)(void *, void *, smx_action_t),
474                                   void *data){
475   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
476                           match_fun, data);
477 }
478
479 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
480                               void *dst_buff, size_t *dst_buff_size,
481                               int (*match_fun)(void *, void *, smx_action_t), void *data)
482 {
483   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
484   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
485
486   smx_action_t other_action;
487   //communication already done, get it inside the fifo of completed comms
488   //permanent receive v1
489   //int already_received=0;
490   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
491
492     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
493     //find a match in the already received fifo
494     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
495     //if not found, assume the receiver came first, register it to the mailbox in the classical way
496     if (!other_action)  {
497       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
498       other_action = this_action;
499       SIMIX_rdv_push(rdv, this_action);
500     }else{
501       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
502       {
503         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
504         other_action->state = SIMIX_DONE;
505         other_action->comm.type = SIMIX_COMM_DONE;
506         other_action->comm.rdv = NULL;
507         //SIMIX_comm_destroy(this_action);
508         //--smx_total_comms; // this creation was a pure waste
509         //already_received=1;
510         //other_action->comm.refcount--;
511       }/*else{
512          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
513          }*/
514       other_action->comm.refcount--;
515       SIMIX_comm_destroy(this_action);
516       --smx_total_comms; // this creation was a pure waste
517     }
518   }else{
519     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
520
521     /* Look for communication action matching our needs. We also provide a description of
522      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
523      *
524      * If it is not found then push our communication into the rendez-vous point */
525     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
526
527     if (!other_action) {
528       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
529       other_action = this_action;
530       SIMIX_rdv_push(rdv, this_action);
531     } else {
532       SIMIX_comm_destroy(this_action);
533       --smx_total_comms; // this creation was a pure waste
534       other_action->state = SIMIX_READY;
535       other_action->comm.type = SIMIX_COMM_READY;
536       //other_action->comm.refcount--;
537     }
538     xbt_fifo_push(dst_proc->comms, other_action);
539   }
540
541   /* Setup communication action */
542   other_action->comm.dst_proc = dst_proc;
543   other_action->comm.dst_buff = dst_buff;
544   other_action->comm.dst_buff_size = dst_buff_size;
545   other_action->comm.dst_data = data;
546
547   other_action->comm.match_fun = match_fun;
548
549
550   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
551     SIMIX_comm_copy_data(other_action);*/
552
553
554   if (MC_is_active()) {
555     other_action->state = SIMIX_RUNNING;
556     return other_action;
557   }
558
559   SIMIX_comm_start(other_action);
560   // }
561   return other_action;
562 }
563
564 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
565                                   void *dst_buff, size_t *dst_buff_size,
566                                   int (*match_fun)(void *, void *, smx_action_t),
567                                   void *data, double rate){
568   return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
569                           match_fun, data, rate);
570 }
571
572 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
573                               void *dst_buff, size_t *dst_buff_size,
574                               int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
575 {
576   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
577   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
578
579   smx_action_t other_action;
580   //communication already done, get it inside the fifo of completed comms
581   //permanent receive v1
582   //int already_received=0;
583   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
584
585     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
586     //find a match in the already received fifo
587     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
588     //if not found, assume the receiver came first, register it to the mailbox in the classical way
589     if (!other_action)  {
590       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
591       other_action = this_action;
592       SIMIX_rdv_push(rdv, this_action);
593     }else{
594       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
595       {
596         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
597         other_action->state = SIMIX_DONE;
598         other_action->comm.type = SIMIX_COMM_DONE;
599         other_action->comm.rdv = NULL;
600         //SIMIX_comm_destroy(this_action);
601         //--smx_total_comms; // this creation was a pure waste
602         //already_received=1;
603         //other_action->comm.refcount--;
604       }/*else{
605          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
606          }*/
607       other_action->comm.refcount--;
608       SIMIX_comm_destroy(this_action);
609       --smx_total_comms; // this creation was a pure waste
610     }
611   }else{
612     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
613
614     /* Look for communication action matching our needs. We also provide a description of
615      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
616      *
617      * If it is not found then push our communication into the rendez-vous point */
618     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
619
620     if (!other_action) {
621       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
622       other_action = this_action;
623       SIMIX_rdv_push(rdv, this_action);
624     } else {
625       SIMIX_comm_destroy(this_action);
626       --smx_total_comms; // this creation was a pure waste
627       other_action->state = SIMIX_READY;
628       other_action->comm.type = SIMIX_COMM_READY;
629       //other_action->comm.refcount--;
630     }
631     xbt_fifo_push(dst_proc->comms, other_action);
632   }
633
634   /* Setup communication action */
635   other_action->comm.dst_proc = dst_proc;
636   other_action->comm.dst_buff = dst_buff;
637   other_action->comm.dst_buff_size = dst_buff_size;
638   other_action->comm.dst_data = data;
639
640   if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
641           other_action->comm.rate = rate;
642
643   other_action->comm.match_fun = match_fun;
644
645
646   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
647     SIMIX_comm_copy_data(other_action);*/
648
649
650   if (MC_is_active()) {
651     other_action->state = SIMIX_RUNNING;
652     return other_action;
653   }
654
655   SIMIX_comm_start(other_action);
656   // }
657   return other_action;
658 }
659
660 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
661                                    int src, int tag,
662                                    int (*match_fun)(void *, void *, smx_action_t),
663                                    void *data){
664   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
665 }
666
667 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
668                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
669 {
670   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
671   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
672
673   smx_action_t other_action=NULL;
674   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
675     //find a match in the already received fifo
676       XBT_DEBUG("first try in the perm recv mailbox \n");
677
678     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
679   }
680  // }else{
681     if(!other_action){
682         XBT_DEBUG("second try in the other mailbox");
683         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
684     }
685 //  }
686   if(other_action)other_action->comm.refcount--;
687
688   SIMIX_comm_destroy(this_action);
689   --smx_total_comms;
690   return other_action;
691 }
692
693 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
694 {
695   int idx = simcall->mc_value;
696   /* the simcall may be a wait, a send or a recv */
697   surf_action_t sleep;
698
699   /* Associate this simcall to the wait action */
700   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
701
702   xbt_fifo_push(action->simcalls, simcall);
703   simcall->issuer->waiting_action = action;
704
705   if (MC_is_active()) {
706     if (idx == 0) {
707       action->state = SIMIX_DONE;
708     } else {
709       /* If we reached this point, the wait simcall must have a timeout */
710       /* Otherwise it shouldn't be enabled and executed by the MC */
711       if (timeout == -1)
712         THROW_IMPOSSIBLE;
713
714       if (action->comm.src_proc == simcall->issuer)
715         action->state = SIMIX_SRC_TIMEOUT;
716       else
717         action->state = SIMIX_DST_TIMEOUT;
718     }
719
720     SIMIX_comm_finish(action);
721     return;
722   }
723
724   /* If the action has already finish perform the error handling, */
725   /* otherwise set up a waiting timeout on the right side         */
726   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
727     SIMIX_comm_finish(action);
728   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
729     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
730     surf_workstation_model->action_data_set(sleep, action);
731
732     if (simcall->issuer == action->comm.src_proc)
733       action->comm.src_timeout = sleep;
734     else
735       action->comm.dst_timeout = sleep;
736   }
737 }
738
739 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
740 {
741   if(MC_is_active()){
742     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
743     if(simcall_comm_test__get__result(simcall)){
744       action->state = SIMIX_DONE;
745       xbt_fifo_push(action->simcalls, simcall);
746       SIMIX_comm_finish(action);
747     }else{
748       SIMIX_simcall_answer(simcall);
749     }
750     return;
751   }
752
753   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
754   if (simcall_comm_test__get__result(simcall)) {
755     xbt_fifo_push(action->simcalls, simcall);
756     SIMIX_comm_finish(action);
757   } else {
758     SIMIX_simcall_answer(simcall);
759   }
760 }
761
762 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
763 {
764   int idx = simcall->mc_value;
765   unsigned int cursor;
766   smx_action_t action;
767   simcall_comm_testany__set__result(simcall, -1);
768
769   if (MC_is_active()){
770     if(idx == -1){
771       SIMIX_simcall_answer(simcall);
772     }else{
773       action = xbt_dynar_get_as(actions, idx, smx_action_t);
774       simcall_comm_testany__set__result(simcall, idx);
775       xbt_fifo_push(action->simcalls, simcall);
776       action->state = SIMIX_DONE;
777       SIMIX_comm_finish(action);
778     }
779     return;
780   }
781
782   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
783     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
784       simcall_comm_testany__set__result(simcall, cursor);
785       xbt_fifo_push(action->simcalls, simcall);
786       SIMIX_comm_finish(action);
787       return;
788     }
789   }
790   SIMIX_simcall_answer(simcall);
791 }
792
793 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
794 {
795   int idx = simcall->mc_value;
796   smx_action_t action;
797   unsigned int cursor = 0;
798
799   if (MC_is_active()){
800     action = xbt_dynar_get_as(actions, idx, smx_action_t);
801     xbt_fifo_push(action->simcalls, simcall);
802     simcall_comm_waitany__set__result(simcall, idx);
803     action->state = SIMIX_DONE;
804     SIMIX_comm_finish(action);
805     return;
806   }
807
808   xbt_dynar_foreach(actions, cursor, action){
809     /* associate this simcall to the the action */
810     xbt_fifo_push(action->simcalls, simcall);
811
812     /* see if the action is already finished */
813     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
814       SIMIX_comm_finish(action);
815       break;
816     }
817   }
818 }
819
820 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
821 {
822   smx_action_t action;
823   unsigned int cursor = 0;
824   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
825
826   xbt_dynar_foreach(actions, cursor, action) {
827     xbt_fifo_remove(action->simcalls, simcall);
828   }
829 }
830
831 /**
832  *  \brief Starts the simulation of a communication action.
833  *  \param action the communication action
834  */
835 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
836 {
837   /* If both the sender and the receiver are already there, start the communication */
838   if (action->state == SIMIX_READY) {
839
840     smx_host_t sender = action->comm.src_proc->smx_host;
841     smx_host_t receiver = action->comm.dst_proc->smx_host;
842
843     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
844               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
845
846     action->comm.surf_comm = surf_workstation_model->extension.workstation.
847       communicate(sender, receiver, action->comm.task_size, action->comm.rate);
848
849     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
850
851     action->state = SIMIX_RUNNING;
852
853     /* If a link is failed, detect it immediately */
854     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
855       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
856                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
857       action->state = SIMIX_LINK_FAILURE;
858       SIMIX_comm_destroy_internal_actions(action);
859     }
860
861     /* If any of the process is suspend, create the action but stop its execution,
862        it will be restarted when the sender process resume */
863     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
864         SIMIX_process_is_suspended(action->comm.dst_proc)) {
865       /* FIXME: check what should happen with the action state */
866
867       if (SIMIX_process_is_suspended(action->comm.src_proc))
868         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
869                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
870       else
871         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
872                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
873
874       surf_workstation_model->suspend(action->comm.surf_comm);
875
876     }
877   }
878 }
879
880 /**
881  * \brief Answers the SIMIX simcalls associated to a communication action.
882  * \param action a finished communication action
883  */
884 void SIMIX_comm_finish(smx_action_t action)
885 {
886   unsigned int destroy_count = 0;
887   smx_simcall_t simcall;
888
889   while ((simcall = xbt_fifo_shift(action->simcalls))) {
890
891     /* If a waitany simcall is waiting for this action to finish, then remove
892        it from the other actions in the waitany list. Afterwards, get the
893        position of the actual action in the waitany dynar and
894        return it as the result of the simcall */
895     if (simcall->call == SIMCALL_COMM_WAITANY) {
896       SIMIX_waitany_remove_simcall_from_actions(simcall);
897       if (!MC_is_active())
898         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
899     }
900
901     /* If the action is still in a rendez-vous point then remove from it */
902     if (action->comm.rdv)
903       SIMIX_rdv_remove(action->comm.rdv, action);
904
905     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
906
907     /* Check out for errors */
908     switch (action->state) {
909
910     case SIMIX_DONE:
911       XBT_DEBUG("Communication %p complete!", action);
912       SIMIX_comm_copy_data(action);
913       break;
914
915     case SIMIX_SRC_TIMEOUT:
916       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
917                     "Communication timeouted because of sender");
918       break;
919
920     case SIMIX_DST_TIMEOUT:
921       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
922                     "Communication timeouted because of receiver");
923       break;
924
925     case SIMIX_SRC_HOST_FAILURE:
926       if (simcall->issuer == action->comm.src_proc)
927         simcall->issuer->context->iwannadie = 1;
928 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
929       else
930         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
931       break;
932
933     case SIMIX_DST_HOST_FAILURE:
934       if (simcall->issuer == action->comm.dst_proc)
935         simcall->issuer->context->iwannadie = 1;
936 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
937       else
938         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
939       break;
940
941     case SIMIX_LINK_FAILURE:
942       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
943                 action,
944                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
945                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
946                 simcall->issuer->name, simcall->issuer, action->comm.detached);
947       if (action->comm.src_proc == simcall->issuer) {
948         XBT_DEBUG("I'm source");
949       } else if (action->comm.dst_proc == simcall->issuer) {
950         XBT_DEBUG("I'm dest");
951       } else {
952         XBT_DEBUG("I'm neither source nor dest");
953       }
954       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
955       break;
956
957     case SIMIX_CANCELED:
958       if (simcall->issuer == action->comm.dst_proc)
959         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
960                       "Communication canceled by the sender");
961       else
962         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
963                       "Communication canceled by the receiver");
964       break;
965
966     default:
967       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
968     }
969
970     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
971     if (simcall->issuer->doexception) {
972       if (simcall->call == SIMCALL_COMM_WAITANY) {
973         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
974       }
975       else if (simcall->call == SIMCALL_COMM_TESTANY) {
976         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
977       }
978     }
979
980     if (surf_workstation_model->extension.
981         workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
982       simcall->issuer->context->iwannadie = 1;
983     }
984
985     simcall->issuer->waiting_action = NULL;
986     xbt_fifo_remove(simcall->issuer->comms, action);
987     if(action->comm.detached){
988       if(simcall->issuer == action->comm.src_proc){
989         if(action->comm.dst_proc)
990           xbt_fifo_remove(action->comm.dst_proc->comms, action);
991       }
992       if(simcall->issuer == action->comm.dst_proc){
993         if(action->comm.src_proc)
994           xbt_fifo_remove(action->comm.src_proc->comms, action);
995       }
996     }
997     SIMIX_simcall_answer(simcall);
998     destroy_count++;
999   }
1000
1001   while (destroy_count-- > 0)
1002     SIMIX_comm_destroy(action);
1003 }
1004
1005 /**
1006  * \brief This function is called when a Surf communication action is finished.
1007  * \param action the corresponding Simix communication
1008  */
1009 void SIMIX_post_comm(smx_action_t action)
1010 {
1011   /* Update action state */
1012   if (action->comm.src_timeout &&
1013       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1014     action->state = SIMIX_SRC_TIMEOUT;
1015   else if (action->comm.dst_timeout &&
1016            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1017     action->state = SIMIX_DST_TIMEOUT;
1018   else if (action->comm.src_timeout &&
1019            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1020     action->state = SIMIX_SRC_HOST_FAILURE;
1021   else if (action->comm.dst_timeout &&
1022            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1023     action->state = SIMIX_DST_HOST_FAILURE;
1024   else if (action->comm.surf_comm &&
1025            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1026     XBT_DEBUG("Puta madre. Surf says that the link broke");
1027     action->state = SIMIX_LINK_FAILURE;
1028   } else
1029     action->state = SIMIX_DONE;
1030
1031   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1032             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1033
1034   /* destroy the surf actions associated with the Simix communication */
1035   SIMIX_comm_destroy_internal_actions(action);
1036
1037   /* remove the communication action from the list of pending communications
1038    * of both processes (if they still exist) */
1039   if (action->comm.src_proc) {
1040     xbt_fifo_remove(action->comm.src_proc->comms, action);
1041   }
1042   if (action->comm.dst_proc) {
1043     xbt_fifo_remove(action->comm.dst_proc->comms, action);
1044   }
1045
1046   /* if there are simcalls associated with the action, then answer them */
1047   if (xbt_fifo_size(action->simcalls)) {
1048     SIMIX_comm_finish(action);
1049   }
1050 }
1051
1052 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1053   SIMIX_comm_cancel(action);
1054 }
1055 void SIMIX_comm_cancel(smx_action_t action)
1056 {
1057   /* if the action is a waiting state means that it is still in a rdv */
1058   /* so remove from it and delete it */
1059   if (action->state == SIMIX_WAITING) {
1060     SIMIX_rdv_remove(action->comm.rdv, action);
1061     action->state = SIMIX_CANCELED;
1062   }
1063   else if (!MC_is_active() /* when running the MC there are no surf actions */
1064            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1065
1066     surf_workstation_model->action_cancel(action->comm.surf_comm);
1067   }
1068 }
1069
1070 void SIMIX_comm_suspend(smx_action_t action)
1071 {
1072   /*FIXME: shall we suspend also the timeout actions? */
1073   if (action->comm.surf_comm)
1074     surf_workstation_model->suspend(action->comm.surf_comm);
1075   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1076 }
1077
1078 void SIMIX_comm_resume(smx_action_t action)
1079 {
1080   /*FIXME: check what happen with the timeouts */
1081   if (action->comm.surf_comm)
1082     surf_workstation_model->resume(action->comm.surf_comm);
1083   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1084 }
1085
1086
1087 /************* Action Getters **************/
1088
1089 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1090   return SIMIX_comm_get_remains(action);
1091 }
1092 /**
1093  *  \brief get the amount remaining from the communication
1094  *  \param action The communication
1095  */
1096 double SIMIX_comm_get_remains(smx_action_t action)
1097 {
1098   double remains;
1099
1100   if(!action){
1101     return 0;
1102   }
1103
1104   switch (action->state) {
1105
1106   case SIMIX_RUNNING:
1107     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1108     break;
1109
1110   case SIMIX_WAITING:
1111   case SIMIX_READY:
1112     remains = 0; /*FIXME: check what should be returned */
1113     break;
1114
1115   default:
1116     remains = 0; /*FIXME: is this correct? */
1117     break;
1118   }
1119   return remains;
1120 }
1121
1122 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1123   return SIMIX_comm_get_state(action);
1124 }
1125 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1126 {
1127   return action->state;
1128 }
1129
1130 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1131   return SIMIX_comm_get_src_data(action);
1132 }
1133 /**
1134  *  \brief Return the user data associated to the sender of the communication
1135  *  \param action The communication
1136  *  \return the user data
1137  */
1138 void* SIMIX_comm_get_src_data(smx_action_t action)
1139 {
1140   return action->comm.src_data;
1141 }
1142
1143 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1144   return SIMIX_comm_get_dst_data(action);
1145 }
1146 /**
1147  *  \brief Return the user data associated to the receiver of the communication
1148  *  \param action The communication
1149  *  \return the user data
1150  */
1151 void* SIMIX_comm_get_dst_data(smx_action_t action)
1152 {
1153   return action->comm.dst_data;
1154 }
1155
1156 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1157   return SIMIX_comm_get_src_proc(action);
1158 }
1159 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1160 {
1161   return action->comm.src_proc;
1162 }
1163
1164 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1165   return SIMIX_comm_get_dst_proc(action);
1166 }
1167 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1168 {
1169   return action->comm.dst_proc;
1170 }
1171
1172 #ifdef HAVE_LATENCY_BOUND_TRACKING
1173 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1174 {
1175   return SIMIX_comm_is_latency_bounded(action);
1176 }
1177
1178 /**
1179  *  \brief verify if communication is latency bounded
1180  *  \param comm The communication
1181  */
1182 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1183 {
1184   if(!action){
1185     return 0;
1186   }
1187   if (action->comm.surf_comm){
1188     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1189     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1190     XBT_DEBUG("Action limited is %d", action->latency_limited);
1191   }
1192   return action->latency_limited;
1193 }
1194 #endif
1195
1196 /******************************************************************************/
1197 /*                    SIMIX_comm_copy_data callbacks                       */
1198 /******************************************************************************/
1199 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1200   &SIMIX_comm_copy_pointer_callback;
1201
1202 void
1203 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1204 {
1205   SIMIX_comm_copy_data_callback = callback;
1206 }
1207
1208 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1209 {
1210   xbt_assert((buff_size == sizeof(void *)),
1211              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1212   *(void **) (comm->comm.dst_buff) = buff;
1213 }
1214
1215 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1216 {
1217   XBT_DEBUG("Copy the data over");
1218   memcpy(comm->comm.dst_buff, buff, buff_size);
1219   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1220     xbt_free(buff);
1221     comm->comm.src_buff = NULL;
1222   }
1223 }
1224
1225
1226 /**
1227  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1228  *  \param comm The communication
1229  */
1230 void SIMIX_comm_copy_data(smx_action_t comm)
1231 {
1232   size_t buff_size = comm->comm.src_buff_size;
1233   /* If there is no data to be copy then return */
1234   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1235     return;
1236
1237   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1238             comm,
1239             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1240             comm->comm.src_buff,
1241             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1242             comm->comm.dst_buff, buff_size);
1243
1244   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1245   if (comm->comm.dst_buff_size)
1246     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1247
1248   /* Update the receiver's buffer size to the copied amount */
1249   if (comm->comm.dst_buff_size)
1250     *comm->comm.dst_buff_size = buff_size;
1251
1252   if (buff_size > 0)
1253     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1254
1255   /* Set the copied flag so we copy data only once */
1256   /* (this function might be called from both communication ends) */
1257   comm->comm.copied = 1;
1258 }