Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "Logging specific to SIMIX (network)");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_action_t),
26                                         void *user_data, smx_action_t my_action);
27 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_action_t),
29                                         void *user_data, smx_action_t my_action);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_action_t action);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 /**
294  *  \brief Destroy a communicate action
295  *  \param action The communicate action to be destroyed
296  */
297 void SIMIX_comm_destroy(smx_action_t action)
298 {
299   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300             action, action->comm.refcount, (int)action->state);
301
302   if (action->comm.refcount <= 0) {
303     xbt_backtrace_display_current();
304     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
306   }
307   action->comm.refcount--;
308   if (action->comm.refcount > 0)
309       return;
310   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311             action->comm.refcount);
312
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
315 #endif
316
317   xbt_free(action->name);
318   SIMIX_comm_destroy_internal_actions(action);
319
320   if (action->comm.detached && action->state != SIMIX_DONE) {
321     /* the communication has failed and was detached:
322      * we have to free the buffer */
323     if (action->comm.clean_fun) {
324       action->comm.clean_fun(action->comm.src_buff);
325     }
326     action->comm.src_buff = NULL;
327   }
328
329   if(action->comm.rdv)
330     SIMIX_rdv_remove(action->comm.rdv, action);
331
332   xbt_mallocator_release(simix_global->action_mallocator, action);
333 }
334
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
336 {
337   if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
340 #endif
341     surf_action_unref(action->comm.surf_comm);
342     action->comm.surf_comm = NULL;
343   }
344
345   if (action->comm.src_timeout){
346     surf_action_unref(action->comm.src_timeout);
347     action->comm.src_timeout = NULL;
348   }
349
350   if (action->comm.dst_timeout){
351     surf_action_unref(action->comm.dst_timeout);
352     action->comm.dst_timeout = NULL;
353   }
354 }
355
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
357                                   double task_size, double rate,
358                                   void *src_buff, size_t src_buff_size,
359                                   int (*match_fun)(void *, void *,smx_action_t),
360                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
361                                   void *data, double timeout){
362   smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
363                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
364                                        data, 0);
365   SIMCALL_SET_MC_VALUE(simcall, 0);
366   SIMIX_pre_comm_wait(simcall, comm, timeout);
367 }
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
369                                   double task_size, double rate,
370                                   void *src_buff, size_t src_buff_size,
371                                   int (*match_fun)(void *, void *,smx_action_t),
372                                   void (*clean_fun)(void *),
373                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
374                                   void *data, int detached){
375   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445   other_action->comm.copy_data_fun = copy_data_fun;
446
447
448   if (MC_is_active()) {
449     other_action->state = SIMIX_RUNNING;
450     return (detached ? NULL : other_action);
451   }
452
453   SIMIX_comm_start(other_action);
454   return (detached ? NULL : other_action);
455 }
456
457 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
458                          void *dst_buff, size_t *dst_buff_size,
459                          int (*match_fun)(void *, void *, smx_action_t),
460                          void (*copy_data_fun)(smx_action_t, void*, size_t),
461                          void *data, double timeout, double rate)
462 {
463   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
464                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
465   SIMCALL_SET_MC_VALUE(simcall, 0);
466   SIMIX_pre_comm_wait(simcall, comm, timeout);
467 }
468
469 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
470                                   void *dst_buff, size_t *dst_buff_size,
471                                   int (*match_fun)(void *, void *, smx_action_t),
472                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
473                                   void *data, double rate)
474 {
475   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
476                           match_fun, copy_data_fun, data, rate);
477 }
478
479 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
480                               void *dst_buff, size_t *dst_buff_size,
481                               int (*match_fun)(void *, void *, smx_action_t),
482                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
483                               void *data, double rate)
484 {
485   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
486   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487
488   smx_action_t other_action;
489   //communication already done, get it inside the fifo of completed comms
490   //permanent receive v1
491   //int already_received=0;
492   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
493
494     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
495     //find a match in the already received fifo
496     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497     //if not found, assume the receiver came first, register it to the mailbox in the classical way
498     if (!other_action)  {
499       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
500       other_action = this_action;
501       SIMIX_rdv_push(rdv, this_action);
502     }else{
503       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
504       {
505         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
506         other_action->state = SIMIX_DONE;
507         other_action->comm.type = SIMIX_COMM_DONE;
508         other_action->comm.rdv = NULL;
509         //SIMIX_comm_destroy(this_action);
510         //--smx_total_comms; // this creation was a pure waste
511         //already_received=1;
512         //other_action->comm.refcount--;
513       }/*else{
514          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
515          }*/
516       other_action->comm.refcount--;
517       SIMIX_comm_destroy(this_action);
518       --smx_total_comms; // this creation was a pure waste
519     }
520   }else{
521     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
522
523     /* Look for communication action matching our needs. We also provide a description of
524      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
525      *
526      * If it is not found then push our communication into the rendez-vous point */
527     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
528
529     if (!other_action) {
530       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
531       other_action = this_action;
532       SIMIX_rdv_push(rdv, this_action);
533     } else {
534       SIMIX_comm_destroy(this_action);
535       --smx_total_comms; // this creation was a pure waste
536       other_action->state = SIMIX_READY;
537       other_action->comm.type = SIMIX_COMM_READY;
538       //other_action->comm.refcount--;
539     }
540     xbt_fifo_push(dst_proc->comms, other_action);
541   }
542
543   /* Setup communication action */
544   other_action->comm.dst_proc = dst_proc;
545   other_action->comm.dst_buff = dst_buff;
546   other_action->comm.dst_buff_size = dst_buff_size;
547   other_action->comm.dst_data = data;
548
549   if (rate != -1.0 &&
550       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
551     other_action->comm.rate = rate;
552
553   other_action->comm.match_fun = match_fun;
554   other_action->comm.copy_data_fun = copy_data_fun;
555
556
557   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
558     SIMIX_comm_copy_data(other_action);*/
559
560
561   if (MC_is_active()) {
562     other_action->state = SIMIX_RUNNING;
563     return other_action;
564   }
565
566   SIMIX_comm_start(other_action);
567   // }
568   return other_action;
569 }
570
571 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
572                                    int src, int tag,
573                                    int (*match_fun)(void *, void *, smx_action_t),
574                                    void *data){
575   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
576 }
577
578 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
579                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
580 {
581   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
582   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
583
584   smx_action_t other_action=NULL;
585   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586     //find a match in the already received fifo
587       XBT_DEBUG("first try in the perm recv mailbox \n");
588
589     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
590   }
591  // }else{
592     if(!other_action){
593         XBT_DEBUG("second try in the other mailbox");
594         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
595     }
596 //  }
597   if(other_action)other_action->comm.refcount--;
598
599   SIMIX_comm_destroy(this_action);
600   --smx_total_comms;
601   return other_action;
602 }
603
604 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
605 {
606   /* the simcall may be a wait, a send or a recv */
607   surf_action_t sleep;
608
609   /* Associate this simcall to the wait action */
610   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
611
612   xbt_fifo_push(action->simcalls, simcall);
613   simcall->issuer->waiting_action = action;
614
615   if (MC_is_active()) {
616     int idx = SIMCALL_GET_MC_VALUE(simcall);
617     if (idx == 0) {
618       action->state = SIMIX_DONE;
619     } else {
620       /* If we reached this point, the wait simcall must have a timeout */
621       /* Otherwise it shouldn't be enabled and executed by the MC */
622       if (timeout == -1)
623         THROW_IMPOSSIBLE;
624
625       if (action->comm.src_proc == simcall->issuer)
626         action->state = SIMIX_SRC_TIMEOUT;
627       else
628         action->state = SIMIX_DST_TIMEOUT;
629     }
630
631     SIMIX_comm_finish(action);
632     return;
633   }
634
635   /* If the action has already finish perform the error handling, */
636   /* otherwise set up a waiting timeout on the right side         */
637   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
638     SIMIX_comm_finish(action);
639   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
640     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
641     surf_action_set_data(sleep, action);
642
643     if (simcall->issuer == action->comm.src_proc)
644       action->comm.src_timeout = sleep;
645     else
646       action->comm.dst_timeout = sleep;
647   }
648 }
649
650 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
651 {
652   if(MC_is_active()){
653     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
654     if(simcall_comm_test__get__result(simcall)){
655       action->state = SIMIX_DONE;
656       xbt_fifo_push(action->simcalls, simcall);
657       SIMIX_comm_finish(action);
658     }else{
659       SIMIX_simcall_answer(simcall);
660     }
661     return;
662   }
663
664   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
665   if (simcall_comm_test__get__result(simcall)) {
666     xbt_fifo_push(action->simcalls, simcall);
667     SIMIX_comm_finish(action);
668   } else {
669     SIMIX_simcall_answer(simcall);
670   }
671 }
672
673 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
674 {
675   unsigned int cursor;
676   smx_action_t action;
677   simcall_comm_testany__set__result(simcall, -1);
678
679   if (MC_is_active()){
680     int idx = SIMCALL_GET_MC_VALUE(simcall);
681     if(idx == -1){
682       SIMIX_simcall_answer(simcall);
683     }else{
684       action = xbt_dynar_get_as(actions, idx, smx_action_t);
685       simcall_comm_testany__set__result(simcall, idx);
686       xbt_fifo_push(action->simcalls, simcall);
687       action->state = SIMIX_DONE;
688       SIMIX_comm_finish(action);
689     }
690     return;
691   }
692
693   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
694     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
695       simcall_comm_testany__set__result(simcall, cursor);
696       xbt_fifo_push(action->simcalls, simcall);
697       SIMIX_comm_finish(action);
698       return;
699     }
700   }
701   SIMIX_simcall_answer(simcall);
702 }
703
704 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
705 {
706   smx_action_t action;
707   unsigned int cursor = 0;
708
709   if (MC_is_active()){
710     int idx = SIMCALL_GET_MC_VALUE(simcall);
711     action = xbt_dynar_get_as(actions, idx, smx_action_t);
712     xbt_fifo_push(action->simcalls, simcall);
713     simcall_comm_waitany__set__result(simcall, idx);
714     action->state = SIMIX_DONE;
715     SIMIX_comm_finish(action);
716     return;
717   }
718
719   xbt_dynar_foreach(actions, cursor, action){
720     /* associate this simcall to the the action */
721     xbt_fifo_push(action->simcalls, simcall);
722
723     /* see if the action is already finished */
724     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
725       SIMIX_comm_finish(action);
726       break;
727     }
728   }
729 }
730
731 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
732 {
733   smx_action_t action;
734   unsigned int cursor = 0;
735   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
736
737   xbt_dynar_foreach(actions, cursor, action) {
738     xbt_fifo_remove(action->simcalls, simcall);
739   }
740 }
741
742 /**
743  *  \brief Starts the simulation of a communication action.
744  *  \param action the communication action
745  */
746 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
747 {
748   /* If both the sender and the receiver are already there, start the communication */
749   if (action->state == SIMIX_READY) {
750
751     smx_host_t sender = action->comm.src_proc->smx_host;
752     smx_host_t receiver = action->comm.dst_proc->smx_host;
753
754     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
755               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
756
757     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
758                                                                     sender, receiver,
759                                                                     action->comm.task_size, action->comm.rate);
760
761     surf_action_set_data(action->comm.surf_comm, action);
762
763     action->state = SIMIX_RUNNING;
764
765     /* If a link is failed, detect it immediately */
766     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
767       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
768                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
769       action->state = SIMIX_LINK_FAILURE;
770       SIMIX_comm_destroy_internal_actions(action);
771     }
772
773     /* If any of the process is suspend, create the action but stop its execution,
774        it will be restarted when the sender process resume */
775     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
776         SIMIX_process_is_suspended(action->comm.dst_proc)) {
777       /* FIXME: check what should happen with the action state */
778
779       if (SIMIX_process_is_suspended(action->comm.src_proc))
780         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
781                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
782       else
783         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
784                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
785
786       surf_action_suspend(action->comm.surf_comm);
787
788     }
789   }
790 }
791
792 /**
793  * \brief Answers the SIMIX simcalls associated to a communication action.
794  * \param action a finished communication action
795  */
796 void SIMIX_comm_finish(smx_action_t action)
797 {
798   unsigned int destroy_count = 0;
799   smx_simcall_t simcall;
800
801
802   while ((simcall = xbt_fifo_shift(action->simcalls))) {
803
804     /* If a waitany simcall is waiting for this action to finish, then remove
805        it from the other actions in the waitany list. Afterwards, get the
806        position of the actual action in the waitany dynar and
807        return it as the result of the simcall */
808
809     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
810       continue; // if process handling comm is killed
811     if (simcall->call == SIMCALL_COMM_WAITANY) {
812       SIMIX_waitany_remove_simcall_from_actions(simcall);
813       if (!MC_is_active())
814         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
815     }
816
817     /* If the action is still in a rendez-vous point then remove from it */
818     if (action->comm.rdv)
819       SIMIX_rdv_remove(action->comm.rdv, action);
820
821     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
822
823     /* Check out for errors */
824     switch (action->state) {
825
826     case SIMIX_DONE:
827       XBT_DEBUG("Communication %p complete!", action);
828       SIMIX_comm_copy_data(action);
829       break;
830
831     case SIMIX_SRC_TIMEOUT:
832       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
833                     "Communication timeouted because of sender");
834       break;
835
836     case SIMIX_DST_TIMEOUT:
837       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
838                     "Communication timeouted because of receiver");
839       break;
840
841     case SIMIX_SRC_HOST_FAILURE:
842       if (simcall->issuer == action->comm.src_proc)
843         simcall->issuer->context->iwannadie = 1;
844 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
845       else
846         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
847       break;
848
849     case SIMIX_DST_HOST_FAILURE:
850       if (simcall->issuer == action->comm.dst_proc)
851         simcall->issuer->context->iwannadie = 1;
852 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
853       else
854         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
855       break;
856
857     case SIMIX_LINK_FAILURE:
858       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
859                 action,
860                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
861                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
862                 simcall->issuer->name, simcall->issuer, action->comm.detached);
863       if (action->comm.src_proc == simcall->issuer) {
864         XBT_DEBUG("I'm source");
865       } else if (action->comm.dst_proc == simcall->issuer) {
866         XBT_DEBUG("I'm dest");
867       } else {
868         XBT_DEBUG("I'm neither source nor dest");
869       }
870       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
871       break;
872
873     case SIMIX_CANCELED:
874       if (simcall->issuer == action->comm.dst_proc)
875         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
876                       "Communication canceled by the sender");
877       else
878         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
879                       "Communication canceled by the receiver");
880       break;
881
882     default:
883       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
884     }
885
886     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
887     if (simcall->issuer->doexception) {
888       if (simcall->call == SIMCALL_COMM_WAITANY) {
889         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
890       }
891       else if (simcall->call == SIMCALL_COMM_TESTANY) {
892         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
893       }
894     }
895
896     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
897       simcall->issuer->context->iwannadie = 1;
898     }
899
900     simcall->issuer->waiting_action = NULL;
901     xbt_fifo_remove(simcall->issuer->comms, action);
902     if(action->comm.detached){
903       if(simcall->issuer == action->comm.src_proc){
904         if(action->comm.dst_proc)
905           xbt_fifo_remove(action->comm.dst_proc->comms, action);
906       }
907       if(simcall->issuer == action->comm.dst_proc){
908         if(action->comm.src_proc)
909           xbt_fifo_remove(action->comm.src_proc->comms, action);
910       }
911     }
912     SIMIX_simcall_answer(simcall);
913     destroy_count++;
914   }
915
916   while (destroy_count-- > 0)
917     SIMIX_comm_destroy(action);
918 }
919
920 /**
921  * \brief This function is called when a Surf communication action is finished.
922  * \param action the corresponding Simix communication
923  */
924 void SIMIX_post_comm(smx_action_t action)
925 {
926   /* Update action state */
927   if (action->comm.src_timeout &&
928       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
929     action->state = SIMIX_SRC_TIMEOUT;
930   else if (action->comm.dst_timeout &&
931           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
932     action->state = SIMIX_DST_TIMEOUT;
933   else if (action->comm.src_timeout &&
934           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
935     action->state = SIMIX_SRC_HOST_FAILURE;
936   else if (action->comm.dst_timeout &&
937       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
938     action->state = SIMIX_DST_HOST_FAILURE;
939   else if (action->comm.surf_comm &&
940           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
941     XBT_DEBUG("Puta madre. Surf says that the link broke");
942     action->state = SIMIX_LINK_FAILURE;
943   } else
944     action->state = SIMIX_DONE;
945
946   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
947             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
948
949   /* destroy the surf actions associated with the Simix communication */
950   SIMIX_comm_destroy_internal_actions(action);
951
952   /* remove the communication action from the list of pending communications
953    * of both processes (if they still exist) */
954   if (action->comm.src_proc) {
955     xbt_fifo_remove(action->comm.src_proc->comms, action);
956   }
957   if (action->comm.dst_proc) {
958     xbt_fifo_remove(action->comm.dst_proc->comms, action);
959   }
960
961   /* if there are simcalls associated with the action, then answer them */
962   if (xbt_fifo_size(action->simcalls)) {
963     SIMIX_comm_finish(action);
964   }
965 }
966
967 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
968   SIMIX_comm_cancel(action);
969 }
970 void SIMIX_comm_cancel(smx_action_t action)
971 {
972   /* if the action is a waiting state means that it is still in a rdv */
973   /* so remove from it and delete it */
974   if (action->state == SIMIX_WAITING) {
975     SIMIX_rdv_remove(action->comm.rdv, action);
976     action->state = SIMIX_CANCELED;
977   }
978   else if (!MC_is_active() /* when running the MC there are no surf actions */
979            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
980
981     surf_action_cancel(action->comm.surf_comm);
982   }
983 }
984
985 void SIMIX_comm_suspend(smx_action_t action)
986 {
987   /*FIXME: shall we suspend also the timeout actions? */
988   if (action->comm.surf_comm)
989     surf_action_suspend(action->comm.surf_comm);
990   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
991 }
992
993 void SIMIX_comm_resume(smx_action_t action)
994 {
995   /*FIXME: check what happen with the timeouts */
996   if (action->comm.surf_comm)
997     surf_action_resume(action->comm.surf_comm);
998   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
999 }
1000
1001
1002 /************* Action Getters **************/
1003
1004 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1005   return SIMIX_comm_get_remains(action);
1006 }
1007 /**
1008  *  \brief get the amount remaining from the communication
1009  *  \param action The communication
1010  */
1011 double SIMIX_comm_get_remains(smx_action_t action)
1012 {
1013   double remains;
1014
1015   if(!action){
1016     return 0;
1017   }
1018
1019   switch (action->state) {
1020
1021   case SIMIX_RUNNING:
1022     remains = surf_action_get_remains(action->comm.surf_comm);
1023     break;
1024
1025   case SIMIX_WAITING:
1026   case SIMIX_READY:
1027     remains = 0; /*FIXME: check what should be returned */
1028     break;
1029
1030   default:
1031     remains = 0; /*FIXME: is this correct? */
1032     break;
1033   }
1034   return remains;
1035 }
1036
1037 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1038   return SIMIX_comm_get_state(action);
1039 }
1040 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1041 {
1042   return action->state;
1043 }
1044
1045 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1046   return SIMIX_comm_get_src_data(action);
1047 }
1048 /**
1049  *  \brief Return the user data associated to the sender of the communication
1050  *  \param action The communication
1051  *  \return the user data
1052  */
1053 void* SIMIX_comm_get_src_data(smx_action_t action)
1054 {
1055   return action->comm.src_data;
1056 }
1057
1058 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1059   return SIMIX_comm_get_dst_data(action);
1060 }
1061 /**
1062  *  \brief Return the user data associated to the receiver of the communication
1063  *  \param action The communication
1064  *  \return the user data
1065  */
1066 void* SIMIX_comm_get_dst_data(smx_action_t action)
1067 {
1068   return action->comm.dst_data;
1069 }
1070
1071 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1072   return SIMIX_comm_get_src_proc(action);
1073 }
1074 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1075 {
1076   return action->comm.src_proc;
1077 }
1078
1079 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1080   return SIMIX_comm_get_dst_proc(action);
1081 }
1082 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1083 {
1084   return action->comm.dst_proc;
1085 }
1086
1087 #ifdef HAVE_LATENCY_BOUND_TRACKING
1088 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1089 {
1090   return SIMIX_comm_is_latency_bounded(action);
1091 }
1092
1093 /**
1094  *  \brief verify if communication is latency bounded
1095  *  \param comm The communication
1096  */
1097 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1098 {
1099   if(!action){
1100     return 0;
1101   }
1102   if (action->comm.surf_comm){
1103     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1104     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1105     XBT_DEBUG("Action limited is %d", action->latency_limited);
1106   }
1107   return action->latency_limited;
1108 }
1109 #endif
1110
1111 /******************************************************************************/
1112 /*                    SIMIX_comm_copy_data callbacks                       */
1113 /******************************************************************************/
1114 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1115   &SIMIX_comm_copy_pointer_callback;
1116
1117 void
1118 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1119 {
1120   SIMIX_comm_copy_data_callback = callback;
1121 }
1122
1123 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1124 {
1125   xbt_assert((buff_size == sizeof(void *)),
1126              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1127   *(void **) (comm->comm.dst_buff) = buff;
1128 }
1129
1130 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1131 {
1132   XBT_DEBUG("Copy the data over");
1133   memcpy(comm->comm.dst_buff, buff, buff_size);
1134   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1135     xbt_free(buff);
1136     comm->comm.src_buff = NULL;
1137   }
1138 }
1139
1140
1141 /**
1142  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1143  *  \param comm The communication
1144  */
1145 void SIMIX_comm_copy_data(smx_action_t comm)
1146 {
1147   size_t buff_size = comm->comm.src_buff_size;
1148   /* If there is no data to be copy then return */
1149   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1150     return;
1151
1152   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1153             comm,
1154             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1155             comm->comm.src_buff,
1156             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1157             comm->comm.dst_buff, buff_size);
1158
1159   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1160   if (comm->comm.dst_buff_size)
1161     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1162
1163   /* Update the receiver's buffer size to the copied amount */
1164   if (comm->comm.dst_buff_size)
1165     *comm->comm.dst_buff_size = buff_size;
1166
1167   if (buff_size > 0){
1168       if(comm->comm.copy_data_fun)
1169         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1170       else
1171         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1172   }
1173
1174
1175   /* Set the copied flag so we copy data only once */
1176   /* (this function might be called from both communication ends) */
1177   comm->comm.copied = 1;
1178 }