Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dd3660a8513a8fba0a4ba43f7a79a7a819e6c11a
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29 static void SIMIX_comm_start(smx_action_t action);
30
31 void SIMIX_network_init(void)
32 {
33   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
34   if(MC_is_active())
35     MC_ignore_global_variable("smx_total_comms");
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
48   return SIMIX_rdv_create(name);
49 }
50 smx_rdv_t SIMIX_rdv_create(const char *name)
51 {
52   /* two processes may have pushed the same rdv_create simcall at the same time */
53   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
54
55   if (!rdv) {
56     rdv = xbt_new0(s_smx_rvpoint_t, 1);
57     rdv->name = name ? xbt_strdup(name) : NULL;
58     rdv->comm_fifo = xbt_fifo_new();
59     rdv->done_comm_fifo = xbt_fifo_new();
60     rdv->permanent_receiver=NULL;
61
62     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
63
64     if (rdv->name)
65       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
66   }
67   return rdv;
68 }
69
70 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
71   return SIMIX_rdv_destroy(rdv);
72 }
73 void SIMIX_rdv_destroy(smx_rdv_t rdv)
74 {
75   if (rdv->name)
76     xbt_dict_remove(rdv_points, rdv->name);
77 }
78
79 void SIMIX_rdv_free(void *data)
80 {
81   XBT_DEBUG("rdv free %p", data);
82   smx_rdv_t rdv = (smx_rdv_t) data;
83   xbt_free(rdv->name);
84   xbt_fifo_free(rdv->comm_fifo);
85   xbt_fifo_free(rdv->done_comm_fifo);
86
87   xbt_free(rdv);  
88 }
89
90 xbt_dict_t SIMIX_get_rdv_points()
91 {
92   return rdv_points;
93 }
94
95 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
96   return SIMIX_rdv_get_by_name(name);
97 }
98 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
99 {
100   return xbt_dict_get_or_null(rdv_points, name);
101 }
102
103 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
104   return SIMIX_rdv_comm_count_by_host(rdv, host);
105 }
106 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
107 {
108   smx_action_t comm = NULL;
109   xbt_fifo_item_t item = NULL;
110   int count = 0;
111
112   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
113     if (comm->comm.src_proc->smx_host == host)
114       count++;
115   }
116
117   return count;
118 }
119
120 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
121   return SIMIX_rdv_get_head(rdv);
122 }
123 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
124 {
125   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
126 }
127
128 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
129   return SIMIX_rdv_get_receiver(rdv);
130 }
131 /**
132  *  \brief get the receiver (process associated to the mailbox)
133  *  \param rdv The rendez-vous point
134  *  \return process The receiving process (NULL if not set)
135  */
136 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
137 {
138   return rdv->permanent_receiver;
139 }
140
141 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
142                             smx_process_t process){
143   SIMIX_rdv_set_receiver(rdv, process);
144 }
145 /**
146  *  \brief set the receiver of the rendez vous point to allow eager sends
147  *  \param rdv The rendez-vous point
148  *  \param process The receiving process
149  */
150 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
151 {
152   rdv->permanent_receiver=process;
153 }
154
155 /**
156  *  \brief Pushes a communication action into a rendez-vous point
157  *  \param rdv The rendez-vous point
158  *  \param comm The communication action
159  */
160 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
161 {
162   xbt_fifo_push(rdv->comm_fifo, comm);
163   comm->comm.rdv = rdv;
164 }
165
166 /**
167  *  \brief Removes a communication action from a rendez-vous point
168  *  \param rdv The rendez-vous point
169  *  \param comm The communication action
170  */
171 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
172 {
173   xbt_fifo_remove(rdv->comm_fifo, comm);
174   comm->comm.rdv = NULL;
175 }
176
177 /**
178  *  \brief Checks if there is a communication action queued in a fifo matching our needs
179  *  \param type The type of communication we are looking for (comm_send, comm_recv)
180  *  \return The communication action if found, NULL otherwise
181  */
182 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
183                                  int (*match_fun)(void *, void *,smx_action_t),
184                                  void *this_user_data, smx_action_t my_action)
185 {
186   smx_action_t action;
187   xbt_fifo_item_t item;
188   void* other_user_data = NULL;
189
190   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_SEND) {
192       other_user_data = action->comm.src_data;
193     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
194       other_user_data = action->comm.dst_data;
195     }
196     if (action->comm.type == type &&
197         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
198         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
199       XBT_DEBUG("Found a matching communication action %p", action);
200       xbt_fifo_remove_item(fifo, item);
201       xbt_fifo_free_item(item);
202       action->comm.refcount++;
203 #ifdef HAVE_MC
204       action->comm.rdv_cpy = action->comm.rdv;
205 #endif
206       action->comm.rdv = NULL;
207       return action;
208     }
209     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
210               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
211               action, (int)action->comm.type, (int)type);
212   }
213   XBT_DEBUG("No matching communication action found");
214   return NULL;
215 }
216
217
218 /**
219  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
220  *  \param type The type of communication we are looking for (comm_send, comm_recv)
221  *  \return The communication action if found, NULL otherwise
222  */
223 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
224                                  int (*match_fun)(void *, void *,smx_action_t),
225                                  void *this_user_data, smx_action_t my_action)
226 {
227   smx_action_t action;
228   xbt_fifo_item_t item;
229   void* other_user_data = NULL;
230
231   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
232     if (action->comm.type == SIMIX_COMM_SEND) {
233       other_user_data = action->comm.src_data;
234     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
235       other_user_data = action->comm.dst_data;
236     }
237     if (action->comm.type == type &&
238         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
239         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
240       XBT_DEBUG("Found a matching communication action %p", action);
241       action->comm.refcount++;
242
243       return action;
244     }
245     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
246               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
247               action, (int)action->comm.type, (int)type);
248   }
249   XBT_DEBUG("No matching communication action found");
250   return NULL;
251 }
252 /******************************************************************************/
253 /*                            Communication Actions                            */
254 /******************************************************************************/
255
256 /**
257  *  \brief Creates a new communicate action
258  *  \param type The direction of communication (comm_send, comm_recv)
259  *  \return The new communicate action
260  */
261 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
262 {
263   smx_action_t act;
264
265   /* alloc structures */
266   act = xbt_mallocator_get(simix_global->action_mallocator);
267
268   act->type = SIMIX_ACTION_COMMUNICATE;
269   act->state = SIMIX_WAITING;
270
271   /* set communication */
272   act->comm.type = type;
273   act->comm.refcount = 1;
274   act->comm.src_data=NULL;
275   act->comm.dst_data=NULL;
276
277
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279   //initialize with unknown value
280   act->latency_limited = -1;
281 #endif
282
283 #ifdef HAVE_TRACING
284   act->category = NULL;
285 #endif
286
287   XBT_DEBUG("Create communicate action %p", act);
288   ++smx_total_comms;
289
290   return act;
291 }
292
293 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
294   SIMIX_comm_destroy(action);
295 }
296 /**
297  *  \brief Destroy a communicate action
298  *  \param action The communicate action to be destroyed
299  */
300 void SIMIX_comm_destroy(smx_action_t action)
301 {
302   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
303             action, action->comm.refcount, (int)action->state);
304
305   if (action->comm.refcount <= 0) {
306     xbt_backtrace_display_current();
307     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
308             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
309   }
310   action->comm.refcount--;
311   if (action->comm.refcount > 0)
312       return;
313   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
314             action->comm.refcount);
315
316 #ifdef HAVE_LATENCY_BOUND_TRACKING
317   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
318 #endif
319
320   xbt_free(action->name);
321   SIMIX_comm_destroy_internal_actions(action);
322
323   if (action->comm.detached && action->state != SIMIX_DONE) {
324     /* the communication has failed and was detached:
325      * we have to free the buffer */
326     if (action->comm.clean_fun) {
327       action->comm.clean_fun(action->comm.src_buff);
328     }
329     action->comm.src_buff = NULL;
330   }
331
332   if(action->comm.rdv)
333     SIMIX_rdv_remove(action->comm.rdv, action);
334
335   xbt_mallocator_release(simix_global->action_mallocator, action);
336 }
337
338 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
339 {
340   if (action->comm.surf_comm){
341 #ifdef HAVE_LATENCY_BOUND_TRACKING
342     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
343 #endif
344     surf_action_unref(action->comm.surf_comm);
345     action->comm.surf_comm = NULL;
346   }
347
348   if (action->comm.src_timeout){
349     surf_action_unref(action->comm.src_timeout);
350     action->comm.src_timeout = NULL;
351   }
352
353   if (action->comm.dst_timeout){
354     surf_action_unref(action->comm.dst_timeout);
355     action->comm.dst_timeout = NULL;
356   }
357 }
358
359 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
360                                   double task_size, double rate,
361                                   void *src_buff, size_t src_buff_size,
362                                   int (*match_fun)(void *, void *,smx_action_t),
363                                   void *data, double timeout){
364   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
365                                        src_buff, src_buff_size, match_fun, NULL,
366                                        data, 0);
367   simcall->mc_value = 0;
368   SIMIX_pre_comm_wait(simcall, comm, timeout);
369 }
370 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
371                                   double task_size, double rate,
372                                   void *src_buff, size_t src_buff_size,
373                                   int (*match_fun)(void *, void *,smx_action_t),
374                                   void (*clean_fun)(void *), 
375                                   void *data, int detached){
376   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
377                           src_buff_size, match_fun, clean_fun, data, detached);
378
379 }
380 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
381                               double task_size, double rate,
382                               void *src_buff, size_t src_buff_size,
383                               int (*match_fun)(void *, void *,smx_action_t),
384                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
385                               void *data,
386                               int detached)
387 {
388   XBT_DEBUG("send from %p\n", rdv);
389
390   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
391   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
392
393   /* Look for communication action matching our needs. We also provide a description of
394    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
395    *
396    * If it is not found then push our communication into the rendez-vous point */
397   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
398
399   if (!other_action) {
400     other_action = this_action;
401
402     if (rdv->permanent_receiver!=NULL){
403       //this mailbox is for small messages, which have to be sent right now
404       other_action->state = SIMIX_READY;
405       other_action->comm.dst_proc=rdv->permanent_receiver;
406       other_action->comm.refcount++;
407       xbt_fifo_push(rdv->done_comm_fifo,other_action);
408       other_action->comm.rdv=rdv;
409       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
410
411     }else{
412       SIMIX_rdv_push(rdv, this_action);
413     }
414   } else {
415     XBT_DEBUG("Receive already pushed\n");
416
417     SIMIX_comm_destroy(this_action);
418     --smx_total_comms; // this creation was a pure waste
419
420     other_action->state = SIMIX_READY;
421     other_action->comm.type = SIMIX_COMM_READY;
422
423   }
424   xbt_fifo_push(src_proc->comms, other_action);
425
426   /* if the communication action is detached then decrease the refcount
427    * by one, so it will be eliminated by the receiver's destroy call */
428   if (detached) {
429     other_action->comm.detached = 1;
430     other_action->comm.refcount--;
431     other_action->comm.clean_fun = clean_fun;
432   } else {
433     other_action->comm.clean_fun = NULL;
434   }
435
436   /* Setup the communication action */
437   other_action->comm.src_proc = src_proc;
438   other_action->comm.task_size = task_size;
439   other_action->comm.rate = rate;
440   other_action->comm.src_buff = src_buff;
441   other_action->comm.src_buff_size = src_buff_size;
442   other_action->comm.src_data = data;
443
444   other_action->comm.match_fun = match_fun;
445
446   if (MC_is_active()) {
447     other_action->state = SIMIX_RUNNING;
448     return other_action;
449   }
450
451   SIMIX_comm_start(other_action);
452   return (detached ? NULL : other_action);
453 }
454
455 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
456                          void *dst_buff, size_t *dst_buff_size,
457                          int (*match_fun)(void *, void *, smx_action_t),
458                          void *data, double timeout, double rate)
459 {
460   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
461                                        dst_buff_size, match_fun, data, rate);
462   simcall->mc_value = 0;
463   SIMIX_pre_comm_wait(simcall, comm, timeout);
464 }
465
466 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
467                                   void *dst_buff, size_t *dst_buff_size,
468                                   int (*match_fun)(void *, void *, smx_action_t),
469                                   void *data, double rate)
470 {
471   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
472                           match_fun, data, rate);
473 }
474
475 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
476                               void *dst_buff, size_t *dst_buff_size,
477                               int (*match_fun)(void *, void *, smx_action_t),
478                               void *data, double rate)
479 {
480   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
481   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
482
483   smx_action_t other_action;
484   //communication already done, get it inside the fifo of completed comms
485   //permanent receive v1
486   //int already_received=0;
487   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
488
489     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
490     //find a match in the already received fifo
491     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
492     //if not found, assume the receiver came first, register it to the mailbox in the classical way
493     if (!other_action)  {
494       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
495       other_action = this_action;
496       SIMIX_rdv_push(rdv, this_action);
497     }else{
498       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
499       {
500         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
501         other_action->state = SIMIX_DONE;
502         other_action->comm.type = SIMIX_COMM_DONE;
503         other_action->comm.rdv = NULL;
504         //SIMIX_comm_destroy(this_action);
505         //--smx_total_comms; // this creation was a pure waste
506         //already_received=1;
507         //other_action->comm.refcount--;
508       }/*else{
509          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
510          }*/
511       other_action->comm.refcount--;
512       SIMIX_comm_destroy(this_action);
513       --smx_total_comms; // this creation was a pure waste
514     }
515   }else{
516     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
517
518     /* Look for communication action matching our needs. We also provide a description of
519      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
520      *
521      * If it is not found then push our communication into the rendez-vous point */
522     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
523
524     if (!other_action) {
525       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
526       other_action = this_action;
527       SIMIX_rdv_push(rdv, this_action);
528     } else {
529       SIMIX_comm_destroy(this_action);
530       --smx_total_comms; // this creation was a pure waste
531       other_action->state = SIMIX_READY;
532       other_action->comm.type = SIMIX_COMM_READY;
533       //other_action->comm.refcount--;
534     }
535     xbt_fifo_push(dst_proc->comms, other_action);
536   }
537
538   /* Setup communication action */
539   other_action->comm.dst_proc = dst_proc;
540   other_action->comm.dst_buff = dst_buff;
541   other_action->comm.dst_buff_size = dst_buff_size;
542   other_action->comm.dst_data = data;
543
544   if (rate != -1.0 &&
545       (other_action->comm.rate == -1.0 || rate < other_action->comm.rate))
546     other_action->comm.rate = rate;
547
548   other_action->comm.match_fun = match_fun;
549
550
551   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
552     SIMIX_comm_copy_data(other_action);*/
553
554
555   if (MC_is_active()) {
556     other_action->state = SIMIX_RUNNING;
557     return other_action;
558   }
559
560   SIMIX_comm_start(other_action);
561   // }
562   return other_action;
563 }
564
565 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
566                                    int src, int tag,
567                                    int (*match_fun)(void *, void *, smx_action_t),
568                                    void *data){
569   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
570 }
571
572 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
573                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
574 {
575   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
576   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
577
578   smx_action_t other_action=NULL;
579   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
580     //find a match in the already received fifo
581       XBT_DEBUG("first try in the perm recv mailbox \n");
582
583     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
584   }
585  // }else{
586     if(!other_action){
587         XBT_DEBUG("second try in the other mailbox");
588         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
589     }
590 //  }
591   if(other_action)other_action->comm.refcount--;
592
593   SIMIX_comm_destroy(this_action);
594   --smx_total_comms;
595   return other_action;
596 }
597
598 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
599 {
600   /* the simcall may be a wait, a send or a recv */
601   surf_action_t sleep;
602
603   /* Associate this simcall to the wait action */
604   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
605
606   xbt_fifo_push(action->simcalls, simcall);
607   simcall->issuer->waiting_action = action;
608
609   if (MC_is_active()) {
610     int idx = simcall->mc_value;
611     if (idx == 0) {
612       action->state = SIMIX_DONE;
613     } else {
614       /* If we reached this point, the wait simcall must have a timeout */
615       /* Otherwise it shouldn't be enabled and executed by the MC */
616       if (timeout == -1)
617         THROW_IMPOSSIBLE;
618
619       if (action->comm.src_proc == simcall->issuer)
620         action->state = SIMIX_SRC_TIMEOUT;
621       else
622         action->state = SIMIX_DST_TIMEOUT;
623     }
624
625     SIMIX_comm_finish(action);
626     return;
627   }
628
629   /* If the action has already finish perform the error handling, */
630   /* otherwise set up a waiting timeout on the right side         */
631   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
632     SIMIX_comm_finish(action);
633   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
634     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
635     surf_action_set_data(sleep, action);
636
637     if (simcall->issuer == action->comm.src_proc)
638       action->comm.src_timeout = sleep;
639     else
640       action->comm.dst_timeout = sleep;
641   }
642 }
643
644 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
645 {
646   if(MC_is_active()){
647     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
648     if(simcall_comm_test__get__result(simcall)){
649       action->state = SIMIX_DONE;
650       xbt_fifo_push(action->simcalls, simcall);
651       SIMIX_comm_finish(action);
652     }else{
653       SIMIX_simcall_answer(simcall);
654     }
655     return;
656   }
657
658   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
659   if (simcall_comm_test__get__result(simcall)) {
660     xbt_fifo_push(action->simcalls, simcall);
661     SIMIX_comm_finish(action);
662   } else {
663     SIMIX_simcall_answer(simcall);
664   }
665 }
666
667 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
668 {
669   unsigned int cursor;
670   smx_action_t action;
671   simcall_comm_testany__set__result(simcall, -1);
672
673   if (MC_is_active()){
674     int idx = simcall->mc_value;
675     if(idx == -1){
676       SIMIX_simcall_answer(simcall);
677     }else{
678       action = xbt_dynar_get_as(actions, idx, smx_action_t);
679       simcall_comm_testany__set__result(simcall, idx);
680       xbt_fifo_push(action->simcalls, simcall);
681       action->state = SIMIX_DONE;
682       SIMIX_comm_finish(action);
683     }
684     return;
685   }
686
687   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
688     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
689       simcall_comm_testany__set__result(simcall, cursor);
690       xbt_fifo_push(action->simcalls, simcall);
691       SIMIX_comm_finish(action);
692       return;
693     }
694   }
695   SIMIX_simcall_answer(simcall);
696 }
697
698 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
699 {
700   smx_action_t action;
701   unsigned int cursor = 0;
702
703   if (MC_is_active()){
704     int idx = simcall->mc_value;
705     action = xbt_dynar_get_as(actions, idx, smx_action_t);
706     xbt_fifo_push(action->simcalls, simcall);
707     simcall_comm_waitany__set__result(simcall, idx);
708     action->state = SIMIX_DONE;
709     SIMIX_comm_finish(action);
710     return;
711   }
712
713   xbt_dynar_foreach(actions, cursor, action){
714     /* associate this simcall to the the action */
715     xbt_fifo_push(action->simcalls, simcall);
716
717     /* see if the action is already finished */
718     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
719       SIMIX_comm_finish(action);
720       break;
721     }
722   }
723 }
724
725 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
726 {
727   smx_action_t action;
728   unsigned int cursor = 0;
729   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
730
731   xbt_dynar_foreach(actions, cursor, action) {
732     xbt_fifo_remove(action->simcalls, simcall);
733   }
734 }
735
736 /**
737  *  \brief Starts the simulation of a communication action.
738  *  \param action the communication action
739  */
740 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
741 {
742   /* If both the sender and the receiver are already there, start the communication */
743   if (action->state == SIMIX_READY) {
744
745     smx_host_t sender = action->comm.src_proc->smx_host;
746     smx_host_t receiver = action->comm.dst_proc->smx_host;
747
748     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
749               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
750
751     action->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
752                                                                     sender, receiver,
753                                                                     action->comm.task_size, action->comm.rate);
754
755     surf_action_set_data(action->comm.surf_comm, action);
756
757     action->state = SIMIX_RUNNING;
758
759     /* If a link is failed, detect it immediately */
760     if (surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
761       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
762                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
763       action->state = SIMIX_LINK_FAILURE;
764       SIMIX_comm_destroy_internal_actions(action);
765     }
766
767     /* If any of the process is suspend, create the action but stop its execution,
768        it will be restarted when the sender process resume */
769     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
770         SIMIX_process_is_suspended(action->comm.dst_proc)) {
771       /* FIXME: check what should happen with the action state */
772
773       if (SIMIX_process_is_suspended(action->comm.src_proc))
774         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
775                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
776       else
777         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
778                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
779
780       surf_action_suspend(action->comm.surf_comm);
781
782     }
783   }
784 }
785
786 /**
787  * \brief Answers the SIMIX simcalls associated to a communication action.
788  * \param action a finished communication action
789  */
790 void SIMIX_comm_finish(smx_action_t action)
791 {
792   unsigned int destroy_count = 0;
793   smx_simcall_t simcall;
794
795   while ((simcall = xbt_fifo_shift(action->simcalls))) {
796
797     /* If a waitany simcall is waiting for this action to finish, then remove
798        it from the other actions in the waitany list. Afterwards, get the
799        position of the actual action in the waitany dynar and
800        return it as the result of the simcall */
801     if (simcall->call == SIMCALL_COMM_WAITANY) {
802       SIMIX_waitany_remove_simcall_from_actions(simcall);
803       if (!MC_is_active())
804         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
805     }
806
807     /* If the action is still in a rendez-vous point then remove from it */
808     if (action->comm.rdv)
809       SIMIX_rdv_remove(action->comm.rdv, action);
810
811     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
812
813     /* Check out for errors */
814     switch (action->state) {
815
816     case SIMIX_DONE:
817       XBT_DEBUG("Communication %p complete!", action);
818       SIMIX_comm_copy_data(action);
819       break;
820
821     case SIMIX_SRC_TIMEOUT:
822       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
823                     "Communication timeouted because of sender");
824       break;
825
826     case SIMIX_DST_TIMEOUT:
827       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
828                     "Communication timeouted because of receiver");
829       break;
830
831     case SIMIX_SRC_HOST_FAILURE:
832       if (simcall->issuer == action->comm.src_proc)
833         simcall->issuer->context->iwannadie = 1;
834 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
835       else
836         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
837       break;
838
839     case SIMIX_DST_HOST_FAILURE:
840       if (simcall->issuer == action->comm.dst_proc)
841         simcall->issuer->context->iwannadie = 1;
842 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
843       else
844         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
845       break;
846
847     case SIMIX_LINK_FAILURE:
848       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
849                 action,
850                 action->comm.src_proc ? sg_host_name(action->comm.src_proc->smx_host) : NULL,
851                 action->comm.dst_proc ? sg_host_name(action->comm.dst_proc->smx_host) : NULL,
852                 simcall->issuer->name, simcall->issuer, action->comm.detached);
853       if (action->comm.src_proc == simcall->issuer) {
854         XBT_DEBUG("I'm source");
855       } else if (action->comm.dst_proc == simcall->issuer) {
856         XBT_DEBUG("I'm dest");
857       } else {
858         XBT_DEBUG("I'm neither source nor dest");
859       }
860       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
861       break;
862
863     case SIMIX_CANCELED:
864       if (simcall->issuer == action->comm.dst_proc)
865         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
866                       "Communication canceled by the sender");
867       else
868         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
869                       "Communication canceled by the receiver");
870       break;
871
872     default:
873       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
874     }
875
876     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
877     if (simcall->issuer->doexception) {
878       if (simcall->call == SIMCALL_COMM_WAITANY) {
879         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
880       }
881       else if (simcall->call == SIMCALL_COMM_TESTANY) {
882         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
883       }
884     }
885
886     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
887       simcall->issuer->context->iwannadie = 1;
888     }
889
890     simcall->issuer->waiting_action = NULL;
891     xbt_fifo_remove(simcall->issuer->comms, action);
892     if(action->comm.detached){
893       if(simcall->issuer == action->comm.src_proc){
894         if(action->comm.dst_proc)
895           xbt_fifo_remove(action->comm.dst_proc->comms, action);
896       }
897       if(simcall->issuer == action->comm.dst_proc){
898         if(action->comm.src_proc)
899           xbt_fifo_remove(action->comm.src_proc->comms, action);
900       }
901     }
902     SIMIX_simcall_answer(simcall);
903     destroy_count++;
904   }
905
906   while (destroy_count-- > 0)
907     SIMIX_comm_destroy(action);
908 }
909
910 /**
911  * \brief This function is called when a Surf communication action is finished.
912  * \param action the corresponding Simix communication
913  */
914 void SIMIX_post_comm(smx_action_t action)
915 {
916   /* Update action state */
917   if (action->comm.src_timeout &&
918       surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_DONE)
919     action->state = SIMIX_SRC_TIMEOUT;
920   else if (action->comm.dst_timeout &&
921           surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_DONE)
922     action->state = SIMIX_DST_TIMEOUT;
923   else if (action->comm.src_timeout &&
924           surf_action_get_state(action->comm.src_timeout) == SURF_ACTION_FAILED)
925     action->state = SIMIX_SRC_HOST_FAILURE;
926   else if (action->comm.dst_timeout &&
927       surf_action_get_state(action->comm.dst_timeout) == SURF_ACTION_FAILED)
928     action->state = SIMIX_DST_HOST_FAILURE;
929   else if (action->comm.surf_comm &&
930           surf_action_get_state(action->comm.surf_comm) == SURF_ACTION_FAILED) {
931     XBT_DEBUG("Puta madre. Surf says that the link broke");
932     action->state = SIMIX_LINK_FAILURE;
933   } else
934     action->state = SIMIX_DONE;
935
936   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
937             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
938
939   /* destroy the surf actions associated with the Simix communication */
940   SIMIX_comm_destroy_internal_actions(action);
941
942   /* remove the communication action from the list of pending communications
943    * of both processes (if they still exist) */
944   if (action->comm.src_proc) {
945     xbt_fifo_remove(action->comm.src_proc->comms, action);
946   }
947   if (action->comm.dst_proc) {
948     xbt_fifo_remove(action->comm.dst_proc->comms, action);
949   }
950
951   /* if there are simcalls associated with the action, then answer them */
952   if (xbt_fifo_size(action->simcalls)) {
953     SIMIX_comm_finish(action);
954   }
955 }
956
957 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
958   SIMIX_comm_cancel(action);
959 }
960 void SIMIX_comm_cancel(smx_action_t action)
961 {
962   /* if the action is a waiting state means that it is still in a rdv */
963   /* so remove from it and delete it */
964   if (action->state == SIMIX_WAITING) {
965     SIMIX_rdv_remove(action->comm.rdv, action);
966     action->state = SIMIX_CANCELED;
967   }
968   else if (!MC_is_active() /* when running the MC there are no surf actions */
969            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
970
971     surf_action_cancel(action->comm.surf_comm);
972   }
973 }
974
975 void SIMIX_comm_suspend(smx_action_t action)
976 {
977   /*FIXME: shall we suspend also the timeout actions? */
978   if (action->comm.surf_comm)
979     surf_action_suspend(action->comm.surf_comm);
980   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
981 }
982
983 void SIMIX_comm_resume(smx_action_t action)
984 {
985   /*FIXME: check what happen with the timeouts */
986   if (action->comm.surf_comm)
987     surf_action_resume(action->comm.surf_comm);
988   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
989 }
990
991
992 /************* Action Getters **************/
993
994 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
995   return SIMIX_comm_get_remains(action);
996 }
997 /**
998  *  \brief get the amount remaining from the communication
999  *  \param action The communication
1000  */
1001 double SIMIX_comm_get_remains(smx_action_t action)
1002 {
1003   double remains;
1004
1005   if(!action){
1006     return 0;
1007   }
1008
1009   switch (action->state) {
1010
1011   case SIMIX_RUNNING:
1012     remains = surf_action_get_remains(action->comm.surf_comm);
1013     break;
1014
1015   case SIMIX_WAITING:
1016   case SIMIX_READY:
1017     remains = 0; /*FIXME: check what should be returned */
1018     break;
1019
1020   default:
1021     remains = 0; /*FIXME: is this correct? */
1022     break;
1023   }
1024   return remains;
1025 }
1026
1027 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1028   return SIMIX_comm_get_state(action);
1029 }
1030 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1031 {
1032   return action->state;
1033 }
1034
1035 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1036   return SIMIX_comm_get_src_data(action);
1037 }
1038 /**
1039  *  \brief Return the user data associated to the sender of the communication
1040  *  \param action The communication
1041  *  \return the user data
1042  */
1043 void* SIMIX_comm_get_src_data(smx_action_t action)
1044 {
1045   return action->comm.src_data;
1046 }
1047
1048 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1049   return SIMIX_comm_get_dst_data(action);
1050 }
1051 /**
1052  *  \brief Return the user data associated to the receiver of the communication
1053  *  \param action The communication
1054  *  \return the user data
1055  */
1056 void* SIMIX_comm_get_dst_data(smx_action_t action)
1057 {
1058   return action->comm.dst_data;
1059 }
1060
1061 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1062   return SIMIX_comm_get_src_proc(action);
1063 }
1064 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1065 {
1066   return action->comm.src_proc;
1067 }
1068
1069 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1070   return SIMIX_comm_get_dst_proc(action);
1071 }
1072 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1073 {
1074   return action->comm.dst_proc;
1075 }
1076
1077 #ifdef HAVE_LATENCY_BOUND_TRACKING
1078 int SIMIX_pre_comm_is_latency_bounded(smx_simcall_t simcall, smx_action_t action)
1079 {
1080   return SIMIX_comm_is_latency_bounded(action);
1081 }
1082
1083 /**
1084  *  \brief verify if communication is latency bounded
1085  *  \param comm The communication
1086  */
1087 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1088 {
1089   if(!action){
1090     return 0;
1091   }
1092   if (action->comm.surf_comm){
1093     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1094     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1095     XBT_DEBUG("Action limited is %d", action->latency_limited);
1096   }
1097   return action->latency_limited;
1098 }
1099 #endif
1100
1101 /******************************************************************************/
1102 /*                    SIMIX_comm_copy_data callbacks                       */
1103 /******************************************************************************/
1104 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1105   &SIMIX_comm_copy_pointer_callback;
1106
1107 void
1108 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1109 {
1110   SIMIX_comm_copy_data_callback = callback;
1111 }
1112
1113 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1114 {
1115   xbt_assert((buff_size == sizeof(void *)),
1116              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1117   *(void **) (comm->comm.dst_buff) = buff;
1118 }
1119
1120 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1121 {
1122   XBT_DEBUG("Copy the data over");
1123   memcpy(comm->comm.dst_buff, buff, buff_size);
1124   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1125     xbt_free(buff);
1126     comm->comm.src_buff = NULL;
1127   }
1128 }
1129
1130
1131 /**
1132  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1133  *  \param comm The communication
1134  */
1135 void SIMIX_comm_copy_data(smx_action_t comm)
1136 {
1137   size_t buff_size = comm->comm.src_buff_size;
1138   /* If there is no data to be copy then return */
1139   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1140     return;
1141
1142   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1143             comm,
1144             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1145             comm->comm.src_buff,
1146             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1147             comm->comm.dst_buff, buff_size);
1148
1149   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1150   if (comm->comm.dst_buff_size)
1151     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1152
1153   /* Update the receiver's buffer size to the copied amount */
1154   if (comm->comm.dst_buff_size)
1155     *comm->comm.dst_buff_size = buff_size;
1156
1157   if (buff_size > 0)
1158     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1159
1160   /* Set the copied flag so we copy data only once */
1161   /* (this function might be called from both communication ends) */
1162   comm->comm.copied = 1;
1163 }