Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a671d9f2d9af4bf03a94c8e3684830b4d784d5a3
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "smpi/private.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "SIMIX network-related synchronization");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_synchro_t synchro);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_rdv_create(const char *name)
48 {
49   /* two processes may have pushed the same rdv_create simcall at the same time */
50   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
51
52   if (!rdv) {
53     rdv = xbt_new0(s_smx_rvpoint_t, 1);
54     rdv->name = name ? xbt_strdup(name) : NULL;
55     rdv->comm_fifo = xbt_fifo_new();
56     rdv->done_comm_fifo = xbt_fifo_new();
57     rdv->permanent_receiver=NULL;
58
59     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
60
61     if (rdv->name)
62       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
63   }
64   return rdv;
65 }
66
67 void SIMIX_rdv_destroy(smx_rdv_t rdv)
68 {
69   if (rdv->name)
70     xbt_dict_remove(rdv_points, rdv->name);
71 }
72
73 void SIMIX_rdv_free(void *data)
74 {
75   XBT_DEBUG("rdv free %p", data);
76   smx_rdv_t rdv = (smx_rdv_t) data;
77   xbt_free(rdv->name);
78   xbt_fifo_free(rdv->comm_fifo);
79   xbt_fifo_free(rdv->done_comm_fifo);
80
81   xbt_free(rdv);
82 }
83
84 xbt_dict_t SIMIX_get_rdv_points()
85 {
86   return rdv_points;
87 }
88
89 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
90 {
91   return xbt_dict_get_or_null(rdv_points, name);
92 }
93
94 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
95 {
96   smx_synchro_t comm = NULL;
97   xbt_fifo_item_t item = NULL;
98   int count = 0;
99
100   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
101     if (comm->comm.src_proc->smx_host == host)
102       count++;
103   }
104
105   return count;
106 }
107
108 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
109 {
110   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
111 }
112
113 /**
114  *  \brief get the receiver (process associated to the mailbox)
115  *  \param rdv The rendez-vous point
116  *  \return process The receiving process (NULL if not set)
117  */
118 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
119 {
120   return rdv->permanent_receiver;
121 }
122
123 /**
124  *  \brief set the receiver of the rendez vous point to allow eager sends
125  *  \param rdv The rendez-vous point
126  *  \param process The receiving process
127  */
128 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
129 {
130   rdv->permanent_receiver=process;
131 }
132
133 /**
134  *  \brief Pushes a communication synchro into a rendez-vous point
135  *  \param rdv The rendez-vous point
136  *  \param comm The communication synchro
137  */
138 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
139 {
140   xbt_fifo_push(rdv->comm_fifo, comm);
141   comm->comm.rdv = rdv;
142 }
143
144 /**
145  *  \brief Removes a communication synchro from a rendez-vous point
146  *  \param rdv The rendez-vous point
147  *  \param comm The communication synchro
148  */
149 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
150 {
151   xbt_fifo_remove(rdv->comm_fifo, comm);
152   comm->comm.rdv = NULL;
153 }
154
155 /**
156  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
157  *  \param type The type of communication we are looking for (comm_send, comm_recv)
158  *  \return The communication synchro if found, NULL otherwise
159  */
160 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
161                                  int (*match_fun)(void *, void *,smx_synchro_t),
162                                  void *this_user_data, smx_synchro_t my_synchro)
163 {
164   smx_synchro_t synchro;
165   xbt_fifo_item_t item;
166   void* other_user_data = NULL;
167
168   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
169     if (synchro->comm.type == SIMIX_COMM_SEND) {
170       other_user_data = synchro->comm.src_data;
171     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
172       other_user_data = synchro->comm.dst_data;
173     }
174     if (synchro->comm.type == type &&
175         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
176         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
177       XBT_DEBUG("Found a matching communication synchro %p", synchro);
178       xbt_fifo_remove_item(fifo, item);
179       xbt_fifo_free_item(item);
180       synchro->comm.refcount++;
181 #ifdef HAVE_MC
182       synchro->comm.rdv_cpy = synchro->comm.rdv;
183 #endif
184       synchro->comm.rdv = NULL;
185       return synchro;
186     }
187     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
188               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
189               synchro, (int)synchro->comm.type, (int)type);
190   }
191   XBT_DEBUG("No matching communication synchro found");
192   return NULL;
193 }
194
195
196 /**
197  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
198  *  \param type The type of communication we are looking for (comm_send, comm_recv)
199  *  \return The communication synchro if found, NULL otherwise
200  */
201 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
202                                  int (*match_fun)(void *, void *,smx_synchro_t),
203                                  void *this_user_data, smx_synchro_t my_synchro)
204 {
205   smx_synchro_t synchro;
206   xbt_fifo_item_t item;
207   void* other_user_data = NULL;
208
209   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
210     if (synchro->comm.type == SIMIX_COMM_SEND) {
211       other_user_data = synchro->comm.src_data;
212     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
213       other_user_data = synchro->comm.dst_data;
214     }
215     if (synchro->comm.type == type &&
216         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
217         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
218       XBT_DEBUG("Found a matching communication synchro %p", synchro);
219       synchro->comm.refcount++;
220
221       return synchro;
222     }
223     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
224               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
225               synchro, (int)synchro->comm.type, (int)type);
226   }
227   XBT_DEBUG("No matching communication synchro found");
228   return NULL;
229 }
230 /******************************************************************************/
231 /*                          Communication synchros                            */
232 /******************************************************************************/
233
234 /**
235  *  \brief Creates a new communicate synchro
236  *  \param type The direction of communication (comm_send, comm_recv)
237  *  \return The new communicate synchro
238  */
239 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
240 {
241   smx_synchro_t synchro;
242
243   /* alloc structures */
244   synchro = xbt_mallocator_get(simix_global->synchro_mallocator);
245
246   synchro->type = SIMIX_SYNC_COMMUNICATE;
247   synchro->state = SIMIX_WAITING;
248
249   /* set communication */
250   synchro->comm.type = type;
251   synchro->comm.refcount = 1;
252   synchro->comm.src_data=NULL;
253   synchro->comm.dst_data=NULL;
254
255
256 #ifdef HAVE_LATENCY_BOUND_TRACKING
257   //initialize with unknown value
258   synchro->latency_limited = -1;
259 #endif
260
261   synchro->category = NULL;
262
263   XBT_DEBUG("Create communicate synchro %p", synchro);
264   ++smx_total_comms;
265
266   return synchro;
267 }
268
269 /**
270  *  \brief Destroy a communicate synchro
271  *  \param synchro The communicate synchro to be destroyed
272  */
273 void SIMIX_comm_destroy(smx_synchro_t synchro)
274 {
275   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
276             synchro, synchro->comm.refcount, (int)synchro->state);
277
278   if (synchro->comm.refcount <= 0) {
279     xbt_backtrace_display_current();
280     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
281             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
282   }
283   synchro->comm.refcount--;
284   if (synchro->comm.refcount > 0)
285       return;
286   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
287       synchro->comm.refcount);
288
289 #ifdef HAVE_LATENCY_BOUND_TRACKING
290   synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
291 #endif
292
293   xbt_free(synchro->name);
294   SIMIX_comm_destroy_internal_actions(synchro);
295
296   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
297     /* the communication has failed and was detached:
298      * we have to free the buffer */
299     if (synchro->comm.clean_fun) {
300       synchro->comm.clean_fun(synchro->comm.src_buff);
301     }
302     synchro->comm.src_buff = NULL;
303   }
304
305   if(synchro->comm.rdv)
306     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
307
308   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
309 }
310
311 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
312 {
313   if (synchro->comm.surf_comm){
314 #ifdef HAVE_LATENCY_BOUND_TRACKING
315     synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
316 #endif
317     surf_action_unref(synchro->comm.surf_comm);
318     synchro->comm.surf_comm = NULL;
319   }
320
321   if (synchro->comm.src_timeout){
322     surf_action_unref(synchro->comm.src_timeout);
323     synchro->comm.src_timeout = NULL;
324   }
325
326   if (synchro->comm.dst_timeout){
327     surf_action_unref(synchro->comm.dst_timeout);
328     synchro->comm.dst_timeout = NULL;
329   }
330 }
331
332 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
333                                   double task_size, double rate,
334                                   void *src_buff, size_t src_buff_size,
335                                   int (*match_fun)(void *, void *,smx_synchro_t),
336                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
337                                   void *data, double timeout){
338   smx_synchro_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
339                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
340                                        data, 0);
341   SIMCALL_SET_MC_VALUE(simcall, 0);
342   simcall_HANDLER_comm_wait(simcall, comm, timeout);
343 }
344 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
345                                   double task_size, double rate,
346                                   void *src_buff, size_t src_buff_size,
347                                   int (*match_fun)(void *, void *,smx_synchro_t),
348                                   void (*clean_fun)(void *),
349                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
350                                   void *data, int detached){
351   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
352                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
353
354 }
355 smx_synchro_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
356                               double task_size, double rate,
357                               void *src_buff, size_t src_buff_size,
358                               int (*match_fun)(void *, void *,smx_synchro_t),
359                               void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
360                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
361                               void *data,
362                               int detached)
363 {
364   XBT_DEBUG("send from %p", rdv);
365
366   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
367   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
368
369   /* Look for communication synchro matching our needs. We also provide a description of
370    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
371    *
372    * If it is not found then push our communication into the rendez-vous point */
373   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
374
375   if (!other_synchro) {
376     other_synchro = this_synchro;
377
378     if (rdv->permanent_receiver!=NULL){
379       //this mailbox is for small messages, which have to be sent right now
380       other_synchro->state = SIMIX_READY;
381       other_synchro->comm.dst_proc=rdv->permanent_receiver;
382       other_synchro->comm.refcount++;
383       xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
384       other_synchro->comm.rdv=rdv;
385       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
386
387     }else{
388       SIMIX_rdv_push(rdv, this_synchro);
389     }
390   } else {
391     XBT_DEBUG("Receive already pushed");
392
393     SIMIX_comm_destroy(this_synchro);
394     --smx_total_comms; // this creation was a pure waste
395
396     other_synchro->state = SIMIX_READY;
397     other_synchro->comm.type = SIMIX_COMM_READY;
398
399   }
400   xbt_fifo_push(src_proc->comms, other_synchro);
401
402   /* if the communication synchro is detached then decrease the refcount
403    * by one, so it will be eliminated by the receiver's destroy call */
404   if (detached) {
405     other_synchro->comm.detached = 1;
406     other_synchro->comm.refcount--;
407     other_synchro->comm.clean_fun = clean_fun;
408   } else {
409     other_synchro->comm.clean_fun = NULL;
410   }
411
412   /* Setup the communication synchro */
413   other_synchro->comm.src_proc = src_proc;
414   other_synchro->comm.task_size = task_size;
415   other_synchro->comm.rate = rate;
416   other_synchro->comm.src_buff = src_buff;
417   other_synchro->comm.src_buff_size = src_buff_size;
418   other_synchro->comm.src_data = data;
419
420   other_synchro->comm.match_fun = match_fun;
421   other_synchro->comm.copy_data_fun = copy_data_fun;
422
423
424   if (MC_is_active() || MC_record_replay_is_active()) {
425     other_synchro->state = SIMIX_RUNNING;
426     return (detached ? NULL : other_synchro);
427   }
428
429   SIMIX_comm_start(other_synchro);
430   return (detached ? NULL : other_synchro);
431 }
432
433 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
434                          void *dst_buff, size_t *dst_buff_size,
435                          int (*match_fun)(void *, void *, smx_synchro_t),
436                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
437                          void *data, double timeout, double rate)
438 {
439   smx_synchro_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
440                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
441   SIMCALL_SET_MC_VALUE(simcall, 0);
442   simcall_HANDLER_comm_wait(simcall, comm, timeout);
443 }
444
445 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
446                                   void *dst_buff, size_t *dst_buff_size,
447                                   int (*match_fun)(void *, void *, smx_synchro_t),
448                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
449                                   void *data, double rate)
450 {
451   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
452                           match_fun, copy_data_fun, data, rate);
453 }
454
455 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
456                               void *dst_buff, size_t *dst_buff_size,
457                               int (*match_fun)(void *, void *, smx_synchro_t),
458                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
459                               void *data, double rate)
460 {
461   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
462   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
463
464   smx_synchro_t other_synchro;
465   //communication already done, get it inside the fifo of completed comms
466   //permanent receive v1
467   //int already_received=0;
468   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
469
470     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
471     //find a match in the already received fifo
472     other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
473     //if not found, assume the receiver came first, register it to the mailbox in the classical way
474     if (!other_synchro)  {
475       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
476       other_synchro = this_synchro;
477       SIMIX_rdv_push(rdv, this_synchro);
478     }else{
479       if(other_synchro->comm.surf_comm &&       SIMIX_comm_get_remains(other_synchro)==0.0)
480       {
481         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
482         other_synchro->state = SIMIX_DONE;
483         other_synchro->comm.type = SIMIX_COMM_DONE;
484         other_synchro->comm.rdv = NULL;
485       }/*else{
486          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
487          }*/
488       other_synchro->comm.refcount--;
489       SIMIX_comm_destroy(this_synchro);
490       --smx_total_comms; // this creation was a pure waste
491     }
492   }else{
493     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
494
495     /* Look for communication synchro matching our needs. We also provide a description of
496      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
497      *
498      * If it is not found then push our communication into the rendez-vous point */
499     other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
500
501     if (!other_synchro) {
502       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
503       other_synchro = this_synchro;
504       SIMIX_rdv_push(rdv, this_synchro);
505     } else {
506       SIMIX_comm_destroy(this_synchro);
507       --smx_total_comms; // this creation was a pure waste
508       other_synchro->state = SIMIX_READY;
509       other_synchro->comm.type = SIMIX_COMM_READY;
510       //other_synchro->comm.refcount--;
511     }
512     xbt_fifo_push(dst_proc->comms, other_synchro);
513   }
514
515   /* Setup communication synchro */
516   other_synchro->comm.dst_proc = dst_proc;
517   other_synchro->comm.dst_buff = dst_buff;
518   other_synchro->comm.dst_buff_size = dst_buff_size;
519   other_synchro->comm.dst_data = data;
520
521   if (rate != -1.0 &&
522       (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
523     other_synchro->comm.rate = rate;
524
525   other_synchro->comm.match_fun = match_fun;
526   other_synchro->comm.copy_data_fun = copy_data_fun;
527
528
529   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
530     SIMIX_comm_copy_data(other_synchro);*/
531
532
533   if (MC_is_active() || MC_record_replay_is_active()) {
534     other_synchro->state = SIMIX_RUNNING;
535     return other_synchro;
536   }
537
538   SIMIX_comm_start(other_synchro);
539   // }
540   return other_synchro;
541 }
542
543 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
544                                    int type, int src, int tag,
545                                    int (*match_fun)(void *, void *, smx_synchro_t),
546                                    void *data){
547   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
548 }
549
550 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
551                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
552 {
553   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
554   smx_synchro_t this_synchro;
555   int smx_type;
556   if(type == 1){
557     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
558     smx_type = SIMIX_COMM_RECEIVE;
559   } else{
560     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
561     smx_type = SIMIX_COMM_SEND;
562   } 
563   smx_synchro_t other_synchro=NULL;
564   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
565     //find a match in the already received fifo
566       XBT_DEBUG("first try in the perm recv mailbox");
567
568     other_synchro = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_synchro);
569   }
570  // }else{
571     if(!other_synchro){
572         XBT_DEBUG("try in the normal mailbox");
573         other_synchro = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_synchro);
574     }
575 //  }
576   if(other_synchro)other_synchro->comm.refcount--;
577
578   SIMIX_comm_destroy(this_synchro);
579   --smx_total_comms;
580   return other_synchro;
581 }
582
583 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
584 {
585   /* the simcall may be a wait, a send or a recv */
586   surf_action_t sleep;
587
588   /* Associate this simcall to the wait synchro */
589   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
590
591   xbt_fifo_push(synchro->simcalls, simcall);
592   simcall->issuer->waiting_synchro = synchro;
593
594   if (MC_is_active() || MC_record_replay_is_active()) {
595     int idx = SIMCALL_GET_MC_VALUE(simcall);
596     if (idx == 0) {
597       synchro->state = SIMIX_DONE;
598     } else {
599       /* If we reached this point, the wait simcall must have a timeout */
600       /* Otherwise it shouldn't be enabled and executed by the MC */
601       if (timeout == -1)
602         THROW_IMPOSSIBLE;
603
604       if (synchro->comm.src_proc == simcall->issuer)
605         synchro->state = SIMIX_SRC_TIMEOUT;
606       else
607         synchro->state = SIMIX_DST_TIMEOUT;
608     }
609
610     SIMIX_comm_finish(synchro);
611     return;
612   }
613
614   /* If the synchro has already finish perform the error handling, */
615   /* otherwise set up a waiting timeout on the right side          */
616   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
617     SIMIX_comm_finish(synchro);
618   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
619     sleep = surf_host_sleep(simcall->issuer->smx_host, timeout);
620     surf_action_set_data(sleep, synchro);
621
622     if (simcall->issuer == synchro->comm.src_proc)
623       synchro->comm.src_timeout = sleep;
624     else
625       synchro->comm.dst_timeout = sleep;
626   }
627 }
628
629 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
630 {
631   if(MC_is_active() || MC_record_replay_is_active()){
632     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
633     if(simcall_comm_test__get__result(simcall)){
634       synchro->state = SIMIX_DONE;
635       xbt_fifo_push(synchro->simcalls, simcall);
636       SIMIX_comm_finish(synchro);
637     }else{
638       SIMIX_simcall_answer(simcall);
639     }
640     return;
641   }
642
643   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
644   if (simcall_comm_test__get__result(simcall)) {
645     xbt_fifo_push(synchro->simcalls, simcall);
646     SIMIX_comm_finish(synchro);
647   } else {
648     SIMIX_simcall_answer(simcall);
649   }
650 }
651
652 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
653 {
654   unsigned int cursor;
655   smx_synchro_t synchro;
656   simcall_comm_testany__set__result(simcall, -1);
657
658   if (MC_is_active() || MC_record_replay_is_active()){
659     int idx = SIMCALL_GET_MC_VALUE(simcall);
660     if(idx == -1){
661       SIMIX_simcall_answer(simcall);
662     }else{
663       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
664       simcall_comm_testany__set__result(simcall, idx);
665       xbt_fifo_push(synchro->simcalls, simcall);
666       synchro->state = SIMIX_DONE;
667       SIMIX_comm_finish(synchro);
668     }
669     return;
670   }
671
672   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
673     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
674       simcall_comm_testany__set__result(simcall, cursor);
675       xbt_fifo_push(synchro->simcalls, simcall);
676       SIMIX_comm_finish(synchro);
677       return;
678     }
679   }
680   SIMIX_simcall_answer(simcall);
681 }
682
683 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
684 {
685   smx_synchro_t synchro;
686   unsigned int cursor = 0;
687
688   if (MC_is_active() || MC_record_replay_is_active()){
689     int idx = SIMCALL_GET_MC_VALUE(simcall);
690     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
691     xbt_fifo_push(synchro->simcalls, simcall);
692     simcall_comm_waitany__set__result(simcall, idx);
693     synchro->state = SIMIX_DONE;
694     SIMIX_comm_finish(synchro);
695     return;
696   }
697
698   xbt_dynar_foreach(synchros, cursor, synchro){
699     /* associate this simcall to the the synchro */
700     xbt_fifo_push(synchro->simcalls, simcall);
701
702     /* see if the synchro is already finished */
703     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
704       SIMIX_comm_finish(synchro);
705       break;
706     }
707   }
708 }
709
710 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
711 {
712   smx_synchro_t synchro;
713   unsigned int cursor = 0;
714   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
715
716   xbt_dynar_foreach(synchros, cursor, synchro) {
717     xbt_fifo_remove(synchro->simcalls, simcall);
718   }
719 }
720
721 /**
722  *  \brief Starts the simulation of a communication synchro.
723  *  \param synchro the communication synchro
724  */
725 static XBT_INLINE void SIMIX_comm_start(smx_synchro_t synchro)
726 {
727   /* If both the sender and the receiver are already there, start the communication */
728   if (synchro->state == SIMIX_READY) {
729
730     smx_host_t sender = synchro->comm.src_proc->smx_host;
731     smx_host_t receiver = synchro->comm.dst_proc->smx_host;
732
733     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
734               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
735
736     synchro->comm.surf_comm = surf_host_model_communicate(surf_host_model,
737                                                                     sender, receiver,
738                                                                     synchro->comm.task_size, synchro->comm.rate);
739
740     surf_action_set_data(synchro->comm.surf_comm, synchro);
741
742     synchro->state = SIMIX_RUNNING;
743
744     /* If a link is failed, detect it immediately */
745     if (surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
746       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
747                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
748       synchro->state = SIMIX_LINK_FAILURE;
749       SIMIX_comm_destroy_internal_actions(synchro);
750     }
751
752     /* If any of the process is suspend, create the synchro but stop its execution,
753        it will be restarted when the sender process resume */
754     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
755         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
756       /* FIXME: check what should happen with the synchro state */
757
758       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
759         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
760                   SIMIX_host_get_name(synchro->comm.src_proc->smx_host), synchro->comm.src_proc->name);
761       else
762         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
763                   SIMIX_host_get_name(synchro->comm.dst_proc->smx_host), synchro->comm.dst_proc->name);
764
765       surf_action_suspend(synchro->comm.surf_comm);
766
767     }
768   }
769 }
770
771 /**
772  * \brief Answers the SIMIX simcalls associated to a communication synchro.
773  * \param synchro a finished communication synchro
774  */
775 void SIMIX_comm_finish(smx_synchro_t synchro)
776 {
777   unsigned int destroy_count = 0;
778   smx_simcall_t simcall;
779
780   while ((simcall = xbt_fifo_shift(synchro->simcalls))) {
781
782     /* If a waitany simcall is waiting for this synchro to finish, then remove
783        it from the other synchros in the waitany list. Afterwards, get the
784        position of the actual synchro in the waitany dynar and
785        return it as the result of the simcall */
786
787     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
788       continue; // if process handling comm is killed
789     if (simcall->call == SIMCALL_COMM_WAITANY) {
790       SIMIX_waitany_remove_simcall_from_actions(simcall);
791       if (!MC_is_active() && !MC_record_replay_is_active())
792         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
793     }
794
795     /* If the synchro is still in a rendez-vous point then remove from it */
796     if (synchro->comm.rdv)
797       SIMIX_rdv_remove(synchro->comm.rdv, synchro);
798
799     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
800
801     /* Check out for errors */
802
803     if (surf_host_get_state(surf_host_resource_priv(
804           simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
805       simcall->issuer->context->iwannadie = 1;
806       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
807     } else
808
809     switch (synchro->state) {
810
811     case SIMIX_DONE:
812       XBT_DEBUG("Communication %p complete!", synchro);
813       SIMIX_comm_copy_data(synchro);
814       break;
815
816     case SIMIX_SRC_TIMEOUT:
817       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
818                     "Communication timeouted because of sender");
819       break;
820
821     case SIMIX_DST_TIMEOUT:
822       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
823                     "Communication timeouted because of receiver");
824       break;
825
826     case SIMIX_SRC_HOST_FAILURE:
827       if (simcall->issuer == synchro->comm.src_proc)
828         simcall->issuer->context->iwannadie = 1;
829 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
830       else
831         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
832       break;
833
834     case SIMIX_DST_HOST_FAILURE:
835       if (simcall->issuer == synchro->comm.dst_proc)
836         simcall->issuer->context->iwannadie = 1;
837 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
838       else
839         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
840       break;
841
842     case SIMIX_LINK_FAILURE:
843
844       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
845                 synchro,
846                 synchro->comm.src_proc ? sg_host_name(synchro->comm.src_proc->smx_host) : NULL,
847                 synchro->comm.dst_proc ? sg_host_name(synchro->comm.dst_proc->smx_host) : NULL,
848                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
849       if (synchro->comm.src_proc == simcall->issuer) {
850         XBT_DEBUG("I'm source");
851       } else if (synchro->comm.dst_proc == simcall->issuer) {
852         XBT_DEBUG("I'm dest");
853       } else {
854         XBT_DEBUG("I'm neither source nor dest");
855       }
856       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
857       break;
858
859     case SIMIX_CANCELED:
860       if (simcall->issuer == synchro->comm.dst_proc)
861         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
862                       "Communication canceled by the sender");
863       else
864         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
865                       "Communication canceled by the receiver");
866       break;
867
868     default:
869       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
870     }
871
872     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
873     if (simcall->issuer->doexception) {
874       if (simcall->call == SIMCALL_COMM_WAITANY) {
875         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
876       }
877       else if (simcall->call == SIMCALL_COMM_TESTANY) {
878         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
879       }
880     }
881
882     if (surf_host_get_state(surf_host_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
883       simcall->issuer->context->iwannadie = 1;
884     }
885
886     simcall->issuer->waiting_synchro = NULL;
887     xbt_fifo_remove(simcall->issuer->comms, synchro);
888     if(synchro->comm.detached){
889       if(simcall->issuer == synchro->comm.src_proc){
890         if(synchro->comm.dst_proc)
891           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
892       }
893       if(simcall->issuer == synchro->comm.dst_proc){
894         if(synchro->comm.src_proc)
895           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
896       }
897     }
898     SIMIX_simcall_answer(simcall);
899     destroy_count++;
900   }
901
902   while (destroy_count-- > 0)
903     SIMIX_comm_destroy(synchro);
904 }
905
906 /**
907  * \brief This function is called when a Surf communication synchro is finished.
908  * \param synchro the corresponding Simix communication
909  */
910 void SIMIX_post_comm(smx_synchro_t synchro)
911 {
912   /* Update synchro state */
913   if (synchro->comm.src_timeout &&
914       surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_DONE)
915     synchro->state = SIMIX_SRC_TIMEOUT;
916   else if (synchro->comm.dst_timeout &&
917           surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_DONE)
918     synchro->state = SIMIX_DST_TIMEOUT;
919   else if (synchro->comm.src_timeout &&
920           surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_FAILED)
921     synchro->state = SIMIX_SRC_HOST_FAILURE;
922   else if (synchro->comm.dst_timeout &&
923       surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_FAILED)
924     synchro->state = SIMIX_DST_HOST_FAILURE;
925   else if (synchro->comm.surf_comm &&
926           surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
927     XBT_DEBUG("Puta madre. Surf says that the link broke");
928     synchro->state = SIMIX_LINK_FAILURE;
929   } else
930     synchro->state = SIMIX_DONE;
931
932   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
933             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
934
935   /* destroy the surf actions associated with the Simix communication */
936   SIMIX_comm_destroy_internal_actions(synchro);
937
938   /* if there are simcalls associated with the synchro, then answer them */
939   if (xbt_fifo_size(synchro->simcalls)) {
940     SIMIX_comm_finish(synchro);
941   }
942 }
943
944 void SIMIX_comm_cancel(smx_synchro_t synchro)
945 {
946   /* if the synchro is a waiting state means that it is still in a rdv */
947   /* so remove from it and delete it */
948   if (synchro->state == SIMIX_WAITING) {
949     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
950     synchro->state = SIMIX_CANCELED;
951   }
952   else if (!MC_is_active() /* when running the MC there are no surf actions */
953            && !MC_record_replay_is_active()
954            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
955
956     surf_action_cancel(synchro->comm.surf_comm);
957   }
958 }
959
960 void SIMIX_comm_suspend(smx_synchro_t synchro)
961 {
962   /*FIXME: shall we suspend also the timeout synchro? */
963   if (synchro->comm.surf_comm)
964     surf_action_suspend(synchro->comm.surf_comm);
965   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
966 }
967
968 void SIMIX_comm_resume(smx_synchro_t synchro)
969 {
970   /*FIXME: check what happen with the timeouts */
971   if (synchro->comm.surf_comm)
972     surf_action_resume(synchro->comm.surf_comm);
973   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
974 }
975
976
977 /************* synchro Getters **************/
978
979 /**
980  *  \brief get the amount remaining from the communication
981  *  \param synchro The communication
982  */
983 double SIMIX_comm_get_remains(smx_synchro_t synchro)
984 {
985   double remains;
986
987   if(!synchro){
988     return 0;
989   }
990
991   switch (synchro->state) {
992
993   case SIMIX_RUNNING:
994     remains = surf_action_get_remains(synchro->comm.surf_comm);
995     break;
996
997   case SIMIX_WAITING:
998   case SIMIX_READY:
999     remains = 0; /*FIXME: check what should be returned */
1000     break;
1001
1002   default:
1003     remains = 0; /*FIXME: is this correct? */
1004     break;
1005   }
1006   return remains;
1007 }
1008
1009 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
1010 {
1011   return synchro->state;
1012 }
1013
1014 /**
1015  *  \brief Return the user data associated to the sender of the communication
1016  *  \param synchro The communication
1017  *  \return the user data
1018  */
1019 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
1020 {
1021   return synchro->comm.src_data;
1022 }
1023
1024 /**
1025  *  \brief Return the user data associated to the receiver of the communication
1026  *  \param synchro The communication
1027  *  \return the user data
1028  */
1029 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1030 {
1031   return synchro->comm.dst_data;
1032 }
1033
1034 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1035 {
1036   return synchro->comm.src_proc;
1037 }
1038
1039 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1040 {
1041   return synchro->comm.dst_proc;
1042 }
1043
1044 #ifdef HAVE_LATENCY_BOUND_TRACKING
1045 /**
1046  *  \brief verify if communication is latency bounded
1047  *  \param comm The communication
1048  */
1049 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1050 {
1051   if(!synchro){
1052     return 0;
1053   }
1054   if (synchro->comm.surf_comm){
1055     XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1056     synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1057     XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1058   }
1059   return synchro->latency_limited;
1060 }
1061 #endif
1062
1063 /******************************************************************************/
1064 /*                    SIMIX_comm_copy_data callbacks                       */
1065 /******************************************************************************/
1066 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1067   &SIMIX_comm_copy_pointer_callback;
1068
1069 void
1070 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1071 {
1072   SIMIX_comm_copy_data_callback = callback;
1073 }
1074
1075 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1076 {
1077   xbt_assert((buff_size == sizeof(void *)),
1078              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1079   *(void **) (comm->comm.dst_buff) = buff;
1080 }
1081
1082 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1083 {
1084   XBT_DEBUG("Copy the data over");
1085   memcpy(comm->comm.dst_buff, buff, buff_size);
1086   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1087     xbt_free(buff);
1088     comm->comm.src_buff = NULL;
1089   }
1090 }
1091
1092
1093 /**
1094  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1095  *  \param comm The communication
1096  */
1097 void SIMIX_comm_copy_data(smx_synchro_t comm)
1098 {
1099   size_t buff_size = comm->comm.src_buff_size;
1100   /* If there is no data to be copy then return */
1101   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1102     return;
1103
1104   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1105             comm,
1106             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1107             comm->comm.src_buff,
1108             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1109             comm->comm.dst_buff, buff_size);
1110
1111   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1112   if (comm->comm.dst_buff_size)
1113     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1114
1115   /* Update the receiver's buffer size to the copied amount */
1116   if (comm->comm.dst_buff_size)
1117     *comm->comm.dst_buff_size = buff_size;
1118
1119   if (buff_size > 0){
1120       if(comm->comm.copy_data_fun)
1121         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1122       else
1123         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1124   }
1125
1126
1127   /* Set the copied flag so we copy data only once */
1128   /* (this function might be called from both communication ends) */
1129   comm->comm.copied = 1;
1130 }