Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Typo.
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34   if(MC_is_active())
35     MC_ignore_global_variable("smx_total_comms");
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);  
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
294   SIMIX_comm_destroy(action);
295 }
296 /**
297  *  \brief Destroy a communicate action
298  *  \param action The communicate action to be destroyed
299  */
300 void SIMIX_comm_destroy(smx_action_t action)
301 {
302   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
303             action, action->comm.refcount, (int)action->state);
304
305   if (action->comm.refcount <= 0) {
306     xbt_backtrace_display_current();
307     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
308             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
309   }
310   action->comm.refcount--;
311   if (action->comm.refcount > 0)
312       return;
313   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
314             action->comm.refcount);
315
316 #ifdef HAVE_LATENCY_BOUND_TRACKING
317   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
318 #endif
319
320   xbt_free(action->name);
321   SIMIX_comm_destroy_internal_actions(action);
322
323   if (action->comm.detached && action->state != SIMIX_DONE) {
324     /* the communication has failed and was detached:
325      * we have to free the buffer */
326     if (action->comm.clean_fun) {
327       action->comm.clean_fun(action->comm.src_buff);
328     }
329     action->comm.src_buff = NULL;
330   }
331
332   if(action->comm.rdv)
333     SIMIX_rdv_remove(action->comm.rdv, action);
334
335   xbt_mallocator_release(simix_global->action_mallocator, action);
336 }
337
338 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
339 {
340   if (action->comm.surf_comm){
341 #ifdef HAVE_LATENCY_BOUND_TRACKING
342     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
343 #endif
344     surf_action_unref(action->comm.surf_comm);
345     action->comm.surf_comm = NULL;
346   }
347
348   if (action->comm.src_timeout){
349     surf_action_unref(action->comm.src_timeout);
350     action->comm.src_timeout = NULL;
351   }
352
353   if (action->comm.dst_timeout){
354     surf_action_unref(action->comm.dst_timeout);
355     action->comm.dst_timeout = NULL;
356   }
357 }
358
359 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
360                                   double task_size, double rate,
361                                   void *src_buff, size_t src_buff_size,
362                                   int (*match_fun)(void *, void *,smx_action_t),
363                                   void *data, double timeout){
364   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
365                                        src_buff, src_buff_size, match_fun, NULL,
366                                        data, 0);
367   simcall->mc_value = 0;
368   SIMIX_pre_comm_wait(simcall, comm, timeout);
369 }
370 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
371                                   double task_size, double rate,
372                                   void *src_buff, size_t src_buff_size,
373                                   int (*match_fun)(void *, void *,smx_action_t),
374                                   void (*clean_fun)(void *), 
375                                   void *data, int detached){
376   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
377                           src_buff_size, match_fun, clean_fun, data, detached);
378
379 }
380 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
381                               double task_size, double rate,
382                               void *src_buff, size_t src_buff_size,
383                               int (*match_fun)(void *, void *,smx_action_t),
384                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445
446   if (MC_is_active()) {
447     other_action->state = SIMIX_RUNNING;
448     return (detached ? NULL : other_action);
449   }
450
451   SIMIX_comm_start(other_action);
452   return (detached ? NULL : other_action);
453 }
454
455 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
456                          void *dst_buff, size_t *dst_buff_size,
457                          int (*match_fun)(void *, void *, smx_action_t),
458                          void *data, double timeout, double rate)
459 {
460   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
461                                        dst_buff_size, match_fun, data, rate);
462   simcall->mc_value = 0;
463   SIMIX_pre_comm_wait(simcall, comm, timeout);
464 }
465
466 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
467                                   void *dst_buff, size_t *dst_buff_size,
468                                   int (*match_fun)(void *, void *, smx_action_t),
469                                   void *data, double rate)
470 {
471   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
472                           match_fun, data, rate);
473 }
474
475 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
476                               void *dst_buff, size_t *dst_buff_size,
477                               int (*match_fun)(void *, void *, smx_action_t),
478                               void *data, double rate)
479 {
480   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
481   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
482
483   smx_action_t other_action;
484   //communication already done, get it inside the fifo of completed comms
485   //permanent receive v1
486   //int already_received=0;
487   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
488
489     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
490     //find a match in the already received fifo
491     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
492     //if not found, assume the receiver came first, register it to the mailbox in the classical way
493     if (!other_action)  {
494       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
495       other_action = this_action;
496       SIMIX_rdv_push(rdv, this_action);
497     }else{
498       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
499       {
500         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
501         other_action->state = SIMIX_DONE;
502         other_action->comm.type = SIMIX_COMM_DONE;
503         other_action->comm.rdv = NULL;
504         //SIMIX_comm_destroy(this_action);
505         //--smx_total_comms; // this creation was a pure waste
506         //already_received=1;
507         //other_action->comm.refcount--;
508       }/*else{
509          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
510          }*/
511       other_action->comm.refcount--;
512       SIMIX_comm_destroy(this_action);
513       --smx_total_comms; // this creation was a pure waste
514     }
515   }else{
516     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
517
518     /* Look for communication action matching our needs. We also provide a description of
519      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
520      *
521      * If it is not found then push our communication into the rendez-vous point */
522     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
523
524     if (!other_action) {
525       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
526       other_action = this_action;
527       SIMIX_rdv_push(rdv, this_action);
528     } else {
529       SIMIX_comm_destroy(this_action);
530       --smx_total_comms; // this creation was a pure waste
531       other_action->state = SIMIX_READY;
532       other_action->comm.type = SIMIX_COMM_READY;
533       //other_action->comm.refcount--;
534     }
535     xbt_fifo_push(dst_proc->comms, other_action);
536   }
537
538   /* Setup communication action */
539   other_action->comm.dst_proc = dst_proc;
540   other_action->comm.dst_buff = dst_buff;
541   other_action->comm.dst_buff_size = dst_buff_size;
542   other_action->comm.dst_data = data;
543
544   if (rate != -1.0 &&
545       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
546     other_action->comm.rate = rate;
547
548   other_action->comm.match_fun = match_fun;
549
550
551   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
552     SIMIX_comm_copy_data(other_action);*/
553
554
555   if (MC_is_active()) {
556     other_action->state = SIMIX_RUNNING;
557     return other_action;
558   }
559
560   SIMIX_comm_start(other_action);
561   // }
562   return other_action;
563 }
564
565 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
566                                    int src, int tag,
567                                    int (*match_fun)(void *, void *, smx_action_t),
568                                    void *data){
569   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
570 }
571
572 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
573                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
574 {
575   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
576   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
577
578   smx_action_t other_action=NULL;
579   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
580     //find a match in the already received fifo
581       XBT_DEBUG("first try in the perm recv mailbox \n");
582
583     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
584   }
585  // }else{
586     if(!other_action){
587         XBT_DEBUG("second try in the other mailbox");
588         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
589     }
590 //  }
591   if(other_action)other_action->comm.refcount--;
592
593   SIMIX_comm_destroy(this_action);
594   --smx_total_comms;
595   return other_action;
596 }
597
598 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
599 {
600   /* the simcall may be a wait, a send or a recv */
601   surf_action_t sleep;
602
603   /* Associate this simcall to the wait action */
604   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
605
606   xbt_fifo_push(action->simcalls, simcall);
607   simcall->issuer->waiting_action = action;
608
609   if (MC_is_active()) {
610     int idx = simcall->mc_value;
611     if (idx == 0) {
612       action->state = SIMIX_DONE;
613     } else {
614       /* If we reached this point, the wait simcall must have a timeout */
615       /* Otherwise it shouldn't be enabled and executed by the MC */
616       if (timeout == -1)
617         THROW_IMPOSSIBLE;
618
619       if (action->comm.src_proc == simcall->issuer)
620         action->state = SIMIX_SRC_TIMEOUT;
621       else
622         action->state = SIMIX_DST_TIMEOUT;
623     }
624
625     SIMIX_comm_finish(action);
626     return;
627   }
628
629   /* If the action has already finish perform the error handling, */
630   /* otherwise set up a waiting timeout on the right side         */
631   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
632     SIMIX_comm_finish(action);
633   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
634     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
635     surf_action_set_data(sleep, action);
636
637     if (simcall->issuer == action->comm.src_proc)
638       action->comm.src_timeout = sleep;
639     else
640       action->comm.dst_timeout = sleep;
641   }
642 }
643
644 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
645 {
646   if(MC_is_active()){
647     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
648     if(simcall_comm_test__get__result(simcall)){
649       action->state = SIMIX_DONE;
650       xbt_fifo_push(action->simcalls, simcall);
651       SIMIX_comm_finish(action);
652     }else{
653       SIMIX_simcall_answer(simcall);
654     }
655     return;
656   }
657
658   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
659   if (simcall_comm_test__get__result(simcall)) {
660     xbt_fifo_push(action->simcalls, simcall);
661     SIMIX_comm_finish(action);
662   } else {
663     SIMIX_simcall_answer(simcall);
664   }
665 }
666
667 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
668 {
669   unsigned int cursor;
670   smx_action_t action;
671   simcall_comm_testany__set__result(simcall, -1);
672
673   if (MC_is_active()){
674     int idx = simcall->mc_value;
675     if(idx == -1){
676       SIMIX_simcall_answer(simcall);
677     }else{
678       action = xbt_dynar_get_as(actions, idx, smx_action_t);
679       simcall_comm_testany__set__result(simcall, idx);
680       xbt_fifo_push(action->simcalls, simcall);
681       action->state = SIMIX_DONE;
682       SIMIX_comm_finish(action);
683     }
684     return;
685   }
686
687   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
688     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
689       simcall_comm_testany__set__result(simcall, cursor);
690       xbt_fifo_push(action->simcalls, simcall);
691       SIMIX_comm_finish(action);
692       return;
693     }
694   }
695   SIMIX_simcall_answer(simcall);
696 }
697
698 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
699 {
700   smx_action_t action;
701   unsigned int cursor = 0;
702
703   if (MC_is_active()){
704     int idx = simcall->mc_value;
705     action = xbt_dynar_get_as(actions, idx, smx_action_t);
706     xbt_fifo_push(action->simcalls, simcall);
707     simcall_comm_waitany__set__result(simcall, idx);
708     action->state = SIMIX_DONE;
709     SIMIX_comm_finish(action);
710     return;
711   }
712
713   xbt_dynar_foreach(actions, cursor, action){
714     /* associate this simcall to the the action */
715     xbt_fifo_push(action->simcalls, simcall);
716
717     /* see if the action is already finished */
718     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
719       SIMIX_comm_finish(action);
720       break;
721     }
722   }
723 }
724
725 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
726 {
727   smx_action_t action;
728   unsigned int cursor = 0;
729   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
730
731   xbt_dynar_foreach(actions, cursor, action) {
732     xbt_fifo_remove(action->simcalls, simcall);
733   }
734 }
735
736 /**
737  *  \brief Starts the simulation of a communication action.
738  *  \param action the communication action
739  */
740 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
741 {
742   /* If both the sender and the receiver are already there, start the communication */
743   if (action->state == SIMIX_READY) {
744
745     smx_host_t sender = action->comm.src_proc->smx_host;
746     smx_host_t receiver = action->comm.dst_proc->smx_host;
747
748     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
749               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
750
751     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
752                                                                     sender, receiver,
753                                                                     action->comm.task_size, action->comm.rate);
754
755     surf_action_set_data(action->comm.surf_comm, action);
756
757     action->state = SIMIX_RUNNING;
758
759     /* If a link is failed, detect it immediately */
760     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
761       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
762                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
763       action->state = SIMIX_LINK_FAILURE;
764       SIMIX_comm_destroy_internal_actions(action);
765     }
766
767     /* If any of the process is suspend, create the action but stop its execution,
768        it will be restarted when the sender process resume */
769     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
770         SIMIX_process_is_suspended(action->comm.dst_proc)) {
771       /* FIXME: check what should happen with the action state */
772
773       if (SIMIX_process_is_suspended(action->comm.src_proc))
774         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
775                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
776       else
777         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
778                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
779
780       surf_action_suspend(action->comm.surf_comm);
781
782     }
783   }
784 }
785
786 /**
787  * \brief Answers the SIMIX simcalls associated to a communication action.
788  * \param action a finished communication action
789  */
790 void SIMIX_comm_finish(smx_action_t action)
791 {
792   unsigned int destroy_count = 0;
793   smx_simcall_t simcall;
794
795
796   while ((simcall = xbt_fifo_shift(action->simcalls))) {
797
798     /* If a waitany simcall is waiting for this action to finish, then remove
799        it from the other actions in the waitany list. Afterwards, get the
800        position of the actual action in the waitany dynar and
801        return it as the result of the simcall */
802     if (simcall->call == SIMCALL_COMM_WAITANY) {
803       SIMIX_waitany_remove_simcall_from_actions(simcall);
804       if (!MC_is_active())
805         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
806     }
807
808     /* If the action is still in a rendez-vous point then remove from it */
809     if (action->comm.rdv)
810       SIMIX_rdv_remove(action->comm.rdv, action);
811
812     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
813
814     /* Check out for errors */
815     switch (action->state) {
816
817     case SIMIX_DONE:
818       XBT_DEBUG("Communication %p complete!", action);
819       SIMIX_comm_copy_data(action);
820       break;
821
822     case SIMIX_SRC_TIMEOUT:
823       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
824                     "Communication timeouted because of sender");
825       break;
826
827     case SIMIX_DST_TIMEOUT:
828       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
829                     "Communication timeouted because of receiver");
830       break;
831
832     case SIMIX_SRC_HOST_FAILURE:
833       if (simcall->issuer == action->comm.src_proc)
834         simcall->issuer->context->iwannadie = 1;
835 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
836       else
837         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
838       break;
839
840     case SIMIX_DST_HOST_FAILURE:
841       if (simcall->issuer == action->comm.dst_proc)
842         simcall->issuer->context->iwannadie = 1;
843 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
844       else
845         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
846       break;
847
848     case SIMIX_LINK_FAILURE:
849       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
850                 action,
851                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
852                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
853                 simcall->issuer->name, simcall->issuer, action->comm.detached);
854       if (action->comm.src_proc == simcall->issuer) {
855         XBT_DEBUG("I'm source");
856       } else if (action->comm.dst_proc == simcall->issuer) {
857         XBT_DEBUG("I'm dest");
858       } else {
859         XBT_DEBUG("I'm neither source nor dest");
860       }
861       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
862       break;
863
864     case SIMIX_CANCELED:
865       if (simcall->issuer == action->comm.dst_proc)
866         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
867                       "Communication canceled by the sender");
868       else
869         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
870                       "Communication canceled by the receiver");
871       break;
872
873     default:
874       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
875     }
876
877     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
878     if (simcall->issuer->doexception) {
879       if (simcall->call == SIMCALL_COMM_WAITANY) {
880         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
881       }
882       else if (simcall->call == SIMCALL_COMM_TESTANY) {
883         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
884       }
885     }
886
887     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
888       simcall->issuer->context->iwannadie = 1;
889     }
890
891     simcall->issuer->waiting_action = NULL;
892     xbt_fifo_remove(simcall->issuer->comms, action);
893     if(action->comm.detached){
894       smx_process_t proc;
895       int still_alive = 0;
896
897       if(simcall->issuer == action->comm.src_proc){
898         if(action->comm.dst_proc){
899             xbt_swag_foreach(proc, simix_global->process_list)
900             {
901                if(proc==action->comm.dst_proc){
902                    still_alive=1;
903                    break;
904                }
905             }
906         }
907         if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
908       }
909       if(simcall->issuer == action->comm.dst_proc){
910         if(action->comm.src_proc)
911           if(action->comm.dst_proc){
912             xbt_swag_foreach(proc, simix_global->process_list)
913             {
914               if(proc==action->comm.src_proc){
915                   still_alive=1;
916                   break;
917               }
918             }
919           }
920           if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
921       }
922     }
923     SIMIX_simcall_answer(simcall);
924     destroy_count++;
925   }
926
927   while (destroy_count-- > 0)
928     SIMIX_comm_destroy(action);
929 }
930
931 /**
932  * \brief This function is called when a Surf communication action is finished.
933  * \param action the corresponding Simix communication
934  */
935 void SIMIX_post_comm(smx_action_t action)
936 {
937   /* Update action state */
938   if (action->comm.src_timeout &&
939       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
940     action->state = SIMIX_SRC_TIMEOUT;
941   else if (action->comm.dst_timeout &&
942           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
943     action->state = SIMIX_DST_TIMEOUT;
944   else if (action->comm.src_timeout &&
945           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
946     action->state = SIMIX_SRC_HOST_FAILURE;
947   else if (action->comm.dst_timeout &&
948       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
949     action->state = SIMIX_DST_HOST_FAILURE;
950   else if (action->comm.surf_comm &&
951           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
952     XBT_DEBUG("Puta madre. Surf says that the link broken");
953     action->state = SIMIX_LINK_FAILURE;
954   } else
955     action->state = SIMIX_DONE;
956
957   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
958             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
959
960   /* destroy the surf actions associated with the Simix communication */
961   SIMIX_comm_destroy_internal_actions(action);
962
963   /* remove the communication action from the list of pending communications
964    * of both processes (if they still exist) */
965   if (action->comm.src_proc) {
966     xbt_fifo_remove(action->comm.src_proc->comms, action);
967   }
968   if (action->comm.dst_proc) {
969     xbt_fifo_remove(action->comm.dst_proc->comms, action);
970   }
971
972   /* if there are simcalls associated with the action, then answer them */
973   if (xbt_fifo_size(action->simcalls)) {
974     SIMIX_comm_finish(action);
975   }
976 }
977
978 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
979   SIMIX_comm_cancel(action);
980 }
981 void SIMIX_comm_cancel(smx_action_t action)
982 {
983   /* if the action is a waiting state means that it is still in a rdv */
984   /* so remove from it and delete it */
985   if (action->state == SIMIX_WAITING) {
986     SIMIX_rdv_remove(action->comm.rdv, action);
987     action->state = SIMIX_CANCELED;
988   }
989   else if (!MC_is_active() /* when running the MC there are no surf actions */
990            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
991
992     surf_action_cancel(action->comm.surf_comm);
993   }
994 }
995
996 void SIMIX_comm_suspend(smx_action_t action)
997 {
998   /*FIXME: shall we suspend also the timeout actions? */
999   if (action->comm.surf_comm)
1000     surf_action_suspend(action->comm.surf_comm);
1001   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
1002 }
1003
1004 void SIMIX_comm_resume(smx_action_t action)
1005 {
1006   /*FIXME: check what happen with the timeouts */
1007   if (action->comm.surf_comm)
1008     surf_action_resume(action->comm.surf_comm);
1009   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1010 }
1011
1012
1013 /************* Action Getters **************/
1014
1015 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1016   return SIMIX_comm_get_remains(action);
1017 }
1018 /**
1019  *  \brief get the amount remaining from the communication
1020  *  \param action The communication
1021  */
1022 double SIMIX_comm_get_remains(smx_action_t action)
1023 {
1024   double remains;
1025
1026   if(!action){
1027     return 0;
1028   }
1029
1030   switch (action->state) {
1031
1032   case SIMIX_RUNNING:
1033     remains = surf_action_get_remains(action->comm.surf_comm);
1034     break;
1035
1036   case SIMIX_WAITING:
1037   case SIMIX_READY:
1038     remains = 0; /*FIXME: check what should be returned */
1039     break;
1040
1041   default:
1042     remains = 0; /*FIXME: is this correct? */
1043     break;
1044   }
1045   return remains;
1046 }
1047
1048 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1049   return SIMIX_comm_get_state(action);
1050 }
1051 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1052 {
1053   return action->state;
1054 }
1055
1056 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1057   return SIMIX_comm_get_src_data(action);
1058 }
1059 /**
1060  *  \brief Return the user data associated to the sender of the communication
1061  *  \param action The communication
1062  *  \return the user data
1063  */
1064 void* SIMIX_comm_get_src_data(smx_action_t action)
1065 {
1066   return action->comm.src_data;
1067 }
1068
1069 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1070   return SIMIX_comm_get_dst_data(action);
1071 }
1072 /**
1073  *  \brief Return the user data associated to the receiver of the communication
1074  *  \param action The communication
1075  *  \return the user data
1076  */
1077 void* SIMIX_comm_get_dst_data(smx_action_t action)
1078 {
1079   return action->comm.dst_data;
1080 }
1081
1082 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1083   return SIMIX_comm_get_src_proc(action);
1084 }
1085 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1086 {
1087   return action->comm.src_proc;
1088 }
1089
1090 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1091   return SIMIX_comm_get_dst_proc(action);
1092 }
1093 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1094 {
1095   return action->comm.dst_proc;
1096 }
1097
1098 #ifdef HAVE_LATENCY_BOUND_TRACKING
1099 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1100 {
1101   return SIMIX_comm_is_latency_bounded(action);
1102 }
1103
1104 /**
1105  *  \brief verify if communication is latency bounded
1106  *  \param comm The communication
1107  */
1108 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1109 {
1110   if(!action){
1111     return 0;
1112   }
1113   if (action->comm.surf_comm){
1114     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1115     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1116     XBT_DEBUG("Action limited is %d", action->latency_limited);
1117   }
1118   return action->latency_limited;
1119 }
1120 #endif
1121
1122 /******************************************************************************/
1123 /*                    SIMIX_comm_copy_data callbacks                       */
1124 /******************************************************************************/
1125 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1126   &SIMIX_comm_copy_pointer_callback;
1127
1128 void
1129 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1130 {
1131   SIMIX_comm_copy_data_callback = callback;
1132 }
1133
1134 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1135 {
1136   xbt_assert((buff_size == sizeof(void *)),
1137              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1138   *(void **) (comm->comm.dst_buff) = buff;
1139 }
1140
1141 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1142 {
1143   XBT_DEBUG("Copy the data over");
1144   memcpy(comm->comm.dst_buff, buff, buff_size);
1145   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1146     xbt_free(buff);
1147     comm->comm.src_buff = NULL;
1148   }
1149 }
1150
1151
1152 /**
1153  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1154  *  \param comm The communication
1155  */
1156 void SIMIX_comm_copy_data(smx_action_t comm)
1157 {
1158   size_t buff_size = comm->comm.src_buff_size;
1159   /* If there is no data to be copy then return */
1160   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1161     return;
1162
1163   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1164             comm,
1165             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1166             comm->comm.src_buff,
1167             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1168             comm->comm.dst_buff, buff_size);
1169
1170   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1171   if (comm->comm.dst_buff_size)
1172     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1173
1174   /* Update the receiver's buffer size to the copied amount */
1175   if (comm->comm.dst_buff_size)
1176     *comm->comm.dst_buff_size = buff_size;
1177
1178   if (buff_size > 0)
1179     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1180
1181   /* Set the copied flag so we copy data only once */
1182   /* (this function might be called from both communication ends) */
1183   comm->comm.copied = 1;
1184 }