Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
089c68e1513c3501baa1603806e180dea34e83a6
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "SIMIX network-related synchronization");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_synchro_t synchro);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_rdv_create(const char *name)
48 {
49   /* two processes may have pushed the same rdv_create simcall at the same time */
50   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
51
52   if (!rdv) {
53     rdv = xbt_new0(s_smx_rvpoint_t, 1);
54     rdv->name = name ? xbt_strdup(name) : NULL;
55     rdv->comm_fifo = xbt_fifo_new();
56     rdv->done_comm_fifo = xbt_fifo_new();
57     rdv->permanent_receiver=NULL;
58
59     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
60
61     if (rdv->name)
62       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
63   }
64   return rdv;
65 }
66
67 void SIMIX_rdv_destroy(smx_rdv_t rdv)
68 {
69   if (rdv->name)
70     xbt_dict_remove(rdv_points, rdv->name);
71 }
72
73 void SIMIX_rdv_free(void *data)
74 {
75   XBT_DEBUG("rdv free %p", data);
76   smx_rdv_t rdv = (smx_rdv_t) data;
77   xbt_free(rdv->name);
78   xbt_fifo_free(rdv->comm_fifo);
79   xbt_fifo_free(rdv->done_comm_fifo);
80
81   xbt_free(rdv);
82 }
83
84 xbt_dict_t SIMIX_get_rdv_points()
85 {
86   return rdv_points;
87 }
88
89 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
90 {
91   return xbt_dict_get_or_null(rdv_points, name);
92 }
93
94 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
95 {
96   smx_synchro_t comm = NULL;
97   xbt_fifo_item_t item = NULL;
98   int count = 0;
99
100   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
101     if (comm->comm.src_proc->smx_host == host)
102       count++;
103   }
104
105   return count;
106 }
107
108 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
109 {
110   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
111 }
112
113 /**
114  *  \brief get the receiver (process associated to the mailbox)
115  *  \param rdv The rendez-vous point
116  *  \return process The receiving process (NULL if not set)
117  */
118 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
119 {
120   return rdv->permanent_receiver;
121 }
122
123 /**
124  *  \brief set the receiver of the rendez vous point to allow eager sends
125  *  \param rdv The rendez-vous point
126  *  \param process The receiving process
127  */
128 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
129 {
130   rdv->permanent_receiver=process;
131 }
132
133 /**
134  *  \brief Pushes a communication synchro into a rendez-vous point
135  *  \param rdv The rendez-vous point
136  *  \param comm The communication synchro
137  */
138 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
139 {
140   xbt_fifo_push(rdv->comm_fifo, comm);
141   comm->comm.rdv = rdv;
142 }
143
144 /**
145  *  \brief Removes a communication synchro from a rendez-vous point
146  *  \param rdv The rendez-vous point
147  *  \param comm The communication synchro
148  */
149 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
150 {
151   xbt_fifo_remove(rdv->comm_fifo, comm);
152   comm->comm.rdv = NULL;
153 }
154
155 /**
156  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
157  *  \param type The type of communication we are looking for (comm_send, comm_recv)
158  *  \return The communication synchro if found, NULL otherwise
159  */
160 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
161                                  int (*match_fun)(void *, void *,smx_synchro_t),
162                                  void *this_user_data, smx_synchro_t my_synchro)
163 {
164   smx_synchro_t synchro;
165   xbt_fifo_item_t item;
166   void* other_user_data = NULL;
167
168   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
169     if (synchro->comm.type == SIMIX_COMM_SEND) {
170       other_user_data = synchro->comm.src_data;
171     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
172       other_user_data = synchro->comm.dst_data;
173     }
174     if (synchro->comm.type == type &&
175         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
176         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
177       XBT_DEBUG("Found a matching communication synchro %p", synchro);
178       xbt_fifo_remove_item(fifo, item);
179       xbt_fifo_free_item(item);
180       synchro->comm.refcount++;
181 #ifdef HAVE_MC
182       synchro->comm.rdv_cpy = synchro->comm.rdv;
183 #endif
184       synchro->comm.rdv = NULL;
185       return synchro;
186     }
187     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
188               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
189               synchro, (int)synchro->comm.type, (int)type);
190   }
191   XBT_DEBUG("No matching communication synchro found");
192   return NULL;
193 }
194
195
196 /**
197  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
198  *  \param type The type of communication we are looking for (comm_send, comm_recv)
199  *  \return The communication synchro if found, NULL otherwise
200  */
201 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
202                                  int (*match_fun)(void *, void *,smx_synchro_t),
203                                  void *this_user_data, smx_synchro_t my_synchro)
204 {
205   smx_synchro_t synchro;
206   xbt_fifo_item_t item;
207   void* other_user_data = NULL;
208
209   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
210     if (synchro->comm.type == SIMIX_COMM_SEND) {
211       other_user_data = synchro->comm.src_data;
212     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
213       other_user_data = synchro->comm.dst_data;
214     }
215     if (synchro->comm.type == type &&
216         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
217         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
218       XBT_DEBUG("Found a matching communication synchro %p", synchro);
219       synchro->comm.refcount++;
220
221       return synchro;
222     }
223     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
224               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
225               synchro, (int)synchro->comm.type, (int)type);
226   }
227   XBT_DEBUG("No matching communication synchro found");
228   return NULL;
229 }
230 /******************************************************************************/
231 /*                          Communication synchros                            */
232 /******************************************************************************/
233
234 /**
235  *  \brief Creates a new communicate synchro
236  *  \param type The direction of communication (comm_send, comm_recv)
237  *  \return The new communicate synchro
238  */
239 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
240 {
241   smx_synchro_t synchro;
242
243   /* alloc structures */
244   synchro = xbt_mallocator_get(simix_global->synchro_mallocator);
245
246   synchro->type = SIMIX_SYNC_COMMUNICATE;
247   synchro->state = SIMIX_WAITING;
248
249   /* set communication */
250   synchro->comm.type = type;
251   synchro->comm.refcount = 1;
252   synchro->comm.src_data=NULL;
253   synchro->comm.dst_data=NULL;
254
255
256 #ifdef HAVE_LATENCY_BOUND_TRACKING
257   //initialize with unknown value
258   synchro->latency_limited = -1;
259 #endif
260
261 #ifdef HAVE_TRACING
262   synchro->category = NULL;
263 #endif
264
265   XBT_DEBUG("Create communicate synchro %p", synchro);
266   ++smx_total_comms;
267
268   return synchro;
269 }
270
271 /**
272  *  \brief Destroy a communicate synchro
273  *  \param synchro The communicate synchro to be destroyed
274  */
275 void SIMIX_comm_destroy(smx_synchro_t synchro)
276 {
277   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
278             synchro, synchro->comm.refcount, (int)synchro->state);
279
280   if (synchro->comm.refcount <= 0) {
281     xbt_backtrace_display_current();
282     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
283             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
284   }
285   synchro->comm.refcount--;
286   if (synchro->comm.refcount > 0)
287       return;
288   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
289       synchro->comm.refcount);
290
291 #ifdef HAVE_LATENCY_BOUND_TRACKING
292   synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
293 #endif
294
295   xbt_free(synchro->name);
296   SIMIX_comm_destroy_internal_actions(synchro);
297
298   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
299     /* the communication has failed and was detached:
300      * we have to free the buffer */
301     if (synchro->comm.clean_fun) {
302       synchro->comm.clean_fun(synchro->comm.src_buff);
303     }
304     synchro->comm.src_buff = NULL;
305   }
306
307   if(synchro->comm.rdv)
308     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
309
310   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
311 }
312
313 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
314 {
315   if (synchro->comm.surf_comm){
316 #ifdef HAVE_LATENCY_BOUND_TRACKING
317     synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
318 #endif
319     surf_action_unref(synchro->comm.surf_comm);
320     synchro->comm.surf_comm = NULL;
321   }
322
323   if (synchro->comm.src_timeout){
324     surf_action_unref(synchro->comm.src_timeout);
325     synchro->comm.src_timeout = NULL;
326   }
327
328   if (synchro->comm.dst_timeout){
329     surf_action_unref(synchro->comm.dst_timeout);
330     synchro->comm.dst_timeout = NULL;
331   }
332 }
333
334 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
335                                   double task_size, double rate,
336                                   void *src_buff, size_t src_buff_size,
337                                   int (*match_fun)(void *, void *,smx_synchro_t),
338                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
339                                   void *data, double timeout){
340   smx_synchro_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
341                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
342                                        data, 0);
343   SIMCALL_SET_MC_VALUE(simcall, 0);
344   simcall_HANDLER_comm_wait(simcall, comm, timeout);
345 }
346 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
347                                   double task_size, double rate,
348                                   void *src_buff, size_t src_buff_size,
349                                   int (*match_fun)(void *, void *,smx_synchro_t),
350                                   void (*clean_fun)(void *),
351                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
352                                   void *data, int detached){
353   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
354                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
355
356 }
357 smx_synchro_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
358                               double task_size, double rate,
359                               void *src_buff, size_t src_buff_size,
360                               int (*match_fun)(void *, void *,smx_synchro_t),
361                               void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
362                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
363                               void *data,
364                               int detached)
365 {
366   XBT_DEBUG("send from %p", rdv);
367
368   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
369   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
370
371   /* Look for communication synchro matching our needs. We also provide a description of
372    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
373    *
374    * If it is not found then push our communication into the rendez-vous point */
375   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
376
377   if (!other_synchro) {
378     other_synchro = this_synchro;
379
380     if (rdv->permanent_receiver!=NULL){
381       //this mailbox is for small messages, which have to be sent right now
382       other_synchro->state = SIMIX_READY;
383       other_synchro->comm.dst_proc=rdv->permanent_receiver;
384       other_synchro->comm.refcount++;
385       xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
386       other_synchro->comm.rdv=rdv;
387       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
388
389     }else{
390       SIMIX_rdv_push(rdv, this_synchro);
391     }
392   } else {
393     XBT_DEBUG("Receive already pushed");
394
395     SIMIX_comm_destroy(this_synchro);
396     --smx_total_comms; // this creation was a pure waste
397
398     other_synchro->state = SIMIX_READY;
399     other_synchro->comm.type = SIMIX_COMM_READY;
400
401   }
402   xbt_fifo_push(src_proc->comms, other_synchro);
403
404   /* if the communication synchro is detached then decrease the refcount
405    * by one, so it will be eliminated by the receiver's destroy call */
406   if (detached) {
407     other_synchro->comm.detached = 1;
408     other_synchro->comm.refcount--;
409     other_synchro->comm.clean_fun = clean_fun;
410   } else {
411     other_synchro->comm.clean_fun = NULL;
412   }
413
414   /* Setup the communication synchro */
415   other_synchro->comm.src_proc = src_proc;
416   other_synchro->comm.task_size = task_size;
417   other_synchro->comm.rate = rate;
418   other_synchro->comm.src_buff = src_buff;
419   other_synchro->comm.src_buff_size = src_buff_size;
420   other_synchro->comm.src_data = data;
421
422   other_synchro->comm.match_fun = match_fun;
423   other_synchro->comm.copy_data_fun = copy_data_fun;
424
425
426   if (MC_is_active()) {
427     other_synchro->state = SIMIX_RUNNING;
428     return (detached ? NULL : other_synchro);
429   }
430
431   SIMIX_comm_start(other_synchro);
432   return (detached ? NULL : other_synchro);
433 }
434
435 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
436                          void *dst_buff, size_t *dst_buff_size,
437                          int (*match_fun)(void *, void *, smx_synchro_t),
438                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
439                          void *data, double timeout, double rate)
440 {
441   smx_synchro_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
442                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
443   SIMCALL_SET_MC_VALUE(simcall, 0);
444   simcall_HANDLER_comm_wait(simcall, comm, timeout);
445 }
446
447 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
448                                   void *dst_buff, size_t *dst_buff_size,
449                                   int (*match_fun)(void *, void *, smx_synchro_t),
450                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
451                                   void *data, double rate)
452 {
453   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
454                           match_fun, copy_data_fun, data, rate);
455 }
456
457 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
458                               void *dst_buff, size_t *dst_buff_size,
459                               int (*match_fun)(void *, void *, smx_synchro_t),
460                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
461                               void *data, double rate)
462 {
463   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
464   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
465
466   smx_synchro_t other_synchro;
467   //communication already done, get it inside the fifo of completed comms
468   //permanent receive v1
469   //int already_received=0;
470   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
471
472     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
473     //find a match in the already received fifo
474     other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
475     //if not found, assume the receiver came first, register it to the mailbox in the classical way
476     if (!other_synchro)  {
477       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
478       other_synchro = this_synchro;
479       SIMIX_rdv_push(rdv, this_synchro);
480     }else{
481       if(other_synchro->comm.surf_comm &&       SIMIX_comm_get_remains(other_synchro)==0.0)
482       {
483         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
484         other_synchro->state = SIMIX_DONE;
485         other_synchro->comm.type = SIMIX_COMM_DONE;
486         other_synchro->comm.rdv = NULL;
487       }/*else{
488          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
489          }*/
490       other_synchro->comm.refcount--;
491       SIMIX_comm_destroy(this_synchro);
492       --smx_total_comms; // this creation was a pure waste
493     }
494   }else{
495     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
496
497     /* Look for communication synchro matching our needs. We also provide a description of
498      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
499      *
500      * If it is not found then push our communication into the rendez-vous point */
501     other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
502
503     if (!other_synchro) {
504       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
505       other_synchro = this_synchro;
506       SIMIX_rdv_push(rdv, this_synchro);
507     } else {
508       SIMIX_comm_destroy(this_synchro);
509       --smx_total_comms; // this creation was a pure waste
510       other_synchro->state = SIMIX_READY;
511       other_synchro->comm.type = SIMIX_COMM_READY;
512       //other_synchro->comm.refcount--;
513     }
514     xbt_fifo_push(dst_proc->comms, other_synchro);
515   }
516
517   /* Setup communication synchro */
518   other_synchro->comm.dst_proc = dst_proc;
519   other_synchro->comm.dst_buff = dst_buff;
520   other_synchro->comm.dst_buff_size = dst_buff_size;
521   other_synchro->comm.dst_data = data;
522
523   if (rate != -1.0 &&
524       (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
525     other_synchro->comm.rate = rate;
526
527   other_synchro->comm.match_fun = match_fun;
528   other_synchro->comm.copy_data_fun = copy_data_fun;
529
530
531   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
532     SIMIX_comm_copy_data(other_synchro);*/
533
534
535   if (MC_is_active()) {
536     other_synchro->state = SIMIX_RUNNING;
537     return other_synchro;
538   }
539
540   SIMIX_comm_start(other_synchro);
541   // }
542   return other_synchro;
543 }
544
545 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
546                                    int type, int src, int tag,
547                                    int (*match_fun)(void *, void *, smx_synchro_t),
548                                    void *data){
549   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
550 }
551
552 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
553                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
554 {
555   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
556   smx_synchro_t this_synchro;
557   int smx_type;
558   if(type == 1){
559     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
560     smx_type = SIMIX_COMM_RECEIVE;
561   } else{
562     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
563     smx_type = SIMIX_COMM_SEND;
564   } 
565   smx_synchro_t other_synchro=NULL;
566   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
567     //find a match in the already received fifo
568       XBT_DEBUG("first try in the perm recv mailbox");
569
570     other_synchro = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_synchro);
571   }
572  // }else{
573     if(!other_synchro){
574         XBT_DEBUG("try in the normal mailbox");
575         other_synchro = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_synchro);
576     }
577 //  }
578   if(other_synchro)other_synchro->comm.refcount--;
579
580   SIMIX_comm_destroy(this_synchro);
581   --smx_total_comms;
582   return other_synchro;
583 }
584
585 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
586 {
587   /* the simcall may be a wait, a send or a recv */
588   surf_action_t sleep;
589
590   /* Associate this simcall to the wait synchro */
591   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
592
593   xbt_fifo_push(synchro->simcalls, simcall);
594   simcall->issuer->waiting_synchro = synchro;
595
596   if (MC_is_active()) {
597     int idx = SIMCALL_GET_MC_VALUE(simcall);
598     if (idx == 0) {
599       synchro->state = SIMIX_DONE;
600     } else {
601       /* If we reached this point, the wait simcall must have a timeout */
602       /* Otherwise it shouldn't be enabled and executed by the MC */
603       if (timeout == -1)
604         THROW_IMPOSSIBLE;
605
606       if (synchro->comm.src_proc == simcall->issuer)
607         synchro->state = SIMIX_SRC_TIMEOUT;
608       else
609         synchro->state = SIMIX_DST_TIMEOUT;
610     }
611
612     SIMIX_comm_finish(synchro);
613     return;
614   }
615
616   /* If the synchro has already finish perform the error handling, */
617   /* otherwise set up a waiting timeout on the right side          */
618   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
619     SIMIX_comm_finish(synchro);
620   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
621     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
622     surf_action_set_data(sleep, synchro);
623
624     if (simcall->issuer == synchro->comm.src_proc)
625       synchro->comm.src_timeout = sleep;
626     else
627       synchro->comm.dst_timeout = sleep;
628   }
629 }
630
631 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
632 {
633   if(MC_is_active()){
634     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
635     if(simcall_comm_test__get__result(simcall)){
636       synchro->state = SIMIX_DONE;
637       xbt_fifo_push(synchro->simcalls, simcall);
638       SIMIX_comm_finish(synchro);
639     }else{
640       SIMIX_simcall_answer(simcall);
641     }
642     return;
643   }
644
645   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
646   if (simcall_comm_test__get__result(simcall)) {
647     xbt_fifo_push(synchro->simcalls, simcall);
648     SIMIX_comm_finish(synchro);
649   } else {
650     SIMIX_simcall_answer(simcall);
651   }
652 }
653
654 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
655 {
656   unsigned int cursor;
657   smx_synchro_t synchro;
658   simcall_comm_testany__set__result(simcall, -1);
659
660   if (MC_is_active()){
661     int idx = SIMCALL_GET_MC_VALUE(simcall);
662     if(idx == -1){
663       SIMIX_simcall_answer(simcall);
664     }else{
665       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
666       simcall_comm_testany__set__result(simcall, idx);
667       xbt_fifo_push(synchro->simcalls, simcall);
668       synchro->state = SIMIX_DONE;
669       SIMIX_comm_finish(synchro);
670     }
671     return;
672   }
673
674   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
675     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
676       simcall_comm_testany__set__result(simcall, cursor);
677       xbt_fifo_push(synchro->simcalls, simcall);
678       SIMIX_comm_finish(synchro);
679       return;
680     }
681   }
682   SIMIX_simcall_answer(simcall);
683 }
684
685 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
686 {
687   smx_synchro_t synchro;
688   unsigned int cursor = 0;
689
690   if (MC_is_active()){
691     int idx = SIMCALL_GET_MC_VALUE(simcall);
692     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
693     xbt_fifo_push(synchro->simcalls, simcall);
694     simcall_comm_waitany__set__result(simcall, idx);
695     synchro->state = SIMIX_DONE;
696     SIMIX_comm_finish(synchro);
697     return;
698   }
699
700   xbt_dynar_foreach(synchros, cursor, synchro){
701     /* associate this simcall to the the synchro */
702     xbt_fifo_push(synchro->simcalls, simcall);
703
704     /* see if the synchro is already finished */
705     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
706       SIMIX_comm_finish(synchro);
707       break;
708     }
709   }
710 }
711
712 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
713 {
714   smx_synchro_t synchro;
715   unsigned int cursor = 0;
716   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
717
718   xbt_dynar_foreach(synchros, cursor, synchro) {
719     xbt_fifo_remove(synchro->simcalls, simcall);
720   }
721 }
722
723 /**
724  *  \brief Starts the simulation of a communication synchro.
725  *  \param synchro the communication synchro
726  */
727 static XBT_INLINE void SIMIX_comm_start(smx_synchro_t synchro)
728 {
729   /* If both the sender and the receiver are already there, start the communication */
730   if (synchro->state == SIMIX_READY) {
731
732     smx_host_t sender = synchro->comm.src_proc->smx_host;
733     smx_host_t receiver = synchro->comm.dst_proc->smx_host;
734
735     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
736               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
737
738     synchro->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
739                                                                     sender, receiver,
740                                                                     synchro->comm.task_size, synchro->comm.rate);
741
742     surf_action_set_data(synchro->comm.surf_comm, synchro);
743
744     synchro->state = SIMIX_RUNNING;
745
746     /* If a link is failed, detect it immediately */
747     if (surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
748       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
749                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
750       synchro->state = SIMIX_LINK_FAILURE;
751       SIMIX_comm_destroy_internal_actions(synchro);
752     }
753
754     /* If any of the process is suspend, create the synchro but stop its execution,
755        it will be restarted when the sender process resume */
756     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
757         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
758       /* FIXME: check what should happen with the synchro state */
759
760       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
761         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
762                   SIMIX_host_get_name(synchro->comm.src_proc->smx_host), synchro->comm.src_proc->name);
763       else
764         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
765                   SIMIX_host_get_name(synchro->comm.dst_proc->smx_host), synchro->comm.dst_proc->name);
766
767       surf_action_suspend(synchro->comm.surf_comm);
768
769     }
770   }
771 }
772
773 /**
774  * \brief Answers the SIMIX simcalls associated to a communication synchro.
775  * \param synchro a finished communication synchro
776  */
777 void SIMIX_comm_finish(smx_synchro_t synchro)
778 {
779   unsigned int destroy_count = 0;
780   smx_simcall_t simcall;
781
782
783   while ((simcall = xbt_fifo_shift(synchro->simcalls))) {
784
785     /* If a waitany simcall is waiting for this synchro to finish, then remove
786        it from the other synchros in the waitany list. Afterwards, get the
787        position of the actual synchro in the waitany dynar and
788        return it as the result of the simcall */
789
790     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
791       continue; // if process handling comm is killed
792     if (simcall->call == SIMCALL_COMM_WAITANY) {
793       SIMIX_waitany_remove_simcall_from_actions(simcall);
794       if (!MC_is_active())
795         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
796     }
797
798     /* If the synchro is still in a rendez-vous point then remove from it */
799     if (synchro->comm.rdv)
800       SIMIX_rdv_remove(synchro->comm.rdv, synchro);
801
802     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
803
804     /* Check out for errors */
805     switch (synchro->state) {
806
807     case SIMIX_DONE:
808       XBT_DEBUG("Communication %p complete!", synchro);
809       SIMIX_comm_copy_data(synchro);
810       break;
811
812     case SIMIX_SRC_TIMEOUT:
813       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
814                     "Communication timeouted because of sender");
815       break;
816
817     case SIMIX_DST_TIMEOUT:
818       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
819                     "Communication timeouted because of receiver");
820       break;
821
822     case SIMIX_SRC_HOST_FAILURE:
823       if (simcall->issuer == synchro->comm.src_proc)
824         simcall->issuer->context->iwannadie = 1;
825 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
826       else
827         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
828       break;
829
830     case SIMIX_DST_HOST_FAILURE:
831       if (simcall->issuer == synchro->comm.dst_proc)
832         simcall->issuer->context->iwannadie = 1;
833 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
834       else
835         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
836       break;
837
838     case SIMIX_LINK_FAILURE:
839       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
840                 synchro,
841                 synchro->comm.src_proc ? sg_host_name(synchro->comm.src_proc->smx_host) : NULL,
842                 synchro->comm.dst_proc ? sg_host_name(synchro->comm.dst_proc->smx_host) : NULL,
843                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
844       if (synchro->comm.src_proc == simcall->issuer) {
845         XBT_DEBUG("I'm source");
846       } else if (synchro->comm.dst_proc == simcall->issuer) {
847         XBT_DEBUG("I'm dest");
848       } else {
849         XBT_DEBUG("I'm neither source nor dest");
850       }
851       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
852       break;
853
854     case SIMIX_CANCELED:
855       if (simcall->issuer == synchro->comm.dst_proc)
856         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
857                       "Communication canceled by the sender");
858       else
859         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
860                       "Communication canceled by the receiver");
861       break;
862
863     default:
864       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
865     }
866
867     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
868     if (simcall->issuer->doexception) {
869       if (simcall->call == SIMCALL_COMM_WAITANY) {
870         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
871       }
872       else if (simcall->call == SIMCALL_COMM_TESTANY) {
873         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
874       }
875     }
876
877     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
878       simcall->issuer->context->iwannadie = 1;
879     }
880
881     simcall->issuer->waiting_synchro = NULL;
882     xbt_fifo_remove(simcall->issuer->comms, synchro);
883     if(synchro->comm.detached){
884       if(simcall->issuer == synchro->comm.src_proc){
885         if(synchro->comm.dst_proc)
886           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
887       }
888       if(simcall->issuer == synchro->comm.dst_proc){
889         if(synchro->comm.src_proc)
890           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
891       }
892     }
893     SIMIX_simcall_answer(simcall);
894     destroy_count++;
895   }
896
897   while (destroy_count-- > 0)
898     SIMIX_comm_destroy(synchro);
899 }
900
901 /**
902  * \brief This function is called when a Surf communication synchro is finished.
903  * \param synchro the corresponding Simix communication
904  */
905 void SIMIX_post_comm(smx_synchro_t synchro)
906 {
907   /* Update synchro state */
908   if (synchro->comm.src_timeout &&
909       surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_DONE)
910     synchro->state = SIMIX_SRC_TIMEOUT;
911   else if (synchro->comm.dst_timeout &&
912           surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_DONE)
913     synchro->state = SIMIX_DST_TIMEOUT;
914   else if (synchro->comm.src_timeout &&
915           surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_FAILED)
916     synchro->state = SIMIX_SRC_HOST_FAILURE;
917   else if (synchro->comm.dst_timeout &&
918       surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_FAILED)
919     synchro->state = SIMIX_DST_HOST_FAILURE;
920   else if (synchro->comm.surf_comm &&
921           surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
922     XBT_DEBUG("Puta madre. Surf says that the link broke");
923     synchro->state = SIMIX_LINK_FAILURE;
924   } else
925     synchro->state = SIMIX_DONE;
926
927   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
928             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
929
930   /* destroy the surf actions associated with the Simix communication */
931   SIMIX_comm_destroy_internal_actions(synchro);
932
933   /* if there are simcalls associated with the synchro, then answer them */
934   if (xbt_fifo_size(synchro->simcalls)) {
935     SIMIX_comm_finish(synchro);
936   }
937 }
938
939 void SIMIX_comm_cancel(smx_synchro_t synchro)
940 {
941   /* if the synchro is a waiting state means that it is still in a rdv */
942   /* so remove from it and delete it */
943   if (synchro->state == SIMIX_WAITING) {
944     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
945     synchro->state = SIMIX_CANCELED;
946   }
947   else if (!MC_is_active() /* when running the MC there are no surf actions */
948            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
949
950     surf_action_cancel(synchro->comm.surf_comm);
951   }
952 }
953
954 void SIMIX_comm_suspend(smx_synchro_t synchro)
955 {
956   /*FIXME: shall we suspend also the timeout synchro? */
957   if (synchro->comm.surf_comm)
958     surf_action_suspend(synchro->comm.surf_comm);
959   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
960 }
961
962 void SIMIX_comm_resume(smx_synchro_t synchro)
963 {
964   /*FIXME: check what happen with the timeouts */
965   if (synchro->comm.surf_comm)
966     surf_action_resume(synchro->comm.surf_comm);
967   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
968 }
969
970
971 /************* synchro Getters **************/
972
973 /**
974  *  \brief get the amount remaining from the communication
975  *  \param synchro The communication
976  */
977 double SIMIX_comm_get_remains(smx_synchro_t synchro)
978 {
979   double remains;
980
981   if(!synchro){
982     return 0;
983   }
984
985   switch (synchro->state) {
986
987   case SIMIX_RUNNING:
988     remains = surf_action_get_remains(synchro->comm.surf_comm);
989     break;
990
991   case SIMIX_WAITING:
992   case SIMIX_READY:
993     remains = 0; /*FIXME: check what should be returned */
994     break;
995
996   default:
997     remains = 0; /*FIXME: is this correct? */
998     break;
999   }
1000   return remains;
1001 }
1002
1003 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
1004 {
1005   return synchro->state;
1006 }
1007
1008 /**
1009  *  \brief Return the user data associated to the sender of the communication
1010  *  \param synchro The communication
1011  *  \return the user data
1012  */
1013 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
1014 {
1015   return synchro->comm.src_data;
1016 }
1017
1018 /**
1019  *  \brief Return the user data associated to the receiver of the communication
1020  *  \param synchro The communication
1021  *  \return the user data
1022  */
1023 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1024 {
1025   return synchro->comm.dst_data;
1026 }
1027
1028 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1029 {
1030   return synchro->comm.src_proc;
1031 }
1032
1033 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1034 {
1035   return synchro->comm.dst_proc;
1036 }
1037
1038 #ifdef HAVE_LATENCY_BOUND_TRACKING
1039 /**
1040  *  \brief verify if communication is latency bounded
1041  *  \param comm The communication
1042  */
1043 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1044 {
1045   if(!synchro){
1046     return 0;
1047   }
1048   if (synchro->comm.surf_comm){
1049     XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1050     synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1051     XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1052   }
1053   return synchro->latency_limited;
1054 }
1055 #endif
1056
1057 /******************************************************************************/
1058 /*                    SIMIX_comm_copy_data callbacks                       */
1059 /******************************************************************************/
1060 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1061   &SIMIX_comm_copy_pointer_callback;
1062
1063 void
1064 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1065 {
1066   SIMIX_comm_copy_data_callback = callback;
1067 }
1068
1069 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1070 {
1071   xbt_assert((buff_size == sizeof(void *)),
1072              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1073   *(void **) (comm->comm.dst_buff) = buff;
1074 }
1075
1076 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1077 {
1078   XBT_DEBUG("Copy the data over");
1079   memcpy(comm->comm.dst_buff, buff, buff_size);
1080   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1081     xbt_free(buff);
1082     comm->comm.src_buff = NULL;
1083   }
1084 }
1085
1086
1087 /**
1088  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1089  *  \param comm The communication
1090  */
1091 void SIMIX_comm_copy_data(smx_synchro_t comm)
1092 {
1093   size_t buff_size = comm->comm.src_buff_size;
1094   /* If there is no data to be copy then return */
1095   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1096     return;
1097
1098   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1099             comm,
1100             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1101             comm->comm.src_buff,
1102             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1103             comm->comm.dst_buff, buff_size);
1104
1105   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1106   if (comm->comm.dst_buff_size)
1107     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1108
1109   /* Update the receiver's buffer size to the copied amount */
1110   if (comm->comm.dst_buff_size)
1111     *comm->comm.dst_buff_size = buff_size;
1112
1113   if (buff_size > 0){
1114       if(comm->comm.copy_data_fun)
1115         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1116       else
1117         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1118   }
1119
1120
1121   /* Set the copied flag so we copy data only once */
1122   /* (this function might be called from both communication ends) */
1123   comm->comm.copied = 1;
1124 }