Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7c0b9ff264c1db62fb434d3db103bf57a86f7814
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "Logging specific to SIMIX (network)");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_action_t),
26                                         void *user_data, smx_action_t my_action);
27 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_action_t),
29                                         void *user_data, smx_action_t my_action);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_action_t action);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t simcall_HANDLER_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void simcall_HANDLER_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
96 {
97   return xbt_dict_get_or_null(rdv_points, name);
98 }
99
100 unsigned int simcall_HANDLER_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
101   return SIMIX_rdv_comm_count_by_host(rdv, host);
102 }
103 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
104 {
105   smx_action_t comm = NULL;
106   xbt_fifo_item_t item = NULL;
107   int count = 0;
108
109   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
110     if (comm->comm.src_proc->smx_host == host)
111       count++;
112   }
113
114   return count;
115 }
116
117 smx_action_t simcall_HANDLER_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
118   return SIMIX_rdv_get_head(rdv);
119 }
120 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
121 {
122   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
123 }
124
125 smx_process_t simcall_HANDLER_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
126   return SIMIX_rdv_get_receiver(rdv);
127 }
128 /**
129  *  \brief get the receiver (process associated to the mailbox)
130  *  \param rdv The rendez-vous point
131  *  \return process The receiving process (NULL if not set)
132  */
133 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
134 {
135   return rdv->permanent_receiver;
136 }
137
138 void simcall_HANDLER_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
139                             smx_process_t process){
140   SIMIX_rdv_set_receiver(rdv, process);
141 }
142 /**
143  *  \brief set the receiver of the rendez vous point to allow eager sends
144  *  \param rdv The rendez-vous point
145  *  \param process The receiving process
146  */
147 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
148 {
149   rdv->permanent_receiver=process;
150 }
151
152 /**
153  *  \brief Pushes a communication action into a rendez-vous point
154  *  \param rdv The rendez-vous point
155  *  \param comm The communication action
156  */
157 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
158 {
159   xbt_fifo_push(rdv->comm_fifo, comm);
160   comm->comm.rdv = rdv;
161 }
162
163 /**
164  *  \brief Removes a communication action from a rendez-vous point
165  *  \param rdv The rendez-vous point
166  *  \param comm The communication action
167  */
168 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
169 {
170   xbt_fifo_remove(rdv->comm_fifo, comm);
171   comm->comm.rdv = NULL;
172 }
173
174 /**
175  *  \brief Checks if there is a communication action queued in a fifo matching our needs
176  *  \param type The type of communication we are looking for (comm_send, comm_recv)
177  *  \return The communication action if found, NULL otherwise
178  */
179 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
180                                  int (*match_fun)(void *, void *,smx_action_t),
181                                  void *this_user_data, smx_action_t my_action)
182 {
183   smx_action_t action;
184   xbt_fifo_item_t item;
185   void* other_user_data = NULL;
186
187   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
188     if (action->comm.type == SIMIX_COMM_SEND) {
189       other_user_data = action->comm.src_data;
190     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
191       other_user_data = action->comm.dst_data;
192     }
193     if (action->comm.type == type &&
194         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
195         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
196       XBT_DEBUG("Found a matching communication action %p", action);
197       xbt_fifo_remove_item(fifo, item);
198       xbt_fifo_free_item(item);
199       action->comm.refcount++;
200 #ifdef HAVE_MC
201       action->comm.rdv_cpy = action->comm.rdv;
202 #endif
203       action->comm.rdv = NULL;
204       return action;
205     }
206     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
207               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
208               action, (int)action->comm.type, (int)type);
209   }
210   XBT_DEBUG("No matching communication action found");
211   return NULL;
212 }
213
214
215 /**
216  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
217  *  \param type The type of communication we are looking for (comm_send, comm_recv)
218  *  \return The communication action if found, NULL otherwise
219  */
220 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
221                                  int (*match_fun)(void *, void *,smx_action_t),
222                                  void *this_user_data, smx_action_t my_action)
223 {
224   smx_action_t action;
225   xbt_fifo_item_t item;
226   void* other_user_data = NULL;
227
228   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
229     if (action->comm.type == SIMIX_COMM_SEND) {
230       other_user_data = action->comm.src_data;
231     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
232       other_user_data = action->comm.dst_data;
233     }
234     if (action->comm.type == type &&
235         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
236         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
237       XBT_DEBUG("Found a matching communication action %p", action);
238       action->comm.refcount++;
239
240       return action;
241     }
242     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
243               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
244               action, (int)action->comm.type, (int)type);
245   }
246   XBT_DEBUG("No matching communication action found");
247   return NULL;
248 }
249 /******************************************************************************/
250 /*                            Communication Actions                            */
251 /******************************************************************************/
252
253 /**
254  *  \brief Creates a new communicate action
255  *  \param type The direction of communication (comm_send, comm_recv)
256  *  \return The new communicate action
257  */
258 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
259 {
260   smx_action_t act;
261
262   /* alloc structures */
263   act = xbt_mallocator_get(simix_global->action_mallocator);
264
265   act->type = SIMIX_ACTION_COMMUNICATE;
266   act->state = SIMIX_WAITING;
267
268   /* set communication */
269   act->comm.type = type;
270   act->comm.refcount = 1;
271   act->comm.src_data=NULL;
272   act->comm.dst_data=NULL;
273
274
275 #ifdef HAVE_LATENCY_BOUND_TRACKING
276   //initialize with unknown value
277   act->latency_limited = -1;
278 #endif
279
280 #ifdef HAVE_TRACING
281   act->category = NULL;
282 #endif
283
284   XBT_DEBUG("Create communicate action %p", act);
285   ++smx_total_comms;
286
287   return act;
288 }
289
290 /**
291  *  \brief Destroy a communicate action
292  *  \param action The communicate action to be destroyed
293  */
294 void SIMIX_comm_destroy(smx_action_t action)
295 {
296   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
297             action, action->comm.refcount, (int)action->state);
298
299   if (action->comm.refcount <= 0) {
300     xbt_backtrace_display_current();
301     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
302             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
303   }
304   action->comm.refcount--;
305   if (action->comm.refcount > 0)
306       return;
307   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
308             action->comm.refcount);
309
310 #ifdef HAVE_LATENCY_BOUND_TRACKING
311   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
312 #endif
313
314   xbt_free(action->name);
315   SIMIX_comm_destroy_internal_actions(action);
316
317   if (action->comm.detached && action->state != SIMIX_DONE) {
318     /* the communication has failed and was detached:
319      * we have to free the buffer */
320     if (action->comm.clean_fun) {
321       action->comm.clean_fun(action->comm.src_buff);
322     }
323     action->comm.src_buff = NULL;
324   }
325
326   if(action->comm.rdv)
327     SIMIX_rdv_remove(action->comm.rdv, action);
328
329   xbt_mallocator_release(simix_global->action_mallocator, action);
330 }
331
332 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
333 {
334   if (action->comm.surf_comm){
335 #ifdef HAVE_LATENCY_BOUND_TRACKING
336     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
337 #endif
338     surf_action_unref(action->comm.surf_comm);
339     action->comm.surf_comm = NULL;
340   }
341
342   if (action->comm.src_timeout){
343     surf_action_unref(action->comm.src_timeout);
344     action->comm.src_timeout = NULL;
345   }
346
347   if (action->comm.dst_timeout){
348     surf_action_unref(action->comm.dst_timeout);
349     action->comm.dst_timeout = NULL;
350   }
351 }
352
353 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
354                                   double task_size, double rate,
355                                   void *src_buff, size_t src_buff_size,
356                                   int (*match_fun)(void *, void *,smx_action_t),
357                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
358                                   void *data, double timeout){
359   smx_action_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
360                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
361                                        data, 0);
362   SIMCALL_SET_MC_VALUE(simcall, 0);
363   simcall_HANDLER_comm_wait(simcall, comm, timeout);
364 }
365 smx_action_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
366                                   double task_size, double rate,
367                                   void *src_buff, size_t src_buff_size,
368                                   int (*match_fun)(void *, void *,smx_action_t),
369                                   void (*clean_fun)(void *),
370                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
371                                   void *data, int detached){
372   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
373                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
374
375 }
376 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
377                               double task_size, double rate,
378                               void *src_buff, size_t src_buff_size,
379                               int (*match_fun)(void *, void *,smx_action_t),
380                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
381                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
382                               void *data,
383                               int detached)
384 {
385   XBT_DEBUG("send from %p", rdv);
386
387   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
388   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
389
390   /* Look for communication action matching our needs. We also provide a description of
391    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
392    *
393    * If it is not found then push our communication into the rendez-vous point */
394   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
395
396   if (!other_action) {
397     other_action = this_action;
398
399     if (rdv->permanent_receiver!=NULL){
400       //this mailbox is for small messages, which have to be sent right now
401       other_action->state = SIMIX_READY;
402       other_action->comm.dst_proc=rdv->permanent_receiver;
403       other_action->comm.refcount++;
404       xbt_fifo_push(rdv->done_comm_fifo,other_action);
405       other_action->comm.rdv=rdv;
406       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_action->comm));
407
408     }else{
409       SIMIX_rdv_push(rdv, this_action);
410     }
411   } else {
412     XBT_DEBUG("Receive already pushed");
413
414     SIMIX_comm_destroy(this_action);
415     --smx_total_comms; // this creation was a pure waste
416
417     other_action->state = SIMIX_READY;
418     other_action->comm.type = SIMIX_COMM_READY;
419
420   }
421   xbt_fifo_push(src_proc->comms, other_action);
422
423   /* if the communication action is detached then decrease the refcount
424    * by one, so it will be eliminated by the receiver's destroy call */
425   if (detached) {
426     other_action->comm.detached = 1;
427     other_action->comm.refcount--;
428     other_action->comm.clean_fun = clean_fun;
429   } else {
430     other_action->comm.clean_fun = NULL;
431   }
432
433   /* Setup the communication action */
434   other_action->comm.src_proc = src_proc;
435   other_action->comm.task_size = task_size;
436   other_action->comm.rate = rate;
437   other_action->comm.src_buff = src_buff;
438   other_action->comm.src_buff_size = src_buff_size;
439   other_action->comm.src_data = data;
440
441   other_action->comm.match_fun = match_fun;
442   other_action->comm.copy_data_fun = copy_data_fun;
443
444
445   if (MC_is_active()) {
446     other_action->state = SIMIX_RUNNING;
447     return (detached ? NULL : other_action);
448   }
449
450   SIMIX_comm_start(other_action);
451   return (detached ? NULL : other_action);
452 }
453
454 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
455                          void *dst_buff, size_t *dst_buff_size,
456                          int (*match_fun)(void *, void *, smx_action_t),
457                          void (*copy_data_fun)(smx_action_t, void*, size_t),
458                          void *data, double timeout, double rate)
459 {
460   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
461                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
462   SIMCALL_SET_MC_VALUE(simcall, 0);
463   simcall_HANDLER_comm_wait(simcall, comm, timeout);
464 }
465
466 smx_action_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
467                                   void *dst_buff, size_t *dst_buff_size,
468                                   int (*match_fun)(void *, void *, smx_action_t),
469                                   void (*copy_data_fun)(smx_action_t, void*, size_t),
470                                   void *data, double rate)
471 {
472   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
473                           match_fun, copy_data_fun, data, rate);
474 }
475
476 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
477                               void *dst_buff, size_t *dst_buff_size,
478                               int (*match_fun)(void *, void *, smx_action_t),
479                               void (*copy_data_fun)(smx_action_t, void*, size_t), // used to copy data if not default one
480                               void *data, double rate)
481 {
482   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
483   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
484
485   smx_action_t other_action;
486   //communication already done, get it inside the fifo of completed comms
487   //permanent receive v1
488   //int already_received=0;
489   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
490
491     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
492     //find a match in the already received fifo
493     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
494     //if not found, assume the receiver came first, register it to the mailbox in the classical way
495     if (!other_action)  {
496       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
497       other_action = this_action;
498       SIMIX_rdv_push(rdv, this_action);
499     }else{
500       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
501       {
502         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_action->comm));
503         other_action->state = SIMIX_DONE;
504         other_action->comm.type = SIMIX_COMM_DONE;
505         other_action->comm.rdv = NULL;
506       }/*else{
507          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
508          }*/
509       other_action->comm.refcount--;
510       SIMIX_comm_destroy(this_action);
511       --smx_total_comms; // this creation was a pure waste
512     }
513   }else{
514     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
515
516     /* Look for communication action matching our needs. We also provide a description of
517      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
518      *
519      * If it is not found then push our communication into the rendez-vous point */
520     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
521
522     if (!other_action) {
523       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
524       other_action = this_action;
525       SIMIX_rdv_push(rdv, this_action);
526     } else {
527       SIMIX_comm_destroy(this_action);
528       --smx_total_comms; // this creation was a pure waste
529       other_action->state = SIMIX_READY;
530       other_action->comm.type = SIMIX_COMM_READY;
531       //other_action->comm.refcount--;
532     }
533     xbt_fifo_push(dst_proc->comms, other_action);
534   }
535
536   /* Setup communication action */
537   other_action->comm.dst_proc = dst_proc;
538   other_action->comm.dst_buff = dst_buff;
539   other_action->comm.dst_buff_size = dst_buff_size;
540   other_action->comm.dst_data = data;
541
542   if (rate != -1.0 &&
543       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
544     other_action->comm.rate = rate;
545
546   other_action->comm.match_fun = match_fun;
547   other_action->comm.copy_data_fun = copy_data_fun;
548
549
550   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
551     SIMIX_comm_copy_data(other_action);*/
552
553
554   if (MC_is_active()) {
555     other_action->state = SIMIX_RUNNING;
556     return other_action;
557   }
558
559   SIMIX_comm_start(other_action);
560   // }
561   return other_action;
562 }
563
564 smx_action_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
565                                    int type, int src, int tag,
566                                    int (*match_fun)(void *, void *, smx_action_t),
567                                    void *data){
568   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
569 }
570
571 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
572                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
573 {
574   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
575   smx_action_t this_action;
576   int smx_type;
577   if(type == 1){
578     this_action=SIMIX_comm_new(SIMIX_COMM_SEND);
579     smx_type = SIMIX_COMM_RECEIVE;
580   } else{
581     this_action=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
582     smx_type = SIMIX_COMM_SEND;
583   } 
584   smx_action_t other_action=NULL;
585   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
586     //find a match in the already received fifo
587       XBT_DEBUG("first try in the perm recv mailbox");
588
589     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_action);
590   }
591  // }else{
592     if(!other_action){
593         XBT_DEBUG("try in the normal mailbox");
594         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_action);
595     }
596 //  }
597   if(other_action)other_action->comm.refcount--;
598
599   SIMIX_comm_destroy(this_action);
600   --smx_total_comms;
601   return other_action;
602 }
603
604 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
605 {
606   /* the simcall may be a wait, a send or a recv */
607   surf_action_t sleep;
608
609   /* Associate this simcall to the wait action */
610   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", action);
611
612   xbt_fifo_push(action->simcalls, simcall);
613   simcall->issuer->waiting_action = action;
614
615   if (MC_is_active()) {
616     int idx = SIMCALL_GET_MC_VALUE(simcall);
617     if (idx == 0) {
618       action->state = SIMIX_DONE;
619     } else {
620       /* If we reached this point, the wait simcall must have a timeout */
621       /* Otherwise it shouldn't be enabled and executed by the MC */
622       if (timeout == -1)
623         THROW_IMPOSSIBLE;
624
625       if (action->comm.src_proc == simcall->issuer)
626         action->state = SIMIX_SRC_TIMEOUT;
627       else
628         action->state = SIMIX_DST_TIMEOUT;
629     }
630
631     SIMIX_comm_finish(action);
632     return;
633   }
634
635   /* If the action has already finish perform the error handling, */
636   /* otherwise set up a waiting timeout on the right side         */
637   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
638     SIMIX_comm_finish(action);
639   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
640     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
641     surf_action_set_data(sleep, action);
642
643     if (simcall->issuer == action->comm.src_proc)
644       action->comm.src_timeout = sleep;
645     else
646       action->comm.dst_timeout = sleep;
647   }
648 }
649
650 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_action_t action)
651 {
652   if(MC_is_active()){
653     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
654     if(simcall_comm_test__get__result(simcall)){
655       action->state = SIMIX_DONE;
656       xbt_fifo_push(action->simcalls, simcall);
657       SIMIX_comm_finish(action);
658     }else{
659       SIMIX_simcall_answer(simcall);
660     }
661     return;
662   }
663
664   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
665   if (simcall_comm_test__get__result(simcall)) {
666     xbt_fifo_push(action->simcalls, simcall);
667     SIMIX_comm_finish(action);
668   } else {
669     SIMIX_simcall_answer(simcall);
670   }
671 }
672
673 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
674 {
675   unsigned int cursor;
676   smx_action_t action;
677   simcall_comm_testany__set__result(simcall, -1);
678
679   if (MC_is_active()){
680     int idx = SIMCALL_GET_MC_VALUE(simcall);
681     if(idx == -1){
682       SIMIX_simcall_answer(simcall);
683     }else{
684       action = xbt_dynar_get_as(actions, idx, smx_action_t);
685       simcall_comm_testany__set__result(simcall, idx);
686       xbt_fifo_push(action->simcalls, simcall);
687       action->state = SIMIX_DONE;
688       SIMIX_comm_finish(action);
689     }
690     return;
691   }
692
693   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
694     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
695       simcall_comm_testany__set__result(simcall, cursor);
696       xbt_fifo_push(action->simcalls, simcall);
697       SIMIX_comm_finish(action);
698       return;
699     }
700   }
701   SIMIX_simcall_answer(simcall);
702 }
703
704 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
705 {
706   smx_action_t action;
707   unsigned int cursor = 0;
708
709   if (MC_is_active()){
710     int idx = SIMCALL_GET_MC_VALUE(simcall);
711     action = xbt_dynar_get_as(actions, idx, smx_action_t);
712     xbt_fifo_push(action->simcalls, simcall);
713     simcall_comm_waitany__set__result(simcall, idx);
714     action->state = SIMIX_DONE;
715     SIMIX_comm_finish(action);
716     return;
717   }
718
719   xbt_dynar_foreach(actions, cursor, action){
720     /* associate this simcall to the the action */
721     xbt_fifo_push(action->simcalls, simcall);
722
723     /* see if the action is already finished */
724     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
725       SIMIX_comm_finish(action);
726       break;
727     }
728   }
729 }
730
731 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
732 {
733   smx_action_t action;
734   unsigned int cursor = 0;
735   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
736
737   xbt_dynar_foreach(actions, cursor, action) {
738     xbt_fifo_remove(action->simcalls, simcall);
739   }
740 }
741
742 /**
743  *  \brief Starts the simulation of a communication action.
744  *  \param action the communication action
745  */
746 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
747 {
748   /* If both the sender and the receiver are already there, start the communication */
749   if (action->state == SIMIX_READY) {
750
751     smx_host_t sender = action->comm.src_proc->smx_host;
752     smx_host_t receiver = action->comm.dst_proc->smx_host;
753
754     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
755               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
756
757     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
758                                                                     sender, receiver,
759                                                                     action->comm.task_size, action->comm.rate);
760
761     surf_action_set_data(action->comm.surf_comm, action);
762
763     action->state = SIMIX_RUNNING;
764
765     /* If a link is failed, detect it immediately */
766     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
767       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
768                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
769       action->state = SIMIX_LINK_FAILURE;
770       SIMIX_comm_destroy_internal_actions(action);
771     }
772
773     /* If any of the process is suspend, create the action but stop its execution,
774        it will be restarted when the sender process resume */
775     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
776         SIMIX_process_is_suspended(action->comm.dst_proc)) {
777       /* FIXME: check what should happen with the action state */
778
779       if (SIMIX_process_is_suspended(action->comm.src_proc))
780         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
781                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
782       else
783         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
784                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
785
786       surf_action_suspend(action->comm.surf_comm);
787
788     }
789   }
790 }
791
792 /**
793  * \brief Answers the SIMIX simcalls associated to a communication action.
794  * \param action a finished communication action
795  */
796 void SIMIX_comm_finish(smx_action_t action)
797 {
798   unsigned int destroy_count = 0;
799   smx_simcall_t simcall;
800
801
802   while ((simcall = xbt_fifo_shift(action->simcalls))) {
803
804     /* If a waitany simcall is waiting for this action to finish, then remove
805        it from the other actions in the waitany list. Afterwards, get the
806        position of the actual action in the waitany dynar and
807        return it as the result of the simcall */
808
809     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
810       continue; // if process handling comm is killed
811     if (simcall->call == SIMCALL_COMM_WAITANY) {
812       SIMIX_waitany_remove_simcall_from_actions(simcall);
813       if (!MC_is_active())
814         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
815     }
816
817     /* If the action is still in a rendez-vous point then remove from it */
818     if (action->comm.rdv)
819       SIMIX_rdv_remove(action->comm.rdv, action);
820
821     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
822
823     /* Check out for errors */
824     switch (action->state) {
825
826     case SIMIX_DONE:
827       XBT_DEBUG("Communication %p complete!", action);
828       SIMIX_comm_copy_data(action);
829       break;
830
831     case SIMIX_SRC_TIMEOUT:
832       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
833                     "Communication timeouted because of sender");
834       break;
835
836     case SIMIX_DST_TIMEOUT:
837       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
838                     "Communication timeouted because of receiver");
839       break;
840
841     case SIMIX_SRC_HOST_FAILURE:
842       if (simcall->issuer == action->comm.src_proc)
843         simcall->issuer->context->iwannadie = 1;
844 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
845       else
846         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
847       break;
848
849     case SIMIX_DST_HOST_FAILURE:
850       if (simcall->issuer == action->comm.dst_proc)
851         simcall->issuer->context->iwannadie = 1;
852 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
853       else
854         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
855       break;
856
857     case SIMIX_LINK_FAILURE:
858       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
859                 action,
860                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
861                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
862                 simcall->issuer->name, simcall->issuer, action->comm.detached);
863       if (action->comm.src_proc == simcall->issuer) {
864         XBT_DEBUG("I'm source");
865       } else if (action->comm.dst_proc == simcall->issuer) {
866         XBT_DEBUG("I'm dest");
867       } else {
868         XBT_DEBUG("I'm neither source nor dest");
869       }
870       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
871       break;
872
873     case SIMIX_CANCELED:
874       if (simcall->issuer == action->comm.dst_proc)
875         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
876                       "Communication canceled by the sender");
877       else
878         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
879                       "Communication canceled by the receiver");
880       break;
881
882     default:
883       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
884     }
885
886     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
887     if (simcall->issuer->doexception) {
888       if (simcall->call == SIMCALL_COMM_WAITANY) {
889         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
890       }
891       else if (simcall->call == SIMCALL_COMM_TESTANY) {
892         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
893       }
894     }
895
896     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
897       simcall->issuer->context->iwannadie = 1;
898     }
899
900     simcall->issuer->waiting_action = NULL;
901     xbt_fifo_remove(simcall->issuer->comms, action);
902     if(action->comm.detached){
903       if(simcall->issuer == action->comm.src_proc){
904         if(action->comm.dst_proc)
905           xbt_fifo_remove(action->comm.dst_proc->comms, action);
906       }
907       if(simcall->issuer == action->comm.dst_proc){
908         if(action->comm.src_proc)
909           xbt_fifo_remove(action->comm.src_proc->comms, action);
910       }
911     }
912     SIMIX_simcall_answer(simcall);
913     destroy_count++;
914   }
915
916   while (destroy_count-- > 0)
917     SIMIX_comm_destroy(action);
918 }
919
920 /**
921  * \brief This function is called when a Surf communication action is finished.
922  * \param action the corresponding Simix communication
923  */
924 void SIMIX_post_comm(smx_action_t action)
925 {
926   /* Update action state */
927   if (action->comm.src_timeout &&
928       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
929     action->state = SIMIX_SRC_TIMEOUT;
930   else if (action->comm.dst_timeout &&
931           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
932     action->state = SIMIX_DST_TIMEOUT;
933   else if (action->comm.src_timeout &&
934           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
935     action->state = SIMIX_SRC_HOST_FAILURE;
936   else if (action->comm.dst_timeout &&
937       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
938     action->state = SIMIX_DST_HOST_FAILURE;
939   else if (action->comm.surf_comm &&
940           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
941     XBT_DEBUG("Puta madre. Surf says that the link broke");
942     action->state = SIMIX_LINK_FAILURE;
943   } else
944     action->state = SIMIX_DONE;
945
946   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
947             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
948
949   /* destroy the surf actions associated with the Simix communication */
950   SIMIX_comm_destroy_internal_actions(action);
951
952   /* if there are simcalls associated with the action, then answer them */
953   if (xbt_fifo_size(action->simcalls)) {
954     SIMIX_comm_finish(action);
955   }
956 }
957
958 void simcall_HANDLER_comm_cancel(smx_simcall_t simcall, smx_action_t action){
959   SIMIX_comm_cancel(action);
960 }
961 void SIMIX_comm_cancel(smx_action_t action)
962 {
963   /* if the action is a waiting state means that it is still in a rdv */
964   /* so remove from it and delete it */
965   if (action->state == SIMIX_WAITING) {
966     SIMIX_rdv_remove(action->comm.rdv, action);
967     action->state = SIMIX_CANCELED;
968   }
969   else if (!MC_is_active() /* when running the MC there are no surf actions */
970            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
971
972     surf_action_cancel(action->comm.surf_comm);
973   }
974 }
975
976 void SIMIX_comm_suspend(smx_action_t action)
977 {
978   /*FIXME: shall we suspend also the timeout actions? */
979   if (action->comm.surf_comm)
980     surf_action_suspend(action->comm.surf_comm);
981   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
982 }
983
984 void SIMIX_comm_resume(smx_action_t action)
985 {
986   /*FIXME: check what happen with the timeouts */
987   if (action->comm.surf_comm)
988     surf_action_resume(action->comm.surf_comm);
989   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
990 }
991
992
993 /************* Action Getters **************/
994
995 double simcall_HANDLER_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
996   return SIMIX_comm_get_remains(action);
997 }
998 /**
999  *  \brief get the amount remaining from the communication
1000  *  \param action The communication
1001  */
1002 double SIMIX_comm_get_remains(smx_action_t action)
1003 {
1004   double remains;
1005
1006   if(!action){
1007     return 0;
1008   }
1009
1010   switch (action->state) {
1011
1012   case SIMIX_RUNNING:
1013     remains = surf_action_get_remains(action->comm.surf_comm);
1014     break;
1015
1016   case SIMIX_WAITING:
1017   case SIMIX_READY:
1018     remains = 0; /*FIXME: check what should be returned */
1019     break;
1020
1021   default:
1022     remains = 0; /*FIXME: is this correct? */
1023     break;
1024   }
1025   return remains;
1026 }
1027
1028 e_smx_state_t simcall_HANDLER_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1029   return SIMIX_comm_get_state(action);
1030 }
1031 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1032 {
1033   return action->state;
1034 }
1035
1036 void* simcall_HANDLER_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1037   return SIMIX_comm_get_src_data(action);
1038 }
1039 /**
1040  *  \brief Return the user data associated to the sender of the communication
1041  *  \param action The communication
1042  *  \return the user data
1043  */
1044 void* SIMIX_comm_get_src_data(smx_action_t action)
1045 {
1046   return action->comm.src_data;
1047 }
1048
1049 void* simcall_HANDLER_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1050   return SIMIX_comm_get_dst_data(action);
1051 }
1052 /**
1053  *  \brief Return the user data associated to the receiver of the communication
1054  *  \param action The communication
1055  *  \return the user data
1056  */
1057 void* SIMIX_comm_get_dst_data(smx_action_t action)
1058 {
1059   return action->comm.dst_data;
1060 }
1061
1062 smx_process_t simcall_HANDLER_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1063   return SIMIX_comm_get_src_proc(action);
1064 }
1065 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1066 {
1067   return action->comm.src_proc;
1068 }
1069
1070 smx_process_t simcall_HANDLER_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1071   return SIMIX_comm_get_dst_proc(action);
1072 }
1073 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1074 {
1075   return action->comm.dst_proc;
1076 }
1077
1078 #ifdef HAVE_LATENCY_BOUND_TRACKING
1079 int simcall_HANDLER_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1080 {
1081   return SIMIX_comm_is_latency_bounded(action);
1082 }
1083
1084 /**
1085  *  \brief verify if communication is latency bounded
1086  *  \param comm The communication
1087  */
1088 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1089 {
1090   if(!action){
1091     return 0;
1092   }
1093   if (action->comm.surf_comm){
1094     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1095     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1096     XBT_DEBUG("Action limited is %d", action->latency_limited);
1097   }
1098   return action->latency_limited;
1099 }
1100 #endif
1101
1102 /******************************************************************************/
1103 /*                    SIMIX_comm_copy_data callbacks                       */
1104 /******************************************************************************/
1105 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1106   &SIMIX_comm_copy_pointer_callback;
1107
1108 void
1109 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1110 {
1111   SIMIX_comm_copy_data_callback = callback;
1112 }
1113
1114 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1115 {
1116   xbt_assert((buff_size == sizeof(void *)),
1117              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1118   *(void **) (comm->comm.dst_buff) = buff;
1119 }
1120
1121 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1122 {
1123   XBT_DEBUG("Copy the data over");
1124   memcpy(comm->comm.dst_buff, buff, buff_size);
1125   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1126     xbt_free(buff);
1127     comm->comm.src_buff = NULL;
1128   }
1129 }
1130
1131
1132 /**
1133  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1134  *  \param comm The communication
1135  */
1136 void SIMIX_comm_copy_data(smx_action_t comm)
1137 {
1138   size_t buff_size = comm->comm.src_buff_size;
1139   /* If there is no data to be copy then return */
1140   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1141     return;
1142
1143   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1144             comm,
1145             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1146             comm->comm.src_buff,
1147             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1148             comm->comm.dst_buff, buff_size);
1149
1150   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1151   if (comm->comm.dst_buff_size)
1152     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1153
1154   /* Update the receiver's buffer size to the copied amount */
1155   if (comm->comm.dst_buff_size)
1156     *comm->comm.dst_buff_size = buff_size;
1157
1158   if (buff_size > 0){
1159       if(comm->comm.copy_data_fun)
1160         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1161       else
1162         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1163   }
1164
1165
1166   /* Set the copied flag so we copy data only once */
1167   /* (this function might be called from both communication ends) */
1168   comm->comm.copied = 1;
1169 }