Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
no \n in debug messages
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "Logging specific to SIMIX (network)");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_action_t),
26                                         void *user_data, smx_action_t my_action);
27 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_action_t),
29                                         void *user_data, smx_action_t my_action);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_action_t action);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 /**
294  *  \brief Destroy a communicate action
295  *  \param action The communicate action to be destroyed
296  */
297 void SIMIX_comm_destroy(smx_action_t action)
298 {
299   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300             action, action->comm.refcount, (int)action->state);
301
302   if (action->comm.refcount <= 0) {
303     xbt_backtrace_display_current();
304     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
306   }
307   action->comm.refcount--;
308   if (action->comm.refcount > 0)
309       return;
310   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311             action->comm.refcount);
312
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
315 #endif
316
317   xbt_free(action->name);
318   SIMIX_comm_destroy_internal_actions(action);
319
320   if (action->comm.detached && action->state != SIMIX_DONE) {
321     /* the communication has failed and was detached:
322      * we have to free the buffer */
323     if (action->comm.clean_fun) {
324       action->comm.clean_fun(action->comm.src_buff);
325     }
326     action->comm.src_buff = NULL;
327   }
328
329   if(action->comm.rdv)
330     SIMIX_rdv_remove(action->comm.rdv, action);
331
332   xbt_mallocator_release(simix_global->action_mallocator, action);
333 }
334
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
336 {
337   if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
340 #endif
341     surf_action_unref(action->comm.surf_comm);
342     action->comm.surf_comm = NULL;
343   }
344
345   if (action->comm.src_timeout){
346     surf_action_unref(action->comm.src_timeout);
347     action->comm.src_timeout = NULL;
348   }
349
350   if (action->comm.dst_timeout){
351     surf_action_unref(action->comm.dst_timeout);
352     action->comm.dst_timeout = NULL;
353   }
354 }
355
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
357                                   double task_size, double rate,
358                                   void *src_buff, size_t src_buff_size,
359                                   int (*match_fun)(void *, void *,smx_action_t),
360                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
361                                   void *data, double timeout){
362   smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
363                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
364                                        data, 0);
365   SIMCALL_SET_MC_VALUE(simcall, 0);
366   SIMIX_pre_comm_wait(simcall, comm, timeout);
367 }
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
369                                   double task_size, double rate,
370                                   void *src_buff, size_t src_buff_size,
371                                   int (*match_fun)(void *, void *,smx_action_t),
372                                   void (*clean_fun)(void *),
373                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
374                                   void *data, int detached){
375   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445   other_action->comm.copy_data_fun = copy_data_fun;
446
447
448   if (MC_is_active()) {
449     other_action->state = SIMIX_RUNNING;
450     return (detached ? NULL : other_action);
451   }
452
453   SIMIX_comm_start(other_action);
454   return (detached ? NULL : other_action);
455 }
456
457 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
458                          void *dst_buff, size_t *dst_buff_size,
459                          int (*match_fun)(void *, void *, smx_action_t),
460                          void (*copy_data_fun)(smx_action_t, void*, size_t),
461                          void *data, double timeout, double rate)
462 {
463   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
464                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
465   SIMCALL_SET_MC_VALUE(simcall, 0);
466   SIMIX_pre_comm_wait(simcall, comm, timeout);
467 }
468
469 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
470                                   void *dst_buff, size_t *dst_buff_size,
471                                   int (*match_fun)(void *, void *, smx_action_t),
472                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
473                                   void *data, double rate)
474 {
475   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
476                           match_fun, copy_data_fun, data, rate);
477 }
478
479 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
480                               void *dst_buff, size_t *dst_buff_size,
481                               int (*match_fun)(void *, void *, smx_action_t),
482                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
483                               void *data, double rate)
484 {
485   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
486   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487
488   smx_action_t other_action;
489   //communication already done, get it inside the fifo of completed comms
490   //permanent receive v1
491   //int already_received=0;
492   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
493
494     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
495     //find a match in the already received fifo
496     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497     //if not found, assume the receiver came first, register it to the mailbox in the classical way
498     if (!other_action)  {
499       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
500       other_action = this_action;
501       SIMIX_rdv_push(rdv, this_action);
502     }else{
503       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
504       {
505         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_action->comm));
506         other_action->state = SIMIX_DONE;
507         other_action->comm.type = SIMIX_COMM_DONE;
508         other_action->comm.rdv = NULL;
509       }/*else{
510          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
511          }*/
512       other_action->comm.refcount--;
513       SIMIX_comm_destroy(this_action);
514       --smx_total_comms; // this creation was a pure waste
515     }
516   }else{
517     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
518
519     /* Look for communication action matching our needs. We also provide a description of
520      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
521      *
522      * If it is not found then push our communication into the rendez-vous point */
523     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
524
525     if (!other_action) {
526       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
527       other_action = this_action;
528       SIMIX_rdv_push(rdv, this_action);
529     } else {
530       SIMIX_comm_destroy(this_action);
531       --smx_total_comms; // this creation was a pure waste
532       other_action->state = SIMIX_READY;
533       other_action->comm.type = SIMIX_COMM_READY;
534       //other_action->comm.refcount--;
535     }
536     xbt_fifo_push(dst_proc->comms, other_action);
537   }
538
539   /* Setup communication action */
540   other_action->comm.dst_proc = dst_proc;
541   other_action->comm.dst_buff = dst_buff;
542   other_action->comm.dst_buff_size = dst_buff_size;
543   other_action->comm.dst_data = data;
544
545   if (rate != -1.0 &&
546       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
547     other_action->comm.rate = rate;
548
549   other_action->comm.match_fun = match_fun;
550   other_action->comm.copy_data_fun = copy_data_fun;
551
552
553   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
554     SIMIX_comm_copy_data(other_action);*/
555
556
557   if (MC_is_active()) {
558     other_action->state = SIMIX_RUNNING;
559     return other_action;
560   }
561
562   SIMIX_comm_start(other_action);
563   // }
564   return other_action;
565 }
566
567 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
568                                    int type, int src, int tag,
569                                    int (*match_fun)(void *, void *, smx_action_t),
570                                    void *data){
571   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
572 }
573
574 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
575                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
576 {
577   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
578   smx_action_t this_action;
579   int smx_type;
580   if(type == 1){
581     this_action=SIMIX_comm_new(SIMIX_COMM_SEND);
582     smx_type = SIMIX_COMM_RECEIVE;
583   } else{
584     this_action=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
585     smx_type = SIMIX_COMM_SEND;
586   } 
587   smx_action_t other_action=NULL;
588   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
589     //find a match in the already received fifo
590       XBT_DEBUG("first try in the perm recv mailbox");
591
592     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_action);
593   }
594  // }else{
595     if(!other_action){
596         XBT_DEBUG("try in the normal mailbox");
597         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_action);
598     }
599 //  }
600   if(other_action)other_action->comm.refcount--;
601
602   SIMIX_comm_destroy(this_action);
603   --smx_total_comms;
604   return other_action;
605 }
606
607 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
608 {
609   /* the simcall may be a wait, a send or a recv */
610   surf_action_t sleep;
611
612   /* Associate this simcall to the wait action */
613   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
614
615   xbt_fifo_push(action->simcalls, simcall);
616   simcall->issuer->waiting_action = action;
617
618   if (MC_is_active()) {
619     int idx = SIMCALL_GET_MC_VALUE(simcall);
620     if (idx == 0) {
621       action->state = SIMIX_DONE;
622     } else {
623       /* If we reached this point, the wait simcall must have a timeout */
624       /* Otherwise it shouldn't be enabled and executed by the MC */
625       if (timeout == -1)
626         THROW_IMPOSSIBLE;
627
628       if (action->comm.src_proc == simcall->issuer)
629         action->state = SIMIX_SRC_TIMEOUT;
630       else
631         action->state = SIMIX_DST_TIMEOUT;
632     }
633
634     SIMIX_comm_finish(action);
635     return;
636   }
637
638   /* If the action has already finish perform the error handling, */
639   /* otherwise set up a waiting timeout on the right side         */
640   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
641     SIMIX_comm_finish(action);
642   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
643     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
644     surf_action_set_data(sleep, action);
645
646     if (simcall->issuer == action->comm.src_proc)
647       action->comm.src_timeout = sleep;
648     else
649       action->comm.dst_timeout = sleep;
650   }
651 }
652
653 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
654 {
655   if(MC_is_active()){
656     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
657     if(simcall_comm_test__get__result(simcall)){
658       action->state = SIMIX_DONE;
659       xbt_fifo_push(action->simcalls, simcall);
660       SIMIX_comm_finish(action);
661     }else{
662       SIMIX_simcall_answer(simcall);
663     }
664     return;
665   }
666
667   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
668   if (simcall_comm_test__get__result(simcall)) {
669     xbt_fifo_push(action->simcalls, simcall);
670     SIMIX_comm_finish(action);
671   } else {
672     SIMIX_simcall_answer(simcall);
673   }
674 }
675
676 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
677 {
678   unsigned int cursor;
679   smx_action_t action;
680   simcall_comm_testany__set__result(simcall, -1);
681
682   if (MC_is_active()){
683     int idx = SIMCALL_GET_MC_VALUE(simcall);
684     if(idx == -1){
685       SIMIX_simcall_answer(simcall);
686     }else{
687       action = xbt_dynar_get_as(actions, idx, smx_action_t);
688       simcall_comm_testany__set__result(simcall, idx);
689       xbt_fifo_push(action->simcalls, simcall);
690       action->state = SIMIX_DONE;
691       SIMIX_comm_finish(action);
692     }
693     return;
694   }
695
696   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
697     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
698       simcall_comm_testany__set__result(simcall, cursor);
699       xbt_fifo_push(action->simcalls, simcall);
700       SIMIX_comm_finish(action);
701       return;
702     }
703   }
704   SIMIX_simcall_answer(simcall);
705 }
706
707 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
708 {
709   smx_action_t action;
710   unsigned int cursor = 0;
711
712   if (MC_is_active()){
713     int idx = SIMCALL_GET_MC_VALUE(simcall);
714     action = xbt_dynar_get_as(actions, idx, smx_action_t);
715     xbt_fifo_push(action->simcalls, simcall);
716     simcall_comm_waitany__set__result(simcall, idx);
717     action->state = SIMIX_DONE;
718     SIMIX_comm_finish(action);
719     return;
720   }
721
722   xbt_dynar_foreach(actions, cursor, action){
723     /* associate this simcall to the the action */
724     xbt_fifo_push(action->simcalls, simcall);
725
726     /* see if the action is already finished */
727     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
728       SIMIX_comm_finish(action);
729       break;
730     }
731   }
732 }
733
734 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
735 {
736   smx_action_t action;
737   unsigned int cursor = 0;
738   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
739
740   xbt_dynar_foreach(actions, cursor, action) {
741     xbt_fifo_remove(action->simcalls, simcall);
742   }
743 }
744
745 /**
746  *  \brief Starts the simulation of a communication action.
747  *  \param action the communication action
748  */
749 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
750 {
751   /* If both the sender and the receiver are already there, start the communication */
752   if (action->state == SIMIX_READY) {
753
754     smx_host_t sender = action->comm.src_proc->smx_host;
755     smx_host_t receiver = action->comm.dst_proc->smx_host;
756
757     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
758               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
759
760     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
761                                                                     sender, receiver,
762                                                                     action->comm.task_size, action->comm.rate);
763
764     surf_action_set_data(action->comm.surf_comm, action);
765
766     action->state = SIMIX_RUNNING;
767
768     /* If a link is failed, detect it immediately */
769     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
770       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
771                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
772       action->state = SIMIX_LINK_FAILURE;
773       SIMIX_comm_destroy_internal_actions(action);
774     }
775
776     /* If any of the process is suspend, create the action but stop its execution,
777        it will be restarted when the sender process resume */
778     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
779         SIMIX_process_is_suspended(action->comm.dst_proc)) {
780       /* FIXME: check what should happen with the action state */
781
782       if (SIMIX_process_is_suspended(action->comm.src_proc))
783         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
784                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
785       else
786         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
787                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
788
789       surf_action_suspend(action->comm.surf_comm);
790
791     }
792   }
793 }
794
795 /**
796  * \brief Answers the SIMIX simcalls associated to a communication action.
797  * \param action a finished communication action
798  */
799 void SIMIX_comm_finish(smx_action_t action)
800 {
801   unsigned int destroy_count = 0;
802   smx_simcall_t simcall;
803
804
805   while ((simcall = xbt_fifo_shift(action->simcalls))) {
806
807     /* If a waitany simcall is waiting for this action to finish, then remove
808        it from the other actions in the waitany list. Afterwards, get the
809        position of the actual action in the waitany dynar and
810        return it as the result of the simcall */
811
812     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
813       continue; // if process handling comm is killed
814     if (simcall->call == SIMCALL_COMM_WAITANY) {
815       SIMIX_waitany_remove_simcall_from_actions(simcall);
816       if (!MC_is_active())
817         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
818     }
819
820     /* If the action is still in a rendez-vous point then remove from it */
821     if (action->comm.rdv)
822       SIMIX_rdv_remove(action->comm.rdv, action);
823
824     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
825
826     /* Check out for errors */
827     switch (action->state) {
828
829     case SIMIX_DONE:
830       XBT_DEBUG("Communication %p complete!", action);
831       SIMIX_comm_copy_data(action);
832       break;
833
834     case SIMIX_SRC_TIMEOUT:
835       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
836                     "Communication timeouted because of sender");
837       break;
838
839     case SIMIX_DST_TIMEOUT:
840       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
841                     "Communication timeouted because of receiver");
842       break;
843
844     case SIMIX_SRC_HOST_FAILURE:
845       if (simcall->issuer == action->comm.src_proc)
846         simcall->issuer->context->iwannadie = 1;
847 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
848       else
849         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
850       break;
851
852     case SIMIX_DST_HOST_FAILURE:
853       if (simcall->issuer == action->comm.dst_proc)
854         simcall->issuer->context->iwannadie = 1;
855 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
856       else
857         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
858       break;
859
860     case SIMIX_LINK_FAILURE:
861       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
862                 action,
863                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
864                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
865                 simcall->issuer->name, simcall->issuer, action->comm.detached);
866       if (action->comm.src_proc == simcall->issuer) {
867         XBT_DEBUG("I'm source");
868       } else if (action->comm.dst_proc == simcall->issuer) {
869         XBT_DEBUG("I'm dest");
870       } else {
871         XBT_DEBUG("I'm neither source nor dest");
872       }
873       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
874       break;
875
876     case SIMIX_CANCELED:
877       if (simcall->issuer == action->comm.dst_proc)
878         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
879                       "Communication canceled by the sender");
880       else
881         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
882                       "Communication canceled by the receiver");
883       break;
884
885     default:
886       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
887     }
888
889     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
890     if (simcall->issuer->doexception) {
891       if (simcall->call == SIMCALL_COMM_WAITANY) {
892         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
893       }
894       else if (simcall->call == SIMCALL_COMM_TESTANY) {
895         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
896       }
897     }
898
899     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
900       simcall->issuer->context->iwannadie = 1;
901     }
902
903     simcall->issuer->waiting_action = NULL;
904     xbt_fifo_remove(simcall->issuer->comms, action);
905     if(action->comm.detached){
906       if(simcall->issuer == action->comm.src_proc){
907         if(action->comm.dst_proc)
908           xbt_fifo_remove(action->comm.dst_proc->comms, action);
909       }
910       if(simcall->issuer == action->comm.dst_proc){
911         if(action->comm.src_proc)
912           xbt_fifo_remove(action->comm.src_proc->comms, action);
913       }
914     }
915     SIMIX_simcall_answer(simcall);
916     destroy_count++;
917   }
918
919   while (destroy_count-- > 0)
920     SIMIX_comm_destroy(action);
921 }
922
923 /**
924  * \brief This function is called when a Surf communication action is finished.
925  * \param action the corresponding Simix communication
926  */
927 void SIMIX_post_comm(smx_action_t action)
928 {
929   /* Update action state */
930   if (action->comm.src_timeout &&
931       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
932     action->state = SIMIX_SRC_TIMEOUT;
933   else if (action->comm.dst_timeout &&
934           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
935     action->state = SIMIX_DST_TIMEOUT;
936   else if (action->comm.src_timeout &&
937           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
938     action->state = SIMIX_SRC_HOST_FAILURE;
939   else if (action->comm.dst_timeout &&
940       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
941     action->state = SIMIX_DST_HOST_FAILURE;
942   else if (action->comm.surf_comm &&
943           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
944     XBT_DEBUG("Puta madre. Surf says that the link broke");
945     action->state = SIMIX_LINK_FAILURE;
946   } else
947     action->state = SIMIX_DONE;
948
949   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
950             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
951
952   /* destroy the surf actions associated with the Simix communication */
953   SIMIX_comm_destroy_internal_actions(action);
954
955   /* if there are simcalls associated with the action, then answer them */
956   if (xbt_fifo_size(action->simcalls)) {
957     SIMIX_comm_finish(action);
958   }
959 }
960
961 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
962   SIMIX_comm_cancel(action);
963 }
964 void SIMIX_comm_cancel(smx_action_t action)
965 {
966   /* if the action is a waiting state means that it is still in a rdv */
967   /* so remove from it and delete it */
968   if (action->state == SIMIX_WAITING) {
969     SIMIX_rdv_remove(action->comm.rdv, action);
970     action->state = SIMIX_CANCELED;
971   }
972   else if (!MC_is_active() /* when running the MC there are no surf actions */
973            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
974
975     surf_action_cancel(action->comm.surf_comm);
976   }
977 }
978
979 void SIMIX_comm_suspend(smx_action_t action)
980 {
981   /*FIXME: shall we suspend also the timeout actions? */
982   if (action->comm.surf_comm)
983     surf_action_suspend(action->comm.surf_comm);
984   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
985 }
986
987 void SIMIX_comm_resume(smx_action_t action)
988 {
989   /*FIXME: check what happen with the timeouts */
990   if (action->comm.surf_comm)
991     surf_action_resume(action->comm.surf_comm);
992   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
993 }
994
995
996 /************* Action Getters **************/
997
998 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
999   return SIMIX_comm_get_remains(action);
1000 }
1001 /**
1002  *  \brief get the amount remaining from the communication
1003  *  \param action The communication
1004  */
1005 double SIMIX_comm_get_remains(smx_action_t action)
1006 {
1007   double remains;
1008
1009   if(!action){
1010     return 0;
1011   }
1012
1013   switch (action->state) {
1014
1015   case SIMIX_RUNNING:
1016     remains = surf_action_get_remains(action->comm.surf_comm);
1017     break;
1018
1019   case SIMIX_WAITING:
1020   case SIMIX_READY:
1021     remains = 0; /*FIXME: check what should be returned */
1022     break;
1023
1024   default:
1025     remains = 0; /*FIXME: is this correct? */
1026     break;
1027   }
1028   return remains;
1029 }
1030
1031 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1032   return SIMIX_comm_get_state(action);
1033 }
1034 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1035 {
1036   return action->state;
1037 }
1038
1039 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1040   return SIMIX_comm_get_src_data(action);
1041 }
1042 /**
1043  *  \brief Return the user data associated to the sender of the communication
1044  *  \param action The communication
1045  *  \return the user data
1046  */
1047 void* SIMIX_comm_get_src_data(smx_action_t action)
1048 {
1049   return action->comm.src_data;
1050 }
1051
1052 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1053   return SIMIX_comm_get_dst_data(action);
1054 }
1055 /**
1056  *  \brief Return the user data associated to the receiver of the communication
1057  *  \param action The communication
1058  *  \return the user data
1059  */
1060 void* SIMIX_comm_get_dst_data(smx_action_t action)
1061 {
1062   return action->comm.dst_data;
1063 }
1064
1065 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1066   return SIMIX_comm_get_src_proc(action);
1067 }
1068 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1069 {
1070   return action->comm.src_proc;
1071 }
1072
1073 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1074   return SIMIX_comm_get_dst_proc(action);
1075 }
1076 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1077 {
1078   return action->comm.dst_proc;
1079 }
1080
1081 #ifdef HAVE_LATENCY_BOUND_TRACKING
1082 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1083 {
1084   return SIMIX_comm_is_latency_bounded(action);
1085 }
1086
1087 /**
1088  *  \brief verify if communication is latency bounded
1089  *  \param comm The communication
1090  */
1091 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1092 {
1093   if(!action){
1094     return 0;
1095   }
1096   if (action->comm.surf_comm){
1097     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1098     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1099     XBT_DEBUG("Action limited is %d", action->latency_limited);
1100   }
1101   return action->latency_limited;
1102 }
1103 #endif
1104
1105 /******************************************************************************/
1106 /*                    SIMIX_comm_copy_data callbacks                       */
1107 /******************************************************************************/
1108 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1109   &SIMIX_comm_copy_pointer_callback;
1110
1111 void
1112 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1113 {
1114   SIMIX_comm_copy_data_callback = callback;
1115 }
1116
1117 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1118 {
1119   xbt_assert((buff_size == sizeof(void *)),
1120              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1121   *(void **) (comm->comm.dst_buff) = buff;
1122 }
1123
1124 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1125 {
1126   XBT_DEBUG("Copy the data over");
1127   memcpy(comm->comm.dst_buff, buff, buff_size);
1128   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1129     xbt_free(buff);
1130     comm->comm.src_buff = NULL;
1131   }
1132 }
1133
1134
1135 /**
1136  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1137  *  \param comm The communication
1138  */
1139 void SIMIX_comm_copy_data(smx_action_t comm)
1140 {
1141   size_t buff_size = comm->comm.src_buff_size;
1142   /* If there is no data to be copy then return */
1143   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1144     return;
1145
1146   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1147             comm,
1148             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1149             comm->comm.src_buff,
1150             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1151             comm->comm.dst_buff, buff_size);
1152
1153   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1154   if (comm->comm.dst_buff_size)
1155     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1156
1157   /* Update the receiver's buffer size to the copied amount */
1158   if (comm->comm.dst_buff_size)
1159     *comm->comm.dst_buff_size = buff_size;
1160
1161   if (buff_size > 0){
1162       if(comm->comm.copy_data_fun)
1163         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1164       else
1165         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1166   }
1167
1168
1169   /* Set the copied flag so we copy data only once */
1170   /* (this function might be called from both communication ends) */
1171   comm->comm.copied = 1;
1172 }