Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'mc' into mc-perf
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34 }
35
36 void SIMIX_network_exit(void)
37 {
38   xbt_dict_free(&rdv_points);
39 }
40
41 /******************************************************************************/
42 /*                           Rendez-Vous Points                               */
43 /******************************************************************************/
44
45 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
46   return SIMIX_rdv_create(name);
47 }
48 smx_rdv_t SIMIX_rdv_create(const char *name)
49 {
50   /* two processes may have pushed the same rdv_create simcall at the same time */
51   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
52
53   if (!rdv) {
54     rdv = xbt_new0(s_smx_rvpoint_t, 1);
55     rdv->name = name ? xbt_strdup(name) : NULL;
56     rdv->comm_fifo = xbt_fifo_new();
57     rdv->done_comm_fifo = xbt_fifo_new();
58     rdv->permanent_receiver=NULL;
59
60     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
61
62     if (rdv->name)
63       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
64   }
65   return rdv;
66 }
67
68 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
69   return SIMIX_rdv_destroy(rdv);
70 }
71 void SIMIX_rdv_destroy(smx_rdv_t rdv)
72 {
73   if (rdv->name)
74     xbt_dict_remove(rdv_points, rdv->name);
75 }
76
77 void SIMIX_rdv_free(void *data)
78 {
79   XBT_DEBUG("rdv free %p", data);
80   smx_rdv_t rdv = (smx_rdv_t) data;
81   xbt_free(rdv->name);
82   xbt_fifo_free(rdv->comm_fifo);
83   xbt_fifo_free(rdv->done_comm_fifo);
84
85   xbt_free(rdv);  
86 }
87
88 xbt_dict_t SIMIX_get_rdv_points()
89 {
90   return rdv_points;
91 }
92
93 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
94   return SIMIX_rdv_get_by_name(name);
95 }
96 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
97 {
98   return xbt_dict_get_or_null(rdv_points, name);
99 }
100
101 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
102   return SIMIX_rdv_comm_count_by_host(rdv, host);
103 }
104 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
105 {
106   smx_action_t comm = NULL;
107   xbt_fifo_item_t item = NULL;
108   int count = 0;
109
110   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
111     if (comm->comm.src_proc->smx_host == host)
112       count++;
113   }
114
115   return count;
116 }
117
118 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
119   return SIMIX_rdv_get_head(rdv);
120 }
121 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
122 {
123   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
124 }
125
126 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
127   return SIMIX_rdv_get_receiver(rdv);
128 }
129 /**
130  *  \brief get the receiver (process associated to the mailbox)
131  *  \param rdv The rendez-vous point
132  *  \return process The receiving process (NULL if not set)
133  */
134 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
135 {
136   return rdv->permanent_receiver;
137 }
138
139 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
140                             smx_process_t process){
141   SIMIX_rdv_set_receiver(rdv, process);
142 }
143 /**
144  *  \brief set the receiver of the rendez vous point to allow eager sends
145  *  \param rdv The rendez-vous point
146  *  \param process The receiving process
147  */
148 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
149 {
150   rdv->permanent_receiver=process;
151 }
152
153 /**
154  *  \brief Pushes a communication action into a rendez-vous point
155  *  \param rdv The rendez-vous point
156  *  \param comm The communication action
157  */
158 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
159 {
160   xbt_fifo_push(rdv->comm_fifo, comm);
161   comm->comm.rdv = rdv;
162 }
163
164 /**
165  *  \brief Removes a communication action from a rendez-vous point
166  *  \param rdv The rendez-vous point
167  *  \param comm The communication action
168  */
169 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
170 {
171   xbt_fifo_remove(rdv->comm_fifo, comm);
172   comm->comm.rdv = NULL;
173 }
174
175 /**
176  *  \brief Checks if there is a communication action queued in a fifo matching our needs
177  *  \param type The type of communication we are looking for (comm_send, comm_recv)
178  *  \return The communication action if found, NULL otherwise
179  */
180 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
181                                  int (*match_fun)(void *, void *,smx_action_t),
182                                  void *this_user_data, smx_action_t my_action)
183 {
184   smx_action_t action;
185   xbt_fifo_item_t item;
186   void* other_user_data = NULL;
187
188   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
189     if (action->comm.type == SIMIX_COMM_SEND) {
190       other_user_data = action->comm.src_data;
191     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
192       other_user_data = action->comm.dst_data;
193     }
194     if (action->comm.type == type &&
195         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
196         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
197       XBT_DEBUG("Found a matching communication action %p", action);
198       xbt_fifo_remove_item(fifo, item);
199       xbt_fifo_free_item(item);
200       action->comm.refcount++;
201 #ifdef HAVE_MC
202       action->comm.rdv_cpy = action->comm.rdv;
203 #endif
204       action->comm.rdv = NULL;
205       return action;
206     }
207     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
208               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
209               action, (int)action->comm.type, (int)type);
210   }
211   XBT_DEBUG("No matching communication action found");
212   return NULL;
213 }
214
215
216 /**
217  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
218  *  \param type The type of communication we are looking for (comm_send, comm_recv)
219  *  \return The communication action if found, NULL otherwise
220  */
221 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
222                                  int (*match_fun)(void *, void *,smx_action_t),
223                                  void *this_user_data, smx_action_t my_action)
224 {
225   smx_action_t action;
226   xbt_fifo_item_t item;
227   void* other_user_data = NULL;
228
229   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
230     if (action->comm.type == SIMIX_COMM_SEND) {
231       other_user_data = action->comm.src_data;
232     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
233       other_user_data = action->comm.dst_data;
234     }
235     if (action->comm.type == type &&
236         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
237         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
238       XBT_DEBUG("Found a matching communication action %p", action);
239       action->comm.refcount++;
240
241       return action;
242     }
243     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
244               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
245               action, (int)action->comm.type, (int)type);
246   }
247   XBT_DEBUG("No matching communication action found");
248   return NULL;
249 }
250 /******************************************************************************/
251 /*                            Communication Actions                            */
252 /******************************************************************************/
253
254 /**
255  *  \brief Creates a new communicate action
256  *  \param type The direction of communication (comm_send, comm_recv)
257  *  \return The new communicate action
258  */
259 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
260 {
261   smx_action_t act;
262
263   /* alloc structures */
264   act = xbt_mallocator_get(simix_global->action_mallocator);
265
266   act->type = SIMIX_ACTION_COMMUNICATE;
267   act->state = SIMIX_WAITING;
268
269   /* set communication */
270   act->comm.type = type;
271   act->comm.refcount = 1;
272   act->comm.src_data=NULL;
273   act->comm.dst_data=NULL;
274
275
276 #ifdef HAVE_LATENCY_BOUND_TRACKING
277   //initialize with unknown value
278   act->latency_limited = -1;
279 #endif
280
281 #ifdef HAVE_TRACING
282   act->category = NULL;
283 #endif
284
285   XBT_DEBUG("Create communicate action %p", act);
286   ++smx_total_comms;
287
288   return act;
289 }
290
291 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
292   SIMIX_comm_destroy(action);
293 }
294 /**
295  *  \brief Destroy a communicate action
296  *  \param action The communicate action to be destroyed
297  */
298 void SIMIX_comm_destroy(smx_action_t action)
299 {
300   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
301             action, action->comm.refcount, (int)action->state);
302
303   if (action->comm.refcount <= 0) {
304     xbt_backtrace_display_current();
305     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
306             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
307   }
308   action->comm.refcount--;
309   if (action->comm.refcount > 0)
310       return;
311   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
312             action->comm.refcount);
313
314 #ifdef HAVE_LATENCY_BOUND_TRACKING
315   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
316 #endif
317
318   xbt_free(action->name);
319   SIMIX_comm_destroy_internal_actions(action);
320
321   if (action->comm.detached && action->state != SIMIX_DONE) {
322     /* the communication has failed and was detached:
323      * we have to free the buffer */
324     if (action->comm.clean_fun) {
325       action->comm.clean_fun(action->comm.src_buff);
326     }
327     action->comm.src_buff = NULL;
328   }
329
330   if(action->comm.rdv)
331     SIMIX_rdv_remove(action->comm.rdv, action);
332
333   xbt_mallocator_release(simix_global->action_mallocator, action);
334 }
335
336 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
337 {
338   if (action->comm.surf_comm){
339 #ifdef HAVE_LATENCY_BOUND_TRACKING
340     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
341 #endif
342     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
343     action->comm.surf_comm = NULL;
344   }
345
346   if (action->comm.src_timeout){
347     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
348     action->comm.src_timeout = NULL;
349   }
350
351   if (action->comm.dst_timeout){
352     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
353     action->comm.dst_timeout = NULL;
354   }
355 }
356
357 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
358                                   double task_size, double rate,
359                                   void *src_buff, size_t src_buff_size,
360                                   int (*match_fun)(void *, void *,smx_action_t),
361                                   void *data, double timeout){
362   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
363                                        src_buff, src_buff_size, match_fun, NULL,
364                                        data, 0);
365   simcall->mc_value = 0;
366   SIMIX_pre_comm_wait(simcall, comm, timeout);
367 }
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
369                                   double task_size, double rate,
370                                   void *src_buff, size_t src_buff_size,
371                                   int (*match_fun)(void *, void *,smx_action_t),
372                                   void (*clean_fun)(void *), 
373                                   void *data, int detached){
374   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
375                           src_buff_size, match_fun, clean_fun, data, detached);
376
377 }
378 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
379                               double task_size, double rate,
380                               void *src_buff, size_t src_buff_size,
381                               int (*match_fun)(void *, void *,smx_action_t),
382                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
383                               void *data,
384                               int detached)
385 {
386   XBT_DEBUG("send from %p\n", rdv);
387
388   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
389   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
390
391   /* Look for communication action matching our needs. We also provide a description of
392    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
393    *
394    * If it is not found then push our communication into the rendez-vous point */
395   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
396
397   if (!other_action) {
398     other_action = this_action;
399
400     if (rdv->permanent_receiver!=NULL){
401       //this mailbox is for small messages, which have to be sent right now
402       other_action->state = SIMIX_READY;
403       other_action->comm.dst_proc=rdv->permanent_receiver;
404       other_action->comm.refcount++;
405       xbt_fifo_push(rdv->done_comm_fifo,other_action);
406       other_action->comm.rdv=rdv;
407       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
408
409     }else{
410       SIMIX_rdv_push(rdv, this_action);
411     }
412   } else {
413     XBT_DEBUG("Receive already pushed\n");
414
415     SIMIX_comm_destroy(this_action);
416     --smx_total_comms; // this creation was a pure waste
417
418     other_action->state = SIMIX_READY;
419     other_action->comm.type = SIMIX_COMM_READY;
420
421   }
422   xbt_fifo_push(src_proc->comms, other_action);
423
424   /* if the communication action is detached then decrease the refcount
425    * by one, so it will be eliminated by the receiver's destroy call */
426   if (detached) {
427     other_action->comm.detached = 1;
428     other_action->comm.refcount--;
429     other_action->comm.clean_fun = clean_fun;
430   } else {
431     other_action->comm.clean_fun = NULL;
432   }
433
434   /* Setup the communication action */
435   other_action->comm.src_proc = src_proc;
436   other_action->comm.task_size = task_size;
437   other_action->comm.rate = rate;
438   other_action->comm.src_buff = src_buff;
439   other_action->comm.src_buff_size = src_buff_size;
440   other_action->comm.src_data = data;
441
442   other_action->comm.match_fun = match_fun;
443
444   if (MC_is_active()) {
445     other_action->state = SIMIX_RUNNING;
446     return other_action;
447   }
448
449   SIMIX_comm_start(other_action);
450   return (detached ? NULL : other_action);
451 }
452
453 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
454                                   void *dst_buff, size_t *dst_buff_size,
455                                   int (*match_fun)(void *, void *, smx_action_t),
456                                   void *data, double timeout){
457   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
458                                        dst_buff_size, match_fun, data);
459   simcall->mc_value = 0;
460   SIMIX_pre_comm_wait(simcall, comm, timeout);
461 }
462
463 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
464                                   void *dst_buff, size_t *dst_buff_size,
465                                   int (*match_fun)(void *, void *, smx_action_t),
466                                   void *data, double timeout, double rate){
467   smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
468                                        dst_buff_size, match_fun, data, rate);
469   simcall->mc_value = 0;
470   SIMIX_pre_comm_wait(simcall, comm, timeout);
471 }
472
473 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
474                                   void *dst_buff, size_t *dst_buff_size,
475                                   int (*match_fun)(void *, void *, smx_action_t),
476                                   void *data){
477   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
478                           match_fun, data);
479 }
480
481 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
482                               void *dst_buff, size_t *dst_buff_size,
483                               int (*match_fun)(void *, void *, smx_action_t), void *data)
484 {
485   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
486   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487
488   smx_action_t other_action;
489   //communication already done, get it inside the fifo of completed comms
490   //permanent receive v1
491   //int already_received=0;
492   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
493
494     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
495     //find a match in the already received fifo
496     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497     //if not found, assume the receiver came first, register it to the mailbox in the classical way
498     if (!other_action)  {
499       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
500       other_action = this_action;
501       SIMIX_rdv_push(rdv, this_action);
502     }else{
503       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
504       {
505         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
506         other_action->state = SIMIX_DONE;
507         other_action->comm.type = SIMIX_COMM_DONE;
508         other_action->comm.rdv = NULL;
509         //SIMIX_comm_destroy(this_action);
510         //--smx_total_comms; // this creation was a pure waste
511         //already_received=1;
512         //other_action->comm.refcount--;
513       }/*else{
514          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
515          }*/
516       other_action->comm.refcount--;
517       SIMIX_comm_destroy(this_action);
518       --smx_total_comms; // this creation was a pure waste
519     }
520   }else{
521     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
522
523     /* Look for communication action matching our needs. We also provide a description of
524      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
525      *
526      * If it is not found then push our communication into the rendez-vous point */
527     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
528
529     if (!other_action) {
530       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
531       other_action = this_action;
532       SIMIX_rdv_push(rdv, this_action);
533     } else {
534       SIMIX_comm_destroy(this_action);
535       --smx_total_comms; // this creation was a pure waste
536       other_action->state = SIMIX_READY;
537       other_action->comm.type = SIMIX_COMM_READY;
538       //other_action->comm.refcount--;
539     }
540     xbt_fifo_push(dst_proc->comms, other_action);
541   }
542
543   /* Setup communication action */
544   other_action->comm.dst_proc = dst_proc;
545   other_action->comm.dst_buff = dst_buff;
546   other_action->comm.dst_buff_size = dst_buff_size;
547   other_action->comm.dst_data = data;
548
549   other_action->comm.match_fun = match_fun;
550
551
552   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
553     SIMIX_comm_copy_data(other_action);*/
554
555
556   if (MC_is_active()) {
557     other_action->state = SIMIX_RUNNING;
558     return other_action;
559   }
560
561   SIMIX_comm_start(other_action);
562   // }
563   return other_action;
564 }
565
566 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
567                                   void *dst_buff, size_t *dst_buff_size,
568                                   int (*match_fun)(void *, void *, smx_action_t),
569                                   void *data, double rate){
570   return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
571                           match_fun, data, rate);
572 }
573
574 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
575                               void *dst_buff, size_t *dst_buff_size,
576                               int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
577 {
578   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
579   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
580
581   smx_action_t other_action;
582   //communication already done, get it inside the fifo of completed comms
583   //permanent receive v1
584   //int already_received=0;
585   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586
587     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
588     //find a match in the already received fifo
589     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
590     //if not found, assume the receiver came first, register it to the mailbox in the classical way
591     if (!other_action)  {
592       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
593       other_action = this_action;
594       SIMIX_rdv_push(rdv, this_action);
595     }else{
596       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
597       {
598         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
599         other_action->state = SIMIX_DONE;
600         other_action->comm.type = SIMIX_COMM_DONE;
601         other_action->comm.rdv = NULL;
602         //SIMIX_comm_destroy(this_action);
603         //--smx_total_comms; // this creation was a pure waste
604         //already_received=1;
605         //other_action->comm.refcount--;
606       }/*else{
607          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
608          }*/
609       other_action->comm.refcount--;
610       SIMIX_comm_destroy(this_action);
611       --smx_total_comms; // this creation was a pure waste
612     }
613   }else{
614     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
615
616     /* Look for communication action matching our needs. We also provide a description of
617      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
618      *
619      * If it is not found then push our communication into the rendez-vous point */
620     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
621
622     if (!other_action) {
623       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
624       other_action = this_action;
625       SIMIX_rdv_push(rdv, this_action);
626     } else {
627       SIMIX_comm_destroy(this_action);
628       --smx_total_comms; // this creation was a pure waste
629       other_action->state = SIMIX_READY;
630       other_action->comm.type = SIMIX_COMM_READY;
631       //other_action->comm.refcount--;
632     }
633     xbt_fifo_push(dst_proc->comms, other_action);
634   }
635
636   /* Setup communication action */
637   other_action->comm.dst_proc = dst_proc;
638   other_action->comm.dst_buff = dst_buff;
639   other_action->comm.dst_buff_size = dst_buff_size;
640   other_action->comm.dst_data = data;
641
642   if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
643           other_action->comm.rate = rate;
644
645   other_action->comm.match_fun = match_fun;
646
647
648   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
649     SIMIX_comm_copy_data(other_action);*/
650
651
652   if (MC_is_active()) {
653     other_action->state = SIMIX_RUNNING;
654     return other_action;
655   }
656
657   SIMIX_comm_start(other_action);
658   // }
659   return other_action;
660 }
661
662 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
663                                    int src, int tag,
664                                    int (*match_fun)(void *, void *, smx_action_t),
665                                    void *data){
666   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
667 }
668
669 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
670                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
671 {
672   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
673   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
674
675   smx_action_t other_action=NULL;
676   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
677     //find a match in the already received fifo
678       XBT_DEBUG("first try in the perm recv mailbox \n");
679
680     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
681   }
682  // }else{
683     if(!other_action){
684         XBT_DEBUG("second try in the other mailbox");
685         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
686     }
687 //  }
688   if(other_action)other_action->comm.refcount--;
689
690   SIMIX_comm_destroy(this_action);
691   --smx_total_comms;
692   return other_action;
693 }
694
695 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
696 {
697   /* the simcall may be a wait, a send or a recv */
698   surf_action_t sleep;
699
700   /* Associate this simcall to the wait action */
701   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
702
703   xbt_fifo_push(action->simcalls, simcall);
704   simcall->issuer->waiting_action = action;
705
706   if (MC_is_active()) {
707     int idx = simcall->mc_value;
708     if (idx == 0) {
709       action->state = SIMIX_DONE;
710     } else {
711       /* If we reached this point, the wait simcall must have a timeout */
712       /* Otherwise it shouldn't be enabled and executed by the MC */
713       if (timeout == -1)
714         THROW_IMPOSSIBLE;
715
716       if (action->comm.src_proc == simcall->issuer)
717         action->state = SIMIX_SRC_TIMEOUT;
718       else
719         action->state = SIMIX_DST_TIMEOUT;
720     }
721
722     SIMIX_comm_finish(action);
723     return;
724   }
725
726   /* If the action has already finish perform the error handling, */
727   /* otherwise set up a waiting timeout on the right side         */
728   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
729     SIMIX_comm_finish(action);
730   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
731     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
732     surf_workstation_model->action_data_set(sleep, action);
733
734     if (simcall->issuer == action->comm.src_proc)
735       action->comm.src_timeout = sleep;
736     else
737       action->comm.dst_timeout = sleep;
738   }
739 }
740
741 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
742 {
743   if(MC_is_active()){
744     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
745     if(simcall_comm_test__get__result(simcall)){
746       action->state = SIMIX_DONE;
747       xbt_fifo_push(action->simcalls, simcall);
748       SIMIX_comm_finish(action);
749     }else{
750       SIMIX_simcall_answer(simcall);
751     }
752     return;
753   }
754
755   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
756   if (simcall_comm_test__get__result(simcall)) {
757     xbt_fifo_push(action->simcalls, simcall);
758     SIMIX_comm_finish(action);
759   } else {
760     SIMIX_simcall_answer(simcall);
761   }
762 }
763
764 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
765 {
766   unsigned int cursor;
767   smx_action_t action;
768   simcall_comm_testany__set__result(simcall, -1);
769
770   if (MC_is_active()){
771     int idx = simcall->mc_value;
772     if(idx == -1){
773       SIMIX_simcall_answer(simcall);
774     }else{
775       action = xbt_dynar_get_as(actions, idx, smx_action_t);
776       simcall_comm_testany__set__result(simcall, idx);
777       xbt_fifo_push(action->simcalls, simcall);
778       action->state = SIMIX_DONE;
779       SIMIX_comm_finish(action);
780     }
781     return;
782   }
783
784   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
785     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
786       simcall_comm_testany__set__result(simcall, cursor);
787       xbt_fifo_push(action->simcalls, simcall);
788       SIMIX_comm_finish(action);
789       return;
790     }
791   }
792   SIMIX_simcall_answer(simcall);
793 }
794
795 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
796 {
797   smx_action_t action;
798   unsigned int cursor = 0;
799
800   if (MC_is_active()){
801     int idx = simcall->mc_value;
802     action = xbt_dynar_get_as(actions, idx, smx_action_t);
803     xbt_fifo_push(action->simcalls, simcall);
804     simcall_comm_waitany__set__result(simcall, idx);
805     action->state = SIMIX_DONE;
806     SIMIX_comm_finish(action);
807     return;
808   }
809
810   xbt_dynar_foreach(actions, cursor, action){
811     /* associate this simcall to the the action */
812     xbt_fifo_push(action->simcalls, simcall);
813
814     /* see if the action is already finished */
815     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
816       SIMIX_comm_finish(action);
817       break;
818     }
819   }
820 }
821
822 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
823 {
824   smx_action_t action;
825   unsigned int cursor = 0;
826   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
827
828   xbt_dynar_foreach(actions, cursor, action) {
829     xbt_fifo_remove(action->simcalls, simcall);
830   }
831 }
832
833 /**
834  *  \brief Starts the simulation of a communication action.
835  *  \param action the communication action
836  */
837 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
838 {
839   /* If both the sender and the receiver are already there, start the communication */
840   if (action->state == SIMIX_READY) {
841
842     smx_host_t sender = action->comm.src_proc->smx_host;
843     smx_host_t receiver = action->comm.dst_proc->smx_host;
844
845     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
846               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
847
848     action->comm.surf_comm = surf_workstation_model->extension.workstation.
849       communicate(sender, receiver, action->comm.task_size, action->comm.rate);
850
851     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
852
853     action->state = SIMIX_RUNNING;
854
855     /* If a link is failed, detect it immediately */
856     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
857       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
858                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
859       action->state = SIMIX_LINK_FAILURE;
860       SIMIX_comm_destroy_internal_actions(action);
861     }
862
863     /* If any of the process is suspend, create the action but stop its execution,
864        it will be restarted when the sender process resume */
865     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
866         SIMIX_process_is_suspended(action->comm.dst_proc)) {
867       /* FIXME: check what should happen with the action state */
868
869       if (SIMIX_process_is_suspended(action->comm.src_proc))
870         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
871                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
872       else
873         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
874                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
875
876       surf_workstation_model->suspend(action->comm.surf_comm);
877
878     }
879   }
880 }
881
882 /**
883  * \brief Answers the SIMIX simcalls associated to a communication action.
884  * \param action a finished communication action
885  */
886 void SIMIX_comm_finish(smx_action_t action)
887 {
888   unsigned int destroy_count = 0;
889   smx_simcall_t simcall;
890
891   while ((simcall = xbt_fifo_shift(action->simcalls))) {
892
893     /* If a waitany simcall is waiting for this action to finish, then remove
894        it from the other actions in the waitany list. Afterwards, get the
895        position of the actual action in the waitany dynar and
896        return it as the result of the simcall */
897     if (simcall->call == SIMCALL_COMM_WAITANY) {
898       SIMIX_waitany_remove_simcall_from_actions(simcall);
899       if (!MC_is_active())
900         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
901     }
902
903     /* If the action is still in a rendez-vous point then remove from it */
904     if (action->comm.rdv)
905       SIMIX_rdv_remove(action->comm.rdv, action);
906
907     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
908
909     /* Check out for errors */
910     switch (action->state) {
911
912     case SIMIX_DONE:
913       XBT_DEBUG("Communication %p complete!", action);
914       SIMIX_comm_copy_data(action);
915       break;
916
917     case SIMIX_SRC_TIMEOUT:
918       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
919                     "Communication timeouted because of sender");
920       break;
921
922     case SIMIX_DST_TIMEOUT:
923       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
924                     "Communication timeouted because of receiver");
925       break;
926
927     case SIMIX_SRC_HOST_FAILURE:
928       if (simcall->issuer == action->comm.src_proc)
929         simcall->issuer->context->iwannadie = 1;
930 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
931       else
932         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
933       break;
934
935     case SIMIX_DST_HOST_FAILURE:
936       if (simcall->issuer == action->comm.dst_proc)
937         simcall->issuer->context->iwannadie = 1;
938 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
939       else
940         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
941       break;
942
943     case SIMIX_LINK_FAILURE:
944       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
945                 action,
946                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
947                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
948                 simcall->issuer->name, simcall->issuer, action->comm.detached);
949       if (action->comm.src_proc == simcall->issuer) {
950         XBT_DEBUG("I'm source");
951       } else if (action->comm.dst_proc == simcall->issuer) {
952         XBT_DEBUG("I'm dest");
953       } else {
954         XBT_DEBUG("I'm neither source nor dest");
955       }
956       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
957       break;
958
959     case SIMIX_CANCELED:
960       if (simcall->issuer == action->comm.dst_proc)
961         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
962                       "Communication canceled by the sender");
963       else
964         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
965                       "Communication canceled by the receiver");
966       break;
967
968     default:
969       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
970     }
971
972     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
973     if (simcall->issuer->doexception) {
974       if (simcall->call == SIMCALL_COMM_WAITANY) {
975         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
976       }
977       else if (simcall->call == SIMCALL_COMM_TESTANY) {
978         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
979       }
980     }
981
982     if (surf_workstation_model->extension.
983         workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
984       simcall->issuer->context->iwannadie = 1;
985     }
986
987     simcall->issuer->waiting_action = NULL;
988     xbt_fifo_remove(simcall->issuer->comms, action);
989     if(action->comm.detached){
990       if(simcall->issuer == action->comm.src_proc){
991         if(action->comm.dst_proc)
992           xbt_fifo_remove(action->comm.dst_proc->comms, action);
993       }
994       if(simcall->issuer == action->comm.dst_proc){
995         if(action->comm.src_proc)
996           xbt_fifo_remove(action->comm.src_proc->comms, action);
997       }
998     }
999     SIMIX_simcall_answer(simcall);
1000     destroy_count++;
1001   }
1002
1003   while (destroy_count-- > 0)
1004     SIMIX_comm_destroy(action);
1005 }
1006
1007 /**
1008  * \brief This function is called when a Surf communication action is finished.
1009  * \param action the corresponding Simix communication
1010  */
1011 void SIMIX_post_comm(smx_action_t action)
1012 {
1013   /* Update action state */
1014   if (action->comm.src_timeout &&
1015       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1016     action->state = SIMIX_SRC_TIMEOUT;
1017   else if (action->comm.dst_timeout &&
1018            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1019     action->state = SIMIX_DST_TIMEOUT;
1020   else if (action->comm.src_timeout &&
1021            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1022     action->state = SIMIX_SRC_HOST_FAILURE;
1023   else if (action->comm.dst_timeout &&
1024            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1025     action->state = SIMIX_DST_HOST_FAILURE;
1026   else if (action->comm.surf_comm &&
1027            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1028     XBT_DEBUG("Puta madre. Surf says that the link broke");
1029     action->state = SIMIX_LINK_FAILURE;
1030   } else
1031     action->state = SIMIX_DONE;
1032
1033   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1034             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1035
1036   /* destroy the surf actions associated with the Simix communication */
1037   SIMIX_comm_destroy_internal_actions(action);
1038
1039   /* remove the communication action from the list of pending communications
1040    * of both processes (if they still exist) */
1041   if (action->comm.src_proc) {
1042     xbt_fifo_remove(action->comm.src_proc->comms, action);
1043   }
1044   if (action->comm.dst_proc) {
1045     xbt_fifo_remove(action->comm.dst_proc->comms, action);
1046   }
1047
1048   /* if there are simcalls associated with the action, then answer them */
1049   if (xbt_fifo_size(action->simcalls)) {
1050     SIMIX_comm_finish(action);
1051   }
1052 }
1053
1054 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1055   SIMIX_comm_cancel(action);
1056 }
1057 void SIMIX_comm_cancel(smx_action_t action)
1058 {
1059   /* if the action is a waiting state means that it is still in a rdv */
1060   /* so remove from it and delete it */
1061   if (action->state == SIMIX_WAITING) {
1062     SIMIX_rdv_remove(action->comm.rdv, action);
1063     action->state = SIMIX_CANCELED;
1064   }
1065   else if (!MC_is_active() /* when running the MC there are no surf actions */
1066            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1067
1068     surf_workstation_model->action_cancel(action->comm.surf_comm);
1069   }
1070 }
1071
1072 void SIMIX_comm_suspend(smx_action_t action)
1073 {
1074   /*FIXME: shall we suspend also the timeout actions? */
1075   if (action->comm.surf_comm)
1076     surf_workstation_model->suspend(action->comm.surf_comm);
1077   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1078 }
1079
1080 void SIMIX_comm_resume(smx_action_t action)
1081 {
1082   /*FIXME: check what happen with the timeouts */
1083   if (action->comm.surf_comm)
1084     surf_workstation_model->resume(action->comm.surf_comm);
1085   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1086 }
1087
1088
1089 /************* Action Getters **************/
1090
1091 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1092   return SIMIX_comm_get_remains(action);
1093 }
1094 /**
1095  *  \brief get the amount remaining from the communication
1096  *  \param action The communication
1097  */
1098 double SIMIX_comm_get_remains(smx_action_t action)
1099 {
1100   double remains;
1101
1102   if(!action){
1103     return 0;
1104   }
1105
1106   switch (action->state) {
1107
1108   case SIMIX_RUNNING:
1109     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1110     break;
1111
1112   case SIMIX_WAITING:
1113   case SIMIX_READY:
1114     remains = 0; /*FIXME: check what should be returned */
1115     break;
1116
1117   default:
1118     remains = 0; /*FIXME: is this correct? */
1119     break;
1120   }
1121   return remains;
1122 }
1123
1124 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1125   return SIMIX_comm_get_state(action);
1126 }
1127 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1128 {
1129   return action->state;
1130 }
1131
1132 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1133   return SIMIX_comm_get_src_data(action);
1134 }
1135 /**
1136  *  \brief Return the user data associated to the sender of the communication
1137  *  \param action The communication
1138  *  \return the user data
1139  */
1140 void* SIMIX_comm_get_src_data(smx_action_t action)
1141 {
1142   return action->comm.src_data;
1143 }
1144
1145 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1146   return SIMIX_comm_get_dst_data(action);
1147 }
1148 /**
1149  *  \brief Return the user data associated to the receiver of the communication
1150  *  \param action The communication
1151  *  \return the user data
1152  */
1153 void* SIMIX_comm_get_dst_data(smx_action_t action)
1154 {
1155   return action->comm.dst_data;
1156 }
1157
1158 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1159   return SIMIX_comm_get_src_proc(action);
1160 }
1161 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1162 {
1163   return action->comm.src_proc;
1164 }
1165
1166 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1167   return SIMIX_comm_get_dst_proc(action);
1168 }
1169 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1170 {
1171   return action->comm.dst_proc;
1172 }
1173
1174 #ifdef HAVE_LATENCY_BOUND_TRACKING
1175 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1176 {
1177   return SIMIX_comm_is_latency_bounded(action);
1178 }
1179
1180 /**
1181  *  \brief verify if communication is latency bounded
1182  *  \param comm The communication
1183  */
1184 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1185 {
1186   if(!action){
1187     return 0;
1188   }
1189   if (action->comm.surf_comm){
1190     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1191     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1192     XBT_DEBUG("Action limited is %d", action->latency_limited);
1193   }
1194   return action->latency_limited;
1195 }
1196 #endif
1197
1198 /******************************************************************************/
1199 /*                    SIMIX_comm_copy_data callbacks                       */
1200 /******************************************************************************/
1201 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1202   &SIMIX_comm_copy_pointer_callback;
1203
1204 void
1205 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1206 {
1207   SIMIX_comm_copy_data_callback = callback;
1208 }
1209
1210 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1211 {
1212   xbt_assert((buff_size == sizeof(void *)),
1213              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1214   *(void **) (comm->comm.dst_buff) = buff;
1215 }
1216
1217 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1218 {
1219   XBT_DEBUG("Copy the data over");
1220   memcpy(comm->comm.dst_buff, buff, buff_size);
1221   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1222     xbt_free(buff);
1223     comm->comm.src_buff = NULL;
1224   }
1225 }
1226
1227
1228 /**
1229  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1230  *  \param comm The communication
1231  */
1232 void SIMIX_comm_copy_data(smx_action_t comm)
1233 {
1234   size_t buff_size = comm->comm.src_buff_size;
1235   /* If there is no data to be copy then return */
1236   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1237     return;
1238
1239   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1240             comm,
1241             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1242             comm->comm.src_buff,
1243             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1244             comm->comm.dst_buff, buff_size);
1245
1246   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1247   if (comm->comm.dst_buff_size)
1248     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1249
1250   /* Update the receiver's buffer size to the copied amount */
1251   if (comm->comm.dst_buff_size)
1252     *comm->comm.dst_buff_size = buff_size;
1253
1254   if (buff_size > 0)
1255     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1256
1257   /* Set the copied flag so we copy data only once */
1258   /* (this function might be called from both communication ends) */
1259   comm->comm.copied = 1;
1260 }