Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'mc'
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "Logging specific to SIMIX (network)");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_action_t),
26                                         void *user_data, smx_action_t my_action);
27 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_action_t),
29                                         void *user_data, smx_action_t my_action);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_action_t action);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);  
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 /**
294  *  \brief Destroy a communicate action
295  *  \param action The communicate action to be destroyed
296  */
297 void SIMIX_comm_destroy(smx_action_t action)
298 {
299   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300             action, action->comm.refcount, (int)action->state);
301
302   if (action->comm.refcount <= 0) {
303     xbt_backtrace_display_current();
304     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
306   }
307   action->comm.refcount--;
308   if (action->comm.refcount > 0)
309       return;
310   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311             action->comm.refcount);
312
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
315 #endif
316
317   xbt_free(action->name);
318   SIMIX_comm_destroy_internal_actions(action);
319
320   if (action->comm.detached && action->state != SIMIX_DONE) {
321     /* the communication has failed and was detached:
322      * we have to free the buffer */
323     if (action->comm.clean_fun) {
324       action->comm.clean_fun(action->comm.src_buff);
325     }
326     action->comm.src_buff = NULL;
327   }
328
329   if(action->comm.rdv)
330     SIMIX_rdv_remove(action->comm.rdv, action);
331
332   xbt_mallocator_release(simix_global->action_mallocator, action);
333 }
334
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
336 {
337   if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
340 #endif
341     surf_action_unref(action->comm.surf_comm);
342     action->comm.surf_comm = NULL;
343   }
344
345   if (action->comm.src_timeout){
346     surf_action_unref(action->comm.src_timeout);
347     action->comm.src_timeout = NULL;
348   }
349
350   if (action->comm.dst_timeout){
351     surf_action_unref(action->comm.dst_timeout);
352     action->comm.dst_timeout = NULL;
353   }
354 }
355
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
357                                   double task_size, double rate,
358                                   void *src_buff, size_t src_buff_size,
359                                   int (*match_fun)(void *, void *,smx_action_t),
360                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
361                                   void *data, double timeout){
362   smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
363                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
364                                        data, 0);
365   SIMCALL_SET_MC_VALUE(simcall, 0);
366   SIMIX_pre_comm_wait(simcall, comm, timeout);
367 }
368 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
369                                   double task_size, double rate,
370                                   void *src_buff, size_t src_buff_size,
371                                   int (*match_fun)(void *, void *,smx_action_t),
372                                   void (*clean_fun)(void *), 
373                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
374                                   void *data, int detached){
375   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
376                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
377
378 }
379 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
380                               double task_size, double rate,
381                               void *src_buff, size_t src_buff_size,
382                               int (*match_fun)(void *, void *,smx_action_t),
383                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
384                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445   other_action->comm.copy_data_fun = copy_data_fun;
446
447
448   if (MC_is_active()) {
449     other_action->state = SIMIX_RUNNING;
450     return (detached ? NULL : other_action);
451   }
452
453   SIMIX_comm_start(other_action);
454   return (detached ? NULL : other_action);
455 }
456
457 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
458                          void *dst_buff, size_t *dst_buff_size,
459                          int (*match_fun)(void *, void *, smx_action_t),
460                          void (*copy_data_fun)(smx_action_t, void*, size_t),
461                          void *data, double timeout, double rate)
462 {
463   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
464                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
465   SIMCALL_SET_MC_VALUE(simcall, 0);
466   SIMIX_pre_comm_wait(simcall, comm, timeout);
467 }
468
469 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
470                                   void *dst_buff, size_t *dst_buff_size,
471                                   int (*match_fun)(void *, void *, smx_action_t),
472                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
473                                   void *data, double rate)
474 {
475   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
476                           match_fun, copy_data_fun, data, rate);
477 }
478
479 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
480                               void *dst_buff, size_t *dst_buff_size,
481                               int (*match_fun)(void *, void *, smx_action_t),
482                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
483                               void *data, double rate)
484 {
485   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
486   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
487
488   smx_action_t other_action;
489   //communication already done, get it inside the fifo of completed comms
490   //permanent receive v1
491   //int already_received=0;
492   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
493
494     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
495     //find a match in the already received fifo
496     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
497     //if not found, assume the receiver came first, register it to the mailbox in the classical way
498     if (!other_action)  {
499       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
500       other_action = this_action;
501       SIMIX_rdv_push(rdv, this_action);
502     }else{
503       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
504       {
505         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
506         other_action->state = SIMIX_DONE;
507         other_action->comm.type = SIMIX_COMM_DONE;
508         other_action->comm.rdv = NULL;
509         //SIMIX_comm_destroy(this_action);
510         //--smx_total_comms; // this creation was a pure waste
511         //already_received=1;
512         //other_action->comm.refcount--;
513       }/*else{
514          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
515          }*/
516       other_action->comm.refcount--;
517       SIMIX_comm_destroy(this_action);
518       --smx_total_comms; // this creation was a pure waste
519     }
520   }else{
521     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
522
523     /* Look for communication action matching our needs. We also provide a description of
524      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
525      *
526      * If it is not found then push our communication into the rendez-vous point */
527     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
528
529     if (!other_action) {
530       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
531       other_action = this_action;
532       SIMIX_rdv_push(rdv, this_action);
533     } else {
534       SIMIX_comm_destroy(this_action);
535       --smx_total_comms; // this creation was a pure waste
536       other_action->state = SIMIX_READY;
537       other_action->comm.type = SIMIX_COMM_READY;
538       //other_action->comm.refcount--;
539     }
540     xbt_fifo_push(dst_proc->comms, other_action);
541   }
542
543   /* Setup communication action */
544   other_action->comm.dst_proc = dst_proc;
545   other_action->comm.dst_buff = dst_buff;
546   other_action->comm.dst_buff_size = dst_buff_size;
547   other_action->comm.dst_data = data;
548
549   if (rate != -1.0 &&
550       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
551     other_action->comm.rate = rate;
552
553   other_action->comm.match_fun = match_fun;
554   other_action->comm.copy_data_fun = copy_data_fun;
555
556
557   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
558     SIMIX_comm_copy_data(other_action);*/
559
560
561   if (MC_is_active()) {
562     other_action->state = SIMIX_RUNNING;
563     return other_action;
564   }
565
566   SIMIX_comm_start(other_action);
567   // }
568   return other_action;
569 }
570
571 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
572                                    int src, int tag,
573                                    int (*match_fun)(void *, void *, smx_action_t),
574                                    void *data){
575   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
576 }
577
578 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
579                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
580 {
581   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
582   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
583
584   smx_action_t other_action=NULL;
585   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586     //find a match in the already received fifo
587       XBT_DEBUG("first try in the perm recv mailbox \n");
588
589     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
590   }
591  // }else{
592     if(!other_action){
593         XBT_DEBUG("second try in the other mailbox");
594         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
595     }
596 //  }
597   if(other_action)other_action->comm.refcount--;
598
599   SIMIX_comm_destroy(this_action);
600   --smx_total_comms;
601   return other_action;
602 }
603
604 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
605 {
606   /* the simcall may be a wait, a send or a recv */
607   surf_action_t sleep;
608
609   /* Associate this simcall to the wait action */
610   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
611
612   xbt_fifo_push(action->simcalls, simcall);
613   simcall->issuer->waiting_action = action;
614
615   if (MC_is_active()) {
616     int idx = SIMCALL_GET_MC_VALUE(simcall);
617     if (idx == 0) {
618       action->state = SIMIX_DONE;
619     } else {
620       /* If we reached this point, the wait simcall must have a timeout */
621       /* Otherwise it shouldn't be enabled and executed by the MC */
622       if (timeout == -1)
623         THROW_IMPOSSIBLE;
624
625       if (action->comm.src_proc == simcall->issuer)
626         action->state = SIMIX_SRC_TIMEOUT;
627       else
628         action->state = SIMIX_DST_TIMEOUT;
629     }
630
631     SIMIX_comm_finish(action);
632     return;
633   }
634
635   /* If the action has already finish perform the error handling, */
636   /* otherwise set up a waiting timeout on the right side         */
637   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
638     SIMIX_comm_finish(action);
639   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
640     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
641     surf_action_set_data(sleep, action);
642
643     if (simcall->issuer == action->comm.src_proc)
644       action->comm.src_timeout = sleep;
645     else
646       action->comm.dst_timeout = sleep;
647   }
648 }
649
650 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
651 {
652   if(MC_is_active()){
653     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
654     if(simcall_comm_test__get__result(simcall)){
655       action->state = SIMIX_DONE;
656       xbt_fifo_push(action->simcalls, simcall);
657       SIMIX_comm_finish(action);
658     }else{
659       SIMIX_simcall_answer(simcall);
660     }
661     return;
662   }
663
664   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
665   if (simcall_comm_test__get__result(simcall)) {
666     xbt_fifo_push(action->simcalls, simcall);
667     SIMIX_comm_finish(action);
668   } else {
669     SIMIX_simcall_answer(simcall);
670   }
671 }
672
673 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
674 {
675   unsigned int cursor;
676   smx_action_t action;
677   simcall_comm_testany__set__result(simcall, -1);
678
679   if (MC_is_active()){
680     int idx = SIMCALL_GET_MC_VALUE(simcall);
681     if(idx == -1){
682       SIMIX_simcall_answer(simcall);
683     }else{
684       action = xbt_dynar_get_as(actions, idx, smx_action_t);
685       simcall_comm_testany__set__result(simcall, idx);
686       xbt_fifo_push(action->simcalls, simcall);
687       action->state = SIMIX_DONE;
688       SIMIX_comm_finish(action);
689     }
690     return;
691   }
692
693   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
694     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
695       simcall_comm_testany__set__result(simcall, cursor);
696       xbt_fifo_push(action->simcalls, simcall);
697       SIMIX_comm_finish(action);
698       return;
699     }
700   }
701   SIMIX_simcall_answer(simcall);
702 }
703
704 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
705 {
706   smx_action_t action;
707   unsigned int cursor = 0;
708
709   if (MC_is_active()){
710     int idx = SIMCALL_GET_MC_VALUE(simcall);
711     action = xbt_dynar_get_as(actions, idx, smx_action_t);
712     xbt_fifo_push(action->simcalls, simcall);
713     simcall_comm_waitany__set__result(simcall, idx);
714     action->state = SIMIX_DONE;
715     SIMIX_comm_finish(action);
716     return;
717   }
718
719   xbt_dynar_foreach(actions, cursor, action){
720     /* associate this simcall to the the action */
721     xbt_fifo_push(action->simcalls, simcall);
722
723     /* see if the action is already finished */
724     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
725       SIMIX_comm_finish(action);
726       break;
727     }
728   }
729 }
730
731 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
732 {
733   smx_action_t action;
734   unsigned int cursor = 0;
735   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
736
737   xbt_dynar_foreach(actions, cursor, action) {
738     xbt_fifo_remove(action->simcalls, simcall);
739   }
740 }
741
742 /**
743  *  \brief Starts the simulation of a communication action.
744  *  \param action the communication action
745  */
746 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
747 {
748   /* If both the sender and the receiver are already there, start the communication */
749   if (action->state == SIMIX_READY) {
750
751     smx_host_t sender = action->comm.src_proc->smx_host;
752     smx_host_t receiver = action->comm.dst_proc->smx_host;
753
754     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
755               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
756
757     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
758                                                                     sender, receiver,
759                                                                     action->comm.task_size, action->comm.rate);
760
761     surf_action_set_data(action->comm.surf_comm, action);
762
763     action->state = SIMIX_RUNNING;
764
765     /* If a link is failed, detect it immediately */
766     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
767       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
768                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
769       action->state = SIMIX_LINK_FAILURE;
770       SIMIX_comm_destroy_internal_actions(action);
771     }
772
773     /* If any of the process is suspend, create the action but stop its execution,
774        it will be restarted when the sender process resume */
775     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
776         SIMIX_process_is_suspended(action->comm.dst_proc)) {
777       /* FIXME: check what should happen with the action state */
778
779       if (SIMIX_process_is_suspended(action->comm.src_proc))
780         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
781                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
782       else
783         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
784                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
785
786       surf_action_suspend(action->comm.surf_comm);
787
788     }
789   }
790 }
791
792 /**
793  * \brief Answers the SIMIX simcalls associated to a communication action.
794  * \param action a finished communication action
795  */
796 void SIMIX_comm_finish(smx_action_t action)
797 {
798   unsigned int destroy_count = 0;
799   smx_simcall_t simcall;
800
801
802   while ((simcall = xbt_fifo_shift(action->simcalls))) {
803
804     /* If a waitany simcall is waiting for this action to finish, then remove
805        it from the other actions in the waitany list. Afterwards, get the
806        position of the actual action in the waitany dynar and
807        return it as the result of the simcall */
808     if (simcall->call == SIMCALL_COMM_WAITANY) {
809       SIMIX_waitany_remove_simcall_from_actions(simcall);
810       if (!MC_is_active())
811         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
812     }
813
814     /* If the action is still in a rendez-vous point then remove from it */
815     if (action->comm.rdv)
816       SIMIX_rdv_remove(action->comm.rdv, action);
817
818     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
819
820     /* Check out for errors */
821     switch (action->state) {
822
823     case SIMIX_DONE:
824       XBT_DEBUG("Communication %p complete!", action);
825       SIMIX_comm_copy_data(action);
826       break;
827
828     case SIMIX_SRC_TIMEOUT:
829       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
830                     "Communication timeouted because of sender");
831       break;
832
833     case SIMIX_DST_TIMEOUT:
834       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
835                     "Communication timeouted because of receiver");
836       break;
837
838     case SIMIX_SRC_HOST_FAILURE:
839       if (simcall->issuer == action->comm.src_proc)
840         simcall->issuer->context->iwannadie = 1;
841 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
842       else
843         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
844       break;
845
846     case SIMIX_DST_HOST_FAILURE:
847       if (simcall->issuer == action->comm.dst_proc)
848         simcall->issuer->context->iwannadie = 1;
849 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
850       else
851         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
852       break;
853
854     case SIMIX_LINK_FAILURE:
855       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
856                 action,
857                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
858                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
859                 simcall->issuer->name, simcall->issuer, action->comm.detached);
860       if (action->comm.src_proc == simcall->issuer) {
861         XBT_DEBUG("I'm source");
862       } else if (action->comm.dst_proc == simcall->issuer) {
863         XBT_DEBUG("I'm dest");
864       } else {
865         XBT_DEBUG("I'm neither source nor dest");
866       }
867       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
868       break;
869
870     case SIMIX_CANCELED:
871       if (simcall->issuer == action->comm.dst_proc)
872         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
873                       "Communication canceled by the sender");
874       else
875         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
876                       "Communication canceled by the receiver");
877       break;
878
879     default:
880       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
881     }
882
883     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
884     if (simcall->issuer->doexception) {
885       if (simcall->call == SIMCALL_COMM_WAITANY) {
886         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
887       }
888       else if (simcall->call == SIMCALL_COMM_TESTANY) {
889         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
890       }
891     }
892
893     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
894       simcall->issuer->context->iwannadie = 1;
895     }
896
897     simcall->issuer->waiting_action = NULL;
898     xbt_fifo_remove(simcall->issuer->comms, action);
899     if(action->comm.detached){
900       smx_process_t proc;
901       int still_alive = 0;
902
903       if(simcall->issuer == action->comm.src_proc){
904         if(action->comm.dst_proc){
905             xbt_swag_foreach(proc, simix_global->process_list)
906             {
907                if(proc==action->comm.dst_proc){
908                    still_alive=1;
909                    break;
910                }
911             }
912         }
913         if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
914       }
915       if(simcall->issuer == action->comm.dst_proc){
916         if(action->comm.src_proc)
917           if(action->comm.dst_proc){
918             xbt_swag_foreach(proc, simix_global->process_list)
919             {
920               if(proc==action->comm.src_proc){
921                   still_alive=1;
922                   break;
923               }
924             }
925           }
926           if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
927       }
928     }
929     SIMIX_simcall_answer(simcall);
930     destroy_count++;
931   }
932
933   while (destroy_count-- > 0)
934     SIMIX_comm_destroy(action);
935 }
936
937 /**
938  * \brief This function is called when a Surf communication action is finished.
939  * \param action the corresponding Simix communication
940  */
941 void SIMIX_post_comm(smx_action_t action)
942 {
943   /* Update action state */
944   if (action->comm.src_timeout &&
945       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
946     action->state = SIMIX_SRC_TIMEOUT;
947   else if (action->comm.dst_timeout &&
948           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
949     action->state = SIMIX_DST_TIMEOUT;
950   else if (action->comm.src_timeout &&
951           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
952     action->state = SIMIX_SRC_HOST_FAILURE;
953   else if (action->comm.dst_timeout &&
954       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
955     action->state = SIMIX_DST_HOST_FAILURE;
956   else if (action->comm.surf_comm &&
957           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
958     XBT_DEBUG("Puta madre. Surf says that the link broke");
959     action->state = SIMIX_LINK_FAILURE;
960   } else
961     action->state = SIMIX_DONE;
962
963   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
964             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
965
966   /* destroy the surf actions associated with the Simix communication */
967   SIMIX_comm_destroy_internal_actions(action);
968
969   /* remove the communication action from the list of pending communications
970    * of both processes (if they still exist) */
971   if (action->comm.src_proc) {
972     xbt_fifo_remove(action->comm.src_proc->comms, action);
973   }
974   if (action->comm.dst_proc) {
975     xbt_fifo_remove(action->comm.dst_proc->comms, action);
976   }
977
978   /* if there are simcalls associated with the action, then answer them */
979   if (xbt_fifo_size(action->simcalls)) {
980     SIMIX_comm_finish(action);
981   }
982 }
983
984 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
985   SIMIX_comm_cancel(action);
986 }
987 void SIMIX_comm_cancel(smx_action_t action)
988 {
989   /* if the action is a waiting state means that it is still in a rdv */
990   /* so remove from it and delete it */
991   if (action->state == SIMIX_WAITING) {
992     SIMIX_rdv_remove(action->comm.rdv, action);
993     action->state = SIMIX_CANCELED;
994   }
995   else if (!MC_is_active() /* when running the MC there are no surf actions */
996            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
997
998     surf_action_cancel(action->comm.surf_comm);
999   }
1000 }
1001
1002 void SIMIX_comm_suspend(smx_action_t action)
1003 {
1004   /*FIXME: shall we suspend also the timeout actions? */
1005   if (action->comm.surf_comm)
1006     surf_action_suspend(action->comm.surf_comm);
1007   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1008 }
1009
1010 void SIMIX_comm_resume(smx_action_t action)
1011 {
1012   /*FIXME: check what happen with the timeouts */
1013   if (action->comm.surf_comm)
1014     surf_action_resume(action->comm.surf_comm);
1015   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1016 }
1017
1018
1019 /************* Action Getters **************/
1020
1021 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1022   return SIMIX_comm_get_remains(action);
1023 }
1024 /**
1025  *  \brief get the amount remaining from the communication
1026  *  \param action The communication
1027  */
1028 double SIMIX_comm_get_remains(smx_action_t action)
1029 {
1030   double remains;
1031
1032   if(!action){
1033     return 0;
1034   }
1035
1036   switch (action->state) {
1037
1038   case SIMIX_RUNNING:
1039     remains = surf_action_get_remains(action->comm.surf_comm);
1040     break;
1041
1042   case SIMIX_WAITING:
1043   case SIMIX_READY:
1044     remains = 0; /*FIXME: check what should be returned */
1045     break;
1046
1047   default:
1048     remains = 0; /*FIXME: is this correct? */
1049     break;
1050   }
1051   return remains;
1052 }
1053
1054 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1055   return SIMIX_comm_get_state(action);
1056 }
1057 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1058 {
1059   return action->state;
1060 }
1061
1062 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1063   return SIMIX_comm_get_src_data(action);
1064 }
1065 /**
1066  *  \brief Return the user data associated to the sender of the communication
1067  *  \param action The communication
1068  *  \return the user data
1069  */
1070 void* SIMIX_comm_get_src_data(smx_action_t action)
1071 {
1072   return action->comm.src_data;
1073 }
1074
1075 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1076   return SIMIX_comm_get_dst_data(action);
1077 }
1078 /**
1079  *  \brief Return the user data associated to the receiver of the communication
1080  *  \param action The communication
1081  *  \return the user data
1082  */
1083 void* SIMIX_comm_get_dst_data(smx_action_t action)
1084 {
1085   return action->comm.dst_data;
1086 }
1087
1088 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1089   return SIMIX_comm_get_src_proc(action);
1090 }
1091 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1092 {
1093   return action->comm.src_proc;
1094 }
1095
1096 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1097   return SIMIX_comm_get_dst_proc(action);
1098 }
1099 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1100 {
1101   return action->comm.dst_proc;
1102 }
1103
1104 #ifdef HAVE_LATENCY_BOUND_TRACKING
1105 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1106 {
1107   return SIMIX_comm_is_latency_bounded(action);
1108 }
1109
1110 /**
1111  *  \brief verify if communication is latency bounded
1112  *  \param comm The communication
1113  */
1114 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1115 {
1116   if(!action){
1117     return 0;
1118   }
1119   if (action->comm.surf_comm){
1120     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1121     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1122     XBT_DEBUG("Action limited is %d", action->latency_limited);
1123   }
1124   return action->latency_limited;
1125 }
1126 #endif
1127
1128 /******************************************************************************/
1129 /*                    SIMIX_comm_copy_data callbacks                       */
1130 /******************************************************************************/
1131 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1132   &SIMIX_comm_copy_pointer_callback;
1133
1134 void
1135 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1136 {
1137   SIMIX_comm_copy_data_callback = callback;
1138 }
1139
1140 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1141 {
1142   xbt_assert((buff_size == sizeof(void *)),
1143              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1144   *(void **) (comm->comm.dst_buff) = buff;
1145 }
1146
1147 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1148 {
1149   XBT_DEBUG("Copy the data over");
1150   memcpy(comm->comm.dst_buff, buff, buff_size);
1151   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1152     xbt_free(buff);
1153     comm->comm.src_buff = NULL;
1154   }
1155 }
1156
1157
1158 /**
1159  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1160  *  \param comm The communication
1161  */
1162 void SIMIX_comm_copy_data(smx_action_t comm)
1163 {
1164   size_t buff_size = comm->comm.src_buff_size;
1165   /* If there is no data to be copy then return */
1166   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1167     return;
1168
1169   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1170             comm,
1171             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1172             comm->comm.src_buff,
1173             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1174             comm->comm.dst_buff, buff_size);
1175
1176   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1177   if (comm->comm.dst_buff_size)
1178     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1179
1180   /* Update the receiver's buffer size to the copied amount */
1181   if (comm->comm.dst_buff_size)
1182     *comm->comm.dst_buff_size = buff_size;
1183
1184   if (buff_size > 0){
1185       if(comm->comm.copy_data_fun)
1186         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1187       else
1188         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1189   }
1190
1191
1192   /* Set the copied flag so we copy data only once */
1193   /* (this function might be called from both communication ends) */
1194   comm->comm.copied = 1;
1195 }