Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[mc] Fix cleanup of info->types
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34 }
35
36 void SIMIX_network_exit(void)
37 {
38   xbt_dict_free(&rdv_points);
39 }
40
41 /******************************************************************************/
42 /*                           Rendez-Vous Points                               */
43 /******************************************************************************/
44
45 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
46   return SIMIX_rdv_create(name);
47 }
48 smx_rdv_t SIMIX_rdv_create(const char *name)
49 {
50   /* two processes may have pushed the same rdv_create simcall at the same time */
51   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
52
53   if (!rdv) {
54     rdv = xbt_new0(s_smx_rvpoint_t, 1);
55     rdv->name = name ? xbt_strdup(name) : NULL;
56     rdv->comm_fifo = xbt_fifo_new();
57     rdv->done_comm_fifo = xbt_fifo_new();
58     rdv->permanent_receiver=NULL;
59
60     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
61
62     if (rdv->name)
63       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
64   }
65   return rdv;
66 }
67
68 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
69   return SIMIX_rdv_destroy(rdv);
70 }
71 void SIMIX_rdv_destroy(smx_rdv_t rdv)
72 {
73   if (rdv->name)
74     xbt_dict_remove(rdv_points, rdv->name);
75 }
76
77 void SIMIX_rdv_free(void *data)
78 {
79   XBT_DEBUG("rdv free %p", data);
80   smx_rdv_t rdv = (smx_rdv_t) data;
81   xbt_free(rdv->name);
82   xbt_fifo_free(rdv->comm_fifo);
83   xbt_fifo_free(rdv->done_comm_fifo);
84
85   xbt_free(rdv);  
86 }
87
88 xbt_dict_t SIMIX_get_rdv_points()
89 {
90   return rdv_points;
91 }
92
93 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
94   return SIMIX_rdv_get_by_name(name);
95 }
96 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
97 {
98   return xbt_dict_get_or_null(rdv_points, name);
99 }
100
101 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
102   return SIMIX_rdv_comm_count_by_host(rdv, host);
103 }
104 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
105 {
106   smx_action_t comm = NULL;
107   xbt_fifo_item_t item = NULL;
108   int count = 0;
109
110   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
111     if (comm->comm.src_proc->smx_host == host)
112       count++;
113   }
114
115   return count;
116 }
117
118 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
119   return SIMIX_rdv_get_head(rdv);
120 }
121 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
122 {
123   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
124 }
125
126 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
127   return SIMIX_rdv_get_receiver(rdv);
128 }
129 /**
130  *  \brief get the receiver (process associated to the mailbox)
131  *  \param rdv The rendez-vous point
132  *  \return process The receiving process (NULL if not set)
133  */
134 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
135 {
136   return rdv->permanent_receiver;
137 }
138
139 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
140                             smx_process_t process){
141   SIMIX_rdv_set_receiver(rdv, process);
142 }
143 /**
144  *  \brief set the receiver of the rendez vous point to allow eager sends
145  *  \param rdv The rendez-vous point
146  *  \param process The receiving process
147  */
148 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
149 {
150   rdv->permanent_receiver=process;
151 }
152
153 /**
154  *  \brief Pushes a communication action into a rendez-vous point
155  *  \param rdv The rendez-vous point
156  *  \param comm The communication action
157  */
158 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
159 {
160   xbt_fifo_push(rdv->comm_fifo, comm);
161   comm->comm.rdv = rdv;
162 }
163
164 /**
165  *  \brief Removes a communication action from a rendez-vous point
166  *  \param rdv The rendez-vous point
167  *  \param comm The communication action
168  */
169 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
170 {
171   xbt_fifo_remove(rdv->comm_fifo, comm);
172   comm->comm.rdv = NULL;
173 }
174
175 /**
176  *  \brief Checks if there is a communication action queued in a fifo matching our needs
177  *  \param type The type of communication we are looking for (comm_send, comm_recv)
178  *  \return The communication action if found, NULL otherwise
179  */
180 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
181                                  int (*match_fun)(void *, void *,smx_action_t),
182                                  void *this_user_data, smx_action_t my_action)
183 {
184   smx_action_t action;
185   xbt_fifo_item_t item;
186   void* other_user_data = NULL;
187
188   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
189     if (action->comm.type == SIMIX_COMM_SEND) {
190       other_user_data = action->comm.src_data;
191     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
192       other_user_data = action->comm.dst_data;
193     }
194     if (action->comm.type == type &&
195         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
196         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
197       XBT_DEBUG("Found a matching communication action %p", action);
198       xbt_fifo_remove_item(fifo, item);
199       xbt_fifo_free_item(item);
200       action->comm.refcount++;
201 #ifdef HAVE_MC
202       action->comm.rdv_cpy = action->comm.rdv;
203 #endif
204       action->comm.rdv = NULL;
205       return action;
206     }
207     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
208               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
209               action, (int)action->comm.type, (int)type);
210   }
211   XBT_DEBUG("No matching communication action found");
212   return NULL;
213 }
214
215
216 /**
217  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
218  *  \param type The type of communication we are looking for (comm_send, comm_recv)
219  *  \return The communication action if found, NULL otherwise
220  */
221 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
222                                  int (*match_fun)(void *, void *,smx_action_t),
223                                  void *this_user_data, smx_action_t my_action)
224 {
225   smx_action_t action;
226   xbt_fifo_item_t item;
227   void* other_user_data = NULL;
228
229   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
230     if (action->comm.type == SIMIX_COMM_SEND) {
231       other_user_data = action->comm.src_data;
232     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
233       other_user_data = action->comm.dst_data;
234     }
235     if (action->comm.type == type &&
236         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
237         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
238       XBT_DEBUG("Found a matching communication action %p", action);
239       action->comm.refcount++;
240
241       return action;
242     }
243     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
244               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
245               action, (int)action->comm.type, (int)type);
246   }
247   XBT_DEBUG("No matching communication action found");
248   return NULL;
249 }
250 /******************************************************************************/
251 /*                            Communication Actions                            */
252 /******************************************************************************/
253
254 /**
255  *  \brief Creates a new communicate action
256  *  \param type The direction of communication (comm_send, comm_recv)
257  *  \return The new communicate action
258  */
259 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
260 {
261   smx_action_t act;
262
263   /* alloc structures */
264   act = xbt_mallocator_get(simix_global->action_mallocator);
265
266   act->type = SIMIX_ACTION_COMMUNICATE;
267   act->state = SIMIX_WAITING;
268
269   /* set communication */
270   act->comm.type = type;
271   act->comm.refcount = 1;
272   act->comm.src_data=NULL;
273   act->comm.dst_data=NULL;
274
275
276 #ifdef HAVE_LATENCY_BOUND_TRACKING
277   //initialize with unknown value
278   act->latency_limited = -1;
279 #endif
280
281 #ifdef HAVE_TRACING
282   act->category = NULL;
283 #endif
284
285   XBT_DEBUG("Create communicate action %p", act);
286   ++smx_total_comms;
287
288   return act;
289 }
290
291 /**
292  *  \brief Destroy a communicate action
293  *  \param action The communicate action to be destroyed
294  */
295 void SIMIX_comm_destroy(smx_action_t action)
296 {
297   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
298             action, action->comm.refcount, (int)action->state);
299
300   if (action->comm.refcount <= 0) {
301     xbt_backtrace_display_current();
302     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
303             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
304   }
305   action->comm.refcount--;
306   if (action->comm.refcount > 0)
307       return;
308   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
309             action->comm.refcount);
310
311 #ifdef HAVE_LATENCY_BOUND_TRACKING
312   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
313 #endif
314
315   xbt_free(action->name);
316   SIMIX_comm_destroy_internal_actions(action);
317
318   if (action->comm.detached && action->state != SIMIX_DONE) {
319     /* the communication has failed and was detached:
320      * we have to free the buffer */
321     if (action->comm.clean_fun) {
322       action->comm.clean_fun(action->comm.src_buff);
323     }
324     action->comm.src_buff = NULL;
325   }
326
327   if(action->comm.rdv)
328     SIMIX_rdv_remove(action->comm.rdv, action);
329
330   xbt_mallocator_release(simix_global->action_mallocator, action);
331 }
332
333 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
334 {
335   if (action->comm.surf_comm){
336 #ifdef HAVE_LATENCY_BOUND_TRACKING
337     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
338 #endif
339     surf_action_unref(action->comm.surf_comm);
340     action->comm.surf_comm = NULL;
341   }
342
343   if (action->comm.src_timeout){
344     surf_action_unref(action->comm.src_timeout);
345     action->comm.src_timeout = NULL;
346   }
347
348   if (action->comm.dst_timeout){
349     surf_action_unref(action->comm.dst_timeout);
350     action->comm.dst_timeout = NULL;
351   }
352 }
353
354 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
355                                   double task_size, double rate,
356                                   void *src_buff, size_t src_buff_size,
357                                   int (*match_fun)(void *, void *,smx_action_t),
358                                   void *data, double timeout){
359   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
360                                        src_buff, src_buff_size, match_fun, NULL,
361                                        data, 0);
362   simcall->mc_value = 0;
363   SIMIX_pre_comm_wait(simcall, comm, timeout);
364 }
365 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
366                                   double task_size, double rate,
367                                   void *src_buff, size_t src_buff_size,
368                                   int (*match_fun)(void *, void *,smx_action_t),
369                                   void (*clean_fun)(void *), 
370                                   void *data, int detached){
371   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
372                           src_buff_size, match_fun, clean_fun, data, detached);
373
374 }
375 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
376                               double task_size, double rate,
377                               void *src_buff, size_t src_buff_size,
378                               int (*match_fun)(void *, void *,smx_action_t),
379                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
380                               void *data,
381                               int detached)
382 {
383   XBT_DEBUG("send from %p\n", rdv);
384
385   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
386   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
387
388   /* Look for communication action matching our needs. We also provide a description of
389    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
390    *
391    * If it is not found then push our communication into the rendez-vous point */
392   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
393
394   if (!other_action) {
395     other_action = this_action;
396
397     if (rdv->permanent_receiver!=NULL){
398       //this mailbox is for small messages, which have to be sent right now
399       other_action->state = SIMIX_READY;
400       other_action->comm.dst_proc=rdv->permanent_receiver;
401       other_action->comm.refcount++;
402       xbt_fifo_push(rdv->done_comm_fifo,other_action);
403       other_action->comm.rdv=rdv;
404       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
405
406     }else{
407       SIMIX_rdv_push(rdv, this_action);
408     }
409   } else {
410     XBT_DEBUG("Receive already pushed\n");
411
412     SIMIX_comm_destroy(this_action);
413     --smx_total_comms; // this creation was a pure waste
414
415     other_action->state = SIMIX_READY;
416     other_action->comm.type = SIMIX_COMM_READY;
417
418   }
419   xbt_fifo_push(src_proc->comms, other_action);
420
421   /* if the communication action is detached then decrease the refcount
422    * by one, so it will be eliminated by the receiver's destroy call */
423   if (detached) {
424     other_action->comm.detached = 1;
425     other_action->comm.refcount--;
426     other_action->comm.clean_fun = clean_fun;
427   } else {
428     other_action->comm.clean_fun = NULL;
429   }
430
431   /* Setup the communication action */
432   other_action->comm.src_proc = src_proc;
433   other_action->comm.task_size = task_size;
434   other_action->comm.rate = rate;
435   other_action->comm.src_buff = src_buff;
436   other_action->comm.src_buff_size = src_buff_size;
437   other_action->comm.src_data = data;
438
439   other_action->comm.match_fun = match_fun;
440
441   if (MC_is_active()) {
442     other_action->state = SIMIX_RUNNING;
443     return (detached ? NULL : other_action);
444   }
445
446   SIMIX_comm_start(other_action);
447   return (detached ? NULL : other_action);
448 }
449
450 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
451                          void *dst_buff, size_t *dst_buff_size,
452                          int (*match_fun)(void *, void *, smx_action_t),
453                          void *data, double timeout, double rate)
454 {
455   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
456                                        dst_buff_size, match_fun, data, rate);
457   simcall->mc_value = 0;
458   SIMIX_pre_comm_wait(simcall, comm, timeout);
459 }
460
461 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
462                                   void *dst_buff, size_t *dst_buff_size,
463                                   int (*match_fun)(void *, void *, smx_action_t),
464                                   void *data, double rate)
465 {
466   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
467                           match_fun, data, rate);
468 }
469
470 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
471                               void *dst_buff, size_t *dst_buff_size,
472                               int (*match_fun)(void *, void *, smx_action_t),
473                               void *data, double rate)
474 {
475   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
476   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
477
478   smx_action_t other_action;
479   //communication already done, get it inside the fifo of completed comms
480   //permanent receive v1
481   //int already_received=0;
482   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
483
484     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
485     //find a match in the already received fifo
486     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
487     //if not found, assume the receiver came first, register it to the mailbox in the classical way
488     if (!other_action)  {
489       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
490       other_action = this_action;
491       SIMIX_rdv_push(rdv, this_action);
492     }else{
493       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
494       {
495         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
496         other_action->state = SIMIX_DONE;
497         other_action->comm.type = SIMIX_COMM_DONE;
498         other_action->comm.rdv = NULL;
499         //SIMIX_comm_destroy(this_action);
500         //--smx_total_comms; // this creation was a pure waste
501         //already_received=1;
502         //other_action->comm.refcount--;
503       }/*else{
504          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
505          }*/
506       other_action->comm.refcount--;
507       SIMIX_comm_destroy(this_action);
508       --smx_total_comms; // this creation was a pure waste
509     }
510   }else{
511     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
512
513     /* Look for communication action matching our needs. We also provide a description of
514      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
515      *
516      * If it is not found then push our communication into the rendez-vous point */
517     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
518
519     if (!other_action) {
520       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
521       other_action = this_action;
522       SIMIX_rdv_push(rdv, this_action);
523     } else {
524       SIMIX_comm_destroy(this_action);
525       --smx_total_comms; // this creation was a pure waste
526       other_action->state = SIMIX_READY;
527       other_action->comm.type = SIMIX_COMM_READY;
528       //other_action->comm.refcount--;
529     }
530     xbt_fifo_push(dst_proc->comms, other_action);
531   }
532
533   /* Setup communication action */
534   other_action->comm.dst_proc = dst_proc;
535   other_action->comm.dst_buff = dst_buff;
536   other_action->comm.dst_buff_size = dst_buff_size;
537   other_action->comm.dst_data = data;
538
539   if (rate != -1.0 &&
540       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
541     other_action->comm.rate = rate;
542
543   other_action->comm.match_fun = match_fun;
544
545
546   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
547     SIMIX_comm_copy_data(other_action);*/
548
549
550   if (MC_is_active()) {
551     other_action->state = SIMIX_RUNNING;
552     return other_action;
553   }
554
555   SIMIX_comm_start(other_action);
556   // }
557   return other_action;
558 }
559
560 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
561                                    int src, int tag,
562                                    int (*match_fun)(void *, void *, smx_action_t),
563                                    void *data){
564   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
565 }
566
567 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
568                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
569 {
570   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
571   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
572
573   smx_action_t other_action=NULL;
574   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
575     //find a match in the already received fifo
576       XBT_DEBUG("first try in the perm recv mailbox \n");
577
578     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
579   }
580  // }else{
581     if(!other_action){
582         XBT_DEBUG("second try in the other mailbox");
583         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
584     }
585 //  }
586   if(other_action)other_action->comm.refcount--;
587
588   SIMIX_comm_destroy(this_action);
589   --smx_total_comms;
590   return other_action;
591 }
592
593 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
594 {
595   /* the simcall may be a wait, a send or a recv */
596   surf_action_t sleep;
597
598   /* Associate this simcall to the wait action */
599   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
600
601   xbt_fifo_push(action->simcalls, simcall);
602   simcall->issuer->waiting_action = action;
603
604   if (MC_is_active()) {
605     int idx = simcall->mc_value;
606     if (idx == 0) {
607       action->state = SIMIX_DONE;
608     } else {
609       /* If we reached this point, the wait simcall must have a timeout */
610       /* Otherwise it shouldn't be enabled and executed by the MC */
611       if (timeout == -1)
612         THROW_IMPOSSIBLE;
613
614       if (action->comm.src_proc == simcall->issuer)
615         action->state = SIMIX_SRC_TIMEOUT;
616       else
617         action->state = SIMIX_DST_TIMEOUT;
618     }
619
620     SIMIX_comm_finish(action);
621     return;
622   }
623
624   /* If the action has already finish perform the error handling, */
625   /* otherwise set up a waiting timeout on the right side         */
626   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
627     SIMIX_comm_finish(action);
628   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
629     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
630     surf_action_set_data(sleep, action);
631
632     if (simcall->issuer == action->comm.src_proc)
633       action->comm.src_timeout = sleep;
634     else
635       action->comm.dst_timeout = sleep;
636   }
637 }
638
639 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
640 {
641   if(MC_is_active()){
642     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
643     if(simcall_comm_test__get__result(simcall)){
644       action->state = SIMIX_DONE;
645       xbt_fifo_push(action->simcalls, simcall);
646       SIMIX_comm_finish(action);
647     }else{
648       SIMIX_simcall_answer(simcall);
649     }
650     return;
651   }
652
653   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
654   if (simcall_comm_test__get__result(simcall)) {
655     xbt_fifo_push(action->simcalls, simcall);
656     SIMIX_comm_finish(action);
657   } else {
658     SIMIX_simcall_answer(simcall);
659   }
660 }
661
662 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
663 {
664   unsigned int cursor;
665   smx_action_t action;
666   simcall_comm_testany__set__result(simcall, -1);
667
668   if (MC_is_active()){
669     int idx = simcall->mc_value;
670     if(idx == -1){
671       SIMIX_simcall_answer(simcall);
672     }else{
673       action = xbt_dynar_get_as(actions, idx, smx_action_t);
674       simcall_comm_testany__set__result(simcall, idx);
675       xbt_fifo_push(action->simcalls, simcall);
676       action->state = SIMIX_DONE;
677       SIMIX_comm_finish(action);
678     }
679     return;
680   }
681
682   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
683     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
684       simcall_comm_testany__set__result(simcall, cursor);
685       xbt_fifo_push(action->simcalls, simcall);
686       SIMIX_comm_finish(action);
687       return;
688     }
689   }
690   SIMIX_simcall_answer(simcall);
691 }
692
693 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
694 {
695   smx_action_t action;
696   unsigned int cursor = 0;
697
698   if (MC_is_active()){
699     int idx = simcall->mc_value;
700     action = xbt_dynar_get_as(actions, idx, smx_action_t);
701     xbt_fifo_push(action->simcalls, simcall);
702     simcall_comm_waitany__set__result(simcall, idx);
703     action->state = SIMIX_DONE;
704     SIMIX_comm_finish(action);
705     return;
706   }
707
708   xbt_dynar_foreach(actions, cursor, action){
709     /* associate this simcall to the the action */
710     xbt_fifo_push(action->simcalls, simcall);
711
712     /* see if the action is already finished */
713     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
714       SIMIX_comm_finish(action);
715       break;
716     }
717   }
718 }
719
720 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
721 {
722   smx_action_t action;
723   unsigned int cursor = 0;
724   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
725
726   xbt_dynar_foreach(actions, cursor, action) {
727     xbt_fifo_remove(action->simcalls, simcall);
728   }
729 }
730
731 /**
732  *  \brief Starts the simulation of a communication action.
733  *  \param action the communication action
734  */
735 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
736 {
737   /* If both the sender and the receiver are already there, start the communication */
738   if (action->state == SIMIX_READY) {
739
740     smx_host_t sender = action->comm.src_proc->smx_host;
741     smx_host_t receiver = action->comm.dst_proc->smx_host;
742
743     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
744               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
745
746     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
747                                                                     sender, receiver,
748                                                                     action->comm.task_size, action->comm.rate);
749
750     surf_action_set_data(action->comm.surf_comm, action);
751
752     action->state = SIMIX_RUNNING;
753
754     /* If a link is failed, detect it immediately */
755     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
756       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
757                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
758       action->state = SIMIX_LINK_FAILURE;
759       SIMIX_comm_destroy_internal_actions(action);
760     }
761
762     /* If any of the process is suspend, create the action but stop its execution,
763        it will be restarted when the sender process resume */
764     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
765         SIMIX_process_is_suspended(action->comm.dst_proc)) {
766       /* FIXME: check what should happen with the action state */
767
768       if (SIMIX_process_is_suspended(action->comm.src_proc))
769         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
770                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
771       else
772         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
773                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
774
775       surf_action_suspend(action->comm.surf_comm);
776
777     }
778   }
779 }
780
781 /**
782  * \brief Answers the SIMIX simcalls associated to a communication action.
783  * \param action a finished communication action
784  */
785 void SIMIX_comm_finish(smx_action_t action)
786 {
787   unsigned int destroy_count = 0;
788   smx_simcall_t simcall;
789
790
791   while ((simcall = xbt_fifo_shift(action->simcalls))) {
792
793     /* If a waitany simcall is waiting for this action to finish, then remove
794        it from the other actions in the waitany list. Afterwards, get the
795        position of the actual action in the waitany dynar and
796        return it as the result of the simcall */
797     if (simcall->call == SIMCALL_COMM_WAITANY) {
798       SIMIX_waitany_remove_simcall_from_actions(simcall);
799       if (!MC_is_active())
800         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
801     }
802
803     /* If the action is still in a rendez-vous point then remove from it */
804     if (action->comm.rdv)
805       SIMIX_rdv_remove(action->comm.rdv, action);
806
807     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
808
809     /* Check out for errors */
810     switch (action->state) {
811
812     case SIMIX_DONE:
813       XBT_DEBUG("Communication %p complete!", action);
814       SIMIX_comm_copy_data(action);
815       break;
816
817     case SIMIX_SRC_TIMEOUT:
818       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
819                     "Communication timeouted because of sender");
820       break;
821
822     case SIMIX_DST_TIMEOUT:
823       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
824                     "Communication timeouted because of receiver");
825       break;
826
827     case SIMIX_SRC_HOST_FAILURE:
828       if (simcall->issuer == action->comm.src_proc)
829         simcall->issuer->context->iwannadie = 1;
830 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
831       else
832         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
833       break;
834
835     case SIMIX_DST_HOST_FAILURE:
836       if (simcall->issuer == action->comm.dst_proc)
837         simcall->issuer->context->iwannadie = 1;
838 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
839       else
840         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
841       break;
842
843     case SIMIX_LINK_FAILURE:
844       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
845                 action,
846                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
847                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
848                 simcall->issuer->name, simcall->issuer, action->comm.detached);
849       if (action->comm.src_proc == simcall->issuer) {
850         XBT_DEBUG("I'm source");
851       } else if (action->comm.dst_proc == simcall->issuer) {
852         XBT_DEBUG("I'm dest");
853       } else {
854         XBT_DEBUG("I'm neither source nor dest");
855       }
856       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
857       break;
858
859     case SIMIX_CANCELED:
860       if (simcall->issuer == action->comm.dst_proc)
861         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
862                       "Communication canceled by the sender");
863       else
864         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
865                       "Communication canceled by the receiver");
866       break;
867
868     default:
869       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
870     }
871
872     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
873     if (simcall->issuer->doexception) {
874       if (simcall->call == SIMCALL_COMM_WAITANY) {
875         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
876       }
877       else if (simcall->call == SIMCALL_COMM_TESTANY) {
878         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
879       }
880     }
881
882     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
883       simcall->issuer->context->iwannadie = 1;
884     }
885
886     simcall->issuer->waiting_action = NULL;
887     xbt_fifo_remove(simcall->issuer->comms, action);
888     if(action->comm.detached){
889       smx_process_t proc;
890       int still_alive = 0;
891
892       if(simcall->issuer == action->comm.src_proc){
893         if(action->comm.dst_proc){
894             xbt_swag_foreach(proc, simix_global->process_list)
895             {
896                if(proc==action->comm.dst_proc){
897                    still_alive=1;
898                    break;
899                }
900             }
901         }
902         if(still_alive) xbt_fifo_remove(action->comm.dst_proc->comms, action);
903       }
904       if(simcall->issuer == action->comm.dst_proc){
905         if(action->comm.src_proc)
906           if(action->comm.dst_proc){
907             xbt_swag_foreach(proc, simix_global->process_list)
908             {
909               if(proc==action->comm.src_proc){
910                   still_alive=1;
911                   break;
912               }
913             }
914           }
915           if(still_alive) xbt_fifo_remove(action->comm.src_proc->comms, action);
916       }
917     }
918     SIMIX_simcall_answer(simcall);
919     destroy_count++;
920   }
921
922   while (destroy_count-- > 0)
923     SIMIX_comm_destroy(action);
924 }
925
926 /**
927  * \brief This function is called when a Surf communication action is finished.
928  * \param action the corresponding Simix communication
929  */
930 void SIMIX_post_comm(smx_action_t action)
931 {
932   /* Update action state */
933   if (action->comm.src_timeout &&
934       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
935     action->state = SIMIX_SRC_TIMEOUT;
936   else if (action->comm.dst_timeout &&
937           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
938     action->state = SIMIX_DST_TIMEOUT;
939   else if (action->comm.src_timeout &&
940           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
941     action->state = SIMIX_SRC_HOST_FAILURE;
942   else if (action->comm.dst_timeout &&
943       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
944     action->state = SIMIX_DST_HOST_FAILURE;
945   else if (action->comm.surf_comm &&
946           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
947     XBT_DEBUG("Puta madre. Surf says that the link broke");
948     action->state = SIMIX_LINK_FAILURE;
949   } else
950     action->state = SIMIX_DONE;
951
952   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
953             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
954
955   /* destroy the surf actions associated with the Simix communication */
956   SIMIX_comm_destroy_internal_actions(action);
957
958   /* remove the communication action from the list of pending communications
959    * of both processes (if they still exist) */
960   if (action->comm.src_proc) {
961     xbt_fifo_remove(action->comm.src_proc->comms, action);
962   }
963   if (action->comm.dst_proc) {
964     xbt_fifo_remove(action->comm.dst_proc->comms, action);
965   }
966
967   /* if there are simcalls associated with the action, then answer them */
968   if (xbt_fifo_size(action->simcalls)) {
969     SIMIX_comm_finish(action);
970   }
971 }
972
973 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
974   SIMIX_comm_cancel(action);
975 }
976 void SIMIX_comm_cancel(smx_action_t action)
977 {
978   /* if the action is a waiting state means that it is still in a rdv */
979   /* so remove from it and delete it */
980   if (action->state == SIMIX_WAITING) {
981     SIMIX_rdv_remove(action->comm.rdv, action);
982     action->state = SIMIX_CANCELED;
983   }
984   else if (!MC_is_active() /* when running the MC there are no surf actions */
985            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
986
987     surf_action_cancel(action->comm.surf_comm);
988   }
989 }
990
991 void SIMIX_comm_suspend(smx_action_t action)
992 {
993   /*FIXME: shall we suspend also the timeout actions? */
994   if (action->comm.surf_comm)
995     surf_action_suspend(action->comm.surf_comm);
996   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
997 }
998
999 void SIMIX_comm_resume(smx_action_t action)
1000 {
1001   /*FIXME: check what happen with the timeouts */
1002   if (action->comm.surf_comm)
1003     surf_action_resume(action->comm.surf_comm);
1004   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
1005 }
1006
1007
1008 /************* Action Getters **************/
1009
1010 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
1011   return SIMIX_comm_get_remains(action);
1012 }
1013 /**
1014  *  \brief get the amount remaining from the communication
1015  *  \param action The communication
1016  */
1017 double SIMIX_comm_get_remains(smx_action_t action)
1018 {
1019   double remains;
1020
1021   if(!action){
1022     return 0;
1023   }
1024
1025   switch (action->state) {
1026
1027   case SIMIX_RUNNING:
1028     remains = surf_action_get_remains(action->comm.surf_comm);
1029     break;
1030
1031   case SIMIX_WAITING:
1032   case SIMIX_READY:
1033     remains = 0; /*FIXME: check what should be returned */
1034     break;
1035
1036   default:
1037     remains = 0; /*FIXME: is this correct? */
1038     break;
1039   }
1040   return remains;
1041 }
1042
1043 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1044   return SIMIX_comm_get_state(action);
1045 }
1046 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1047 {
1048   return action->state;
1049 }
1050
1051 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1052   return SIMIX_comm_get_src_data(action);
1053 }
1054 /**
1055  *  \brief Return the user data associated to the sender of the communication
1056  *  \param action The communication
1057  *  \return the user data
1058  */
1059 void* SIMIX_comm_get_src_data(smx_action_t action)
1060 {
1061   return action->comm.src_data;
1062 }
1063
1064 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1065   return SIMIX_comm_get_dst_data(action);
1066 }
1067 /**
1068  *  \brief Return the user data associated to the receiver of the communication
1069  *  \param action The communication
1070  *  \return the user data
1071  */
1072 void* SIMIX_comm_get_dst_data(smx_action_t action)
1073 {
1074   return action->comm.dst_data;
1075 }
1076
1077 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1078   return SIMIX_comm_get_src_proc(action);
1079 }
1080 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1081 {
1082   return action->comm.src_proc;
1083 }
1084
1085 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1086   return SIMIX_comm_get_dst_proc(action);
1087 }
1088 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1089 {
1090   return action->comm.dst_proc;
1091 }
1092
1093 #ifdef HAVE_LATENCY_BOUND_TRACKING
1094 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1095 {
1096   return SIMIX_comm_is_latency_bounded(action);
1097 }
1098
1099 /**
1100  *  \brief verify if communication is latency bounded
1101  *  \param comm The communication
1102  */
1103 int SIMIX_comm_is_latency_bounded(smx_action_t action)
1104 {
1105   if(!action){
1106     return 0;
1107   }
1108   if (action->comm.surf_comm){
1109     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1110     action->latency_limited = surf_network_action_get_latency_limited(action->comm.surf_comm);
1111     XBT_DEBUG("Action limited is %d", action->latency_limited);
1112   }
1113   return action->latency_limited;
1114 }
1115 #endif
1116
1117 /******************************************************************************/
1118 /*                    SIMIX_comm_copy_data callbacks                       */
1119 /******************************************************************************/
1120 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1121   &SIMIX_comm_copy_pointer_callback;
1122
1123 void
1124 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1125 {
1126   SIMIX_comm_copy_data_callback = callback;
1127 }
1128
1129 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1130 {
1131   xbt_assert((buff_size == sizeof(void *)),
1132              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1133   *(void **) (comm->comm.dst_buff) = buff;
1134 }
1135
1136 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1137 {
1138   XBT_DEBUG("Copy the data over");
1139   memcpy(comm->comm.dst_buff, buff, buff_size);
1140   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1141     xbt_free(buff);
1142     comm->comm.src_buff = NULL;
1143   }
1144 }
1145
1146
1147 /**
1148  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1149  *  \param comm The communication
1150  */
1151 void SIMIX_comm_copy_data(smx_action_t comm)
1152 {
1153   size_t buff_size = comm->comm.src_buff_size;
1154   /* If there is no data to be copy then return */
1155   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1156     return;
1157
1158   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1159             comm,
1160             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1161             comm->comm.src_buff,
1162             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1163             comm->comm.dst_buff, buff_size);
1164
1165   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1166   if (comm->comm.dst_buff_size)
1167     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1168
1169   /* Update the receiver's buffer size to the copied amount */
1170   if (comm->comm.dst_buff_size)
1171     *comm->comm.dst_buff_size = buff_size;
1172
1173   if (buff_size > 0)
1174     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1175
1176   /* Set the copied flag so we copy data only once */
1177   /* (this function might be called from both communication ends) */
1178   comm->comm.copied = 1;
1179 }