Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
I thought I had done this one a long time ago. Deactivate sender_gap aggregation.
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
26
27 void SIMIX_network_init(void)
28 {
29   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
30 }
31
32 void SIMIX_network_exit(void)
33 {
34   xbt_dict_free(&rdv_points);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_rdv_t SIMIX_rdv_create(const char *name)
42 {
43   /* two processes may have pushed the same rdv_create simcall at the same time */
44   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
45
46   if (!rdv) {
47     rdv = xbt_new0(s_smx_rvpoint_t, 1);
48     rdv->name = name ? xbt_strdup(name) : NULL;
49     rdv->comm_fifo = xbt_fifo_new();
50     rdv->done_comm_fifo = xbt_fifo_new();
51     rdv->permanent_receiver=NULL;
52
53     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
54
55     if (rdv->name)
56       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
57   }
58   return rdv;
59 }
60
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
62 {
63   if (rdv->name)
64     xbt_dict_remove(rdv_points, rdv->name);
65 }
66
67 void SIMIX_rdv_free(void *data)
68 {
69   XBT_DEBUG("rdv free %p", data);
70   smx_rdv_t rdv = (smx_rdv_t) data;
71   xbt_free(rdv->name);
72   xbt_fifo_free(rdv->comm_fifo);
73   xbt_fifo_free(rdv->done_comm_fifo);
74
75   xbt_free(rdv);  
76 }
77
78 xbt_dict_t SIMIX_get_rdv_points()
79 {
80   return rdv_points;
81 }
82
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
84 {
85   return xbt_dict_get_or_null(rdv_points, name);
86 }
87
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
89 {
90   smx_action_t comm = NULL;
91   xbt_fifo_item_t item = NULL;
92   int count = 0;
93
94   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95     if (comm->comm.src_proc->smx_host == host)
96       count++;
97   }
98
99   return count;
100 }
101
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
103 {
104   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
105 }
106
107 /**
108  *  \brief get the receiver (process associated to the mailbox)
109  *  \param rdv The rendez-vous point
110  *  \return process The receiving process (NULL if not set)
111  */
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
113 {
114   return rdv->permanent_receiver;
115 }
116
117 /**
118  *  \brief set the receiver of the rendez vous point to allow eager sends
119  *  \param rdv The rendez-vous point
120  *  \param process The receiving process
121  */
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
123 {
124   rdv->permanent_receiver=process;
125 }
126
127 /**
128  *  \brief Pushes a communication action into a rendez-vous point
129  *  \param rdv The rendez-vous point
130  *  \param comm The communication action
131  */
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
133 {
134   xbt_fifo_push(rdv->comm_fifo, comm);
135   comm->comm.rdv = rdv;
136 }
137
138 /**
139  *  \brief Removes a communication action from a rendez-vous point
140  *  \param rdv The rendez-vous point
141  *  \param comm The communication action
142  */
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
144 {
145   xbt_fifo_remove(rdv->comm_fifo, comm);
146   comm->comm.rdv = NULL;
147 }
148
149 /**
150  *  \brief Checks if there is a communication action queued in a fifo matching our needs
151  *  \param type The type of communication we are looking for (comm_send, comm_recv)
152  *  \return The communication action if found, NULL otherwise
153  */
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155                                  int (*match_fun)(void *, void *,smx_action_t),
156                                  void *this_user_data, smx_action_t my_action)
157 {
158   smx_action_t action;
159   xbt_fifo_item_t item;
160   void* other_user_data = NULL;
161
162   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163     if (action->comm.type == SIMIX_COMM_SEND) {
164       other_user_data = action->comm.src_data;
165     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166       other_user_data = action->comm.dst_data;
167     }
168     if (action->comm.type == type &&
169         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
170         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
171       XBT_DEBUG("Found a matching communication action %p", action);
172       xbt_fifo_remove_item(fifo, item);
173       xbt_fifo_free_item(item);
174       action->comm.refcount++;
175       action->comm.rdv = NULL;
176       return action;
177     }
178     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180               action, (int)action->comm.type, (int)type);
181   }
182   XBT_DEBUG("No matching communication action found");
183   return NULL;
184 }
185
186
187 /**
188  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
189  *  \param type The type of communication we are looking for (comm_send, comm_recv)
190  *  \return The communication action if found, NULL otherwise
191  */
192 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
193                                  int (*match_fun)(void *, void *,smx_action_t),
194                                  void *this_user_data, smx_action_t my_action)
195 {
196   smx_action_t action;
197   xbt_fifo_item_t item;
198   void* other_user_data = NULL;
199
200   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
201     if (action->comm.type == SIMIX_COMM_SEND) {
202       other_user_data = action->comm.src_data;
203     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
204       other_user_data = action->comm.dst_data;
205     }
206     if (action->comm.type == type &&
207         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
208         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
209       XBT_DEBUG("Found a matching communication action %p", action);
210       action->comm.refcount++;
211
212       return action;
213     }
214     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
215               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
216               action, (int)action->comm.type, (int)type);
217   }
218   XBT_DEBUG("No matching communication action found");
219   return NULL;
220 }
221 /******************************************************************************/
222 /*                            Communication Actions                            */
223 /******************************************************************************/
224
225 /**
226  *  \brief Creates a new communicate action
227  *  \param type The direction of communication (comm_send, comm_recv)
228  *  \return The new communicate action
229  */
230 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
231 {
232   smx_action_t act;
233
234   /* alloc structures */
235   act = xbt_mallocator_get(simix_global->action_mallocator);
236
237   act->type = SIMIX_ACTION_COMMUNICATE;
238   act->state = SIMIX_WAITING;
239
240   /* set communication */
241   act->comm.type = type;
242   act->comm.refcount = 1;
243
244 #ifdef HAVE_LATENCY_BOUND_TRACKING
245   //initialize with unknown value
246   act->latency_limited = -1;
247 #endif
248
249 #ifdef HAVE_TRACING
250   act->category = NULL;
251 #endif
252
253   XBT_DEBUG("Create communicate action %p", act);
254   ++smx_total_comms;
255
256   return act;
257 }
258
259 /**
260  *  \brief Destroy a communicate action
261  *  \param action The communicate action to be destroyed
262  */
263 void SIMIX_comm_destroy(smx_action_t action)
264 {
265   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
266             action, action->comm.refcount, (int)action->state);
267
268   if (action->comm.refcount <= 0) {
269     xbt_backtrace_display_current();
270     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
271             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
272   }
273   action->comm.refcount--;
274   if (action->comm.refcount > 0)
275     return;
276   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
277             action->comm.refcount);
278
279 #ifdef HAVE_LATENCY_BOUND_TRACKING
280   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
281 #endif
282
283   xbt_free(action->name);
284   SIMIX_comm_destroy_internal_actions(action);
285
286   if (action->comm.detached && action->state != SIMIX_DONE) {
287     /* the communication has failed and was detached:
288      * we have to free the buffer */
289     if (action->comm.clean_fun) {
290       action->comm.clean_fun(action->comm.src_buff);
291     }
292     action->comm.src_buff = NULL;
293   }
294
295   xbt_mallocator_release(simix_global->action_mallocator, action);
296 }
297
298 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
299 {
300   if (action->comm.surf_comm){
301 #ifdef HAVE_LATENCY_BOUND_TRACKING
302     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
303 #endif
304     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
305     action->comm.surf_comm = NULL;
306   }
307
308   if (action->comm.src_timeout){
309     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
310     action->comm.src_timeout = NULL;
311   }
312
313   if (action->comm.dst_timeout){
314     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
315     action->comm.dst_timeout = NULL;
316   }
317 }
318
319 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
320                               double task_size, double rate,
321                               void *src_buff, size_t src_buff_size,
322                               int (*match_fun)(void *, void *,smx_action_t),
323                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
324                               void *data,
325                               int detached)
326 {
327   XBT_DEBUG("send from %p\n", rdv);
328
329   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
330   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
331
332   /* Look for communication action matching our needs. We also provide a description of
333    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
334    *
335    * If it is not found then push our communication into the rendez-vous point */
336   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
337
338   if (!other_action) {
339     other_action = this_action;
340
341     if (rdv->permanent_receiver!=NULL){
342       //this mailbox is for small messages, which have to be sent right now
343       other_action->state = SIMIX_READY;
344       other_action->comm.dst_proc=rdv->permanent_receiver;
345       other_action->comm.refcount++;
346       other_action->comm.rdv = rdv;
347       xbt_fifo_push(rdv->done_comm_fifo,other_action);
348       other_action->comm.rdv=rdv;
349       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
350
351     }else{
352       SIMIX_rdv_push(rdv, this_action);
353     }
354   } else {
355     XBT_DEBUG("Receive already pushed\n");
356
357     SIMIX_comm_destroy(this_action);
358     --smx_total_comms; // this creation was a pure waste
359
360     other_action->state = SIMIX_READY;
361     other_action->comm.type = SIMIX_COMM_READY;
362
363   }
364   xbt_fifo_push(src_proc->comms, other_action);
365
366   /* if the communication action is detached then decrease the refcount
367    * by one, so it will be eliminated by the receiver's destroy call */
368   if (detached) {
369     other_action->comm.detached = 1;
370     other_action->comm.refcount--;
371     other_action->comm.clean_fun = clean_fun;
372   } else {
373     other_action->comm.clean_fun = NULL;
374   }
375
376   /* Setup the communication action */
377   other_action->comm.src_proc = src_proc;
378   other_action->comm.task_size = task_size;
379   other_action->comm.rate = rate;
380   other_action->comm.src_buff = src_buff;
381   other_action->comm.src_buff_size = src_buff_size;
382   other_action->comm.src_data = data;
383
384   other_action->comm.match_fun = match_fun;
385
386   if (MC_IS_ENABLED) {
387     other_action->state = SIMIX_RUNNING;
388     return other_action;
389   }
390
391   SIMIX_comm_start(other_action);
392   return (detached ? NULL : other_action);
393 }
394
395 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
396                               void *dst_buff, size_t *dst_buff_size,
397                               int (*match_fun)(void *, void *, smx_action_t), void *data)
398 {
399   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
400   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
401
402   smx_action_t other_action;
403   //communication already done, get it inside the fifo of completed comms
404   //permanent receive v1
405   //int already_received=0;
406   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
407
408     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
409     //find a match in the already received fifo
410     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
411     //if not found, assume the receiver came first, register it to the mailbox in the classical way
412     if (!other_action)  {
413       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
414       other_action = this_action;
415       SIMIX_rdv_push(rdv, this_action);
416     }else{
417       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
418       {
419         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
420         other_action->state = SIMIX_DONE;
421         other_action->comm.type = SIMIX_COMM_DONE;
422         other_action->comm.rdv = NULL;
423         //SIMIX_comm_destroy(this_action);
424         //--smx_total_comms; // this creation was a pure waste
425         //already_received=1;
426         //other_action->comm.refcount--;
427       }/*else{
428          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
429          }*/
430      // other_action->comm.refcount--;
431       SIMIX_comm_destroy(this_action);
432       --smx_total_comms; // this creation was a pure waste
433     }
434   }else{
435     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
436
437     /* Look for communication action matching our needs. We also provide a description of
438      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
439      *
440      * If it is not found then push our communication into the rendez-vous point */
441     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
442
443     if (!other_action) {
444       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
445       other_action = this_action;
446       SIMIX_rdv_push(rdv, this_action);
447     } else {
448       SIMIX_comm_destroy(this_action);
449       --smx_total_comms; // this creation was a pure waste
450       other_action->state = SIMIX_READY;
451       other_action->comm.type = SIMIX_COMM_READY;
452    //   other_action->comm.refcount--;
453     }
454     xbt_fifo_push(dst_proc->comms, other_action);
455   }
456
457   /* Setup communication action */
458   other_action->comm.dst_proc = dst_proc;
459   other_action->comm.dst_buff = dst_buff;
460   other_action->comm.dst_buff_size = dst_buff_size;
461   other_action->comm.dst_data = data;
462
463   other_action->comm.match_fun = match_fun;
464
465
466   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
467     SIMIX_comm_copy_data(other_action);*/
468
469
470   if (MC_IS_ENABLED) {
471     other_action->state = SIMIX_RUNNING;
472     return other_action;
473   }
474
475   SIMIX_comm_start(other_action);
476   // }
477   return other_action;
478 }
479
480
481 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
482                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
483 {
484   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
485   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
486
487   smx_action_t other_action;
488   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
489     //find a match in the already received fifo
490     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
491   }else{
492     other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
493   }
494   if(other_action)other_action->comm.refcount--;
495
496   SIMIX_comm_destroy(this_action);
497   --smx_total_comms;
498   return other_action;
499 }
500
501 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
502 {
503
504   /* the simcall may be a wait, a send or a recv */
505   surf_action_t sleep;
506
507   /* Associate this simcall to the wait action */
508   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
509
510   xbt_fifo_push(action->simcalls, simcall);
511   simcall->issuer->waiting_action = action;
512
513   if (MC_IS_ENABLED) {
514     if (idx == 0) {
515       action->state = SIMIX_DONE;
516     } else {
517       /* If we reached this point, the wait simcall must have a timeout */
518       /* Otherwise it shouldn't be enabled and executed by the MC */
519       if (timeout == -1)
520         THROW_IMPOSSIBLE;
521
522       if (action->comm.src_proc == simcall->issuer)
523         action->state = SIMIX_SRC_TIMEOUT;
524       else
525         action->state = SIMIX_DST_TIMEOUT;
526     }
527
528     SIMIX_comm_finish(action);
529     return;
530   }
531
532   /* If the action has already finish perform the error handling, */
533   /* otherwise set up a waiting timeout on the right side         */
534   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
535     SIMIX_comm_finish(action);
536   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
537     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
538     surf_workstation_model->action_data_set(sleep, action);
539
540     if (simcall->issuer == action->comm.src_proc)
541       action->comm.src_timeout = sleep;
542     else
543       action->comm.dst_timeout = sleep;
544   }
545 }
546
547 void SIMIX_pre_comm_test(smx_simcall_t simcall)
548 {
549   smx_action_t action = simcall->comm_test.comm;
550
551   if(MC_IS_ENABLED){
552     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
553     if(simcall->comm_test.result){
554       action->state = SIMIX_DONE;
555       xbt_fifo_push(action->simcalls, simcall);
556       SIMIX_comm_finish(action);
557     }else{
558       SIMIX_simcall_answer(simcall);
559     }
560     return;
561   }
562
563   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
564   if (simcall->comm_test.result) {
565     xbt_fifo_push(action->simcalls, simcall);
566     SIMIX_comm_finish(action);
567   } else {
568     SIMIX_simcall_answer(simcall);
569   }
570 }
571
572 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
573 {
574   unsigned int cursor;
575   smx_action_t action;
576   xbt_dynar_t actions = simcall->comm_testany.comms;
577   simcall->comm_testany.result = -1;
578
579   if (MC_IS_ENABLED){
580     if(idx == -1){
581       SIMIX_simcall_answer(simcall);
582     }else{
583       action = xbt_dynar_get_as(actions, idx, smx_action_t);
584       simcall->comm_testany.result = idx;
585       xbt_fifo_push(action->simcalls, simcall);
586       action->state = SIMIX_DONE;
587       SIMIX_comm_finish(action);
588     }
589     return;
590   }
591
592   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
593     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
594       simcall->comm_testany.result = cursor;
595       xbt_fifo_push(action->simcalls, simcall);
596       SIMIX_comm_finish(action);
597       return;
598     }
599   }
600   SIMIX_simcall_answer(simcall);
601 }
602
603 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
604 {
605   smx_action_t action;
606   unsigned int cursor = 0;
607   xbt_dynar_t actions = simcall->comm_waitany.comms;
608
609   if (MC_IS_ENABLED){
610     action = xbt_dynar_get_as(actions, idx, smx_action_t);
611     xbt_fifo_push(action->simcalls, simcall);
612     simcall->comm_waitany.result = idx;
613     action->state = SIMIX_DONE;
614     SIMIX_comm_finish(action);
615     return;
616   }
617
618   xbt_dynar_foreach(actions, cursor, action){
619     /* associate this simcall to the the action */
620     xbt_fifo_push(action->simcalls, simcall);
621
622     /* see if the action is already finished */
623     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
624       SIMIX_comm_finish(action);
625       break;
626     }
627   }
628 }
629
630 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
631 {
632   smx_action_t action;
633   unsigned int cursor = 0;
634   xbt_dynar_t actions = simcall->comm_waitany.comms;
635
636   xbt_dynar_foreach(actions, cursor, action) {
637     xbt_fifo_remove(action->simcalls, simcall);
638   }
639 }
640
641 /**
642  *  \brief Starts the simulation of a communication action.
643  *  \param action the communication action
644  */
645 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
646 {
647   /* If both the sender and the receiver are already there, start the communication */
648   if (action->state == SIMIX_READY) {
649
650     smx_host_t sender = action->comm.src_proc->smx_host;
651     smx_host_t receiver = action->comm.dst_proc->smx_host;
652
653     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
654               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
655
656     action->comm.surf_comm = surf_workstation_model->extension.workstation.
657       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
658
659     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
660
661     action->state = SIMIX_RUNNING;
662
663     /* If a link is failed, detect it immediately */
664     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
665       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
666                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
667       action->state = SIMIX_LINK_FAILURE;
668       SIMIX_comm_destroy_internal_actions(action);
669     }
670
671     /* If any of the process is suspend, create the action but stop its execution,
672        it will be restarted when the sender process resume */
673     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
674         SIMIX_process_is_suspended(action->comm.dst_proc)) {
675       /* FIXME: check what should happen with the action state */
676
677       if (SIMIX_process_is_suspended(action->comm.src_proc))
678         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
679                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
680       else
681         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
682                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
683
684       surf_workstation_model->suspend(action->comm.surf_comm);
685
686     }
687   }
688 }
689
690 /**
691  * \brief Answers the SIMIX simcalls associated to a communication action.
692  * \param action a finished communication action
693  */
694 void SIMIX_comm_finish(smx_action_t action)
695 {
696   unsigned int destroy_count = 0;
697   smx_simcall_t simcall;
698
699   while ((simcall = xbt_fifo_shift(action->simcalls))) {
700
701     /* If a waitany simcall is waiting for this action to finish, then remove
702        it from the other actions in the waitany list. Afterwards, get the
703        position of the actual action in the waitany dynar and
704        return it as the result of the simcall */
705     if (simcall->call == SIMCALL_COMM_WAITANY) {
706       SIMIX_waitany_remove_simcall_from_actions(simcall);
707       if (!MC_IS_ENABLED)
708         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
709     }
710
711     /* If the action is still in a rendez-vous point then remove from it */
712     if (action->comm.rdv)
713       SIMIX_rdv_remove(action->comm.rdv, action);
714
715     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
716
717     /* Check out for errors */
718     switch (action->state) {
719
720     case SIMIX_DONE:
721       XBT_DEBUG("Communication %p complete!", action);
722       SIMIX_comm_copy_data(action);
723       break;
724
725     case SIMIX_SRC_TIMEOUT:
726       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
727                     "Communication timeouted because of sender");
728       break;
729
730     case SIMIX_DST_TIMEOUT:
731       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
732                     "Communication timeouted because of receiver");
733       break;
734
735     case SIMIX_SRC_HOST_FAILURE:
736       if (simcall->issuer == action->comm.src_proc)
737         simcall->issuer->context->iwannadie = 1;
738 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
739       else
740         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
741       break;
742
743     case SIMIX_DST_HOST_FAILURE:
744       if (simcall->issuer == action->comm.dst_proc)
745         simcall->issuer->context->iwannadie = 1;
746 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
747       else
748         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
749       break;
750
751     case SIMIX_LINK_FAILURE:
752       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
753                 action,
754                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
755                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
756                 simcall->issuer->name, simcall->issuer, action->comm.detached);
757       if (action->comm.src_proc == simcall->issuer) {
758         XBT_DEBUG("I'm source");
759       } else if (action->comm.dst_proc == simcall->issuer) {
760         XBT_DEBUG("I'm dest");
761       } else {
762         XBT_DEBUG("I'm neither source nor dest");
763       }
764       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
765       break;
766
767     case SIMIX_CANCELED:
768       if (simcall->issuer == action->comm.dst_proc)
769         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
770                       "Communication canceled by the sender");
771       else
772         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
773                       "Communication canceled by the receiver");
774       break;
775
776     default:
777       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
778     }
779
780     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
781     if (simcall->issuer->doexception) {
782       if (simcall->call == SIMCALL_COMM_WAITANY) {
783         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
784       }
785       else if (simcall->call == SIMCALL_COMM_TESTANY) {
786         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
787       }
788     }
789
790     if (surf_workstation_model->extension.
791         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
792       simcall->issuer->context->iwannadie = 1;
793     }
794
795     simcall->issuer->waiting_action = NULL;
796     xbt_fifo_remove(simcall->issuer->comms, action);
797     SIMIX_simcall_answer(simcall);
798     destroy_count++;
799   }
800
801   while (destroy_count-- > 0)
802     SIMIX_comm_destroy(action);
803 }
804
805 /**
806  * \brief This function is called when a Surf communication action is finished.
807  * \param action the corresponding Simix communication
808  */
809 void SIMIX_post_comm(smx_action_t action)
810 {
811   /* Update action state */
812   if (action->comm.src_timeout &&
813       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
814     action->state = SIMIX_SRC_TIMEOUT;
815   else if (action->comm.dst_timeout &&
816            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
817     action->state = SIMIX_DST_TIMEOUT;
818   else if (action->comm.src_timeout &&
819            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
820     action->state = SIMIX_SRC_HOST_FAILURE;
821   else if (action->comm.dst_timeout &&
822            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
823     action->state = SIMIX_DST_HOST_FAILURE;
824   else if (action->comm.surf_comm &&
825            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
826     XBT_DEBUG("Puta madre. Surf says that the link broke");
827     action->state = SIMIX_LINK_FAILURE;
828   } else
829     action->state = SIMIX_DONE;
830
831   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
832             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
833
834   /* destroy the surf actions associated with the Simix communication */
835   SIMIX_comm_destroy_internal_actions(action);
836
837   /* remove the communication action from the list of pending communications
838    * of both processes (if they still exist) */
839   if (action->comm.src_proc) {
840     xbt_fifo_remove(action->comm.src_proc->comms, action);
841   }
842   if (action->comm.dst_proc) {
843     xbt_fifo_remove(action->comm.dst_proc->comms, action);
844   }
845
846   /* if there are simcalls associated with the action, then answer them */
847   if (xbt_fifo_size(action->simcalls)) {
848     SIMIX_comm_finish(action);
849   }
850 }
851
852 void SIMIX_comm_cancel(smx_action_t action)
853 {
854   /* if the action is a waiting state means that it is still in a rdv */
855   /* so remove from it and delete it */
856   if (action->state == SIMIX_WAITING) {
857     SIMIX_rdv_remove(action->comm.rdv, action);
858     action->state = SIMIX_CANCELED;
859   }
860   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
861            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
862
863     surf_workstation_model->action_cancel(action->comm.surf_comm);
864   }
865 }
866
867 void SIMIX_comm_suspend(smx_action_t action)
868 {
869   /*FIXME: shall we suspend also the timeout actions? */
870   if (action->comm.surf_comm)
871     surf_workstation_model->suspend(action->comm.surf_comm);
872   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
873 }
874
875 void SIMIX_comm_resume(smx_action_t action)
876 {
877   /*FIXME: check what happen with the timeouts */
878   if (action->comm.surf_comm)
879     surf_workstation_model->resume(action->comm.surf_comm);
880   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
881 }
882
883
884 /************* Action Getters **************/
885
886 /**
887  *  \brief get the amount remaining from the communication
888  *  \param action The communication
889  */
890 double SIMIX_comm_get_remains(smx_action_t action)
891 {
892   double remains;
893
894   if(!action){
895     return 0;
896   }
897
898   switch (action->state) {
899
900   case SIMIX_RUNNING:
901     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
902     break;
903
904   case SIMIX_WAITING:
905   case SIMIX_READY:
906     remains = 0; /*FIXME: check what should be returned */
907     break;
908
909   default:
910     remains = 0; /*FIXME: is this correct? */
911     break;
912   }
913   return remains;
914 }
915
916 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
917 {
918   return action->state;
919 }
920
921 /**
922  *  \brief Return the user data associated to the sender of the communication
923  *  \param action The communication
924  *  \return the user data
925  */
926 void* SIMIX_comm_get_src_data(smx_action_t action)
927 {
928   return action->comm.src_data;
929 }
930
931 /**
932  *  \brief Return the user data associated to the receiver of the communication
933  *  \param action The communication
934  *  \return the user data
935  */
936 void* SIMIX_comm_get_dst_data(smx_action_t action)
937 {
938   return action->comm.dst_data;
939 }
940
941 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
942 {
943   return action->comm.src_proc;
944 }
945
946 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
947 {
948   return action->comm.dst_proc;
949 }
950
951 #ifdef HAVE_LATENCY_BOUND_TRACKING
952 /**
953  *  \brief verify if communication is latency bounded
954  *  \param comm The communication
955  */
956 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
957 {
958   if(!action){
959     return 0;
960   }
961   if (action->comm.surf_comm){
962     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
963     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
964     XBT_DEBUG("Action limited is %d", action->latency_limited);
965   }
966   return action->latency_limited;
967 }
968 #endif
969
970 /******************************************************************************/
971 /*                    SIMIX_comm_copy_data callbacks                       */
972 /******************************************************************************/
973 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
974   &SIMIX_comm_copy_pointer_callback;
975
976 void
977 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
978 {
979   SIMIX_comm_copy_data_callback = callback;
980 }
981
982 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
983 {
984   xbt_assert((buff_size == sizeof(void *)),
985              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
986   *(void **) (comm->comm.dst_buff) = buff;
987 }
988
989 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
990 {
991   XBT_DEBUG("Copy the data over");
992   memcpy(comm->comm.dst_buff, buff, buff_size);
993   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
994     xbt_free(buff);
995     comm->comm.src_buff = NULL;
996   }
997 }
998
999
1000 /**
1001  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1002  *  \param comm The communication
1003  */
1004 void SIMIX_comm_copy_data(smx_action_t comm)
1005 {
1006   size_t buff_size = comm->comm.src_buff_size;
1007   /* If there is no data to be copy then return */
1008   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1009     return;
1010
1011   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1012             comm,
1013             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1014             comm->comm.src_buff,
1015             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1016             comm->comm.dst_buff, buff_size);
1017
1018   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1019   if (comm->comm.dst_buff_size)
1020     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1021
1022   /* Update the receiver's buffer size to the copied amount */
1023   if (comm->comm.dst_buff_size)
1024     *comm->comm.dst_buff_size = buff_size;
1025
1026   if (buff_size > 0)
1027     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1028
1029   /* Set the copied flag so we copy data only once */
1030   /* (this function might be called from both communication ends) */
1031   comm->comm.copied = 1;
1032 }