Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Revert commit ed3e911d for better performances
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "Logging specific to SIMIX (network)");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_action_t),
26                                         void *user_data, smx_action_t my_action);
27 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_action_t),
29                                         void *user_data, smx_action_t my_action);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_action_t action);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 /**
294  *  \brief Destroy a communicate action
295  *  \param action The communicate action to be destroyed
296  */
297 void SIMIX_comm_destroy(smx_action_t action)
298 {
299   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300             action, action->comm.refcount, (int)action->state);
301
302   if (action->comm.refcount <= 0) {
303     xbt_backtrace_display_current();
304     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
306   }
307   action->comm.refcount--;
308   if (action->comm.refcount > 0)
309       return;
310   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311             action->comm.refcount);
312
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
315 #endif
316
317   xbt_free(action->name);
318   SIMIX_comm_destroy_internal_actions(action);
319
320   if (action->comm.detached && action->state != SIMIX_DONE) {
321     /* the communication has failed and was detached:
322      * we have to free the buffer */
323     if (action->comm.clean_fun) {
324       action->comm.clean_fun(action->comm.src_buff);
325     }
326     action->comm.src_buff = NULL;
327   }
328
329   if(action->comm.rdv)
330     SIMIX_rdv_remove(action->comm.rdv, action);
331
332   xbt_mallocator_release(simix_global->action_mallocator, action);
333 }
334
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
336 {
337   if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
340 #endif
341     surf_action_unref(action->comm.surf_comm);
342     action->comm.surf_comm = NULL;
343   }
344
345   if (action->comm.src_timeout){
346     surf_action_unref(action->comm.src_timeout);
347     action->comm.src_timeout = NULL;
348   }
349
350   if (action->comm.dst_timeout){
351     surf_action_unref(action->comm.dst_timeout);
352     action->comm.dst_timeout = NULL;
353   }
354 }
355
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
357                                   double task_size, double rate,
358                                   void *src_buff, size_t src_buff_size,
359                                   int (*match_fun)(void *, void *,smx_action_t),
360                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
361                                   void *data, double timeout){
362   smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
363                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
364                                        data, 0);
365   SIMCALL_SET_MC_VALUE(simcall, 0);
366   SIMIX_pre_comm_wait(simcall, comm, timeout);
367 }
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
369                                   double task_size, double rate,
370                                   void *src_buff, size_t src_buff_size,
371                                   int (*match_fun)(void *, void *,smx_action_t),
372                                   void (*clean_fun)(void *),
373                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
374                                   void *data, int detached){
375   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445   other_action->comm.copy_data_fun = copy_data_fun;
446
447
448   if (MC_is_active()) {
449     other_action->state = SIMIX_RUNNING;
450     return (detached ? NULL : other_action);
451   }
452
453   SIMIX_comm_start(other_action);
454   return (detached ? NULL : other_action);
455 }
456
457 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
458                          void *dst_buff, size_t *dst_buff_size,
459                          int (*match_fun)(void *, void *, smx_action_t),
460                          void (*copy_data_fun)(smx_action_t, void*, size_t),
461                          void *data, double timeout, double rate)
462 {
463   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
464                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
465   SIMCALL_SET_MC_VALUE(simcall, 0);
466   SIMIX_pre_comm_wait(simcall, comm, timeout);
467 }
468
469 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
470                                   void *dst_buff, size_t *dst_buff_size,
471                                   int (*match_fun)(void *, void *, smx_action_t),
472                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
473                                   void *data, double rate)
474 {
475   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
476                           match_fun, copy_data_fun, data, rate);
477 }
478
479 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
480                               void *dst_buff, size_t *dst_buff_size,
481                               int (*match_fun)(void *, void *, smx_action_t),
482                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
483                               void *data, double rate)
484 {
485   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
486   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487
488   smx_action_t other_action;
489   //communication already done, get it inside the fifo of completed comms
490   //permanent receive v1
491   //int already_received=0;
492   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
493
494     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
495     //find a match in the already received fifo
496     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497     //if not found, assume the receiver came first, register it to the mailbox in the classical way
498     if (!other_action)  {
499       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
500       other_action = this_action;
501       SIMIX_rdv_push(rdv, this_action);
502     }else{
503       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
504       {
505         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
506         other_action->state = SIMIX_DONE;
507         other_action->comm.type = SIMIX_COMM_DONE;
508         other_action->comm.rdv = NULL;
509         //SIMIX_comm_destroy(this_action);
510         //--smx_total_comms; // this creation was a pure waste
511         //already_received=1;
512         //other_action->comm.refcount--;
513       }/*else{
514          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
515          }*/
516       other_action->comm.refcount--;
517       SIMIX_comm_destroy(this_action);
518       --smx_total_comms; // this creation was a pure waste
519     }
520   }else{
521     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
522
523     /* Look for communication action matching our needs. We also provide a description of
524      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
525      *
526      * If it is not found then push our communication into the rendez-vous point */
527     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
528
529     if (!other_action) {
530       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
531       other_action = this_action;
532       SIMIX_rdv_push(rdv, this_action);
533     } else {
534       SIMIX_comm_destroy(this_action);
535       --smx_total_comms; // this creation was a pure waste
536       other_action->state = SIMIX_READY;
537       other_action->comm.type = SIMIX_COMM_READY;
538       //other_action->comm.refcount--;
539     }
540     xbt_fifo_push(dst_proc->comms, other_action);
541   }
542
543   /* Setup communication action */
544   other_action->comm.dst_proc = dst_proc;
545   other_action->comm.dst_buff = dst_buff;
546   other_action->comm.dst_buff_size = dst_buff_size;
547   other_action->comm.dst_data = data;
548
549   if (rate != -1.0 &&
550       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
551     other_action->comm.rate = rate;
552
553   other_action->comm.match_fun = match_fun;
554   other_action->comm.copy_data_fun = copy_data_fun;
555
556
557   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
558     SIMIX_comm_copy_data(other_action);*/
559
560
561   if (MC_is_active()) {
562     other_action->state = SIMIX_RUNNING;
563     return other_action;
564   }
565
566   SIMIX_comm_start(other_action);
567   // }
568   return other_action;
569 }
570
571 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
572                                    int src, int tag,
573                                    int (*match_fun)(void *, void *, smx_action_t),
574                                    void *data){
575   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
576 }
577
578 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
579                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
580 {
581   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
582   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
583
584   smx_action_t other_action=NULL;
585   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586     //find a match in the already received fifo
587       XBT_DEBUG("first try in the perm recv mailbox \n");
588
589     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
590   }
591  // }else{
592     if(!other_action){
593         XBT_DEBUG("second try in the other mailbox");
594         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
595     }
596 //  }
597   if(other_action)other_action->comm.refcount--;
598
599   SIMIX_comm_destroy(this_action);
600   --smx_total_comms;
601   return other_action;
602 }
603
604 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
605 {
606   /* the simcall may be a wait, a send or a recv */
607   surf_action_t sleep;
608
609   /* Associate this simcall to the wait action */
610   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
611
612   xbt_fifo_push(action->simcalls, simcall);
613   simcall->issuer->waiting_action = action;
614
615   if (MC_is_active()) {
616     int idx = SIMCALL_GET_MC_VALUE(simcall);
617     if (idx == 0) {
618       action->state = SIMIX_DONE;
619     } else {
620       /* If we reached this point, the wait simcall must have a timeout */
621       /* Otherwise it shouldn't be enabled and executed by the MC */
622       if (timeout == -1)
623         THROW_IMPOSSIBLE;
624
625       if (action->comm.src_proc == simcall->issuer)
626         action->state = SIMIX_SRC_TIMEOUT;
627       else
628         action->state = SIMIX_DST_TIMEOUT;
629     }
630
631     SIMIX_comm_finish(action);
632     return;
633   }
634
635   /* If the action has already finish perform the error handling, */
636   /* otherwise set up a waiting timeout on the right side         */
637   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
638     SIMIX_comm_finish(action);
639   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
640     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
641     surf_action_set_data(sleep, action);
642
643     if (simcall->issuer == action->comm.src_proc)
644       action->comm.src_timeout = sleep;
645     else
646       action->comm.dst_timeout = sleep;
647   }
648 }
649
650 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
651 {
652   if(MC_is_active()){
653     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
654     if(simcall_comm_test__get__result(simcall)){
655       action->state = SIMIX_DONE;
656       xbt_fifo_push(action->simcalls, simcall);
657       SIMIX_comm_finish(action);
658     }else{
659       SIMIX_simcall_answer(simcall);
660     }
661     return;
662   }
663
664   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
665   if (simcall_comm_test__get__result(simcall)) {
666     xbt_fifo_push(action->simcalls, simcall);
667     SIMIX_comm_finish(action);
668   } else {
669     SIMIX_simcall_answer(simcall);
670   }
671 }
672
673 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
674 {
675   unsigned int cursor;
676   smx_action_t action;
677   simcall_comm_testany__set__result(simcall, -1);
678
679   if (MC_is_active()){
680     int idx = SIMCALL_GET_MC_VALUE(simcall);
681     if(idx == -1){
682       SIMIX_simcall_answer(simcall);
683     }else{
684       action = xbt_dynar_get_as(actions, idx, smx_action_t);
685       simcall_comm_testany__set__result(simcall, idx);
686       xbt_fifo_push(action->simcalls, simcall);
687       action->state = SIMIX_DONE;
688       SIMIX_comm_finish(action);
689     }
690     return;
691   }
692
693   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
694     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
695       simcall_comm_testany__set__result(simcall, cursor);
696       xbt_fifo_push(action->simcalls, simcall);
697       SIMIX_comm_finish(action);
698       return;
699     }
700   }
701   SIMIX_simcall_answer(simcall);
702 }
703
704 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
705 {
706   smx_action_t action;
707   unsigned int cursor = 0;
708
709   if (MC_is_active()){
710     int idx = SIMCALL_GET_MC_VALUE(simcall);
711     action = xbt_dynar_get_as(actions, idx, smx_action_t);
712     xbt_fifo_push(action->simcalls, simcall);
713     simcall_comm_waitany__set__result(simcall, idx);
714     action->state = SIMIX_DONE;
715     SIMIX_comm_finish(action);
716     return;
717   }
718
719   xbt_dynar_foreach(actions, cursor, action){
720     /* associate this simcall to the the action */
721     xbt_fifo_push(action->simcalls, simcall);
722
723     /* see if the action is already finished */
724     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
725       SIMIX_comm_finish(action);
726       break;
727     }
728   }
729 }
730
731 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
732 {
733   smx_action_t action;
734   unsigned int cursor = 0;
735   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
736
737   xbt_dynar_foreach(actions, cursor, action) {
738     xbt_fifo_remove(action->simcalls, simcall);
739   }
740 }
741
742 /**
743  *  \brief Starts the simulation of a communication action.
744  *  \param action the communication action
745  */
746 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
747 {
748   /* If both the sender and the receiver are already there, start the communication */
749   if (action->state == SIMIX_READY) {
750
751     smx_host_t sender = action->comm.src_proc->smx_host;
752     smx_host_t receiver = action->comm.dst_proc->smx_host;
753
754     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
755               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
756
757     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
758                                                                     sender, receiver,
759                                                                     action->comm.task_size, action->comm.rate);
760
761     surf_action_set_data(action->comm.surf_comm, action);
762
763     action->state = SIMIX_RUNNING;
764
765     /* If a link is failed, detect it immediately */
766     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
767       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
768                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
769       action->state = SIMIX_LINK_FAILURE;
770       SIMIX_comm_destroy_internal_actions(action);
771     }
772
773     /* If any of the process is suspend, create the action but stop its execution,
774        it will be restarted when the sender process resume */
775     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
776         SIMIX_process_is_suspended(action->comm.dst_proc)) {
777       /* FIXME: check what should happen with the action state */
778
779       if (SIMIX_process_is_suspended(action->comm.src_proc))
780         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
781                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
782       else
783         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
784                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
785
786       surf_action_suspend(action->comm.surf_comm);
787
788     }
789   }
790 }
791
792 /**
793  * \brief Answers the SIMIX simcalls associated to a communication action.
794  * \param action a finished communication action
795  */
796 void SIMIX_comm_finish(smx_action_t action)
797 {
798   unsigned int destroy_count = 0;
799   smx_simcall_t simcall;
800
801
802   while ((simcall = xbt_fifo_shift(action->simcalls))) {
803
804     /* If a waitany simcall is waiting for this action to finish, then remove
805        it from the other actions in the waitany list. Afterwards, get the
806        position of the actual action in the waitany dynar and
807        return it as the result of the simcall */
808     if (simcall->call == SIMCALL_COMM_WAITANY) {
809       SIMIX_waitany_remove_simcall_from_actions(simcall);
810       if (!MC_is_active())
811         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
812     }
813
814     /* If the action is still in a rendez-vous point then remove from it */
815     if (action->comm.rdv)
816       SIMIX_rdv_remove(action->comm.rdv, action);
817
818     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
819
820     /* Check out for errors */
821     switch (action->state) {
822
823     case SIMIX_DONE:
824       XBT_DEBUG("Communication %p complete!", action);
825       SIMIX_comm_copy_data(action);
826       break;
827
828     case SIMIX_SRC_TIMEOUT:
829       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
830                     "Communication timeouted because of sender");
831       break;
832
833     case SIMIX_DST_TIMEOUT:
834       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
835                     "Communication timeouted because of receiver");
836       break;
837
838     case SIMIX_SRC_HOST_FAILURE:
839       if (simcall->issuer == action->comm.src_proc)
840         simcall->issuer->context->iwannadie = 1;
841 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
842       else
843         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
844       break;
845
846     case SIMIX_DST_HOST_FAILURE:
847       if (simcall->issuer == action->comm.dst_proc)
848         simcall->issuer->context->iwannadie = 1;
849 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
850       else
851         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
852       break;
853
854     case SIMIX_LINK_FAILURE:
855       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
856                 action,
857                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
858                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
859                 simcall->issuer->name, simcall->issuer, action->comm.detached);
860       if (action->comm.src_proc == simcall->issuer) {
861         XBT_DEBUG("I'm source");
862       } else if (action->comm.dst_proc == simcall->issuer) {
863         XBT_DEBUG("I'm dest");
864       } else {
865         XBT_DEBUG("I'm neither source nor dest");
866       }
867       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
868       break;
869
870     case SIMIX_CANCELED:
871       if (simcall->issuer == action->comm.dst_proc)
872         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
873                       "Communication canceled by the sender");
874       else
875         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
876                       "Communication canceled by the receiver");
877       break;
878
879     default:
880       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
881     }
882
883     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
884     if (simcall->issuer->doexception) {
885       if (simcall->call == SIMCALL_COMM_WAITANY) {
886         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
887       }
888       else if (simcall->call == SIMCALL_COMM_TESTANY) {
889         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
890       }
891     }
892
893     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
894       simcall->issuer->context->iwannadie = 1;
895     }
896
897     simcall->issuer->waiting_action = NULL;
898     xbt_fifo_remove(simcall->issuer->comms, action);
899     if(action->comm.detached){
900       if(simcall->issuer == action->comm.src_proc){
901         if(action->comm.dst_proc)
902           xbt_fifo_remove(action->comm.dst_proc->comms, action);
903       }
904       if(simcall->issuer == action->comm.dst_proc){
905         if(action->comm.src_proc)
906           xbt_fifo_remove(action->comm.src_proc->comms, action);
907       }
908     }
909     SIMIX_simcall_answer(simcall);
910     destroy_count++;
911   }
912
913   while (destroy_count-- > 0)
914     SIMIX_comm_destroy(action);
915 }
916
917 /**
918  * \brief This function is called when a Surf communication action is finished.
919  * \param action the corresponding Simix communication
920  */
921 void SIMIX_post_comm(smx_action_t action)
922 {
923   /* Update action state */
924   if (action->comm.src_timeout &&
925       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
926     action->state = SIMIX_SRC_TIMEOUT;
927   else if (action->comm.dst_timeout &&
928           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
929     action->state = SIMIX_DST_TIMEOUT;
930   else if (action->comm.src_timeout &&
931           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
932     action->state = SIMIX_SRC_HOST_FAILURE;
933   else if (action->comm.dst_timeout &&
934       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
935     action->state = SIMIX_DST_HOST_FAILURE;
936   else if (action->comm.surf_comm &&
937           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
938     XBT_DEBUG("Puta madre. Surf says that the link broke");
939     action->state = SIMIX_LINK_FAILURE;
940   } else
941     action->state = SIMIX_DONE;
942
943   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
944             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
945
946   /* destroy the surf actions associated with the Simix communication */
947   SIMIX_comm_destroy_internal_actions(action);
948
949   /* remove the communication action from the list of pending communications
950    * of both processes (if they still exist) */
951   if (action->comm.src_proc) {
952     xbt_fifo_remove(action->comm.src_proc->comms, action);
953   }
954   if (action->comm.dst_proc) {
955     xbt_fifo_remove(action->comm.dst_proc->comms, action);
956   }
957
958   /* if there are simcalls associated with the action, then answer them */
959   if (xbt_fifo_size(action->simcalls)) {
960     SIMIX_comm_finish(action);
961   }
962 }
963
964 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
965   SIMIX_comm_cancel(action);
966 }
967 void SIMIX_comm_cancel(smx_action_t action)
968 {
969   /* if the action is a waiting state means that it is still in a rdv */
970   /* so remove from it and delete it */
971   if (action->state == SIMIX_WAITING) {
972     SIMIX_rdv_remove(action->comm.rdv, action);
973     action->state = SIMIX_CANCELED;
974   }
975   else if (!MC_is_active() /* when running the MC there are no surf actions */
976            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
977
978     surf_action_cancel(action->comm.surf_comm);
979   }
980 }
981
982 void SIMIX_comm_suspend(smx_action_t action)
983 {
984   /*FIXME: shall we suspend also the timeout actions? */
985   if (action->comm.surf_comm)
986     surf_action_suspend(action->comm.surf_comm);
987   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
988 }
989
990 void SIMIX_comm_resume(smx_action_t action)
991 {
992   /*FIXME: check what happen with the timeouts */
993   if (action->comm.surf_comm)
994     surf_action_resume(action->comm.surf_comm);
995   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
996 }
997
998
999 /************* Action Getters **************/
1000
1001 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1002   return SIMIX_comm_get_remains(action);
1003 }
1004 /**
1005  *  \brief get the amount remaining from the communication
1006  *  \param action The communication
1007  */
1008 double SIMIX_comm_get_remains(smx_action_t action)
1009 {
1010   double remains;
1011
1012   if(!action){
1013     return 0;
1014   }
1015
1016   switch (action->state) {
1017
1018   case SIMIX_RUNNING:
1019     remains = surf_action_get_remains(action->comm.surf_comm);
1020     break;
1021
1022   case SIMIX_WAITING:
1023   case SIMIX_READY:
1024     remains = 0; /*FIXME: check what should be returned */
1025     break;
1026
1027   default:
1028     remains = 0; /*FIXME: is this correct? */
1029     break;
1030   }
1031   return remains;
1032 }
1033
1034 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1035   return SIMIX_comm_get_state(action);
1036 }
1037 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1038 {
1039   return action->state;
1040 }
1041
1042 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1043   return SIMIX_comm_get_src_data(action);
1044 }
1045 /**
1046  *  \brief Return the user data associated to the sender of the communication
1047  *  \param action The communication
1048  *  \return the user data
1049  */
1050 void* SIMIX_comm_get_src_data(smx_action_t action)
1051 {
1052   return action->comm.src_data;
1053 }
1054
1055 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1056   return SIMIX_comm_get_dst_data(action);
1057 }
1058 /**
1059  *  \brief Return the user data associated to the receiver of the communication
1060  *  \param action The communication
1061  *  \return the user data
1062  */
1063 void* SIMIX_comm_get_dst_data(smx_action_t action)
1064 {
1065   return action->comm.dst_data;
1066 }
1067
1068 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1069   return SIMIX_comm_get_src_proc(action);
1070 }
1071 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1072 {
1073   return action->comm.src_proc;
1074 }
1075
1076 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1077   return SIMIX_comm_get_dst_proc(action);
1078 }
1079 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1080 {
1081   return action->comm.dst_proc;
1082 }
1083
1084 #ifdef HAVE_LATENCY_BOUND_TRACKING
1085 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1086 {
1087   return SIMIX_comm_is_latency_bounded(action);
1088 }
1089
1090 /**
1091  *  \brief verify if communication is latency bounded
1092  *  \param comm The communication
1093  */
1094 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1095 {
1096   if(!action){
1097     return 0;
1098   }
1099   if (action->comm.surf_comm){
1100     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1101     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1102     XBT_DEBUG("Action limited is %d", action->latency_limited);
1103   }
1104   return action->latency_limited;
1105 }
1106 #endif
1107
1108 /******************************************************************************/
1109 /*                    SIMIX_comm_copy_data callbacks                       */
1110 /******************************************************************************/
1111 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1112   &SIMIX_comm_copy_pointer_callback;
1113
1114 void
1115 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1116 {
1117   SIMIX_comm_copy_data_callback = callback;
1118 }
1119
1120 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1121 {
1122   xbt_assert((buff_size == sizeof(void *)),
1123              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1124   *(void **) (comm->comm.dst_buff) = buff;
1125 }
1126
1127 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1128 {
1129   XBT_DEBUG("Copy the data over");
1130   memcpy(comm->comm.dst_buff, buff, buff_size);
1131   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1132     xbt_free(buff);
1133     comm->comm.src_buff = NULL;
1134   }
1135 }
1136
1137
1138 /**
1139  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1140  *  \param comm The communication
1141  */
1142 void SIMIX_comm_copy_data(smx_action_t comm)
1143 {
1144   size_t buff_size = comm->comm.src_buff_size;
1145   /* If there is no data to be copy then return */
1146   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1147     return;
1148
1149   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1150             comm,
1151             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1152             comm->comm.src_buff,
1153             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1154             comm->comm.dst_buff, buff_size);
1155
1156   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1157   if (comm->comm.dst_buff_size)
1158     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1159
1160   /* Update the receiver's buffer size to the copied amount */
1161   if (comm->comm.dst_buff_size)
1162     *comm->comm.dst_buff_size = buff_size;
1163
1164   if (buff_size > 0){
1165       if(comm->comm.copy_data_fun)
1166         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1167       else
1168         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1169   }
1170
1171
1172   /* Set the copied flag so we copy data only once */
1173   /* (this function might be called from both communication ends) */
1174   comm->comm.copied = 1;
1175 }