Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove comm destroy simcall
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34   if(MC_is_active())
35     MC_ignore_global_variable("smx_total_comms");
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);  
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 /**
294  *  \brief Destroy a communicate action
295  *  \param action The communicate action to be destroyed
296  */
297 void SIMIX_comm_destroy(smx_action_t action)
298 {
299   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
300             action, action->comm.refcount, (int)action->state);
301
302   if (action->comm.refcount <= 0) {
303     xbt_backtrace_display_current();
304     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
305             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
306   }
307   action->comm.refcount--;
308   if (action->comm.refcount > 0)
309       return;
310   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
311             action->comm.refcount);
312
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
315 #endif
316
317   xbt_free(action->name);
318   SIMIX_comm_destroy_internal_actions(action);
319
320   if (action->comm.detached && action->state != SIMIX_DONE) {
321     /* the communication has failed and was detached:
322      * we have to free the buffer */
323     if (action->comm.clean_fun) {
324       action->comm.clean_fun(action->comm.src_buff);
325     }
326     action->comm.src_buff = NULL;
327   }
328
329   if(action->comm.rdv)
330     SIMIX_rdv_remove(action->comm.rdv, action);
331
332   xbt_mallocator_release(simix_global->action_mallocator, action);
333 }
334
335 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
336 {
337   if (action->comm.surf_comm){
338 #ifdef HAVE_LATENCY_BOUND_TRACKING
339     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
340 #endif
341     surf_action_unref(action->comm.surf_comm);
342     action->comm.surf_comm = NULL;
343   }
344
345   if (action->comm.src_timeout){
346     surf_action_unref(action->comm.src_timeout);
347     action->comm.src_timeout = NULL;
348   }
349
350   if (action->comm.dst_timeout){
351     surf_action_unref(action->comm.dst_timeout);
352     action->comm.dst_timeout = NULL;
353   }
354 }
355
356 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
357                                   double task_size, double rate,
358                                   void *src_buff, size_t src_buff_size,
359                                   int (*match_fun)(void *, void *,smx_action_t),
360                                   void *data, double timeout){
361   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
362                                        src_buff, src_buff_size, match_fun, NULL,
363                                        data, 0);
364   simcall->mc_value = 0;
365   SIMIX_pre_comm_wait(simcall, comm, timeout);
366 }
367 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
368                                   double task_size, double rate,
369                                   void *src_buff, size_t src_buff_size,
370                                   int (*match_fun)(void *, void *,smx_action_t),
371                                   void (*clean_fun)(void *), 
372                                   void *data, int detached){
373   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
374                           src_buff_size, match_fun, clean_fun, data, detached);
375
376 }
377 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
378                               double task_size, double rate,
379                               void *src_buff, size_t src_buff_size,
380                               int (*match_fun)(void *, void *,smx_action_t),
381                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
382                               void *data,
383                               int detached)
384 {
385   XBT_DEBUG("send from %p\n", rdv);
386
387   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
388   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
389
390   /* Look for communication action matching our needs. We also provide a description of
391    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
392    *
393    * If it is not found then push our communication into the rendez-vous point */
394   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
395
396   if (!other_action) {
397     other_action = this_action;
398
399     if (rdv->permanent_receiver!=NULL){
400       //this mailbox is for small messages, which have to be sent right now
401       other_action->state = SIMIX_READY;
402       other_action->comm.dst_proc=rdv->permanent_receiver;
403       other_action->comm.refcount++;
404       xbt_fifo_push(rdv->done_comm_fifo,other_action);
405       other_action->comm.rdv=rdv;
406       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
407
408     }else{
409       SIMIX_rdv_push(rdv, this_action);
410     }
411   } else {
412     XBT_DEBUG("Receive already pushed\n");
413
414     SIMIX_comm_destroy(this_action);
415     --smx_total_comms; // this creation was a pure waste
416
417     other_action->state = SIMIX_READY;
418     other_action->comm.type = SIMIX_COMM_READY;
419
420   }
421   xbt_fifo_push(src_proc->comms, other_action);
422
423   /* if the communication action is detached then decrease the refcount
424    * by one, so it will be eliminated by the receiver's destroy call */
425   if (detached) {
426     other_action->comm.detached = 1;
427     other_action->comm.refcount--;
428     other_action->comm.clean_fun = clean_fun;
429   } else {
430     other_action->comm.clean_fun = NULL;
431   }
432
433   /* Setup the communication action */
434   other_action->comm.src_proc = src_proc;
435   other_action->comm.task_size = task_size;
436   other_action->comm.rate = rate;
437   other_action->comm.src_buff = src_buff;
438   other_action->comm.src_buff_size = src_buff_size;
439   other_action->comm.src_data = data;
440
441   other_action->comm.match_fun = match_fun;
442
443   if (MC_is_active()) {
444     other_action->state = SIMIX_RUNNING;
445     return (detached ? NULL : other_action);
446   }
447
448   SIMIX_comm_start(other_action);
449   return (detached ? NULL : other_action);
450 }
451
452 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
453                          void *dst_buff, size_t *dst_buff_size,
454                          int (*match_fun)(void *, void *, smx_action_t),
455                          void *data, double timeout, double rate)
456 {
457   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
458                                        dst_buff_size, match_fun, data, rate);
459   simcall->mc_value = 0;
460   SIMIX_pre_comm_wait(simcall, comm, timeout);
461 }
462
463 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
464                                   void *dst_buff, size_t *dst_buff_size,
465                                   int (*match_fun)(void *, void *, smx_action_t),
466                                   void *data, double rate)
467 {
468   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
469                           match_fun, data, rate);
470 }
471
472 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
473                               void *dst_buff, size_t *dst_buff_size,
474                               int (*match_fun)(void *, void *, smx_action_t),
475                               void *data, double rate)
476 {
477   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
478   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
479
480   smx_action_t other_action;
481   //communication already done, get it inside the fifo of completed comms
482   //permanent receive v1
483   //int already_received=0;
484   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
485
486     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
487     //find a match in the already received fifo
488     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
489     //if not found, assume the receiver came first, register it to the mailbox in the classical way
490     if (!other_action)  {
491       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
492       other_action = this_action;
493       SIMIX_rdv_push(rdv, this_action);
494     }else{
495       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
496       {
497         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
498         other_action->state = SIMIX_DONE;
499         other_action->comm.type = SIMIX_COMM_DONE;
500         other_action->comm.rdv = NULL;
501         //SIMIX_comm_destroy(this_action);
502         //--smx_total_comms; // this creation was a pure waste
503         //already_received=1;
504         //other_action->comm.refcount--;
505       }/*else{
506          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
507          }*/
508       other_action->comm.refcount--;
509       SIMIX_comm_destroy(this_action);
510       --smx_total_comms; // this creation was a pure waste
511     }
512   }else{
513     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
514
515     /* Look for communication action matching our needs. We also provide a description of
516      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
517      *
518      * If it is not found then push our communication into the rendez-vous point */
519     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
520
521     if (!other_action) {
522       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
523       other_action = this_action;
524       SIMIX_rdv_push(rdv, this_action);
525     } else {
526       SIMIX_comm_destroy(this_action);
527       --smx_total_comms; // this creation was a pure waste
528       other_action->state = SIMIX_READY;
529       other_action->comm.type = SIMIX_COMM_READY;
530       //other_action->comm.refcount--;
531     }
532     xbt_fifo_push(dst_proc->comms, other_action);
533   }
534
535   /* Setup communication action */
536   other_action->comm.dst_proc = dst_proc;
537   other_action->comm.dst_buff = dst_buff;
538   other_action->comm.dst_buff_size = dst_buff_size;
539   other_action->comm.dst_data = data;
540
541   if (rate != -1.0 &&
542       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
543     other_action->comm.rate = rate;
544
545   other_action->comm.match_fun = match_fun;
546
547
548   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
549     SIMIX_comm_copy_data(other_action);*/
550
551
552   if (MC_is_active()) {
553     other_action->state = SIMIX_RUNNING;
554     return other_action;
555   }
556
557   SIMIX_comm_start(other_action);
558   // }
559   return other_action;
560 }
561
562 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
563                                    int src, int tag,
564                                    int (*match_fun)(void *, void *, smx_action_t),
565                                    void *data){
566   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
567 }
568
569 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
570                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
571 {
572   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
573   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
574
575   smx_action_t other_action=NULL;
576   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
577     //find a match in the already received fifo
578       XBT_DEBUG("first try in the perm recv mailbox \n");
579
580     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
581   }
582  // }else{
583     if(!other_action){
584         XBT_DEBUG("second try in the other mailbox");
585         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
586     }
587 //  }
588   if(other_action)other_action->comm.refcount--;
589
590   SIMIX_comm_destroy(this_action);
591   --smx_total_comms;
592   return other_action;
593 }
594
595 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
596 {
597   /* the simcall may be a wait, a send or a recv */
598   surf_action_t sleep;
599
600   /* Associate this simcall to the wait action */
601   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
602
603   xbt_fifo_push(action->simcalls, simcall);
604   simcall->issuer->waiting_action = action;
605
606   if (MC_is_active()) {
607     int idx = simcall->mc_value;
608     if (idx == 0) {
609       action->state = SIMIX_DONE;
610     } else {
611       /* If we reached this point, the wait simcall must have a timeout */
612       /* Otherwise it shouldn't be enabled and executed by the MC */
613       if (timeout == -1)
614         THROW_IMPOSSIBLE;
615
616       if (action->comm.src_proc == simcall->issuer)
617         action->state = SIMIX_SRC_TIMEOUT;
618       else
619         action->state = SIMIX_DST_TIMEOUT;
620     }
621
622     SIMIX_comm_finish(action);
623     return;
624   }
625
626   /* If the action has already finish perform the error handling, */
627   /* otherwise set up a waiting timeout on the right side         */
628   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
629     SIMIX_comm_finish(action);
630   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
631     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
632     surf_action_set_data(sleep, action);
633
634     if (simcall->issuer == action->comm.src_proc)
635       action->comm.src_timeout = sleep;
636     else
637       action->comm.dst_timeout = sleep;
638   }
639 }
640
641 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
642 {
643   if(MC_is_active()){
644     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
645     if(simcall_comm_test__get__result(simcall)){
646       action->state = SIMIX_DONE;
647       xbt_fifo_push(action->simcalls, simcall);
648       SIMIX_comm_finish(action);
649     }else{
650       SIMIX_simcall_answer(simcall);
651     }
652     return;
653   }
654
655   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
656   if (simcall_comm_test__get__result(simcall)) {
657     xbt_fifo_push(action->simcalls, simcall);
658     SIMIX_comm_finish(action);
659   } else {
660     SIMIX_simcall_answer(simcall);
661   }
662 }
663
664 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
665 {
666   unsigned int cursor;
667   smx_action_t action;
668   simcall_comm_testany__set__result(simcall, -1);
669
670   if (MC_is_active()){
671     int idx = simcall->mc_value;
672     if(idx == -1){
673       SIMIX_simcall_answer(simcall);
674     }else{
675       action = xbt_dynar_get_as(actions, idx, smx_action_t);
676       simcall_comm_testany__set__result(simcall, idx);
677       xbt_fifo_push(action->simcalls, simcall);
678       action->state = SIMIX_DONE;
679       SIMIX_comm_finish(action);
680     }
681     return;
682   }
683
684   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
685     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
686       simcall_comm_testany__set__result(simcall, cursor);
687       xbt_fifo_push(action->simcalls, simcall);
688       SIMIX_comm_finish(action);
689       return;
690     }
691   }
692   SIMIX_simcall_answer(simcall);
693 }
694
695 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
696 {
697   smx_action_t action;
698   unsigned int cursor = 0;
699
700   if (MC_is_active()){
701     int idx = simcall->mc_value;
702     action = xbt_dynar_get_as(actions, idx, smx_action_t);
703     xbt_fifo_push(action->simcalls, simcall);
704     simcall_comm_waitany__set__result(simcall, idx);
705     action->state = SIMIX_DONE;
706     SIMIX_comm_finish(action);
707     return;
708   }
709
710   xbt_dynar_foreach(actions, cursor, action){
711     /* associate this simcall to the the action */
712     xbt_fifo_push(action->simcalls, simcall);
713
714     /* see if the action is already finished */
715     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
716       SIMIX_comm_finish(action);
717       break;
718     }
719   }
720 }
721
722 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
723 {
724   smx_action_t action;
725   unsigned int cursor = 0;
726   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
727
728   xbt_dynar_foreach(actions, cursor, action) {
729     xbt_fifo_remove(action->simcalls, simcall);
730   }
731 }
732
733 /**
734  *  \brief Starts the simulation of a communication action.
735  *  \param action the communication action
736  */
737 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
738 {
739   /* If both the sender and the receiver are already there, start the communication */
740   if (action->state == SIMIX_READY) {
741
742     smx_host_t sender = action->comm.src_proc->smx_host;
743     smx_host_t receiver = action->comm.dst_proc->smx_host;
744
745     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
746               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
747
748     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
749                                                                     sender, receiver,
750                                                                     action->comm.task_size, action->comm.rate);
751
752     surf_action_set_data(action->comm.surf_comm, action);
753
754     action->state = SIMIX_RUNNING;
755
756     /* If a link is failed, detect it immediately */
757     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
758       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
759                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
760       action->state = SIMIX_LINK_FAILURE;
761       SIMIX_comm_destroy_internal_actions(action);
762     }
763
764     /* If any of the process is suspend, create the action but stop its execution,
765        it will be restarted when the sender process resume */
766     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
767         SIMIX_process_is_suspended(action->comm.dst_proc)) {
768       /* FIXME: check what should happen with the action state */
769
770       if (SIMIX_process_is_suspended(action->comm.src_proc))
771         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
772                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
773       else
774         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
775                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
776
777       surf_action_suspend(action->comm.surf_comm);
778
779     }
780   }
781 }
782
783 /**
784  * \brief Answers the SIMIX simcalls associated to a communication action.
785  * \param action a finished communication action
786  */
787 void SIMIX_comm_finish(smx_action_t action)
788 {
789   unsigned int destroy_count = 0;
790   smx_simcall_t simcall;
791
792
793   while ((simcall = xbt_fifo_shift(action->simcalls))) {
794
795     /* If a waitany simcall is waiting for this action to finish, then remove
796        it from the other actions in the waitany list. Afterwards, get the
797        position of the actual action in the waitany dynar and
798        return it as the result of the simcall */
799     if (simcall->call == SIMCALL_COMM_WAITANY) {
800       SIMIX_waitany_remove_simcall_from_actions(simcall);
801       if (!MC_is_active())
802         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
803     }
804
805     /* If the action is still in a rendez-vous point then remove from it */
806     if (action->comm.rdv)
807       SIMIX_rdv_remove(action->comm.rdv, action);
808
809     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
810
811     /* Check out for errors */
812     switch (action->state) {
813
814     case SIMIX_DONE:
815       XBT_DEBUG("Communication %p complete!", action);
816       SIMIX_comm_copy_data(action);
817       break;
818
819     case SIMIX_SRC_TIMEOUT:
820       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
821                     "Communication timeouted because of sender");
822       break;
823
824     case SIMIX_DST_TIMEOUT:
825       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
826                     "Communication timeouted because of receiver");
827       break;
828
829     case SIMIX_SRC_HOST_FAILURE:
830       if (simcall->issuer == action->comm.src_proc)
831         simcall->issuer->context->iwannadie = 1;
832 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
833       else
834         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
835       break;
836
837     case SIMIX_DST_HOST_FAILURE:
838       if (simcall->issuer == action->comm.dst_proc)
839         simcall->issuer->context->iwannadie = 1;
840 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
841       else
842         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
843       break;
844
845     case SIMIX_LINK_FAILURE:
846       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
847                 action,
848                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
849                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
850                 simcall->issuer->name, simcall->issuer, action->comm.detached);
851       if (action->comm.src_proc == simcall->issuer) {
852         XBT_DEBUG("I'm source");
853       } else if (action->comm.dst_proc == simcall->issuer) {
854         XBT_DEBUG("I'm dest");
855       } else {
856         XBT_DEBUG("I'm neither source nor dest");
857       }
858       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
859       break;
860
861     case SIMIX_CANCELED:
862       if (simcall->issuer == action->comm.dst_proc)
863         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
864                       "Communication canceled by the sender");
865       else
866         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
867                       "Communication canceled by the receiver");
868       break;
869
870     default:
871       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
872     }
873
874     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
875     if (simcall->issuer->doexception) {
876       if (simcall->call == SIMCALL_COMM_WAITANY) {
877         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
878       }
879       else if (simcall->call == SIMCALL_COMM_TESTANY) {
880         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
881       }
882     }
883
884     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
885       simcall->issuer->context->iwannadie = 1;
886     }
887
888     simcall->issuer->waiting_action = NULL;
889     xbt_fifo_remove(simcall->issuer->comms, action);
890     if(action->comm.detached){
891       smx_process_t proc;
892       int still_alive = 0;
893
894       if(simcall->issuer == action->comm.src_proc){
895         if(action->comm.dst_proc){
896             xbt_swag_foreach(proc, simix_global->process_list)
897             {
898                if(proc==action->comm.dst_proc){
899                    still_alive=1;
900                    break;
901                }
902             }
903         }
904         if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
905       }
906       if(simcall->issuer == action->comm.dst_proc){
907         if(action->comm.src_proc)
908           if(action->comm.dst_proc){
909             xbt_swag_foreach(proc, simix_global->process_list)
910             {
911               if(proc==action->comm.src_proc){
912                   still_alive=1;
913                   break;
914               }
915             }
916           }
917           if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
918       }
919     }
920     SIMIX_simcall_answer(simcall);
921     destroy_count++;
922   }
923
924   while (destroy_count-- > 0)
925     SIMIX_comm_destroy(action);
926 }
927
928 /**
929  * \brief This function is called when a Surf communication action is finished.
930  * \param action the corresponding Simix communication
931  */
932 void SIMIX_post_comm(smx_action_t action)
933 {
934   /* Update action state */
935   if (action->comm.src_timeout &&
936       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
937     action->state = SIMIX_SRC_TIMEOUT;
938   else if (action->comm.dst_timeout &&
939           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
940     action->state = SIMIX_DST_TIMEOUT;
941   else if (action->comm.src_timeout &&
942           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
943     action->state = SIMIX_SRC_HOST_FAILURE;
944   else if (action->comm.dst_timeout &&
945       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
946     action->state = SIMIX_DST_HOST_FAILURE;
947   else if (action->comm.surf_comm &&
948           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
949     XBT_DEBUG("Puta madre. Surf says that the link broke");
950     action->state = SIMIX_LINK_FAILURE;
951   } else
952     action->state = SIMIX_DONE;
953
954   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
955             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
956
957   /* destroy the surf actions associated with the Simix communication */
958   SIMIX_comm_destroy_internal_actions(action);
959
960   /* remove the communication action from the list of pending communications
961    * of both processes (if they still exist) */
962   if (action->comm.src_proc) {
963     xbt_fifo_remove(action->comm.src_proc->comms, action);
964   }
965   if (action->comm.dst_proc) {
966     xbt_fifo_remove(action->comm.dst_proc->comms, action);
967   }
968
969   /* if there are simcalls associated with the action, then answer them */
970   if (xbt_fifo_size(action->simcalls)) {
971     SIMIX_comm_finish(action);
972   }
973 }
974
975 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
976   SIMIX_comm_cancel(action);
977 }
978 void SIMIX_comm_cancel(smx_action_t action)
979 {
980   /* if the action is a waiting state means that it is still in a rdv */
981   /* so remove from it and delete it */
982   if (action->state == SIMIX_WAITING) {
983     SIMIX_rdv_remove(action->comm.rdv, action);
984     action->state = SIMIX_CANCELED;
985   }
986   else if (!MC_is_active() /* when running the MC there are no surf actions */
987            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
988
989     surf_action_cancel(action->comm.surf_comm);
990   }
991 }
992
993 void SIMIX_comm_suspend(smx_action_t action)
994 {
995   /*FIXME: shall we suspend also the timeout actions? */
996   if (action->comm.surf_comm)
997     surf_action_suspend(action->comm.surf_comm);
998   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
999 }
1000
1001 void SIMIX_comm_resume(smx_action_t action)
1002 {
1003   /*FIXME: check what happen with the timeouts */
1004   if (action->comm.surf_comm)
1005     surf_action_resume(action->comm.surf_comm);
1006   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1007 }
1008
1009
1010 /************* Action Getters **************/
1011
1012 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1013   return SIMIX_comm_get_remains(action);
1014 }
1015 /**
1016  *  \brief get the amount remaining from the communication
1017  *  \param action The communication
1018  */
1019 double SIMIX_comm_get_remains(smx_action_t action)
1020 {
1021   double remains;
1022
1023   if(!action){
1024     return 0;
1025   }
1026
1027   switch (action->state) {
1028
1029   case SIMIX_RUNNING:
1030     remains = surf_action_get_remains(action->comm.surf_comm);
1031     break;
1032
1033   case SIMIX_WAITING:
1034   case SIMIX_READY:
1035     remains = 0; /*FIXME: check what should be returned */
1036     break;
1037
1038   default:
1039     remains = 0; /*FIXME: is this correct? */
1040     break;
1041   }
1042   return remains;
1043 }
1044
1045 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1046   return SIMIX_comm_get_state(action);
1047 }
1048 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1049 {
1050   return action->state;
1051 }
1052
1053 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1054   return SIMIX_comm_get_src_data(action);
1055 }
1056 /**
1057  *  \brief Return the user data associated to the sender of the communication
1058  *  \param action The communication
1059  *  \return the user data
1060  */
1061 void* SIMIX_comm_get_src_data(smx_action_t action)
1062 {
1063   return action->comm.src_data;
1064 }
1065
1066 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1067   return SIMIX_comm_get_dst_data(action);
1068 }
1069 /**
1070  *  \brief Return the user data associated to the receiver of the communication
1071  *  \param action The communication
1072  *  \return the user data
1073  */
1074 void* SIMIX_comm_get_dst_data(smx_action_t action)
1075 {
1076   return action->comm.dst_data;
1077 }
1078
1079 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1080   return SIMIX_comm_get_src_proc(action);
1081 }
1082 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1083 {
1084   return action->comm.src_proc;
1085 }
1086
1087 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1088   return SIMIX_comm_get_dst_proc(action);
1089 }
1090 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1091 {
1092   return action->comm.dst_proc;
1093 }
1094
1095 #ifdef HAVE_LATENCY_BOUND_TRACKING
1096 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1097 {
1098   return SIMIX_comm_is_latency_bounded(action);
1099 }
1100
1101 /**
1102  *  \brief verify if communication is latency bounded
1103  *  \param comm The communication
1104  */
1105 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1106 {
1107   if(!action){
1108     return 0;
1109   }
1110   if (action->comm.surf_comm){
1111     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1112     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1113     XBT_DEBUG("Action limited is %d", action->latency_limited);
1114   }
1115   return action->latency_limited;
1116 }
1117 #endif
1118
1119 /******************************************************************************/
1120 /*                    SIMIX_comm_copy_data callbacks                       */
1121 /******************************************************************************/
1122 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1123   &SIMIX_comm_copy_pointer_callback;
1124
1125 void
1126 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1127 {
1128   SIMIX_comm_copy_data_callback = callback;
1129 }
1130
1131 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1132 {
1133   xbt_assert((buff_size == sizeof(void *)),
1134              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1135   *(void **) (comm->comm.dst_buff) = buff;
1136 }
1137
1138 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1139 {
1140   XBT_DEBUG("Copy the data over");
1141   memcpy(comm->comm.dst_buff, buff, buff_size);
1142   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1143     xbt_free(buff);
1144     comm->comm.src_buff = NULL;
1145   }
1146 }
1147
1148
1149 /**
1150  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1151  *  \param comm The communication
1152  */
1153 void SIMIX_comm_copy_data(smx_action_t comm)
1154 {
1155   size_t buff_size = comm->comm.src_buff_size;
1156   /* If there is no data to be copy then return */
1157   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1158     return;
1159
1160   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1161             comm,
1162             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1163             comm->comm.src_buff,
1164             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1165             comm->comm.dst_buff, buff_size);
1166
1167   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1168   if (comm->comm.dst_buff_size)
1169     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1170
1171   /* Update the receiver's buffer size to the copied amount */
1172   if (comm->comm.dst_buff_size)
1173     *comm->comm.dst_buff_size = buff_size;
1174
1175   if (buff_size > 0)
1176     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1177
1178   /* Set the copied flag so we copy data only once */
1179   /* (this function might be called from both communication ends) */
1180   comm->comm.copied = 1;
1181 }