Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
model-checker : move functions about snapshot comparison in a separate file mc_compare.c
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33 }
34
35 void SIMIX_network_exit(void)
36 {
37   xbt_dict_free(&rdv_points);
38 }
39
40 /******************************************************************************/
41 /*                           Rendez-Vous Points                               */
42 /******************************************************************************/
43
44 smx_rdv_t SIMIX_rdv_create(const char *name)
45 {
46   /* two processes may have pushed the same rdv_create simcall at the same time */
47   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
48
49   if (!rdv) {
50     rdv = xbt_new0(s_smx_rvpoint_t, 1);
51     rdv->name = name ? xbt_strdup(name) : NULL;
52     rdv->comm_fifo = xbt_fifo_new();
53     rdv->done_comm_fifo = xbt_fifo_new();
54     rdv->permanent_receiver=NULL;
55
56     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
57
58     if (rdv->name)
59       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
60   }
61   return rdv;
62 }
63
64 void SIMIX_rdv_destroy(smx_rdv_t rdv)
65 {
66   if (rdv->name)
67     xbt_dict_remove(rdv_points, rdv->name);
68 }
69
70 void SIMIX_rdv_free(void *data)
71 {
72   XBT_DEBUG("rdv free %p", data);
73   smx_rdv_t rdv = (smx_rdv_t) data;
74   xbt_free(rdv->name);
75   xbt_fifo_free(rdv->comm_fifo);
76   xbt_fifo_free(rdv->done_comm_fifo);
77
78   xbt_free(rdv);  
79 }
80
81 xbt_dict_t SIMIX_get_rdv_points()
82 {
83   return rdv_points;
84 }
85
86 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
87 {
88   return xbt_dict_get_or_null(rdv_points, name);
89 }
90
91 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
92 {
93   smx_action_t comm = NULL;
94   xbt_fifo_item_t item = NULL;
95   int count = 0;
96
97   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
98     if (comm->comm.src_proc->smx_host == host)
99       count++;
100   }
101
102   return count;
103 }
104
105 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
106 {
107   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
108 }
109
110 /**
111  *  \brief get the receiver (process associated to the mailbox)
112  *  \param rdv The rendez-vous point
113  *  \return process The receiving process (NULL if not set)
114  */
115 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
116 {
117   return rdv->permanent_receiver;
118 }
119
120 /**
121  *  \brief set the receiver of the rendez vous point to allow eager sends
122  *  \param rdv The rendez-vous point
123  *  \param process The receiving process
124  */
125 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
126 {
127   rdv->permanent_receiver=process;
128 }
129
130 /**
131  *  \brief Pushes a communication action into a rendez-vous point
132  *  \param rdv The rendez-vous point
133  *  \param comm The communication action
134  */
135 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
136 {
137   xbt_fifo_push(rdv->comm_fifo, comm);
138   comm->comm.rdv = rdv;
139 }
140
141 /**
142  *  \brief Removes a communication action from a rendez-vous point
143  *  \param rdv The rendez-vous point
144  *  \param comm The communication action
145  */
146 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
147 {
148   xbt_fifo_remove(rdv->comm_fifo, comm);
149   comm->comm.rdv = NULL;
150 }
151
152 /**
153  *  \brief Checks if there is a communication action queued in a fifo matching our needs
154  *  \param type The type of communication we are looking for (comm_send, comm_recv)
155  *  \return The communication action if found, NULL otherwise
156  */
157 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
158                                  int (*match_fun)(void *, void *,smx_action_t),
159                                  void *this_user_data, smx_action_t my_action)
160 {
161   smx_action_t action;
162   xbt_fifo_item_t item;
163   void* other_user_data = NULL;
164
165   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
166     if (action->comm.type == SIMIX_COMM_SEND) {
167       other_user_data = action->comm.src_data;
168     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
169       other_user_data = action->comm.dst_data;
170     }
171     if (action->comm.type == type &&
172         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
173         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
174       XBT_DEBUG("Found a matching communication action %p", action);
175       xbt_fifo_remove_item(fifo, item);
176       xbt_fifo_free_item(item);
177       action->comm.refcount++;
178       action->comm.rdv = NULL;
179       return action;
180     }
181     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
182               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
183               action, (int)action->comm.type, (int)type);
184   }
185   XBT_DEBUG("No matching communication action found");
186   return NULL;
187 }
188
189
190 /**
191  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
192  *  \param type The type of communication we are looking for (comm_send, comm_recv)
193  *  \return The communication action if found, NULL otherwise
194  */
195 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
196                                  int (*match_fun)(void *, void *,smx_action_t),
197                                  void *this_user_data, smx_action_t my_action)
198 {
199   smx_action_t action;
200   xbt_fifo_item_t item;
201   void* other_user_data = NULL;
202
203   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
204     if (action->comm.type == SIMIX_COMM_SEND) {
205       other_user_data = action->comm.src_data;
206     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
207       other_user_data = action->comm.dst_data;
208     }
209     if (action->comm.type == type &&
210         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
211         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
212       XBT_DEBUG("Found a matching communication action %p", action);
213       action->comm.refcount++;
214
215       return action;
216     }
217     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
218               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
219               action, (int)action->comm.type, (int)type);
220   }
221   XBT_DEBUG("No matching communication action found");
222   return NULL;
223 }
224 /******************************************************************************/
225 /*                            Communication Actions                            */
226 /******************************************************************************/
227
228 /**
229  *  \brief Creates a new communicate action
230  *  \param type The direction of communication (comm_send, comm_recv)
231  *  \return The new communicate action
232  */
233 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
234 {
235   smx_action_t act;
236
237   /* alloc structures */
238   act = xbt_mallocator_get(simix_global->action_mallocator);
239
240   act->type = SIMIX_ACTION_COMMUNICATE;
241   act->state = SIMIX_WAITING;
242
243   /* set communication */
244   act->comm.type = type;
245   act->comm.refcount = 1;
246
247 #ifdef HAVE_LATENCY_BOUND_TRACKING
248   //initialize with unknown value
249   act->latency_limited = -1;
250 #endif
251
252 #ifdef HAVE_TRACING
253   act->category = NULL;
254 #endif
255
256   XBT_DEBUG("Create communicate action %p", act);
257   ++smx_total_comms;
258
259   return act;
260 }
261
262 /**
263  *  \brief Destroy a communicate action
264  *  \param action The communicate action to be destroyed
265  */
266 void SIMIX_comm_destroy(smx_action_t action)
267 {
268   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
269             action, action->comm.refcount, (int)action->state);
270
271   if (action->comm.refcount <= 0) {
272     xbt_backtrace_display_current();
273     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
274             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
275   }
276   action->comm.refcount--;
277   if (action->comm.refcount > 0)
278     return;
279   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
280             action->comm.refcount);
281
282 #ifdef HAVE_LATENCY_BOUND_TRACKING
283   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
284 #endif
285
286   xbt_free(action->name);
287   SIMIX_comm_destroy_internal_actions(action);
288
289   if (action->comm.detached && action->state != SIMIX_DONE) {
290     /* the communication has failed and was detached:
291      * we have to free the buffer */
292     if (action->comm.clean_fun) {
293       action->comm.clean_fun(action->comm.src_buff);
294     }
295     action->comm.src_buff = NULL;
296   }
297
298   xbt_mallocator_release(simix_global->action_mallocator, action);
299 }
300
301 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
302 {
303   if (action->comm.surf_comm){
304 #ifdef HAVE_LATENCY_BOUND_TRACKING
305     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
306 #endif
307     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
308     action->comm.surf_comm = NULL;
309   }
310
311   if (action->comm.src_timeout){
312     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
313     action->comm.src_timeout = NULL;
314   }
315
316   if (action->comm.dst_timeout){
317     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
318     action->comm.dst_timeout = NULL;
319   }
320 }
321
322 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
323                               double task_size, double rate,
324                               void *src_buff, size_t src_buff_size,
325                               int (*match_fun)(void *, void *,smx_action_t),
326                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
327                               void *data,
328                               int detached)
329 {
330   XBT_DEBUG("send from %p\n", rdv);
331
332   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
333   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
334
335   /* Look for communication action matching our needs. We also provide a description of
336    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
337    *
338    * If it is not found then push our communication into the rendez-vous point */
339   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
340
341   if (!other_action) {
342     other_action = this_action;
343
344     if (rdv->permanent_receiver!=NULL){
345       //this mailbox is for small messages, which have to be sent right now
346       other_action->state = SIMIX_READY;
347       other_action->comm.dst_proc=rdv->permanent_receiver;
348       other_action->comm.refcount++;
349       other_action->comm.rdv = rdv;
350       xbt_fifo_push(rdv->done_comm_fifo,other_action);
351       other_action->comm.rdv=rdv;
352       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
353
354     }else{
355       SIMIX_rdv_push(rdv, this_action);
356     }
357   } else {
358     XBT_DEBUG("Receive already pushed\n");
359
360     SIMIX_comm_destroy(this_action);
361     --smx_total_comms; // this creation was a pure waste
362
363     other_action->state = SIMIX_READY;
364     other_action->comm.type = SIMIX_COMM_READY;
365
366   }
367   xbt_fifo_push(src_proc->comms, other_action);
368
369   /* if the communication action is detached then decrease the refcount
370    * by one, so it will be eliminated by the receiver's destroy call */
371   if (detached) {
372     other_action->comm.detached = 1;
373     other_action->comm.refcount--;
374     other_action->comm.clean_fun = clean_fun;
375   } else {
376     other_action->comm.clean_fun = NULL;
377   }
378
379   /* Setup the communication action */
380   other_action->comm.src_proc = src_proc;
381   other_action->comm.task_size = task_size;
382   other_action->comm.rate = rate;
383   other_action->comm.src_buff = src_buff;
384   other_action->comm.src_buff_size = src_buff_size;
385   other_action->comm.src_data = data;
386
387   other_action->comm.match_fun = match_fun;
388
389   if (MC_IS_ENABLED) {
390     other_action->state = SIMIX_RUNNING;
391     return other_action;
392   }
393
394   SIMIX_comm_start(other_action);
395   return (detached ? NULL : other_action);
396 }
397
398 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
399                               void *dst_buff, size_t *dst_buff_size,
400                               int (*match_fun)(void *, void *, smx_action_t), void *data)
401 {
402   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
403   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
404
405   smx_action_t other_action;
406   //communication already done, get it inside the fifo of completed comms
407   //permanent receive v1
408   //int already_received=0;
409   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
410
411     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
412     //find a match in the already received fifo
413     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
414     //if not found, assume the receiver came first, register it to the mailbox in the classical way
415     if (!other_action)  {
416       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
417       other_action = this_action;
418       SIMIX_rdv_push(rdv, this_action);
419     }else{
420       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
421       {
422         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
423         other_action->state = SIMIX_DONE;
424         other_action->comm.type = SIMIX_COMM_DONE;
425         other_action->comm.rdv = NULL;
426         //SIMIX_comm_destroy(this_action);
427         //--smx_total_comms; // this creation was a pure waste
428         //already_received=1;
429         //other_action->comm.refcount--;
430       }/*else{
431          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
432          }*/
433      // other_action->comm.refcount--;
434       SIMIX_comm_destroy(this_action);
435       --smx_total_comms; // this creation was a pure waste
436     }
437   }else{
438     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
439
440     /* Look for communication action matching our needs. We also provide a description of
441      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
442      *
443      * If it is not found then push our communication into the rendez-vous point */
444     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
445
446     if (!other_action) {
447       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
448       other_action = this_action;
449       SIMIX_rdv_push(rdv, this_action);
450     } else {
451       SIMIX_comm_destroy(this_action);
452       --smx_total_comms; // this creation was a pure waste
453       other_action->state = SIMIX_READY;
454       other_action->comm.type = SIMIX_COMM_READY;
455    //   other_action->comm.refcount--;
456     }
457     xbt_fifo_push(dst_proc->comms, other_action);
458   }
459
460   /* Setup communication action */
461   other_action->comm.dst_proc = dst_proc;
462   other_action->comm.dst_buff = dst_buff;
463   other_action->comm.dst_buff_size = dst_buff_size;
464   other_action->comm.dst_data = data;
465
466   other_action->comm.match_fun = match_fun;
467
468
469   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
470     SIMIX_comm_copy_data(other_action);*/
471
472
473   if (MC_IS_ENABLED) {
474     other_action->state = SIMIX_RUNNING;
475     return other_action;
476   }
477
478   SIMIX_comm_start(other_action);
479   // }
480   return other_action;
481 }
482
483
484 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
485                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
486 {
487   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
488   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
489
490   smx_action_t other_action=NULL;
491   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
492     //find a match in the already received fifo
493       XBT_DEBUG("first try in the perm recv mailbox \n");
494
495     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
496   }
497  // }else{
498     if(!other_action){
499         XBT_DEBUG("second try in the other mailbox");
500         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
501     }
502 //  }
503   if(other_action)other_action->comm.refcount--;
504
505   SIMIX_comm_destroy(this_action);
506   --smx_total_comms;
507   return other_action;
508 }
509
510 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
511 {
512
513   /* the simcall may be a wait, a send or a recv */
514   surf_action_t sleep;
515
516   /* Associate this simcall to the wait action */
517   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
518
519   xbt_fifo_push(action->simcalls, simcall);
520   simcall->issuer->waiting_action = action;
521
522   if (MC_IS_ENABLED) {
523     if (idx == 0) {
524       action->state = SIMIX_DONE;
525     } else {
526       /* If we reached this point, the wait simcall must have a timeout */
527       /* Otherwise it shouldn't be enabled and executed by the MC */
528       if (timeout == -1)
529         THROW_IMPOSSIBLE;
530
531       if (action->comm.src_proc == simcall->issuer)
532         action->state = SIMIX_SRC_TIMEOUT;
533       else
534         action->state = SIMIX_DST_TIMEOUT;
535     }
536
537     SIMIX_comm_finish(action);
538     return;
539   }
540
541   /* If the action has already finish perform the error handling, */
542   /* otherwise set up a waiting timeout on the right side         */
543   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
544     SIMIX_comm_finish(action);
545   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
546     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
547     surf_workstation_model->action_data_set(sleep, action);
548
549     if (simcall->issuer == action->comm.src_proc)
550       action->comm.src_timeout = sleep;
551     else
552       action->comm.dst_timeout = sleep;
553   }
554 }
555
556 void SIMIX_pre_comm_test(smx_simcall_t simcall)
557 {
558   smx_action_t action = simcall->comm_test.comm;
559
560   if(MC_IS_ENABLED){
561     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
562     if(simcall->comm_test.result){
563       action->state = SIMIX_DONE;
564       xbt_fifo_push(action->simcalls, simcall);
565       SIMIX_comm_finish(action);
566     }else{
567       SIMIX_simcall_answer(simcall);
568     }
569     return;
570   }
571
572   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
573   if (simcall->comm_test.result) {
574     xbt_fifo_push(action->simcalls, simcall);
575     SIMIX_comm_finish(action);
576   } else {
577     SIMIX_simcall_answer(simcall);
578   }
579 }
580
581 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
582 {
583   unsigned int cursor;
584   smx_action_t action;
585   xbt_dynar_t actions = simcall->comm_testany.comms;
586   simcall->comm_testany.result = -1;
587
588   if (MC_IS_ENABLED){
589     if(idx == -1){
590       SIMIX_simcall_answer(simcall);
591     }else{
592       action = xbt_dynar_get_as(actions, idx, smx_action_t);
593       simcall->comm_testany.result = idx;
594       xbt_fifo_push(action->simcalls, simcall);
595       action->state = SIMIX_DONE;
596       SIMIX_comm_finish(action);
597     }
598     return;
599   }
600
601   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
602     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
603       simcall->comm_testany.result = cursor;
604       xbt_fifo_push(action->simcalls, simcall);
605       SIMIX_comm_finish(action);
606       return;
607     }
608   }
609   SIMIX_simcall_answer(simcall);
610 }
611
612 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
613 {
614   smx_action_t action;
615   unsigned int cursor = 0;
616   xbt_dynar_t actions = simcall->comm_waitany.comms;
617
618   if (MC_IS_ENABLED){
619     action = xbt_dynar_get_as(actions, idx, smx_action_t);
620     xbt_fifo_push(action->simcalls, simcall);
621     simcall->comm_waitany.result = idx;
622     action->state = SIMIX_DONE;
623     SIMIX_comm_finish(action);
624     return;
625   }
626
627   xbt_dynar_foreach(actions, cursor, action){
628     /* associate this simcall to the the action */
629     xbt_fifo_push(action->simcalls, simcall);
630
631     /* see if the action is already finished */
632     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
633       SIMIX_comm_finish(action);
634       break;
635     }
636   }
637 }
638
639 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
640 {
641   smx_action_t action;
642   unsigned int cursor = 0;
643   xbt_dynar_t actions = simcall->comm_waitany.comms;
644
645   xbt_dynar_foreach(actions, cursor, action) {
646     xbt_fifo_remove(action->simcalls, simcall);
647   }
648 }
649
650 /**
651  *  \brief Starts the simulation of a communication action.
652  *  \param action the communication action
653  */
654 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
655 {
656   /* If both the sender and the receiver are already there, start the communication */
657   if (action->state == SIMIX_READY) {
658
659     smx_host_t sender = action->comm.src_proc->smx_host;
660     smx_host_t receiver = action->comm.dst_proc->smx_host;
661
662     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
663               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
664
665     action->comm.surf_comm = surf_workstation_model->extension.workstation.
666       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
667
668     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
669
670     action->state = SIMIX_RUNNING;
671
672     /* If a link is failed, detect it immediately */
673     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
674       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
675                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
676       action->state = SIMIX_LINK_FAILURE;
677       SIMIX_comm_destroy_internal_actions(action);
678     }
679
680     /* If any of the process is suspend, create the action but stop its execution,
681        it will be restarted when the sender process resume */
682     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
683         SIMIX_process_is_suspended(action->comm.dst_proc)) {
684       /* FIXME: check what should happen with the action state */
685
686       if (SIMIX_process_is_suspended(action->comm.src_proc))
687         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
688                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
689       else
690         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
691                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
692
693       surf_workstation_model->suspend(action->comm.surf_comm);
694
695     }
696   }
697 }
698
699 /**
700  * \brief Answers the SIMIX simcalls associated to a communication action.
701  * \param action a finished communication action
702  */
703 void SIMIX_comm_finish(smx_action_t action)
704 {
705   unsigned int destroy_count = 0;
706   smx_simcall_t simcall;
707
708   while ((simcall = xbt_fifo_shift(action->simcalls))) {
709
710     /* If a waitany simcall is waiting for this action to finish, then remove
711        it from the other actions in the waitany list. Afterwards, get the
712        position of the actual action in the waitany dynar and
713        return it as the result of the simcall */
714     if (simcall->call == SIMCALL_COMM_WAITANY) {
715       SIMIX_waitany_remove_simcall_from_actions(simcall);
716       if (!MC_IS_ENABLED)
717         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
718     }
719
720     /* If the action is still in a rendez-vous point then remove from it */
721     if (action->comm.rdv)
722       SIMIX_rdv_remove(action->comm.rdv, action);
723
724     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
725
726     /* Check out for errors */
727     switch (action->state) {
728
729     case SIMIX_DONE:
730       XBT_DEBUG("Communication %p complete!", action);
731       SIMIX_comm_copy_data(action);
732       break;
733
734     case SIMIX_SRC_TIMEOUT:
735       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
736                     "Communication timeouted because of sender");
737       break;
738
739     case SIMIX_DST_TIMEOUT:
740       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
741                     "Communication timeouted because of receiver");
742       break;
743
744     case SIMIX_SRC_HOST_FAILURE:
745       if (simcall->issuer == action->comm.src_proc)
746         simcall->issuer->context->iwannadie = 1;
747 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
748       else
749         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
750       break;
751
752     case SIMIX_DST_HOST_FAILURE:
753       if (simcall->issuer == action->comm.dst_proc)
754         simcall->issuer->context->iwannadie = 1;
755 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
756       else
757         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
758       break;
759
760     case SIMIX_LINK_FAILURE:
761       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
762                 action,
763                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
764                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
765                 simcall->issuer->name, simcall->issuer, action->comm.detached);
766       if (action->comm.src_proc == simcall->issuer) {
767         XBT_DEBUG("I'm source");
768       } else if (action->comm.dst_proc == simcall->issuer) {
769         XBT_DEBUG("I'm dest");
770       } else {
771         XBT_DEBUG("I'm neither source nor dest");
772       }
773       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
774       break;
775
776     case SIMIX_CANCELED:
777       if (simcall->issuer == action->comm.dst_proc)
778         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
779                       "Communication canceled by the sender");
780       else
781         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
782                       "Communication canceled by the receiver");
783       break;
784
785     default:
786       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
787     }
788
789     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
790     if (simcall->issuer->doexception) {
791       if (simcall->call == SIMCALL_COMM_WAITANY) {
792         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
793       }
794       else if (simcall->call == SIMCALL_COMM_TESTANY) {
795         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
796       }
797     }
798
799     if (surf_workstation_model->extension.
800         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
801       simcall->issuer->context->iwannadie = 1;
802     }
803
804     simcall->issuer->waiting_action = NULL;
805     xbt_fifo_remove(simcall->issuer->comms, action);
806     SIMIX_simcall_answer(simcall);
807     destroy_count++;
808   }
809
810   while (destroy_count-- > 0)
811     SIMIX_comm_destroy(action);
812 }
813
814 /**
815  * \brief This function is called when a Surf communication action is finished.
816  * \param action the corresponding Simix communication
817  */
818 void SIMIX_post_comm(smx_action_t action)
819 {
820   /* Update action state */
821   if (action->comm.src_timeout &&
822       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
823     action->state = SIMIX_SRC_TIMEOUT;
824   else if (action->comm.dst_timeout &&
825            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
826     action->state = SIMIX_DST_TIMEOUT;
827   else if (action->comm.src_timeout &&
828            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
829     action->state = SIMIX_SRC_HOST_FAILURE;
830   else if (action->comm.dst_timeout &&
831            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
832     action->state = SIMIX_DST_HOST_FAILURE;
833   else if (action->comm.surf_comm &&
834            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
835     XBT_DEBUG("Puta madre. Surf says that the link broke");
836     action->state = SIMIX_LINK_FAILURE;
837   } else
838     action->state = SIMIX_DONE;
839
840   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
841             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
842
843   /* destroy the surf actions associated with the Simix communication */
844   SIMIX_comm_destroy_internal_actions(action);
845
846   /* remove the communication action from the list of pending communications
847    * of both processes (if they still exist) */
848   if (action->comm.src_proc) {
849     xbt_fifo_remove(action->comm.src_proc->comms, action);
850   }
851   if (action->comm.dst_proc) {
852     xbt_fifo_remove(action->comm.dst_proc->comms, action);
853   }
854
855   /* if there are simcalls associated with the action, then answer them */
856   if (xbt_fifo_size(action->simcalls)) {
857     SIMIX_comm_finish(action);
858   }
859 }
860
861 void SIMIX_comm_cancel(smx_action_t action)
862 {
863   /* if the action is a waiting state means that it is still in a rdv */
864   /* so remove from it and delete it */
865   if (action->state == SIMIX_WAITING) {
866     SIMIX_rdv_remove(action->comm.rdv, action);
867     action->state = SIMIX_CANCELED;
868   }
869   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
870            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
871
872     surf_workstation_model->action_cancel(action->comm.surf_comm);
873   }
874 }
875
876 void SIMIX_comm_suspend(smx_action_t action)
877 {
878   /*FIXME: shall we suspend also the timeout actions? */
879   if (action->comm.surf_comm)
880     surf_workstation_model->suspend(action->comm.surf_comm);
881   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
882 }
883
884 void SIMIX_comm_resume(smx_action_t action)
885 {
886   /*FIXME: check what happen with the timeouts */
887   if (action->comm.surf_comm)
888     surf_workstation_model->resume(action->comm.surf_comm);
889   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
890 }
891
892
893 /************* Action Getters **************/
894
895 /**
896  *  \brief get the amount remaining from the communication
897  *  \param action The communication
898  */
899 double SIMIX_comm_get_remains(smx_action_t action)
900 {
901   double remains;
902
903   if(!action){
904     return 0;
905   }
906
907   switch (action->state) {
908
909   case SIMIX_RUNNING:
910     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
911     break;
912
913   case SIMIX_WAITING:
914   case SIMIX_READY:
915     remains = 0; /*FIXME: check what should be returned */
916     break;
917
918   default:
919     remains = 0; /*FIXME: is this correct? */
920     break;
921   }
922   return remains;
923 }
924
925 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
926 {
927   return action->state;
928 }
929
930 /**
931  *  \brief Return the user data associated to the sender of the communication
932  *  \param action The communication
933  *  \return the user data
934  */
935 void* SIMIX_comm_get_src_data(smx_action_t action)
936 {
937   return action->comm.src_data;
938 }
939
940 /**
941  *  \brief Return the user data associated to the receiver of the communication
942  *  \param action The communication
943  *  \return the user data
944  */
945 void* SIMIX_comm_get_dst_data(smx_action_t action)
946 {
947   return action->comm.dst_data;
948 }
949
950 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
951 {
952   return action->comm.src_proc;
953 }
954
955 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
956 {
957   return action->comm.dst_proc;
958 }
959
960 #ifdef HAVE_LATENCY_BOUND_TRACKING
961 /**
962  *  \brief verify if communication is latency bounded
963  *  \param comm The communication
964  */
965 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
966 {
967   if(!action){
968     return 0;
969   }
970   if (action->comm.surf_comm){
971     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
972     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
973     XBT_DEBUG("Action limited is %d", action->latency_limited);
974   }
975   return action->latency_limited;
976 }
977 #endif
978
979 /******************************************************************************/
980 /*                    SIMIX_comm_copy_data callbacks                       */
981 /******************************************************************************/
982 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
983   &SIMIX_comm_copy_pointer_callback;
984
985 void
986 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
987 {
988   SIMIX_comm_copy_data_callback = callback;
989 }
990
991 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
992 {
993   xbt_assert((buff_size == sizeof(void *)),
994              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
995   *(void **) (comm->comm.dst_buff) = buff;
996 }
997
998 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
999 {
1000   XBT_DEBUG("Copy the data over");
1001   memcpy(comm->comm.dst_buff, buff, buff_size);
1002   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1003     xbt_free(buff);
1004     comm->comm.src_buff = NULL;
1005   }
1006 }
1007
1008
1009 /**
1010  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1011  *  \param comm The communication
1012  */
1013 void SIMIX_comm_copy_data(smx_action_t comm)
1014 {
1015   size_t buff_size = comm->comm.src_buff_size;
1016   /* If there is no data to be copy then return */
1017   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1018     return;
1019
1020   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1021             comm,
1022             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1023             comm->comm.src_buff,
1024             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1025             comm->comm.dst_buff, buff_size);
1026
1027   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1028   if (comm->comm.dst_buff_size)
1029     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1030
1031   /* Update the receiver's buffer size to the copied amount */
1032   if (comm->comm.dst_buff_size)
1033     *comm->comm.dst_buff_size = buff_size;
1034
1035   if (buff_size > 0)
1036     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1037
1038   /* Set the copied flag so we copy data only once */
1039   /* (this function might be called from both communication ends) */
1040   comm->comm.copied = 1;
1041 }