Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2c05fb619dddaeb3f353c530a205f99620fb9003
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_IMPORT_NO_EXPORT(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore_data_bss(&smx_total_comms, sizeof(smx_total_comms));
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_pre_rdv_create(smx_simcall_t simcall, const char *name){
47   return SIMIX_rdv_create(name);
48 }
49 smx_rdv_t SIMIX_rdv_create(const char *name)
50 {
51   /* two processes may have pushed the same rdv_create simcall at the same time */
52   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
53
54   if (!rdv) {
55     rdv = xbt_new0(s_smx_rvpoint_t, 1);
56     rdv->name = name ? xbt_strdup(name) : NULL;
57     rdv->comm_fifo = xbt_fifo_new();
58     rdv->done_comm_fifo = xbt_fifo_new();
59     rdv->permanent_receiver=NULL;
60
61     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
62
63     if (rdv->name)
64       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
65   }
66   return rdv;
67 }
68
69 void SIMIX_pre_rdv_destroy(smx_simcall_t simcall, smx_rdv_t rdv){
70   return SIMIX_rdv_destroy(rdv);
71 }
72 void SIMIX_rdv_destroy(smx_rdv_t rdv)
73 {
74   if (rdv->name)
75     xbt_dict_remove(rdv_points, rdv->name);
76 }
77
78 void SIMIX_rdv_free(void *data)
79 {
80   XBT_DEBUG("rdv free %p", data);
81   smx_rdv_t rdv = (smx_rdv_t) data;
82   xbt_free(rdv->name);
83   xbt_fifo_free(rdv->comm_fifo);
84   xbt_fifo_free(rdv->done_comm_fifo);
85
86   xbt_free(rdv);  
87 }
88
89 xbt_dict_t SIMIX_get_rdv_points()
90 {
91   return rdv_points;
92 }
93
94 smx_rdv_t SIMIX_pre_rdv_get_by_name(smx_simcall_t simcall, const char *name){
95   return SIMIX_rdv_get_by_name(name);
96 }
97 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
98 {
99   return xbt_dict_get_or_null(rdv_points, name);
100 }
101
102 int SIMIX_pre_rdv_comm_count_by_host(smx_simcall_t simcall, smx_rdv_t rdv, smx_host_t host){
103   return SIMIX_rdv_comm_count_by_host(rdv, host);
104 }
105 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
106 {
107   smx_action_t comm = NULL;
108   xbt_fifo_item_t item = NULL;
109   int count = 0;
110
111   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
112     if (comm->comm.src_proc->smx_host == host)
113       count++;
114   }
115
116   return count;
117 }
118
119 smx_action_t SIMIX_pre_rdv_get_head(smx_simcall_t simcall, smx_rdv_t rdv){
120   return SIMIX_rdv_get_head(rdv);
121 }
122 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
123 {
124   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
125 }
126
127 smx_process_t SIMIX_pre_rdv_get_receiver(smx_simcall_t simcall, smx_rdv_t rdv){
128   return SIMIX_rdv_get_receiver(rdv);
129 }
130 /**
131  *  \brief get the receiver (process associated to the mailbox)
132  *  \param rdv The rendez-vous point
133  *  \return process The receiving process (NULL if not set)
134  */
135 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
136 {
137   return rdv->permanent_receiver;
138 }
139
140 void SIMIX_pre_rdv_set_receiver(smx_simcall_t simcall, smx_rdv_t rdv,
141                             smx_process_t process){
142   SIMIX_rdv_set_receiver(rdv, process);
143 }
144 /**
145  *  \brief set the receiver of the rendez vous point to allow eager sends
146  *  \param rdv The rendez-vous point
147  *  \param process The receiving process
148  */
149 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
150 {
151   rdv->permanent_receiver=process;
152 }
153
154 /**
155  *  \brief Pushes a communication action into a rendez-vous point
156  *  \param rdv The rendez-vous point
157  *  \param comm The communication action
158  */
159 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
160 {
161   xbt_fifo_push(rdv->comm_fifo, comm);
162   comm->comm.rdv = rdv;
163 }
164
165 /**
166  *  \brief Removes a communication action from a rendez-vous point
167  *  \param rdv The rendez-vous point
168  *  \param comm The communication action
169  */
170 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
171 {
172   xbt_fifo_remove(rdv->comm_fifo, comm);
173   comm->comm.rdv = NULL;
174 }
175
176 /**
177  *  \brief Checks if there is a communication action queued in a fifo matching our needs
178  *  \param type The type of communication we are looking for (comm_send, comm_recv)
179  *  \return The communication action if found, NULL otherwise
180  */
181 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
182                                  int (*match_fun)(void *, void *,smx_action_t),
183                                  void *this_user_data, smx_action_t my_action)
184 {
185   smx_action_t action;
186   xbt_fifo_item_t item;
187   void* other_user_data = NULL;
188
189   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
190     if (action->comm.type == SIMIX_COMM_SEND) {
191       other_user_data = action->comm.src_data;
192     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
193       other_user_data = action->comm.dst_data;
194     }
195     if (action->comm.type == type &&
196         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
197         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
198       XBT_DEBUG("Found a matching communication action %p", action);
199       xbt_fifo_remove_item(fifo, item);
200       xbt_fifo_free_item(item);
201       action->comm.refcount++;
202       action->comm.rdv = NULL;
203       return action;
204     }
205     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
206               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
207               action, (int)action->comm.type, (int)type);
208   }
209   XBT_DEBUG("No matching communication action found");
210   return NULL;
211 }
212
213
214 /**
215  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
216  *  \param type The type of communication we are looking for (comm_send, comm_recv)
217  *  \return The communication action if found, NULL otherwise
218  */
219 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
220                                  int (*match_fun)(void *, void *,smx_action_t),
221                                  void *this_user_data, smx_action_t my_action)
222 {
223   smx_action_t action;
224   xbt_fifo_item_t item;
225   void* other_user_data = NULL;
226
227   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
228     if (action->comm.type == SIMIX_COMM_SEND) {
229       other_user_data = action->comm.src_data;
230     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
231       other_user_data = action->comm.dst_data;
232     }
233     if (action->comm.type == type &&
234         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
235         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
236       XBT_DEBUG("Found a matching communication action %p", action);
237       action->comm.refcount++;
238
239       return action;
240     }
241     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
242               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
243               action, (int)action->comm.type, (int)type);
244   }
245   XBT_DEBUG("No matching communication action found");
246   return NULL;
247 }
248 /******************************************************************************/
249 /*                            Communication Actions                            */
250 /******************************************************************************/
251
252 /**
253  *  \brief Creates a new communicate action
254  *  \param type The direction of communication (comm_send, comm_recv)
255  *  \return The new communicate action
256  */
257 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
258 {
259   smx_action_t act;
260
261   /* alloc structures */
262   act = xbt_mallocator_get(simix_global->action_mallocator);
263
264   act->type = SIMIX_ACTION_COMMUNICATE;
265   act->state = SIMIX_WAITING;
266
267   /* set communication */
268   act->comm.type = type;
269   act->comm.refcount = 1;
270   act->comm.src_data=NULL;
271   act->comm.dst_data=NULL;
272
273
274 #ifdef HAVE_LATENCY_BOUND_TRACKING
275   //initialize with unknown value
276   act->latency_limited = -1;
277 #endif
278
279 #ifdef HAVE_TRACING
280   act->category = NULL;
281 #endif
282
283   XBT_DEBUG("Create communicate action %p", act);
284   ++smx_total_comms;
285
286   return act;
287 }
288
289 void SIMIX_pre_comm_destroy(smx_simcall_t simcall, smx_action_t action){
290   SIMIX_comm_destroy(action);
291 }
292 /**
293  *  \brief Destroy a communicate action
294  *  \param action The communicate action to be destroyed
295  */
296 void SIMIX_comm_destroy(smx_action_t action)
297 {
298   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
299             action, action->comm.refcount, (int)action->state);
300
301   if (action->comm.refcount <= 0) {
302     xbt_backtrace_display_current();
303     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
304             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
305   }
306   action->comm.refcount--;
307   if (action->comm.refcount > 0)
308       return;
309   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
310             action->comm.refcount);
311
312 #ifdef HAVE_LATENCY_BOUND_TRACKING
313   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
314 #endif
315
316   xbt_free(action->name);
317   SIMIX_comm_destroy_internal_actions(action);
318
319   if (action->comm.detached && action->state != SIMIX_DONE) {
320     /* the communication has failed and was detached:
321      * we have to free the buffer */
322     if (action->comm.clean_fun) {
323       action->comm.clean_fun(action->comm.src_buff);
324     }
325     action->comm.src_buff = NULL;
326   }
327
328   xbt_mallocator_release(simix_global->action_mallocator, action);
329 }
330
331 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
332 {
333   if (action->comm.surf_comm){
334 #ifdef HAVE_LATENCY_BOUND_TRACKING
335     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
336 #endif
337     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
338     action->comm.surf_comm = NULL;
339   }
340
341   if (action->comm.src_timeout){
342     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
343     action->comm.src_timeout = NULL;
344   }
345
346   if (action->comm.dst_timeout){
347     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
348     action->comm.dst_timeout = NULL;
349   }
350 }
351
352 void SIMIX_pre_comm_send(smx_simcall_t simcall, smx_rdv_t rdv,
353                                   double task_size, double rate,
354                                   void *src_buff, size_t src_buff_size,
355                                   int (*match_fun)(void *, void *,smx_action_t),
356                                   void *data, double timeout){
357   smx_action_t comm = SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate,
358                                        src_buff, src_buff_size, match_fun, NULL,
359                                        data, 0);
360   simcall->mc_value = 0;
361   SIMIX_pre_comm_wait(simcall, comm, timeout);
362 }
363 smx_action_t SIMIX_pre_comm_isend(smx_simcall_t simcall, smx_rdv_t rdv,
364                                   double task_size, double rate,
365                                   void *src_buff, size_t src_buff_size,
366                                   int (*match_fun)(void *, void *,smx_action_t),
367                                   void (*clean_fun)(void *), 
368                                   void *data, int detached){
369   return SIMIX_comm_isend(simcall->issuer, rdv, task_size, rate, src_buff,
370                           src_buff_size, match_fun, clean_fun, data, detached);
371
372 }
373 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
374                               double task_size, double rate,
375                               void *src_buff, size_t src_buff_size,
376                               int (*match_fun)(void *, void *,smx_action_t),
377                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
378                               void *data,
379                               int detached)
380 {
381   XBT_DEBUG("send from %p\n", rdv);
382
383   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
384   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
385
386   /* Look for communication action matching our needs. We also provide a description of
387    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
388    *
389    * If it is not found then push our communication into the rendez-vous point */
390   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
391
392   if (!other_action) {
393     other_action = this_action;
394
395     if (rdv->permanent_receiver!=NULL){
396       //this mailbox is for small messages, which have to be sent right now
397       other_action->state = SIMIX_READY;
398       other_action->comm.dst_proc=rdv->permanent_receiver;
399       other_action->comm.refcount++;
400       other_action->comm.rdv = rdv;
401       xbt_fifo_push(rdv->done_comm_fifo,other_action);
402       other_action->comm.rdv=rdv;
403       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
404
405     }else{
406       SIMIX_rdv_push(rdv, this_action);
407     }
408   } else {
409     XBT_DEBUG("Receive already pushed\n");
410
411     SIMIX_comm_destroy(this_action);
412     --smx_total_comms; // this creation was a pure waste
413
414     other_action->state = SIMIX_READY;
415     other_action->comm.type = SIMIX_COMM_READY;
416
417   }
418   xbt_fifo_push(src_proc->comms, other_action);
419
420   /* if the communication action is detached then decrease the refcount
421    * by one, so it will be eliminated by the receiver's destroy call */
422   if (detached) {
423     other_action->comm.detached = 1;
424     other_action->comm.refcount--;
425     other_action->comm.clean_fun = clean_fun;
426   } else {
427     other_action->comm.clean_fun = NULL;
428   }
429
430   /* Setup the communication action */
431   other_action->comm.src_proc = src_proc;
432   other_action->comm.task_size = task_size;
433   other_action->comm.rate = rate;
434   other_action->comm.src_buff = src_buff;
435   other_action->comm.src_buff_size = src_buff_size;
436   other_action->comm.src_data = data;
437
438   other_action->comm.match_fun = match_fun;
439
440   if (MC_is_active()) {
441     other_action->state = SIMIX_RUNNING;
442     return other_action;
443   }
444
445   SIMIX_comm_start(other_action);
446   return (detached ? NULL : other_action);
447 }
448
449 void SIMIX_pre_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
450                                   void *dst_buff, size_t *dst_buff_size,
451                                   int (*match_fun)(void *, void *, smx_action_t),
452                                   void *data, double timeout){
453   smx_action_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
454                                        dst_buff_size, match_fun, data);
455   simcall->mc_value = 0;
456   SIMIX_pre_comm_wait(simcall, comm, timeout);
457 }
458 smx_action_t SIMIX_pre_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
459                                   void *dst_buff, size_t *dst_buff_size,
460                                   int (*match_fun)(void *, void *, smx_action_t),
461                                   void *data){
462   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
463                           match_fun, data);
464 }
465 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
466                               void *dst_buff, size_t *dst_buff_size,
467                               int (*match_fun)(void *, void *, smx_action_t), void *data)
468 {
469   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
470   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
471
472   smx_action_t other_action;
473   //communication already done, get it inside the fifo of completed comms
474   //permanent receive v1
475   //int already_received=0;
476   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
477
478     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
479     //find a match in the already received fifo
480     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
481     //if not found, assume the receiver came first, register it to the mailbox in the classical way
482     if (!other_action)  {
483       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
484       other_action = this_action;
485       SIMIX_rdv_push(rdv, this_action);
486     }else{
487       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
488       {
489         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
490         other_action->state = SIMIX_DONE;
491         other_action->comm.type = SIMIX_COMM_DONE;
492         other_action->comm.rdv = NULL;
493         //SIMIX_comm_destroy(this_action);
494         //--smx_total_comms; // this creation was a pure waste
495         //already_received=1;
496         //other_action->comm.refcount--;
497       }/*else{
498          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
499          }*/
500       other_action->comm.refcount--;
501       SIMIX_comm_destroy(this_action);
502       --smx_total_comms; // this creation was a pure waste
503     }
504   }else{
505     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
506
507     /* Look for communication action matching our needs. We also provide a description of
508      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
509      *
510      * If it is not found then push our communication into the rendez-vous point */
511     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
512
513     if (!other_action) {
514       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
515       other_action = this_action;
516       SIMIX_rdv_push(rdv, this_action);
517     } else {
518       SIMIX_comm_destroy(this_action);
519       --smx_total_comms; // this creation was a pure waste
520       other_action->state = SIMIX_READY;
521       other_action->comm.type = SIMIX_COMM_READY;
522       //other_action->comm.refcount--;
523     }
524     xbt_fifo_push(dst_proc->comms, other_action);
525   }
526
527   /* Setup communication action */
528   other_action->comm.dst_proc = dst_proc;
529   other_action->comm.dst_buff = dst_buff;
530   other_action->comm.dst_buff_size = dst_buff_size;
531   other_action->comm.dst_data = data;
532
533   other_action->comm.match_fun = match_fun;
534
535
536   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
537     SIMIX_comm_copy_data(other_action);*/
538
539
540   if (MC_is_active()) {
541     other_action->state = SIMIX_RUNNING;
542     return other_action;
543   }
544
545   SIMIX_comm_start(other_action);
546   // }
547   return other_action;
548 }
549
550 smx_action_t SIMIX_pre_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
551                                    int src, int tag,
552                                    int (*match_fun)(void *, void *, smx_action_t),
553                                    void *data){
554   return SIMIX_comm_iprobe(simcall->issuer, rdv, src, tag, match_fun, data);
555 }
556
557 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
558                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
559 {
560   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
561   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
562
563   smx_action_t other_action=NULL;
564   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
565     //find a match in the already received fifo
566       XBT_DEBUG("first try in the perm recv mailbox \n");
567
568     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
569   }
570  // }else{
571     if(!other_action){
572         XBT_DEBUG("second try in the other mailbox");
573         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
574     }
575 //  }
576   if(other_action)other_action->comm.refcount--;
577
578   SIMIX_comm_destroy(this_action);
579   --smx_total_comms;
580   return other_action;
581 }
582
583 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout)
584 {
585   int idx = simcall->mc_value;
586   /* the simcall may be a wait, a send or a recv */
587   surf_action_t sleep;
588
589   /* Associate this simcall to the wait action */
590   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
591
592   xbt_fifo_push(action->simcalls, simcall);
593   simcall->issuer->waiting_action = action;
594
595   if (MC_is_active()) {
596     if (idx == 0) {
597       action->state = SIMIX_DONE;
598     } else {
599       /* If we reached this point, the wait simcall must have a timeout */
600       /* Otherwise it shouldn't be enabled and executed by the MC */
601       if (timeout == -1)
602         THROW_IMPOSSIBLE;
603
604       if (action->comm.src_proc == simcall->issuer)
605         action->state = SIMIX_SRC_TIMEOUT;
606       else
607         action->state = SIMIX_DST_TIMEOUT;
608     }
609
610     SIMIX_comm_finish(action);
611     return;
612   }
613
614   /* If the action has already finish perform the error handling, */
615   /* otherwise set up a waiting timeout on the right side         */
616   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
617     SIMIX_comm_finish(action);
618   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
619     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
620     surf_workstation_model->action_data_set(sleep, action);
621
622     if (simcall->issuer == action->comm.src_proc)
623       action->comm.src_timeout = sleep;
624     else
625       action->comm.dst_timeout = sleep;
626   }
627 }
628
629 void SIMIX_pre_comm_test(smx_simcall_t simcall, smx_action_t action)
630 {
631   if(MC_is_active()){
632     simcall_comm_test__set__result(simcall, action->comm.src_proc && action->comm.dst_proc);
633     if(simcall_comm_test__get__result(simcall)){
634       action->state = SIMIX_DONE;
635       xbt_fifo_push(action->simcalls, simcall);
636       SIMIX_comm_finish(action);
637     }else{
638       SIMIX_simcall_answer(simcall);
639     }
640     return;
641   }
642
643   simcall_comm_test__set__result(simcall, (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING));
644   if (simcall_comm_test__get__result(simcall)) {
645     xbt_fifo_push(action->simcalls, simcall);
646     SIMIX_comm_finish(action);
647   } else {
648     SIMIX_simcall_answer(simcall);
649   }
650 }
651
652 void SIMIX_pre_comm_testany(smx_simcall_t simcall, xbt_dynar_t actions)
653 {
654   int idx = simcall->mc_value;
655   unsigned int cursor;
656   smx_action_t action;
657   simcall_comm_testany__set__result(simcall, -1);
658
659   if (MC_is_active()){
660     if(idx == -1){
661       SIMIX_simcall_answer(simcall);
662     }else{
663       action = xbt_dynar_get_as(actions, idx, smx_action_t);
664       simcall_comm_testany__set__result(simcall, idx);
665       xbt_fifo_push(action->simcalls, simcall);
666       action->state = SIMIX_DONE;
667       SIMIX_comm_finish(action);
668     }
669     return;
670   }
671
672   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,action) {
673     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
674       simcall_comm_testany__set__result(simcall, cursor);
675       xbt_fifo_push(action->simcalls, simcall);
676       SIMIX_comm_finish(action);
677       return;
678     }
679   }
680   SIMIX_simcall_answer(simcall);
681 }
682
683 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, xbt_dynar_t actions)
684 {
685   int idx = simcall->mc_value;
686   smx_action_t action;
687   unsigned int cursor = 0;
688
689   if (MC_is_active()){
690     action = xbt_dynar_get_as(actions, idx, smx_action_t);
691     xbt_fifo_push(action->simcalls, simcall);
692     simcall_comm_waitany__set__result(simcall, idx);
693     action->state = SIMIX_DONE;
694     SIMIX_comm_finish(action);
695     return;
696   }
697
698   xbt_dynar_foreach(actions, cursor, action){
699     /* associate this simcall to the the action */
700     xbt_fifo_push(action->simcalls, simcall);
701
702     /* see if the action is already finished */
703     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
704       SIMIX_comm_finish(action);
705       break;
706     }
707   }
708 }
709
710 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
711 {
712   smx_action_t action;
713   unsigned int cursor = 0;
714   xbt_dynar_t actions = simcall_comm_waitany__get__comms(simcall);
715
716   xbt_dynar_foreach(actions, cursor, action) {
717     xbt_fifo_remove(action->simcalls, simcall);
718   }
719 }
720
721 /**
722  *  \brief Starts the simulation of a communication action.
723  *  \param action the communication action
724  */
725 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
726 {
727   /* If both the sender and the receiver are already there, start the communication */
728   if (action->state == SIMIX_READY) {
729
730     smx_host_t sender = action->comm.src_proc->smx_host;
731     smx_host_t receiver = action->comm.dst_proc->smx_host;
732
733     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
734               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
735
736     action->comm.surf_comm = surf_workstation_model->extension.workstation.
737       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
738
739     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
740
741     action->state = SIMIX_RUNNING;
742
743     /* If a link is failed, detect it immediately */
744     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
745       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
746                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
747       action->state = SIMIX_LINK_FAILURE;
748       SIMIX_comm_destroy_internal_actions(action);
749     }
750
751     /* If any of the process is suspend, create the action but stop its execution,
752        it will be restarted when the sender process resume */
753     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
754         SIMIX_process_is_suspended(action->comm.dst_proc)) {
755       /* FIXME: check what should happen with the action state */
756
757       if (SIMIX_process_is_suspended(action->comm.src_proc))
758         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
759                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
760       else
761         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
762                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
763
764       surf_workstation_model->suspend(action->comm.surf_comm);
765
766     }
767   }
768 }
769
770 /**
771  * \brief Answers the SIMIX simcalls associated to a communication action.
772  * \param action a finished communication action
773  */
774 void SIMIX_comm_finish(smx_action_t action)
775 {
776   unsigned int destroy_count = 0;
777   smx_simcall_t simcall;
778
779   while ((simcall = xbt_fifo_shift(action->simcalls))) {
780
781     /* If a waitany simcall is waiting for this action to finish, then remove
782        it from the other actions in the waitany list. Afterwards, get the
783        position of the actual action in the waitany dynar and
784        return it as the result of the simcall */
785     if (simcall->call == SIMCALL_COMM_WAITANY) {
786       SIMIX_waitany_remove_simcall_from_actions(simcall);
787       if (!MC_is_active())
788         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action));
789     }
790
791     /* If the action is still in a rendez-vous point then remove from it */
792     if (action->comm.rdv)
793       SIMIX_rdv_remove(action->comm.rdv, action);
794
795     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
796
797     /* Check out for errors */
798     switch (action->state) {
799
800     case SIMIX_DONE:
801       XBT_DEBUG("Communication %p complete!", action);
802       SIMIX_comm_copy_data(action);
803       break;
804
805     case SIMIX_SRC_TIMEOUT:
806       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
807                     "Communication timeouted because of sender");
808       break;
809
810     case SIMIX_DST_TIMEOUT:
811       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
812                     "Communication timeouted because of receiver");
813       break;
814
815     case SIMIX_SRC_HOST_FAILURE:
816       if (simcall->issuer == action->comm.src_proc)
817         simcall->issuer->context->iwannadie = 1;
818 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
819       else
820         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
821       break;
822
823     case SIMIX_DST_HOST_FAILURE:
824       if (simcall->issuer == action->comm.dst_proc)
825         simcall->issuer->context->iwannadie = 1;
826 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
827       else
828         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
829       break;
830
831     case SIMIX_LINK_FAILURE:
832       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
833                 action,
834                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
835                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
836                 simcall->issuer->name, simcall->issuer, action->comm.detached);
837       if (action->comm.src_proc == simcall->issuer) {
838         XBT_DEBUG("I'm source");
839       } else if (action->comm.dst_proc == simcall->issuer) {
840         XBT_DEBUG("I'm dest");
841       } else {
842         XBT_DEBUG("I'm neither source nor dest");
843       }
844       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
845       break;
846
847     case SIMIX_CANCELED:
848       if (simcall->issuer == action->comm.dst_proc)
849         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
850                       "Communication canceled by the sender");
851       else
852         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
853                       "Communication canceled by the receiver");
854       break;
855
856     default:
857       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
858     }
859
860     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
861     if (simcall->issuer->doexception) {
862       if (simcall->call == SIMCALL_COMM_WAITANY) {
863         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &action);
864       }
865       else if (simcall->call == SIMCALL_COMM_TESTANY) {
866         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &action);
867       }
868     }
869
870     if (surf_workstation_model->extension.
871         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
872       simcall->issuer->context->iwannadie = 1;
873     }
874
875     simcall->issuer->waiting_action = NULL;
876     xbt_fifo_remove(simcall->issuer->comms, action);
877     SIMIX_simcall_answer(simcall);
878     destroy_count++;
879   }
880
881   while (destroy_count-- > 0)
882     SIMIX_comm_destroy(action);
883 }
884
885 /**
886  * \brief This function is called when a Surf communication action is finished.
887  * \param action the corresponding Simix communication
888  */
889 void SIMIX_post_comm(smx_action_t action)
890 {
891   /* Update action state */
892   if (action->comm.src_timeout &&
893       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
894     action->state = SIMIX_SRC_TIMEOUT;
895   else if (action->comm.dst_timeout &&
896            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
897     action->state = SIMIX_DST_TIMEOUT;
898   else if (action->comm.src_timeout &&
899            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
900     action->state = SIMIX_SRC_HOST_FAILURE;
901   else if (action->comm.dst_timeout &&
902            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
903     action->state = SIMIX_DST_HOST_FAILURE;
904   else if (action->comm.surf_comm &&
905            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
906     XBT_DEBUG("Puta madre. Surf says that the link broke");
907     action->state = SIMIX_LINK_FAILURE;
908   } else
909     action->state = SIMIX_DONE;
910
911   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
912             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
913
914   /* destroy the surf actions associated with the Simix communication */
915   SIMIX_comm_destroy_internal_actions(action);
916
917   /* remove the communication action from the list of pending communications
918    * of both processes (if they still exist) */
919   if (action->comm.src_proc) {
920     xbt_fifo_remove(action->comm.src_proc->comms, action);
921   }
922   if (action->comm.dst_proc) {
923     xbt_fifo_remove(action->comm.dst_proc->comms, action);
924   }
925
926   /* if there are simcalls associated with the action, then answer them */
927   if (xbt_fifo_size(action->simcalls)) {
928     SIMIX_comm_finish(action);
929   }
930 }
931
932 void SIMIX_pre_comm_cancel(smx_simcall_t simcall, smx_action_t action){
933   SIMIX_comm_cancel(action);
934 }
935 void SIMIX_comm_cancel(smx_action_t action)
936 {
937   /* if the action is a waiting state means that it is still in a rdv */
938   /* so remove from it and delete it */
939   if (action->state == SIMIX_WAITING) {
940     SIMIX_rdv_remove(action->comm.rdv, action);
941     action->state = SIMIX_CANCELED;
942   }
943   else if (!MC_is_active() /* when running the MC there are no surf actions */
944            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
945
946     surf_workstation_model->action_cancel(action->comm.surf_comm);
947   }
948 }
949
950 void SIMIX_comm_suspend(smx_action_t action)
951 {
952   /*FIXME: shall we suspend also the timeout actions? */
953   if (action->comm.surf_comm)
954     surf_workstation_model->suspend(action->comm.surf_comm);
955   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
956 }
957
958 void SIMIX_comm_resume(smx_action_t action)
959 {
960   /*FIXME: check what happen with the timeouts */
961   if (action->comm.surf_comm)
962     surf_workstation_model->resume(action->comm.surf_comm);
963   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
964 }
965
966
967 /************* Action Getters **************/
968
969 double SIMIX_pre_comm_get_remains(smx_simcall_t simcall, smx_action_t action){
970   return SIMIX_comm_get_remains(action);
971 }
972 /**
973  *  \brief get the amount remaining from the communication
974  *  \param action The communication
975  */
976 double SIMIX_comm_get_remains(smx_action_t action)
977 {
978   double remains;
979
980   if(!action){
981     return 0;
982   }
983
984   switch (action->state) {
985
986   case SIMIX_RUNNING:
987     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
988     break;
989
990   case SIMIX_WAITING:
991   case SIMIX_READY:
992     remains = 0; /*FIXME: check what should be returned */
993     break;
994
995   default:
996     remains = 0; /*FIXME: is this correct? */
997     break;
998   }
999   return remains;
1000 }
1001
1002 e_smx_state_t SIMIX_pre_comm_get_state(smx_simcall_t simcall, smx_action_t action){
1003   return SIMIX_comm_get_state(action);
1004 }
1005 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
1006 {
1007   return action->state;
1008 }
1009
1010 void* SIMIX_pre_comm_get_src_data(smx_simcall_t simcall, smx_action_t action){
1011   return SIMIX_comm_get_src_data(action);
1012 }
1013 /**
1014  *  \brief Return the user data associated to the sender of the communication
1015  *  \param action The communication
1016  *  \return the user data
1017  */
1018 void* SIMIX_comm_get_src_data(smx_action_t action)
1019 {
1020   return action->comm.src_data;
1021 }
1022
1023 void* SIMIX_pre_comm_get_dst_data(smx_simcall_t simcall, smx_action_t action){
1024   return SIMIX_comm_get_dst_data(action);
1025 }
1026 /**
1027  *  \brief Return the user data associated to the receiver of the communication
1028  *  \param action The communication
1029  *  \return the user data
1030  */
1031 void* SIMIX_comm_get_dst_data(smx_action_t action)
1032 {
1033   return action->comm.dst_data;
1034 }
1035
1036 smx_process_t SIMIX_pre_comm_get_src_proc(smx_simcall_t simcall, smx_action_t action){
1037   return SIMIX_comm_get_src_proc(action);
1038 }
1039 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
1040 {
1041   return action->comm.src_proc;
1042 }
1043
1044 smx_process_t SIMIX_pre_comm_get_dst_proc(smx_simcall_t simcall, smx_action_t action){
1045   return SIMIX_comm_get_dst_proc(action);
1046 }
1047 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
1048 {
1049   return action->comm.dst_proc;
1050 }
1051
1052 #ifdef HAVE_LATENCY_BOUND_TRACKING
1053 /**
1054  *  \brief verify if communication is latency bounded
1055  *  \param comm The communication
1056  */
1057 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
1058 {
1059   if(!action){
1060     return 0;
1061   }
1062   if (action->comm.surf_comm){
1063     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
1064     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
1065     XBT_DEBUG("Action limited is %d", action->latency_limited);
1066   }
1067   return action->latency_limited;
1068 }
1069 #endif
1070
1071 /******************************************************************************/
1072 /*                    SIMIX_comm_copy_data callbacks                       */
1073 /******************************************************************************/
1074 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
1075   &SIMIX_comm_copy_pointer_callback;
1076
1077 void
1078 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
1079 {
1080   SIMIX_comm_copy_data_callback = callback;
1081 }
1082
1083 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
1084 {
1085   xbt_assert((buff_size == sizeof(void *)),
1086              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1087   *(void **) (comm->comm.dst_buff) = buff;
1088 }
1089
1090 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1091 {
1092   XBT_DEBUG("Copy the data over");
1093   memcpy(comm->comm.dst_buff, buff, buff_size);
1094   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1095     xbt_free(buff);
1096     comm->comm.src_buff = NULL;
1097   }
1098 }
1099
1100
1101 /**
1102  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1103  *  \param comm The communication
1104  */
1105 void SIMIX_comm_copy_data(smx_action_t comm)
1106 {
1107   size_t buff_size = comm->comm.src_buff_size;
1108   /* If there is no data to be copy then return */
1109   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1110     return;
1111
1112   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1113             comm,
1114             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1115             comm->comm.src_buff,
1116             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1117             comm->comm.dst_buff, buff_size);
1118
1119   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1120   if (comm->comm.dst_buff_size)
1121     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1122
1123   /* Update the receiver's buffer size to the copied amount */
1124   if (comm->comm.dst_buff_size)
1125     *comm->comm.dst_buff_size = buff_size;
1126
1127   if (buff_size > 0)
1128     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1129
1130   /* Set the copied flag so we copy data only once */
1131   /* (this function might be called from both communication ends) */
1132   comm->comm.copied = 1;
1133 }