Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge remote-tracking branch 'origin/libdw2'
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34   if(MC_is_active())
35     MC_ignore_global_variable("smx_total_comms");
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);  
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
294   SIMIX_comm_destroy(action);
295 }
296 /**
297  *  \brief Destroy a communicate action
298  *  \param action The communicate action to be destroyed
299  */
300 void SIMIX_comm_destroy(smx_action_t action)
301 {
302   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
303             action, action->comm.refcount, (int)action->state);
304
305   if (action->comm.refcount <= 0) {
306     xbt_backtrace_display_current();
307     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
308             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
309   }
310   action->comm.refcount--;
311   if (action->comm.refcount > 0)
312       return;
313   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
314             action->comm.refcount);
315
316 #ifdef HAVE_LATENCY_BOUND_TRACKING
317   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
318 #endif
319
320   xbt_free(action->name);
321   SIMIX_comm_destroy_internal_actions(action);
322
323   if (action->comm.detached && action->state != SIMIX_DONE) {
324     /* the communication has failed and was detached:
325      * we have to free the buffer */
326     if (action->comm.clean_fun) {
327       action->comm.clean_fun(action->comm.src_buff);
328     }
329     action->comm.src_buff = NULL;
330   }
331
332   if(action->comm.rdv)
333     SIMIX_rdv_remove(action->comm.rdv, action);
334
335   xbt_mallocator_release(simix_global->action_mallocator, action);
336 }
337
338 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
339 {
340   if (action->comm.surf_comm){
341 #ifdef HAVE_LATENCY_BOUND_TRACKING
342     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
343 #endif
344     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
345     action->comm.surf_comm = NULL;
346   }
347
348   if (action->comm.src_timeout){
349     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
350     action->comm.src_timeout = NULL;
351   }
352
353   if (action->comm.dst_timeout){
354     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
355     action->comm.dst_timeout = NULL;
356   }
357 }
358
359 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
360                                   double task_size, double rate,
361                                   void *src_buff, size_t src_buff_size,
362                                   int (*match_fun)(void *, void *,smx_action_t),
363                                   void *data, double timeout){
364   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
365                                        src_buff, src_buff_size, match_fun, NULL,
366                                        data, 0);
367   simcall->mc_value = 0;
368   SIMIX_pre_comm_wait(simcall, comm, timeout);
369 }
370 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
371                                   double task_size, double rate,
372                                   void *src_buff, size_t src_buff_size,
373                                   int (*match_fun)(void *, void *,smx_action_t),
374                                   void (*clean_fun)(void *), 
375                                   void *data, int detached){
376   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
377                           src_buff_size, match_fun, clean_fun, data, detached);
378
379 }
380 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
381                               double task_size, double rate,
382                               void *src_buff, size_t src_buff_size,
383                               int (*match_fun)(void *, void *,smx_action_t),
384                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445
446   if (MC_is_active()) {
447     other_action->state = SIMIX_RUNNING;
448     return other_action;
449   }
450
451   SIMIX_comm_start(other_action);
452   return (detached ? NULL : other_action);
453 }
454
455 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
456                                   void *dst_buff, size_t *dst_buff_size,
457                                   int (*match_fun)(void *, void *, smx_action_t),
458                                   void *data, double timeout){
459   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
460                                        dst_buff_size, match_fun, data);
461   simcall->mc_value = 0;
462   SIMIX_pre_comm_wait(simcall, comm, timeout);
463 }
464
465 void SIMIX_pre_comm_recv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
466                                   void *dst_buff, size_t *dst_buff_size,
467                                   int (*match_fun)(void *, void *, smx_action_t),
468                                   void *data, double timeout, double rate){
469   smx_action_t comm = SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff,
470                                        dst_buff_size, match_fun, data, rate);
471   simcall->mc_value = 0;
472   SIMIX_pre_comm_wait(simcall, comm, timeout);
473 }
474
475 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
476                                   void *dst_buff, size_t *dst_buff_size,
477                                   int (*match_fun)(void *, void *, smx_action_t),
478                                   void *data){
479   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
480                           match_fun, data);
481 }
482
483 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
484                               void *dst_buff, size_t *dst_buff_size,
485                               int (*match_fun)(void *, void *, smx_action_t), void *data)
486 {
487   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
488   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
489
490   smx_action_t other_action;
491   //communication already done, get it inside the fifo of completed comms
492   //permanent receive v1
493   //int already_received=0;
494   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
495
496     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
497     //find a match in the already received fifo
498     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
499     //if not found, assume the receiver came first, register it to the mailbox in the classical way
500     if (!other_action)  {
501       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
502       other_action = this_action;
503       SIMIX_rdv_push(rdv, this_action);
504     }else{
505       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
506       {
507         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
508         other_action->state = SIMIX_DONE;
509         other_action->comm.type = SIMIX_COMM_DONE;
510         other_action->comm.rdv = NULL;
511         //SIMIX_comm_destroy(this_action);
512         //--smx_total_comms; // this creation was a pure waste
513         //already_received=1;
514         //other_action->comm.refcount--;
515       }/*else{
516          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
517          }*/
518       other_action->comm.refcount--;
519       SIMIX_comm_destroy(this_action);
520       --smx_total_comms; // this creation was a pure waste
521     }
522   }else{
523     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
524
525     /* Look for communication action matching our needs. We also provide a description of
526      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
527      *
528      * If it is not found then push our communication into the rendez-vous point */
529     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
530
531     if (!other_action) {
532       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
533       other_action = this_action;
534       SIMIX_rdv_push(rdv, this_action);
535     } else {
536       SIMIX_comm_destroy(this_action);
537       --smx_total_comms; // this creation was a pure waste
538       other_action->state = SIMIX_READY;
539       other_action->comm.type = SIMIX_COMM_READY;
540       //other_action->comm.refcount--;
541     }
542     xbt_fifo_push(dst_proc->comms, other_action);
543   }
544
545   /* Setup communication action */
546   other_action->comm.dst_proc = dst_proc;
547   other_action->comm.dst_buff = dst_buff;
548   other_action->comm.dst_buff_size = dst_buff_size;
549   other_action->comm.dst_data = data;
550
551   other_action->comm.match_fun = match_fun;
552
553
554   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
555     SIMIX_comm_copy_data(other_action);*/
556
557
558   if (MC_is_active()) {
559     other_action->state = SIMIX_RUNNING;
560     return other_action;
561   }
562
563   SIMIX_comm_start(other_action);
564   // }
565   return other_action;
566 }
567
568 smx_action_t SIMIX_pre_comm_irecv_bounded(smx_simcall_t simcall, smx_rdv_t rdv,
569                                   void *dst_buff, size_t *dst_buff_size,
570                                   int (*match_fun)(void *, void *, smx_action_t),
571                                   void *data, double rate){
572   return SIMIX_comm_irecv_bounded(simcall->issuer, rdv, dst_buff, dst_buff_size,
573                           match_fun, data, rate);
574 }
575
576 smx_action_t SIMIX_comm_irecv_bounded(smx_process_t dst_proc, smx_rdv_t rdv,
577                               void *dst_buff, size_t *dst_buff_size,
578                               int (*match_fun)(void *, void *, smx_action_t), void *data, double rate)
579 {
580   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
581   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
582
583   smx_action_t other_action;
584   //communication already done, get it inside the fifo of completed comms
585   //permanent receive v1
586   //int already_received=0;
587   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
588
589     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
590     //find a match in the already received fifo
591     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
592     //if not found, assume the receiver came first, register it to the mailbox in the classical way
593     if (!other_action)  {
594       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
595       other_action = this_action;
596       SIMIX_rdv_push(rdv, this_action);
597     }else{
598       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
599       {
600         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
601         other_action->state = SIMIX_DONE;
602         other_action->comm.type = SIMIX_COMM_DONE;
603         other_action->comm.rdv = NULL;
604         //SIMIX_comm_destroy(this_action);
605         //--smx_total_comms; // this creation was a pure waste
606         //already_received=1;
607         //other_action->comm.refcount--;
608       }/*else{
609          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
610          }*/
611       other_action->comm.refcount--;
612       SIMIX_comm_destroy(this_action);
613       --smx_total_comms; // this creation was a pure waste
614     }
615   }else{
616     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
617
618     /* Look for communication action matching our needs. We also provide a description of
619      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
620      *
621      * If it is not found then push our communication into the rendez-vous point */
622     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
623
624     if (!other_action) {
625       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
626       other_action = this_action;
627       SIMIX_rdv_push(rdv, this_action);
628     } else {
629       SIMIX_comm_destroy(this_action);
630       --smx_total_comms; // this creation was a pure waste
631       other_action->state = SIMIX_READY;
632       other_action->comm.type = SIMIX_COMM_READY;
633       //other_action->comm.refcount--;
634     }
635     xbt_fifo_push(dst_proc->comms, other_action);
636   }
637
638   /* Setup communication action */
639   other_action->comm.dst_proc = dst_proc;
640   other_action->comm.dst_buff = dst_buff;
641   other_action->comm.dst_buff_size = dst_buff_size;
642   other_action->comm.dst_data = data;
643
644   if (rate < other_action->comm.rate || other_action->comm.rate == -1.0)
645           other_action->comm.rate = rate;
646
647   other_action->comm.match_fun = match_fun;
648
649
650   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
651     SIMIX_comm_copy_data(other_action);*/
652
653
654   if (MC_is_active()) {
655     other_action->state = SIMIX_RUNNING;
656     return other_action;
657   }
658
659   SIMIX_comm_start(other_action);
660   // }
661   return other_action;
662 }
663
664 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
665                                    int src, int tag,
666                                    int (*match_fun)(void *, void *, smx_action_t),
667                                    void *data){
668   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
669 }
670
671 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
672                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
673 {
674   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
675   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
676
677   smx_action_t other_action=NULL;
678   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
679     //find a match in the already received fifo
680       XBT_DEBUG("first try in the perm recv mailbox \n");
681
682     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
683   }
684  // }else{
685     if(!other_action){
686         XBT_DEBUG("second try in the other mailbox");
687         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
688     }
689 //  }
690   if(other_action)other_action->comm.refcount--;
691
692   SIMIX_comm_destroy(this_action);
693   --smx_total_comms;
694   return other_action;
695 }
696
697 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
698 {
699   /* the simcall may be a wait, a send or a recv */
700   surf_action_t sleep;
701
702   /* Associate this simcall to the wait action */
703   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
704
705   xbt_fifo_push(action->simcalls, simcall);
706   simcall->issuer->waiting_action = action;
707
708   if (MC_is_active()) {
709     int idx = simcall->mc_value;
710     if (idx == 0) {
711       action->state = SIMIX_DONE;
712     } else {
713       /* If we reached this point, the wait simcall must have a timeout */
714       /* Otherwise it shouldn't be enabled and executed by the MC */
715       if (timeout == -1)
716         THROW_IMPOSSIBLE;
717
718       if (action->comm.src_proc == simcall->issuer)
719         action->state = SIMIX_SRC_TIMEOUT;
720       else
721         action->state = SIMIX_DST_TIMEOUT;
722     }
723
724     SIMIX_comm_finish(action);
725     return;
726   }
727
728   /* If the action has already finish perform the error handling, */
729   /* otherwise set up a waiting timeout on the right side         */
730   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
731     SIMIX_comm_finish(action);
732   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
733     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host, timeout);
734     surf_workstation_model->action_data_set(sleep, action);
735
736     if (simcall->issuer == action->comm.src_proc)
737       action->comm.src_timeout = sleep;
738     else
739       action->comm.dst_timeout = sleep;
740   }
741 }
742
743 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
744 {
745   if(MC_is_active()){
746     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
747     if(simcall_comm_test__get__result(simcall)){
748       action->state = SIMIX_DONE;
749       xbt_fifo_push(action->simcalls, simcall);
750       SIMIX_comm_finish(action);
751     }else{
752       SIMIX_simcall_answer(simcall);
753     }
754     return;
755   }
756
757   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
758   if (simcall_comm_test__get__result(simcall)) {
759     xbt_fifo_push(action->simcalls, simcall);
760     SIMIX_comm_finish(action);
761   } else {
762     SIMIX_simcall_answer(simcall);
763   }
764 }
765
766 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
767 {
768   unsigned int cursor;
769   smx_action_t action;
770   simcall_comm_testany__set__result(simcall, -1);
771
772   if (MC_is_active()){
773     int idx = simcall->mc_value;
774     if(idx == -1){
775       SIMIX_simcall_answer(simcall);
776     }else{
777       action = xbt_dynar_get_as(actions, idx, smx_action_t);
778       simcall_comm_testany__set__result(simcall, idx);
779       xbt_fifo_push(action->simcalls, simcall);
780       action->state = SIMIX_DONE;
781       SIMIX_comm_finish(action);
782     }
783     return;
784   }
785
786   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
787     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
788       simcall_comm_testany__set__result(simcall, cursor);
789       xbt_fifo_push(action->simcalls, simcall);
790       SIMIX_comm_finish(action);
791       return;
792     }
793   }
794   SIMIX_simcall_answer(simcall);
795 }
796
797 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
798 {
799   smx_action_t action;
800   unsigned int cursor = 0;
801
802   if (MC_is_active()){
803     int idx = simcall->mc_value;
804     action = xbt_dynar_get_as(actions, idx, smx_action_t);
805     xbt_fifo_push(action->simcalls, simcall);
806     simcall_comm_waitany__set__result(simcall, idx);
807     action->state = SIMIX_DONE;
808     SIMIX_comm_finish(action);
809     return;
810   }
811
812   xbt_dynar_foreach(actions, cursor, action){
813     /* associate this simcall to the the action */
814     xbt_fifo_push(action->simcalls, simcall);
815
816     /* see if the action is already finished */
817     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
818       SIMIX_comm_finish(action);
819       break;
820     }
821   }
822 }
823
824 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
825 {
826   smx_action_t action;
827   unsigned int cursor = 0;
828   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
829
830   xbt_dynar_foreach(actions, cursor, action) {
831     xbt_fifo_remove(action->simcalls, simcall);
832   }
833 }
834
835 /**
836  *  \brief Starts the simulation of a communication action.
837  *  \param action the communication action
838  */
839 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
840 {
841   /* If both the sender and the receiver are already there, start the communication */
842   if (action->state == SIMIX_READY) {
843
844     smx_host_t sender = action->comm.src_proc->smx_host;
845     smx_host_t receiver = action->comm.dst_proc->smx_host;
846
847     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
848               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
849
850     action->comm.surf_comm = surf_workstation_model->extension.workstation.
851       communicate(sender, receiver, action->comm.task_size, action->comm.rate);
852
853     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
854
855     action->state = SIMIX_RUNNING;
856
857     /* If a link is failed, detect it immediately */
858     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
859       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
860                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
861       action->state = SIMIX_LINK_FAILURE;
862       SIMIX_comm_destroy_internal_actions(action);
863     }
864
865     /* If any of the process is suspend, create the action but stop its execution,
866        it will be restarted when the sender process resume */
867     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
868         SIMIX_process_is_suspended(action->comm.dst_proc)) {
869       /* FIXME: check what should happen with the action state */
870
871       if (SIMIX_process_is_suspended(action->comm.src_proc))
872         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
873                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
874       else
875         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
876                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
877
878       surf_workstation_model->suspend(action->comm.surf_comm);
879
880     }
881   }
882 }
883
884 /**
885  * \brief Answers the SIMIX simcalls associated to a communication action.
886  * \param action a finished communication action
887  */
888 void SIMIX_comm_finish(smx_action_t action)
889 {
890   unsigned int destroy_count = 0;
891   smx_simcall_t simcall;
892
893   while ((simcall = xbt_fifo_shift(action->simcalls))) {
894
895     /* If a waitany simcall is waiting for this action to finish, then remove
896        it from the other actions in the waitany list. Afterwards, get the
897        position of the actual action in the waitany dynar and
898        return it as the result of the simcall */
899     if (simcall->call == SIMCALL_COMM_WAITANY) {
900       SIMIX_waitany_remove_simcall_from_actions(simcall);
901       if (!MC_is_active())
902         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
903     }
904
905     /* If the action is still in a rendez-vous point then remove from it */
906     if (action->comm.rdv)
907       SIMIX_rdv_remove(action->comm.rdv, action);
908
909     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
910
911     /* Check out for errors */
912     switch (action->state) {
913
914     case SIMIX_DONE:
915       XBT_DEBUG("Communication %p complete!", action);
916       SIMIX_comm_copy_data(action);
917       break;
918
919     case SIMIX_SRC_TIMEOUT:
920       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
921                     "Communication timeouted because of sender");
922       break;
923
924     case SIMIX_DST_TIMEOUT:
925       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
926                     "Communication timeouted because of receiver");
927       break;
928
929     case SIMIX_SRC_HOST_FAILURE:
930       if (simcall->issuer == action->comm.src_proc)
931         simcall->issuer->context->iwannadie = 1;
932 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
933       else
934         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
935       break;
936
937     case SIMIX_DST_HOST_FAILURE:
938       if (simcall->issuer == action->comm.dst_proc)
939         simcall->issuer->context->iwannadie = 1;
940 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
941       else
942         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
943       break;
944
945     case SIMIX_LINK_FAILURE:
946       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
947                 action,
948                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
949                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
950                 simcall->issuer->name, simcall->issuer, action->comm.detached);
951       if (action->comm.src_proc == simcall->issuer) {
952         XBT_DEBUG("I'm source");
953       } else if (action->comm.dst_proc == simcall->issuer) {
954         XBT_DEBUG("I'm dest");
955       } else {
956         XBT_DEBUG("I'm neither source nor dest");
957       }
958       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
959       break;
960
961     case SIMIX_CANCELED:
962       if (simcall->issuer == action->comm.dst_proc)
963         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
964                       "Communication canceled by the sender");
965       else
966         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
967                       "Communication canceled by the receiver");
968       break;
969
970     default:
971       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
972     }
973
974     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
975     if (simcall->issuer->doexception) {
976       if (simcall->call == SIMCALL_COMM_WAITANY) {
977         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
978       }
979       else if (simcall->call == SIMCALL_COMM_TESTANY) {
980         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
981       }
982     }
983
984     if (surf_workstation_model->extension.
985         workstation.get_state(simcall->issuer->smx_host) != SURF_RESOURCE_ON) {
986       simcall->issuer->context->iwannadie = 1;
987     }
988
989     simcall->issuer->waiting_action = NULL;
990     xbt_fifo_remove(simcall->issuer->comms, action);
991     if(action->comm.detached){
992       if(simcall->issuer == action->comm.src_proc){
993         if(action->comm.dst_proc)
994           xbt_fifo_remove(action->comm.dst_proc->comms, action);
995       }
996       if(simcall->issuer == action->comm.dst_proc){
997         if(action->comm.src_proc)
998           xbt_fifo_remove(action->comm.src_proc->comms, action);
999       }
1000     }
1001     SIMIX_simcall_answer(simcall);
1002     destroy_count++;
1003   }
1004
1005   while (destroy_count-- > 0)
1006     SIMIX_comm_destroy(action);
1007 }
1008
1009 /**
1010  * \brief This function is called when a Surf communication action is finished.
1011  * \param action the corresponding Simix communication
1012  */
1013 void SIMIX_post_comm(smx_action_t action)
1014 {
1015   /* Update action state */
1016   if (action->comm.src_timeout &&
1017       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
1018     action->state = SIMIX_SRC_TIMEOUT;
1019   else if (action->comm.dst_timeout &&
1020            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
1021     action->state = SIMIX_DST_TIMEOUT;
1022   else if (action->comm.src_timeout &&
1023            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
1024     action->state = SIMIX_SRC_HOST_FAILURE;
1025   else if (action->comm.dst_timeout &&
1026            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
1027     action->state = SIMIX_DST_HOST_FAILURE;
1028   else if (action->comm.surf_comm &&
1029            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
1030     XBT_DEBUG("Puta madre. Surf says that the link broke");
1031     action->state = SIMIX_LINK_FAILURE;
1032   } else
1033     action->state = SIMIX_DONE;
1034
1035   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
1036             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
1037
1038   /* destroy the surf actions associated with the Simix communication */
1039   SIMIX_comm_destroy_internal_actions(action);
1040
1041   /* remove the communication action from the list of pending communications
1042    * of both processes (if they still exist) */
1043   if (action->comm.src_proc) {
1044     xbt_fifo_remove(action->comm.src_proc->comms, action);
1045   }
1046   if (action->comm.dst_proc) {
1047     xbt_fifo_remove(action->comm.dst_proc->comms, action);
1048   }
1049
1050   /* if there are simcalls associated with the action, then answer them */
1051   if (xbt_fifo_size(action->simcalls)) {
1052     SIMIX_comm_finish(action);
1053   }
1054 }
1055
1056 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
1057   SIMIX_comm_cancel(action);
1058 }
1059 void SIMIX_comm_cancel(smx_action_t action)
1060 {
1061   /* if the action is a waiting state means that it is still in a rdv */
1062   /* so remove from it and delete it */
1063   if (action->state == SIMIX_WAITING) {
1064     SIMIX_rdv_remove(action->comm.rdv, action);
1065     action->state = SIMIX_CANCELED;
1066   }
1067   else if (!MC_is_active() /* when running the MC there are no surf actions */
1068            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
1069
1070     surf_workstation_model->action_cancel(action->comm.surf_comm);
1071   }
1072 }
1073
1074 void SIMIX_comm_suspend(smx_action_t action)
1075 {
1076   /*FIXME: shall we suspend also the timeout actions? */
1077   if (action->comm.surf_comm)
1078     surf_workstation_model->suspend(action->comm.surf_comm);
1079   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1080 }
1081
1082 void SIMIX_comm_resume(smx_action_t action)
1083 {
1084   /*FIXME: check what happen with the timeouts */
1085   if (action->comm.surf_comm)
1086     surf_workstation_model->resume(action->comm.surf_comm);
1087   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1088 }
1089
1090
1091 /************* Action Getters **************/
1092
1093 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1094   return SIMIX_comm_get_remains(action);
1095 }
1096 /**
1097  *  \brief get the amount remaining from the communication
1098  *  \param action The communication
1099  */
1100 double SIMIX_comm_get_remains(smx_action_t action)
1101 {
1102   double remains;
1103
1104   if(!action){
1105     return 0;
1106   }
1107
1108   switch (action->state) {
1109
1110   case SIMIX_RUNNING:
1111     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
1112     break;
1113
1114   case SIMIX_WAITING:
1115   case SIMIX_READY:
1116     remains = 0; /*FIXME: check what should be returned */
1117     break;
1118
1119   default:
1120     remains = 0; /*FIXME: is this correct? */
1121     break;
1122   }
1123   return remains;
1124 }
1125
1126 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1127   return SIMIX_comm_get_state(action);
1128 }
1129 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1130 {
1131   return action->state;
1132 }
1133
1134 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1135   return SIMIX_comm_get_src_data(action);
1136 }
1137 /**
1138  *  \brief Return the user data associated to the sender of the communication
1139  *  \param action The communication
1140  *  \return the user data
1141  */
1142 void* SIMIX_comm_get_src_data(smx_action_t action)
1143 {
1144   return action->comm.src_data;
1145 }
1146
1147 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1148   return SIMIX_comm_get_dst_data(action);
1149 }
1150 /**
1151  *  \brief Return the user data associated to the receiver of the communication
1152  *  \param action The communication
1153  *  \return the user data
1154  */
1155 void* SIMIX_comm_get_dst_data(smx_action_t action)
1156 {
1157   return action->comm.dst_data;
1158 }
1159
1160 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1161   return SIMIX_comm_get_src_proc(action);
1162 }
1163 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1164 {
1165   return action->comm.src_proc;
1166 }
1167
1168 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1169   return SIMIX_comm_get_dst_proc(action);
1170 }
1171 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1172 {
1173   return action->comm.dst_proc;
1174 }
1175
1176 #ifdef HAVE_LATENCY_BOUND_TRACKING
1177 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1178 {
1179   return SIMIX_comm_is_latency_bounded(action);
1180 }
1181
1182 /**
1183  *  \brief verify if communication is latency bounded
1184  *  \param comm The communication
1185  */
1186 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1187 {
1188   if(!action){
1189     return 0;
1190   }
1191   if (action->comm.surf_comm){
1192     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1193     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1194     XBT_DEBUG("Action limited is %d", action->latency_limited);
1195   }
1196   return action->latency_limited;
1197 }
1198 #endif
1199
1200 /******************************************************************************/
1201 /*                    SIMIX_comm_copy_data callbacks                       */
1202 /******************************************************************************/
1203 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1204   &SIMIX_comm_copy_pointer_callback;
1205
1206 void
1207 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1208 {
1209   SIMIX_comm_copy_data_callback = callback;
1210 }
1211
1212 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1213 {
1214   xbt_assert((buff_size == sizeof(void *)),
1215              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1216   *(void **) (comm->comm.dst_buff) = buff;
1217 }
1218
1219 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1220 {
1221   XBT_DEBUG("Copy the data over");
1222   memcpy(comm->comm.dst_buff, buff, buff_size);
1223   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1224     xbt_free(buff);
1225     comm->comm.src_buff = NULL;
1226   }
1227 }
1228
1229
1230 /**
1231  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1232  *  \param comm The communication
1233  */
1234 void SIMIX_comm_copy_data(smx_action_t comm)
1235 {
1236   size_t buff_size = comm->comm.src_buff_size;
1237   /* If there is no data to be copy then return */
1238   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1239     return;
1240
1241   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1242             comm,
1243             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1244             comm->comm.src_buff,
1245             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1246             comm->comm.dst_buff, buff_size);
1247
1248   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1249   if (comm->comm.dst_buff_size)
1250     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1251
1252   /* Update the receiver's buffer size to the copied amount */
1253   if (comm->comm.dst_buff_size)
1254     *comm->comm.dst_buff_size = buff_size;
1255
1256   if (buff_size > 0)
1257     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1258
1259   /* Set the copied flag so we copy data only once */
1260   /* (this function might be called from both communication ends) */
1261   comm->comm.copied = 1;
1262 }