Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d6ad23c19c4c2b5c8363a21f6ab97037c7697fcb
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_IMPORT_NO_EXPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore_data_bss(&smx_total_comms, sizeof(smx_total_comms));
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
47   return SIMIX_rdv_create(name);
48 }
49 smx_rdv_t SIMIX_rdv_create(const char *name)
50 {
51   /* two processes may have pushed the same rdv_create simcall at the same time */
52   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
53
54   if (!rdv) {
55     rdv = xbt_new0(s_smx_rvpoint_t, 1);
56     rdv->name = name ? xbt_strdup(name) : NULL;
57     rdv->comm_fifo = xbt_fifo_new();
58     rdv->done_comm_fifo = xbt_fifo_new();
59     rdv->permanent_receiver=NULL;
60
61     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
62
63     if (rdv->name)
64       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
65   }
66   return rdv;
67 }
68
69 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
70   return SIMIX_rdv_destroy(rdv);
71 }
72 void SIMIX_rdv_destroy(smx_rdv_t rdv)
73 {
74   if (rdv->name)
75     xbt_dict_remove(rdv_points, rdv->name);
76 }
77
78 void SIMIX_rdv_free(void *data)
79 {
80   XBT_DEBUG("rdv free %p", data);
81   smx_rdv_t rdv = (smx_rdv_t) data;
82   xbt_free(rdv->name);
83   xbt_fifo_free(rdv->comm_fifo);
84   xbt_fifo_free(rdv->done_comm_fifo);
85
86   xbt_free(rdv);  
87 }
88
89 xbt_dict_t SIMIX_get_rdv_points()
90 {
91   return rdv_points;
92 }
93
94 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
95   return SIMIX_rdv_get_by_name(name);
96 }
97 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 {
99   return xbt_dict_get_or_null(rdv_points, name);
100 }
101
102 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
103   return SIMIX_rdv_comm_count_by_host(rdv, host);
104 }
105 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 {
107   smx_action_t comm = NULL;
108   xbt_fifo_item_t item = NULL;
109   int count = 0;
110
111   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
112     if (comm->comm.src_proc->smx_host == host)
113       count++;
114   }
115
116   return count;
117 }
118
119 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
120   return SIMIX_rdv_get_head(rdv);
121 }
122 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 {
124   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 }
126
127 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
128   return SIMIX_rdv_get_receiver(rdv);
129 }
130 /**
131  *  \brief get the receiver (process associated to the mailbox)
132  *  \param rdv The rendez-vous point
133  *  \return process The receiving process (NULL if not set)
134  */
135 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 {
137   return rdv->permanent_receiver;
138 }
139
140 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
141                             smx_process_t process){
142   SIMIX_rdv_set_receiver(rdv, process);
143 }
144 /**
145  *  \brief set the receiver of the rendez vous point to allow eager sends
146  *  \param rdv The rendez-vous point
147  *  \param process The receiving process
148  */
149 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 {
151   rdv->permanent_receiver=process;
152 }
153
154 /**
155  *  \brief Pushes a communication action into a rendez-vous point
156  *  \param rdv The rendez-vous point
157  *  \param comm The communication action
158  */
159 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 {
161   xbt_fifo_push(rdv->comm_fifo, comm);
162   comm->comm.rdv = rdv;
163 }
164
165 /**
166  *  \brief Removes a communication action from a rendez-vous point
167  *  \param rdv The rendez-vous point
168  *  \param comm The communication action
169  */
170 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 {
172   xbt_fifo_remove(rdv->comm_fifo, comm);
173   comm->comm.rdv = NULL;
174 }
175
176 /**
177  *  \brief Checks if there is a communication action queued in a fifo matching our needs
178  *  \param type The type of communication we are looking for (comm_send, comm_recv)
179  *  \return The communication action if found, NULL otherwise
180  */
181 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
182                                  int (*match_fun)(void *, void *,smx_action_t),
183                                  void *this_user_data, smx_action_t my_action)
184 {
185   smx_action_t action;
186   xbt_fifo_item_t item;
187   void* other_user_data = NULL;
188
189   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
190     if (action->comm.type == SIMIX_COMM_SEND) {
191       other_user_data = action->comm.src_data;
192     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
193       other_user_data = action->comm.dst_data;
194     }
195     if (action->comm.type == type &&
196         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
197         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
198       XBT_DEBUG("Found a matching communication action %p", action);
199       xbt_fifo_remove_item(fifo, item);
200       xbt_fifo_free_item(item);
201       action->comm.refcount++;
202       action->comm.rdv = NULL;
203       return action;
204     }
205     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
206               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
207               action, (int)action->comm.type, (int)type);
208   }
209   XBT_DEBUG("No matching communication action found");
210   return NULL;
211 }
212
213
214 /**
215  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
216  *  \param type The type of communication we are looking for (comm_send, comm_recv)
217  *  \return The communication action if found, NULL otherwise
218  */
219 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
220                                  int (*match_fun)(void *, void *,smx_action_t),
221                                  void *this_user_data, smx_action_t my_action)
222 {
223   smx_action_t action;
224   xbt_fifo_item_t item;
225   void* other_user_data = NULL;
226
227   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
228     if (action->comm.type == SIMIX_COMM_SEND) {
229       other_user_data = action->comm.src_data;
230     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
231       other_user_data = action->comm.dst_data;
232     }
233     if (action->comm.type == type &&
234         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
235         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
236       XBT_DEBUG("Found a matching communication action %p", action);
237       action->comm.refcount++;
238
239       return action;
240     }
241     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
242               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
243               action, (int)action->comm.type, (int)type);
244   }
245   XBT_DEBUG("No matching communication action found");
246   return NULL;
247 }
248 /******************************************************************************/
249 /*                            Communication Actions                            */
250 /******************************************************************************/
251
252 /**
253  *  \brief Creates a new communicate action
254  *  \param type The direction of communication (comm_send, comm_recv)
255  *  \return The new communicate action
256  */
257 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
258 {
259   smx_action_t act;
260
261   /* alloc structures */
262   act = xbt_mallocator_get(simix_global->action_mallocator);
263
264   act->type = SIMIX_ACTION_COMMUNICATE;
265   act->state = SIMIX_WAITING;
266
267   /* set communication */
268   act->comm.type = type;
269   act->comm.refcount = 1;
270
271 #ifdef HAVE_LATENCY_BOUND_TRACKING
272   //initialize with unknown value
273   act->latency_limited = -1;
274 #endif
275
276 #ifdef HAVE_TRACING
277   act->category = NULL;
278 #endif
279
280   XBT_DEBUG("Create communicate action %p", act);
281   ++smx_total_comms;
282
283   return act;
284 }
285
286 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
287   SIMIX_comm_destroy(action);
288 }
289 /**
290  *  \brief Destroy a communicate action
291  *  \param action The communicate action to be destroyed
292  */
293 void SIMIX_comm_destroy(smx_action_t action)
294 {
295   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
296             action, action->comm.refcount, (int)action->state);
297
298   if (action->comm.refcount <= 0) {
299     xbt_backtrace_display_current();
300     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
301             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
302   }
303   action->comm.refcount--;
304   if (action->comm.refcount > 0)
305     return;
306   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
307             action->comm.refcount);
308
309 #ifdef HAVE_LATENCY_BOUND_TRACKING
310   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
311 #endif
312
313   xbt_free(action->name);
314   SIMIX_comm_destroy_internal_actions(action);
315
316   if (action->comm.detached && action->state != SIMIX_DONE) {
317     /* the communication has failed and was detached:
318      * we have to free the buffer */
319     if (action->comm.clean_fun) {
320       action->comm.clean_fun(action->comm.src_buff);
321     }
322     action->comm.src_buff = NULL;
323   }
324
325   xbt_mallocator_release(simix_global->action_mallocator, action);
326 }
327
328 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
329 {
330   if (action->comm.surf_comm){
331 #ifdef HAVE_LATENCY_BOUND_TRACKING
332     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
333 #endif
334     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
335     action->comm.surf_comm = NULL;
336   }
337
338   if (action->comm.src_timeout){
339     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
340     action->comm.src_timeout = NULL;
341   }
342
343   if (action->comm.dst_timeout){
344     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
345     action->comm.dst_timeout = NULL;
346   }
347 }
348
349 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
350                                   double task_size, double rate,
351                                   void *src_buff, size_t src_buff_size,
352                                   int (*match_fun)(void *, void *,smx_action_t),
353                                   void *data, double timeout){
354   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
355                                        src_buff, src_buff_size, match_fun, NULL,
356                                        data, 0);
357   simcall->mc_value = 0;
358   SIMIX_pre_comm_wait(simcall, comm, timeout);
359 }
360 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
361                                   double task_size, double rate,
362                                   void *src_buff, size_t src_buff_size,
363                                   int (*match_fun)(void *, void *,smx_action_t),
364                                   void (*clean_fun)(void *), 
365                                   void *data, int detached){
366   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
367                           src_buff_size, match_fun, clean_fun, data, detached);
368
369 }
370 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
371                               double task_size, double rate,
372                               void *src_buff, size_t src_buff_size,
373                               int (*match_fun)(void *, void *,smx_action_t),
374                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
375                               void *data,
376                               int detached)
377 {
378   XBT_DEBUG("send from %p\n", rdv);
379
380   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
381   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
382
383   /* Look for communication action matching our needs. We also provide a description of
384    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
385    *
386    * If it is not found then push our communication into the rendez-vous point */
387   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
388
389   if (!other_action) {
390     other_action = this_action;
391
392     if (rdv->permanent_receiver!=NULL){
393       //this mailbox is for small messages, which have to be sent right now
394       other_action->state = SIMIX_READY;
395       other_action->comm.dst_proc=rdv->permanent_receiver;
396       other_action->comm.refcount++;
397       other_action->comm.rdv = rdv;
398       xbt_fifo_push(rdv->done_comm_fifo,other_action);
399       other_action->comm.rdv=rdv;
400       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
401
402     }else{
403       SIMIX_rdv_push(rdv, this_action);
404     }
405   } else {
406     XBT_DEBUG("Receive already pushed\n");
407
408     SIMIX_comm_destroy(this_action);
409     --smx_total_comms; // this creation was a pure waste
410
411     other_action->state = SIMIX_READY;
412     other_action->comm.type = SIMIX_COMM_READY;
413
414   }
415   xbt_fifo_push(src_proc->comms, other_action);
416
417   /* if the communication action is detached then decrease the refcount
418    * by one, so it will be eliminated by the receiver's destroy call */
419   if (detached) {
420     other_action->comm.detached = 1;
421     other_action->comm.refcount--;
422     other_action->comm.clean_fun = clean_fun;
423   } else {
424     other_action->comm.clean_fun = NULL;
425   }
426
427   /* Setup the communication action */
428   other_action->comm.src_proc = src_proc;
429   other_action->comm.task_size = task_size;
430   other_action->comm.rate = rate;
431   other_action->comm.src_buff = src_buff;
432   other_action->comm.src_buff_size = src_buff_size;
433   other_action->comm.src_data = data;
434
435   other_action->comm.match_fun = match_fun;
436
437   if (MC_is_active()) {
438     other_action->state = SIMIX_RUNNING;
439     return other_action;
440   }
441
442   SIMIX_comm_start(other_action);
443   return (detached ? NULL : other_action);
444 }
445
446 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
447                                   void *dst_buff, size_t *dst_buff_size,
448                                   int (*match_fun)(void *, void *, smx_action_t),
449                                   void *data, double timeout){
450   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
451                                        dst_buff_size, match_fun, data);
452   simcall->mc_value = 0;
453   SIMIX_pre_comm_wait(simcall, comm, timeout);
454 }
455 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
456                                   void *dst_buff, size_t *dst_buff_size,
457                                   int (*match_fun)(void *, void *, smx_action_t),
458                                   void *data){
459   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
460                           match_fun, data);
461 }
462 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
463                               void *dst_buff, size_t *dst_buff_size,
464                               int (*match_fun)(void *, void *, smx_action_t), void *data)
465 {
466   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
467   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
468
469   smx_action_t other_action;
470   //communication already done, get it inside the fifo of completed comms
471   //permanent receive v1
472   //int already_received=0;
473   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
474
475     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
476     //find a match in the already received fifo
477     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
478     //if not found, assume the receiver came first, register it to the mailbox in the classical way
479     if (!other_action)  {
480       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
481       other_action = this_action;
482       SIMIX_rdv_push(rdv, this_action);
483     }else{
484       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
485       {
486         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
487         other_action->state = SIMIX_DONE;
488         other_action->comm.type = SIMIX_COMM_DONE;
489         other_action->comm.rdv = NULL;
490         //SIMIX_comm_destroy(this_action);
491         //--smx_total_comms; // this creation was a pure waste
492         //already_received=1;
493         //other_action->comm.refcount--;
494       }/*else{
495          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
496          }*/
497      // other_action->comm.refcount--;
498       SIMIX_comm_destroy(this_action);
499       --smx_total_comms; // this creation was a pure waste
500     }
501   }else{
502     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
503
504     /* Look for communication action matching our needs. We also provide a description of
505      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
506      *
507      * If it is not found then push our communication into the rendez-vous point */
508     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
509
510     if (!other_action) {
511       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
512       other_action = this_action;
513       SIMIX_rdv_push(rdv, this_action);
514     } else {
515       SIMIX_comm_destroy(this_action);
516       --smx_total_comms; // this creation was a pure waste
517       other_action->state = SIMIX_READY;
518       other_action->comm.type = SIMIX_COMM_READY;
519    //   other_action->comm.refcount--;
520     }
521     xbt_fifo_push(dst_proc->comms, other_action);
522   }
523
524   /* Setup communication action */
525   other_action->comm.dst_proc = dst_proc;
526   other_action->comm.dst_buff = dst_buff;
527   other_action->comm.dst_buff_size = dst_buff_size;
528   other_action->comm.dst_data = data;
529
530   other_action->comm.match_fun = match_fun;
531
532
533   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
534     SIMIX_comm_copy_data(other_action);*/
535
536
537   if (MC_is_active()) {
538     other_action->state = SIMIX_RUNNING;
539     return other_action;
540   }
541
542   SIMIX_comm_start(other_action);
543   // }
544   return other_action;
545 }
546
547 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
548                                    int src, int tag,
549                                    int (*match_fun)(void *, void *, smx_action_t),
550                                    void *data){
551   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
552 }
553
554 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
555                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
556 {
557   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
558   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
559
560   smx_action_t other_action=NULL;
561   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
562     //find a match in the already received fifo
563       XBT_DEBUG("first try in the perm recv mailbox \n");
564
565     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
566   }
567  // }else{
568     if(!other_action){
569         XBT_DEBUG("second try in the other mailbox");
570         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
571     }
572 //  }
573   if(other_action)other_action->comm.refcount--;
574
575   SIMIX_comm_destroy(this_action);
576   --smx_total_comms;
577   return other_action;
578 }
579
580 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
581 {
582   int idx = simcall->mc_value;
583   /* the simcall may be a wait, a send or a recv */
584   surf_action_t sleep;
585
586   /* Associate this simcall to the wait action */
587   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
588
589   xbt_fifo_push(action->simcalls, simcall);
590   simcall->issuer->waiting_action = action;
591
592   if (MC_is_active()) {
593     if (idx == 0) {
594       action->state = SIMIX_DONE;
595     } else {
596       /* If we reached this point, the wait simcall must have a timeout */
597       /* Otherwise it shouldn't be enabled and executed by the MC */
598       if (timeout == -1)
599         THROW_IMPOSSIBLE;
600
601       if (action->comm.src_proc == simcall->issuer)
602         action->state = SIMIX_SRC_TIMEOUT;
603       else
604         action->state = SIMIX_DST_TIMEOUT;
605     }
606
607     SIMIX_comm_finish(action);
608     return;
609   }
610
611   /* If the action has already finish perform the error handling, */
612   /* otherwise set up a waiting timeout on the right side         */
613   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
614     SIMIX_comm_finish(action);
615   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
616     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
617     surf_workstation_model->action_data_set(sleep, action);
618
619     if (simcall->issuer == action->comm.src_proc)
620       action->comm.src_timeout = sleep;
621     else
622       action->comm.dst_timeout = sleep;
623   }
624 }
625
626 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
627 {
628   if(MC_is_active()){
629     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
630     if(simcall->comm_test.result){
631       action->state = SIMIX_DONE;
632       xbt_fifo_push(action->simcalls, simcall);
633       SIMIX_comm_finish(action);
634     }else{
635       SIMIX_simcall_answer(simcall);
636     }
637     return;
638   }
639
640   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
641   if (simcall->comm_test.result) {
642     xbt_fifo_push(action->simcalls, simcall);
643     SIMIX_comm_finish(action);
644   } else {
645     SIMIX_simcall_answer(simcall);
646   }
647 }
648
649 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
650 {
651   int idx = simcall->mc_value;
652   unsigned int cursor;
653   smx_action_t action;
654   simcall->comm_testany.result = -1;
655
656   if (MC_is_active()){
657     if(idx == -1){
658       SIMIX_simcall_answer(simcall);
659     }else{
660       action = xbt_dynar_get_as(actions, idx, smx_action_t);
661       simcall->comm_testany.result = idx;
662       xbt_fifo_push(action->simcalls, simcall);
663       action->state = SIMIX_DONE;
664       SIMIX_comm_finish(action);
665     }
666     return;
667   }
668
669   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
670     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
671       simcall->comm_testany.result = cursor;
672       xbt_fifo_push(action->simcalls, simcall);
673       SIMIX_comm_finish(action);
674       return;
675     }
676   }
677   SIMIX_simcall_answer(simcall);
678 }
679
680 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
681 {
682   int idx = simcall->mc_value;
683   smx_action_t action;
684   unsigned int cursor = 0;
685
686   if (MC_is_active()){
687     action = xbt_dynar_get_as(actions, idx, smx_action_t);
688     xbt_fifo_push(action->simcalls, simcall);
689     simcall->comm_waitany.result = idx;
690     action->state = SIMIX_DONE;
691     SIMIX_comm_finish(action);
692     return;
693   }
694
695   xbt_dynar_foreach(actions, cursor, action){
696     /* associate this simcall to the the action */
697     xbt_fifo_push(action->simcalls, simcall);
698
699     /* see if the action is already finished */
700     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
701       SIMIX_comm_finish(action);
702       break;
703     }
704   }
705 }
706
707 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
708 {
709   smx_action_t action;
710   unsigned int cursor = 0;
711   xbt_dynar_t actions = simcall->comm_waitany.comms;
712
713   xbt_dynar_foreach(actions, cursor, action) {
714     xbt_fifo_remove(action->simcalls, simcall);
715   }
716 }
717
718 /**
719  *  \brief Starts the simulation of a communication action.
720  *  \param action the communication action
721  */
722 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
723 {
724   /* If both the sender and the receiver are already there, start the communication */
725   if (action->state == SIMIX_READY) {
726
727     smx_host_t sender = action->comm.src_proc->smx_host;
728     smx_host_t receiver = action->comm.dst_proc->smx_host;
729
730     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
731               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
732
733     action->comm.surf_comm = surf_workstation_model->extension.workstation.
734       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
735
736     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
737
738     action->state = SIMIX_RUNNING;
739
740     /* If a link is failed, detect it immediately */
741     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
742       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
743                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
744       action->state = SIMIX_LINK_FAILURE;
745       SIMIX_comm_destroy_internal_actions(action);
746     }
747
748     /* If any of the process is suspend, create the action but stop its execution,
749        it will be restarted when the sender process resume */
750     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
751         SIMIX_process_is_suspended(action->comm.dst_proc)) {
752       /* FIXME: check what should happen with the action state */
753
754       if (SIMIX_process_is_suspended(action->comm.src_proc))
755         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
756                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
757       else
758         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
759                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
760
761       surf_workstation_model->suspend(action->comm.surf_comm);
762
763     }
764   }
765 }
766
767 /**
768  * \brief Answers the SIMIX simcalls associated to a communication action.
769  * \param action a finished communication action
770  */
771 void SIMIX_comm_finish(smx_action_t action)
772 {
773   unsigned int destroy_count = 0;
774   smx_simcall_t simcall;
775
776   while ((simcall = xbt_fifo_shift(action->simcalls))) {
777
778     /* If a waitany simcall is waiting for this action to finish, then remove
779        it from the other actions in the waitany list. Afterwards, get the
780        position of the actual action in the waitany dynar and
781        return it as the result of the simcall */
782     if (simcall->call == SIMCALL_COMM_WAITANY) {
783       SIMIX_waitany_remove_simcall_from_actions(simcall);
784       if (!MC_is_active())
785         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
786     }
787
788     /* If the action is still in a rendez-vous point then remove from it */
789     if (action->comm.rdv)
790       SIMIX_rdv_remove(action->comm.rdv, action);
791
792     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
793
794     /* Check out for errors */
795     switch (action->state) {
796
797     case SIMIX_DONE:
798       XBT_DEBUG("Communication %p complete!", action);
799       SIMIX_comm_copy_data(action);
800       break;
801
802     case SIMIX_SRC_TIMEOUT:
803       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
804                     "Communication timeouted because of sender");
805       break;
806
807     case SIMIX_DST_TIMEOUT:
808       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
809                     "Communication timeouted because of receiver");
810       break;
811
812     case SIMIX_SRC_HOST_FAILURE:
813       if (simcall->issuer == action->comm.src_proc)
814         simcall->issuer->context->iwannadie = 1;
815 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
816       else
817         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
818       break;
819
820     case SIMIX_DST_HOST_FAILURE:
821       if (simcall->issuer == action->comm.dst_proc)
822         simcall->issuer->context->iwannadie = 1;
823 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
824       else
825         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
826       break;
827
828     case SIMIX_LINK_FAILURE:
829       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
830                 action,
831                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
832                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
833                 simcall->issuer->name, simcall->issuer, action->comm.detached);
834       if (action->comm.src_proc == simcall->issuer) {
835         XBT_DEBUG("I'm source");
836       } else if (action->comm.dst_proc == simcall->issuer) {
837         XBT_DEBUG("I'm dest");
838       } else {
839         XBT_DEBUG("I'm neither source nor dest");
840       }
841       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
842       break;
843
844     case SIMIX_CANCELED:
845       if (simcall->issuer == action->comm.dst_proc)
846         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
847                       "Communication canceled by the sender");
848       else
849         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
850                       "Communication canceled by the receiver");
851       break;
852
853     default:
854       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
855     }
856
857     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
858     if (simcall->issuer->doexception) {
859       if (simcall->call == SIMCALL_COMM_WAITANY) {
860         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
861       }
862       else if (simcall->call == SIMCALL_COMM_TESTANY) {
863         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
864       }
865     }
866
867     if (surf_workstation_model->extension.
868         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
869       simcall->issuer->context->iwannadie = 1;
870     }
871
872     simcall->issuer->waiting_action = NULL;
873     xbt_fifo_remove(simcall->issuer->comms, action);
874     SIMIX_simcall_answer(simcall);
875     destroy_count++;
876   }
877
878   while (destroy_count-- > 0)
879     SIMIX_comm_destroy(action);
880 }
881
882 /**
883  * \brief This function is called when a Surf communication action is finished.
884  * \param action the corresponding Simix communication
885  */
886 void SIMIX_post_comm(smx_action_t action)
887 {
888   /* Update action state */
889   if (action->comm.src_timeout &&
890       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
891     action->state = SIMIX_SRC_TIMEOUT;
892   else if (action->comm.dst_timeout &&
893            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
894     action->state = SIMIX_DST_TIMEOUT;
895   else if (action->comm.src_timeout &&
896            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
897     action->state = SIMIX_SRC_HOST_FAILURE;
898   else if (action->comm.dst_timeout &&
899            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
900     action->state = SIMIX_DST_HOST_FAILURE;
901   else if (action->comm.surf_comm &&
902            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
903     XBT_DEBUG("Puta madre. Surf says that the link broke");
904     action->state = SIMIX_LINK_FAILURE;
905   } else
906     action->state = SIMIX_DONE;
907
908   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
909             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
910
911   /* destroy the surf actions associated with the Simix communication */
912   SIMIX_comm_destroy_internal_actions(action);
913
914   /* remove the communication action from the list of pending communications
915    * of both processes (if they still exist) */
916   if (action->comm.src_proc) {
917     xbt_fifo_remove(action->comm.src_proc->comms, action);
918   }
919   if (action->comm.dst_proc) {
920     xbt_fifo_remove(action->comm.dst_proc->comms, action);
921   }
922
923   /* if there are simcalls associated with the action, then answer them */
924   if (xbt_fifo_size(action->simcalls)) {
925     SIMIX_comm_finish(action);
926   }
927 }
928
929 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
930   SIMIX_comm_cancel(action);
931 }
932 void SIMIX_comm_cancel(smx_action_t action)
933 {
934   /* if the action is a waiting state means that it is still in a rdv */
935   /* so remove from it and delete it */
936   if (action->state == SIMIX_WAITING) {
937     SIMIX_rdv_remove(action->comm.rdv, action);
938     action->state = SIMIX_CANCELED;
939   }
940   else if (!MC_is_active() /* when running the MC there are no surf actions */
941            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
942
943     surf_workstation_model->action_cancel(action->comm.surf_comm);
944   }
945 }
946
947 void SIMIX_comm_suspend(smx_action_t action)
948 {
949   /*FIXME: shall we suspend also the timeout actions? */
950   if (action->comm.surf_comm)
951     surf_workstation_model->suspend(action->comm.surf_comm);
952   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
953 }
954
955 void SIMIX_comm_resume(smx_action_t action)
956 {
957   /*FIXME: check what happen with the timeouts */
958   if (action->comm.surf_comm)
959     surf_workstation_model->resume(action->comm.surf_comm);
960   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
961 }
962
963
964 /************* Action Getters **************/
965
966 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
967   return SIMIX_comm_get_remains(action);
968 }
969 /**
970  *  \brief get the amount remaining from the communication
971  *  \param action The communication
972  */
973 double SIMIX_comm_get_remains(smx_action_t action)
974 {
975   double remains;
976
977   if(!action){
978     return 0;
979   }
980
981   switch (action->state) {
982
983   case SIMIX_RUNNING:
984     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
985     break;
986
987   case SIMIX_WAITING:
988   case SIMIX_READY:
989     remains = 0; /*FIXME: check what should be returned */
990     break;
991
992   default:
993     remains = 0; /*FIXME: is this correct? */
994     break;
995   }
996   return remains;
997 }
998
999 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1000   return SIMIX_comm_get_state(action);
1001 }
1002 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1003 {
1004   return action->state;
1005 }
1006
1007 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1008   return SIMIX_comm_get_src_data(action);
1009 }
1010 /**
1011  *  \brief Return the user data associated to the sender of the communication
1012  *  \param action The communication
1013  *  \return the user data
1014  */
1015 void* SIMIX_comm_get_src_data(smx_action_t action)
1016 {
1017   return action->comm.src_data;
1018 }
1019
1020 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1021   return SIMIX_comm_get_dst_data(action);
1022 }
1023 /**
1024  *  \brief Return the user data associated to the receiver of the communication
1025  *  \param action The communication
1026  *  \return the user data
1027  */
1028 void* SIMIX_comm_get_dst_data(smx_action_t action)
1029 {
1030   return action->comm.dst_data;
1031 }
1032
1033 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1034   return SIMIX_comm_get_src_proc(action);
1035 }
1036 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1037 {
1038   return action->comm.src_proc;
1039 }
1040
1041 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1042   return SIMIX_comm_get_dst_proc(action);
1043 }
1044 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1045 {
1046   return action->comm.dst_proc;
1047 }
1048
1049 #ifdef HAVE_LATENCY_BOUND_TRACKING
1050 /**
1051  *  \brief verify if communication is latency bounded
1052  *  \param comm The communication
1053  */
1054 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1055 {
1056   if(!action){
1057     return 0;
1058   }
1059   if (action->comm.surf_comm){
1060     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1061     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1062     XBT_DEBUG("Action limited is %d", action->latency_limited);
1063   }
1064   return action->latency_limited;
1065 }
1066 #endif
1067
1068 /******************************************************************************/
1069 /*                    SIMIX_comm_copy_data callbacks                       */
1070 /******************************************************************************/
1071 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1072   &SIMIX_comm_copy_pointer_callback;
1073
1074 void
1075 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1076 {
1077   SIMIX_comm_copy_data_callback = callback;
1078 }
1079
1080 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1081 {
1082   xbt_assert((buff_size == sizeof(void *)),
1083              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1084   *(void **) (comm->comm.dst_buff) = buff;
1085 }
1086
1087 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1088 {
1089   XBT_DEBUG("Copy the data over");
1090   memcpy(comm->comm.dst_buff, buff, buff_size);
1091   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1092     xbt_free(buff);
1093     comm->comm.src_buff = NULL;
1094   }
1095 }
1096
1097
1098 /**
1099  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1100  *  \param comm The communication
1101  */
1102 void SIMIX_comm_copy_data(smx_action_t comm)
1103 {
1104   size_t buff_size = comm->comm.src_buff_size;
1105   /* If there is no data to be copy then return */
1106   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1107     return;
1108
1109   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1110             comm,
1111             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1112             comm->comm.src_buff,
1113             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1114             comm->comm.dst_buff, buff_size);
1115
1116   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1117   if (comm->comm.dst_buff_size)
1118     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1119
1120   /* Update the receiver's buffer size to the copied amount */
1121   if (comm->comm.dst_buff_size)
1122     *comm->comm.dst_buff_size = buff_size;
1123
1124   if (buff_size > 0)
1125     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1126
1127   /* Set the copied flag so we copy data only once */
1128   /* (this function might be called from both communication ends) */
1129   comm->comm.copied = 1;
1130 }