Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Tranform extern for XBT_PUBLIC
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 XBT_PUBLIC(unsigned long int) smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33   if(MC_is_active())
34     MC_ignore(&smx_total_comms, sizeof(smx_total_comms));
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_rdv_create(const char *name)
47 {
48   /* two processes may have pushed the same rdv_create simcall at the same time */
49   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
50
51   if (!rdv) {
52     rdv = xbt_new0(s_smx_rvpoint_t, 1);
53     rdv->name = name ? xbt_strdup(name) : NULL;
54     rdv->comm_fifo = xbt_fifo_new();
55     rdv->done_comm_fifo = xbt_fifo_new();
56     rdv->permanent_receiver=NULL;
57
58     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
59
60     if (rdv->name)
61       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
62   }
63   return rdv;
64 }
65
66 void SIMIX_rdv_destroy(smx_rdv_t rdv)
67 {
68   if (rdv->name)
69     xbt_dict_remove(rdv_points, rdv->name);
70 }
71
72 void SIMIX_rdv_free(void *data)
73 {
74   XBT_DEBUG("rdv free %p", data);
75   smx_rdv_t rdv = (smx_rdv_t) data;
76   xbt_free(rdv->name);
77   xbt_fifo_free(rdv->comm_fifo);
78   xbt_fifo_free(rdv->done_comm_fifo);
79
80   xbt_free(rdv);  
81 }
82
83 xbt_dict_t SIMIX_get_rdv_points()
84 {
85   return rdv_points;
86 }
87
88 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
89 {
90   return xbt_dict_get_or_null(rdv_points, name);
91 }
92
93 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
94 {
95   smx_action_t comm = NULL;
96   xbt_fifo_item_t item = NULL;
97   int count = 0;
98
99   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
100     if (comm->comm.src_proc->smx_host == host)
101       count++;
102   }
103
104   return count;
105 }
106
107 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
108 {
109   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
110 }
111
112 /**
113  *  \brief get the receiver (process associated to the mailbox)
114  *  \param rdv The rendez-vous point
115  *  \return process The receiving process (NULL if not set)
116  */
117 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
118 {
119   return rdv->permanent_receiver;
120 }
121
122 /**
123  *  \brief set the receiver of the rendez vous point to allow eager sends
124  *  \param rdv The rendez-vous point
125  *  \param process The receiving process
126  */
127 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
128 {
129   rdv->permanent_receiver=process;
130 }
131
132 /**
133  *  \brief Pushes a communication action into a rendez-vous point
134  *  \param rdv The rendez-vous point
135  *  \param comm The communication action
136  */
137 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
138 {
139   xbt_fifo_push(rdv->comm_fifo, comm);
140   comm->comm.rdv = rdv;
141 }
142
143 /**
144  *  \brief Removes a communication action from a rendez-vous point
145  *  \param rdv The rendez-vous point
146  *  \param comm The communication action
147  */
148 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
149 {
150   xbt_fifo_remove(rdv->comm_fifo, comm);
151   comm->comm.rdv = NULL;
152 }
153
154 /**
155  *  \brief Checks if there is a communication action queued in a fifo matching our needs
156  *  \param type The type of communication we are looking for (comm_send, comm_recv)
157  *  \return The communication action if found, NULL otherwise
158  */
159 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
160                                  int (*match_fun)(void *, void *,smx_action_t),
161                                  void *this_user_data, smx_action_t my_action)
162 {
163   smx_action_t action;
164   xbt_fifo_item_t item;
165   void* other_user_data = NULL;
166
167   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
168     if (action->comm.type == SIMIX_COMM_SEND) {
169       other_user_data = action->comm.src_data;
170     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
171       other_user_data = action->comm.dst_data;
172     }
173     if (action->comm.type == type &&
174         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
175         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
176       XBT_DEBUG("Found a matching communication action %p", action);
177       xbt_fifo_remove_item(fifo, item);
178       xbt_fifo_free_item(item);
179       action->comm.refcount++;
180       action->comm.rdv = NULL;
181       return action;
182     }
183     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
184               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
185               action, (int)action->comm.type, (int)type);
186   }
187   XBT_DEBUG("No matching communication action found");
188   return NULL;
189 }
190
191
192 /**
193  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
194  *  \param type The type of communication we are looking for (comm_send, comm_recv)
195  *  \return The communication action if found, NULL otherwise
196  */
197 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
198                                  int (*match_fun)(void *, void *,smx_action_t),
199                                  void *this_user_data, smx_action_t my_action)
200 {
201   smx_action_t action;
202   xbt_fifo_item_t item;
203   void* other_user_data = NULL;
204
205   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
206     if (action->comm.type == SIMIX_COMM_SEND) {
207       other_user_data = action->comm.src_data;
208     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
209       other_user_data = action->comm.dst_data;
210     }
211     if (action->comm.type == type &&
212         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
213         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
214       XBT_DEBUG("Found a matching communication action %p", action);
215       action->comm.refcount++;
216
217       return action;
218     }
219     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
220               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
221               action, (int)action->comm.type, (int)type);
222   }
223   XBT_DEBUG("No matching communication action found");
224   return NULL;
225 }
226 /******************************************************************************/
227 /*                            Communication Actions                            */
228 /******************************************************************************/
229
230 /**
231  *  \brief Creates a new communicate action
232  *  \param type The direction of communication (comm_send, comm_recv)
233  *  \return The new communicate action
234  */
235 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
236 {
237   smx_action_t act;
238
239   /* alloc structures */
240   act = xbt_mallocator_get(simix_global->action_mallocator);
241
242   act->type = SIMIX_ACTION_COMMUNICATE;
243   act->state = SIMIX_WAITING;
244
245   /* set communication */
246   act->comm.type = type;
247   act->comm.refcount = 1;
248
249 #ifdef HAVE_LATENCY_BOUND_TRACKING
250   //initialize with unknown value
251   act->latency_limited = -1;
252 #endif
253
254 #ifdef HAVE_TRACING
255   act->category = NULL;
256 #endif
257
258   XBT_DEBUG("Create communicate action %p", act);
259   ++smx_total_comms;
260
261   return act;
262 }
263
264 /**
265  *  \brief Destroy a communicate action
266  *  \param action The communicate action to be destroyed
267  */
268 void SIMIX_comm_destroy(smx_action_t action)
269 {
270   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
271             action, action->comm.refcount, (int)action->state);
272
273   if (action->comm.refcount <= 0) {
274     xbt_backtrace_display_current();
275     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
276             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
277   }
278   action->comm.refcount--;
279   if (action->comm.refcount > 0)
280     return;
281   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
282             action->comm.refcount);
283
284 #ifdef HAVE_LATENCY_BOUND_TRACKING
285   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
286 #endif
287
288   xbt_free(action->name);
289   SIMIX_comm_destroy_internal_actions(action);
290
291   if (action->comm.detached && action->state != SIMIX_DONE) {
292     /* the communication has failed and was detached:
293      * we have to free the buffer */
294     if (action->comm.clean_fun) {
295       action->comm.clean_fun(action->comm.src_buff);
296     }
297     action->comm.src_buff = NULL;
298   }
299
300   xbt_mallocator_release(simix_global->action_mallocator, action);
301 }
302
303 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
304 {
305   if (action->comm.surf_comm){
306 #ifdef HAVE_LATENCY_BOUND_TRACKING
307     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
308 #endif
309     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
310     action->comm.surf_comm = NULL;
311   }
312
313   if (action->comm.src_timeout){
314     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
315     action->comm.src_timeout = NULL;
316   }
317
318   if (action->comm.dst_timeout){
319     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
320     action->comm.dst_timeout = NULL;
321   }
322 }
323
324 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
325                               double task_size, double rate,
326                               void *src_buff, size_t src_buff_size,
327                               int (*match_fun)(void *, void *,smx_action_t),
328                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
329                               void *data,
330                               int detached)
331 {
332   XBT_DEBUG("send from %p\n", rdv);
333
334   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
335   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
336
337   /* Look for communication action matching our needs. We also provide a description of
338    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
339    *
340    * If it is not found then push our communication into the rendez-vous point */
341   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
342
343   if (!other_action) {
344     other_action = this_action;
345
346     if (rdv->permanent_receiver!=NULL){
347       //this mailbox is for small messages, which have to be sent right now
348       other_action->state = SIMIX_READY;
349       other_action->comm.dst_proc=rdv->permanent_receiver;
350       other_action->comm.refcount++;
351       other_action->comm.rdv = rdv;
352       xbt_fifo_push(rdv->done_comm_fifo,other_action);
353       other_action->comm.rdv=rdv;
354       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
355
356     }else{
357       SIMIX_rdv_push(rdv, this_action);
358     }
359   } else {
360     XBT_DEBUG("Receive already pushed\n");
361
362     SIMIX_comm_destroy(this_action);
363     --smx_total_comms; // this creation was a pure waste
364
365     other_action->state = SIMIX_READY;
366     other_action->comm.type = SIMIX_COMM_READY;
367
368   }
369   xbt_fifo_push(src_proc->comms, other_action);
370
371   /* if the communication action is detached then decrease the refcount
372    * by one, so it will be eliminated by the receiver's destroy call */
373   if (detached) {
374     other_action->comm.detached = 1;
375     other_action->comm.refcount--;
376     other_action->comm.clean_fun = clean_fun;
377   } else {
378     other_action->comm.clean_fun = NULL;
379   }
380
381   /* Setup the communication action */
382   other_action->comm.src_proc = src_proc;
383   other_action->comm.task_size = task_size;
384   other_action->comm.rate = rate;
385   other_action->comm.src_buff = src_buff;
386   other_action->comm.src_buff_size = src_buff_size;
387   other_action->comm.src_data = data;
388
389   other_action->comm.match_fun = match_fun;
390
391   if (MC_is_active()) {
392     other_action->state = SIMIX_RUNNING;
393     return other_action;
394   }
395
396   SIMIX_comm_start(other_action);
397   return (detached ? NULL : other_action);
398 }
399
400 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
401                               void *dst_buff, size_t *dst_buff_size,
402                               int (*match_fun)(void *, void *, smx_action_t), void *data)
403 {
404   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
405   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
406
407   smx_action_t other_action;
408   //communication already done, get it inside the fifo of completed comms
409   //permanent receive v1
410   //int already_received=0;
411   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
412
413     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
414     //find a match in the already received fifo
415     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
416     //if not found, assume the receiver came first, register it to the mailbox in the classical way
417     if (!other_action)  {
418       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
419       other_action = this_action;
420       SIMIX_rdv_push(rdv, this_action);
421     }else{
422       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
423       {
424         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
425         other_action->state = SIMIX_DONE;
426         other_action->comm.type = SIMIX_COMM_DONE;
427         other_action->comm.rdv = NULL;
428         //SIMIX_comm_destroy(this_action);
429         //--smx_total_comms; // this creation was a pure waste
430         //already_received=1;
431         //other_action->comm.refcount--;
432       }/*else{
433          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
434          }*/
435      // other_action->comm.refcount--;
436       SIMIX_comm_destroy(this_action);
437       --smx_total_comms; // this creation was a pure waste
438     }
439   }else{
440     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
441
442     /* Look for communication action matching our needs. We also provide a description of
443      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
444      *
445      * If it is not found then push our communication into the rendez-vous point */
446     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
447
448     if (!other_action) {
449       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
450       other_action = this_action;
451       SIMIX_rdv_push(rdv, this_action);
452     } else {
453       SIMIX_comm_destroy(this_action);
454       --smx_total_comms; // this creation was a pure waste
455       other_action->state = SIMIX_READY;
456       other_action->comm.type = SIMIX_COMM_READY;
457    //   other_action->comm.refcount--;
458     }
459     xbt_fifo_push(dst_proc->comms, other_action);
460   }
461
462   /* Setup communication action */
463   other_action->comm.dst_proc = dst_proc;
464   other_action->comm.dst_buff = dst_buff;
465   other_action->comm.dst_buff_size = dst_buff_size;
466   other_action->comm.dst_data = data;
467
468   other_action->comm.match_fun = match_fun;
469
470
471   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
472     SIMIX_comm_copy_data(other_action);*/
473
474
475   if (MC_is_active()) {
476     other_action->state = SIMIX_RUNNING;
477     return other_action;
478   }
479
480   SIMIX_comm_start(other_action);
481   // }
482   return other_action;
483 }
484
485
486 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
487                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
488 {
489   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
490   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
491
492   smx_action_t other_action=NULL;
493   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
494     //find a match in the already received fifo
495       XBT_DEBUG("first try in the perm recv mailbox \n");
496
497     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
498   }
499  // }else{
500     if(!other_action){
501         XBT_DEBUG("second try in the other mailbox");
502         other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
503     }
504 //  }
505   if(other_action)other_action->comm.refcount--;
506
507   SIMIX_comm_destroy(this_action);
508   --smx_total_comms;
509   return other_action;
510 }
511
512 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
513 {
514
515   /* the simcall may be a wait, a send or a recv */
516   surf_action_t sleep;
517
518   /* Associate this simcall to the wait action */
519   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
520
521   xbt_fifo_push(action->simcalls, simcall);
522   simcall->issuer->waiting_action = action;
523
524   if (MC_is_active()) {
525     if (idx == 0) {
526       action->state = SIMIX_DONE;
527     } else {
528       /* If we reached this point, the wait simcall must have a timeout */
529       /* Otherwise it shouldn't be enabled and executed by the MC */
530       if (timeout == -1)
531         THROW_IMPOSSIBLE;
532
533       if (action->comm.src_proc == simcall->issuer)
534         action->state = SIMIX_SRC_TIMEOUT;
535       else
536         action->state = SIMIX_DST_TIMEOUT;
537     }
538
539     SIMIX_comm_finish(action);
540     return;
541   }
542
543   /* If the action has already finish perform the error handling, */
544   /* otherwise set up a waiting timeout on the right side         */
545   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
546     SIMIX_comm_finish(action);
547   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
548     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
549     surf_workstation_model->action_data_set(sleep, action);
550
551     if (simcall->issuer == action->comm.src_proc)
552       action->comm.src_timeout = sleep;
553     else
554       action->comm.dst_timeout = sleep;
555   }
556 }
557
558 void SIMIX_pre_comm_test(smx_simcall_t simcall)
559 {
560   smx_action_t action = simcall->comm_test.comm;
561
562   if(MC_is_active()){
563     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
564     if(simcall->comm_test.result){
565       action->state = SIMIX_DONE;
566       xbt_fifo_push(action->simcalls, simcall);
567       SIMIX_comm_finish(action);
568     }else{
569       SIMIX_simcall_answer(simcall);
570     }
571     return;
572   }
573
574   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
575   if (simcall->comm_test.result) {
576     xbt_fifo_push(action->simcalls, simcall);
577     SIMIX_comm_finish(action);
578   } else {
579     SIMIX_simcall_answer(simcall);
580   }
581 }
582
583 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
584 {
585   unsigned int cursor;
586   smx_action_t action;
587   xbt_dynar_t actions = simcall->comm_testany.comms;
588   simcall->comm_testany.result = -1;
589
590   if (MC_is_active()){
591     if(idx == -1){
592       SIMIX_simcall_answer(simcall);
593     }else{
594       action = xbt_dynar_get_as(actions, idx, smx_action_t);
595       simcall->comm_testany.result = idx;
596       xbt_fifo_push(action->simcalls, simcall);
597       action->state = SIMIX_DONE;
598       SIMIX_comm_finish(action);
599     }
600     return;
601   }
602
603   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
604     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
605       simcall->comm_testany.result = cursor;
606       xbt_fifo_push(action->simcalls, simcall);
607       SIMIX_comm_finish(action);
608       return;
609     }
610   }
611   SIMIX_simcall_answer(simcall);
612 }
613
614 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
615 {
616   smx_action_t action;
617   unsigned int cursor = 0;
618   xbt_dynar_t actions = simcall->comm_waitany.comms;
619
620   if (MC_is_active()){
621     action = xbt_dynar_get_as(actions, idx, smx_action_t);
622     xbt_fifo_push(action->simcalls, simcall);
623     simcall->comm_waitany.result = idx;
624     action->state = SIMIX_DONE;
625     SIMIX_comm_finish(action);
626     return;
627   }
628
629   xbt_dynar_foreach(actions, cursor, action){
630     /* associate this simcall to the the action */
631     xbt_fifo_push(action->simcalls, simcall);
632
633     /* see if the action is already finished */
634     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
635       SIMIX_comm_finish(action);
636       break;
637     }
638   }
639 }
640
641 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
642 {
643   smx_action_t action;
644   unsigned int cursor = 0;
645   xbt_dynar_t actions = simcall->comm_waitany.comms;
646
647   xbt_dynar_foreach(actions, cursor, action) {
648     xbt_fifo_remove(action->simcalls, simcall);
649   }
650 }
651
652 /**
653  *  \brief Starts the simulation of a communication action.
654  *  \param action the communication action
655  */
656 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
657 {
658   /* If both the sender and the receiver are already there, start the communication */
659   if (action->state == SIMIX_READY) {
660
661     smx_host_t sender = action->comm.src_proc->smx_host;
662     smx_host_t receiver = action->comm.dst_proc->smx_host;
663
664     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
665               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
666
667     action->comm.surf_comm = surf_workstation_model->extension.workstation.
668       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
669
670     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
671
672     action->state = SIMIX_RUNNING;
673
674     /* If a link is failed, detect it immediately */
675     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
676       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
677                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
678       action->state = SIMIX_LINK_FAILURE;
679       SIMIX_comm_destroy_internal_actions(action);
680     }
681
682     /* If any of the process is suspend, create the action but stop its execution,
683        it will be restarted when the sender process resume */
684     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
685         SIMIX_process_is_suspended(action->comm.dst_proc)) {
686       /* FIXME: check what should happen with the action state */
687
688       if (SIMIX_process_is_suspended(action->comm.src_proc))
689         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
690                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
691       else
692         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
693                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
694
695       surf_workstation_model->suspend(action->comm.surf_comm);
696
697     }
698   }
699 }
700
701 /**
702  * \brief Answers the SIMIX simcalls associated to a communication action.
703  * \param action a finished communication action
704  */
705 void SIMIX_comm_finish(smx_action_t action)
706 {
707   unsigned int destroy_count = 0;
708   smx_simcall_t simcall;
709
710   while ((simcall = xbt_fifo_shift(action->simcalls))) {
711
712     /* If a waitany simcall is waiting for this action to finish, then remove
713        it from the other actions in the waitany list. Afterwards, get the
714        position of the actual action in the waitany dynar and
715        return it as the result of the simcall */
716     if (simcall->call == SIMCALL_COMM_WAITANY) {
717       SIMIX_waitany_remove_simcall_from_actions(simcall);
718       if (!MC_is_active())
719         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
720     }
721
722     /* If the action is still in a rendez-vous point then remove from it */
723     if (action->comm.rdv)
724       SIMIX_rdv_remove(action->comm.rdv, action);
725
726     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
727
728     /* Check out for errors */
729     switch (action->state) {
730
731     case SIMIX_DONE:
732       XBT_DEBUG("Communication %p complete!", action);
733       SIMIX_comm_copy_data(action);
734       break;
735
736     case SIMIX_SRC_TIMEOUT:
737       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
738                     "Communication timeouted because of sender");
739       break;
740
741     case SIMIX_DST_TIMEOUT:
742       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
743                     "Communication timeouted because of receiver");
744       break;
745
746     case SIMIX_SRC_HOST_FAILURE:
747       if (simcall->issuer == action->comm.src_proc)
748         simcall->issuer->context->iwannadie = 1;
749 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
750       else
751         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
752       break;
753
754     case SIMIX_DST_HOST_FAILURE:
755       if (simcall->issuer == action->comm.dst_proc)
756         simcall->issuer->context->iwannadie = 1;
757 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
758       else
759         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
760       break;
761
762     case SIMIX_LINK_FAILURE:
763       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
764                 action,
765                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
766                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
767                 simcall->issuer->name, simcall->issuer, action->comm.detached);
768       if (action->comm.src_proc == simcall->issuer) {
769         XBT_DEBUG("I'm source");
770       } else if (action->comm.dst_proc == simcall->issuer) {
771         XBT_DEBUG("I'm dest");
772       } else {
773         XBT_DEBUG("I'm neither source nor dest");
774       }
775       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
776       break;
777
778     case SIMIX_CANCELED:
779       if (simcall->issuer == action->comm.dst_proc)
780         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
781                       "Communication canceled by the sender");
782       else
783         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
784                       "Communication canceled by the receiver");
785       break;
786
787     default:
788       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
789     }
790
791     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
792     if (simcall->issuer->doexception) {
793       if (simcall->call == SIMCALL_COMM_WAITANY) {
794         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
795       }
796       else if (simcall->call == SIMCALL_COMM_TESTANY) {
797         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
798       }
799     }
800
801     if (surf_workstation_model->extension.
802         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
803       simcall->issuer->context->iwannadie = 1;
804     }
805
806     simcall->issuer->waiting_action = NULL;
807     xbt_fifo_remove(simcall->issuer->comms, action);
808     SIMIX_simcall_answer(simcall);
809     destroy_count++;
810   }
811
812   while (destroy_count-- > 0)
813     SIMIX_comm_destroy(action);
814 }
815
816 /**
817  * \brief This function is called when a Surf communication action is finished.
818  * \param action the corresponding Simix communication
819  */
820 void SIMIX_post_comm(smx_action_t action)
821 {
822   /* Update action state */
823   if (action->comm.src_timeout &&
824       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
825     action->state = SIMIX_SRC_TIMEOUT;
826   else if (action->comm.dst_timeout &&
827            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
828     action->state = SIMIX_DST_TIMEOUT;
829   else if (action->comm.src_timeout &&
830            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
831     action->state = SIMIX_SRC_HOST_FAILURE;
832   else if (action->comm.dst_timeout &&
833            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
834     action->state = SIMIX_DST_HOST_FAILURE;
835   else if (action->comm.surf_comm &&
836            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
837     XBT_DEBUG("Puta madre. Surf says that the link broke");
838     action->state = SIMIX_LINK_FAILURE;
839   } else
840     action->state = SIMIX_DONE;
841
842   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
843             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
844
845   /* destroy the surf actions associated with the Simix communication */
846   SIMIX_comm_destroy_internal_actions(action);
847
848   /* remove the communication action from the list of pending communications
849    * of both processes (if they still exist) */
850   if (action->comm.src_proc) {
851     xbt_fifo_remove(action->comm.src_proc->comms, action);
852   }
853   if (action->comm.dst_proc) {
854     xbt_fifo_remove(action->comm.dst_proc->comms, action);
855   }
856
857   /* if there are simcalls associated with the action, then answer them */
858   if (xbt_fifo_size(action->simcalls)) {
859     SIMIX_comm_finish(action);
860   }
861 }
862
863 void SIMIX_comm_cancel(smx_action_t action)
864 {
865   /* if the action is a waiting state means that it is still in a rdv */
866   /* so remove from it and delete it */
867   if (action->state == SIMIX_WAITING) {
868     SIMIX_rdv_remove(action->comm.rdv, action);
869     action->state = SIMIX_CANCELED;
870   }
871   else if (!MC_is_active() /* when running the MC there are no surf actions */
872            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
873
874     surf_workstation_model->action_cancel(action->comm.surf_comm);
875   }
876 }
877
878 void SIMIX_comm_suspend(smx_action_t action)
879 {
880   /*FIXME: shall we suspend also the timeout actions? */
881   if (action->comm.surf_comm)
882     surf_workstation_model->suspend(action->comm.surf_comm);
883   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
884 }
885
886 void SIMIX_comm_resume(smx_action_t action)
887 {
888   /*FIXME: check what happen with the timeouts */
889   if (action->comm.surf_comm)
890     surf_workstation_model->resume(action->comm.surf_comm);
891   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
892 }
893
894
895 /************* Action Getters **************/
896
897 /**
898  *  \brief get the amount remaining from the communication
899  *  \param action The communication
900  */
901 double SIMIX_comm_get_remains(smx_action_t action)
902 {
903   double remains;
904
905   if(!action){
906     return 0;
907   }
908
909   switch (action->state) {
910
911   case SIMIX_RUNNING:
912     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
913     break;
914
915   case SIMIX_WAITING:
916   case SIMIX_READY:
917     remains = 0; /*FIXME: check what should be returned */
918     break;
919
920   default:
921     remains = 0; /*FIXME: is this correct? */
922     break;
923   }
924   return remains;
925 }
926
927 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
928 {
929   return action->state;
930 }
931
932 /**
933  *  \brief Return the user data associated to the sender of the communication
934  *  \param action The communication
935  *  \return the user data
936  */
937 void* SIMIX_comm_get_src_data(smx_action_t action)
938 {
939   return action->comm.src_data;
940 }
941
942 /**
943  *  \brief Return the user data associated to the receiver of the communication
944  *  \param action The communication
945  *  \return the user data
946  */
947 void* SIMIX_comm_get_dst_data(smx_action_t action)
948 {
949   return action->comm.dst_data;
950 }
951
952 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
953 {
954   return action->comm.src_proc;
955 }
956
957 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
958 {
959   return action->comm.dst_proc;
960 }
961
962 #ifdef HAVE_LATENCY_BOUND_TRACKING
963 /**
964  *  \brief verify if communication is latency bounded
965  *  \param comm The communication
966  */
967 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
968 {
969   if(!action){
970     return 0;
971   }
972   if (action->comm.surf_comm){
973     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
974     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
975     XBT_DEBUG("Action limited is %d", action->latency_limited);
976   }
977   return action->latency_limited;
978 }
979 #endif
980
981 /******************************************************************************/
982 /*                    SIMIX_comm_copy_data callbacks                       */
983 /******************************************************************************/
984 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
985   &SIMIX_comm_copy_pointer_callback;
986
987 void
988 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
989 {
990   SIMIX_comm_copy_data_callback = callback;
991 }
992
993 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
994 {
995   xbt_assert((buff_size == sizeof(void *)),
996              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
997   *(void **) (comm->comm.dst_buff) = buff;
998 }
999
1000 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
1001 {
1002   XBT_DEBUG("Copy the data over");
1003   memcpy(comm->comm.dst_buff, buff, buff_size);
1004   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1005     xbt_free(buff);
1006     comm->comm.src_buff = NULL;
1007   }
1008 }
1009
1010
1011 /**
1012  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1013  *  \param comm The communication
1014  */
1015 void SIMIX_comm_copy_data(smx_action_t comm)
1016 {
1017   size_t buff_size = comm->comm.src_buff_size;
1018   /* If there is no data to be copy then return */
1019   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1020     return;
1021
1022   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1023             comm,
1024             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1025             comm->comm.src_buff,
1026             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1027             comm->comm.dst_buff, buff_size);
1028
1029   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1030   if (comm->comm.dst_buff_size)
1031     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1032
1033   /* Update the receiver's buffer size to the copied amount */
1034   if (comm->comm.dst_buff_size)
1035     *comm->comm.dst_buff_size = buff_size;
1036
1037   if (buff_size > 0)
1038     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1039
1040   /* Set the copied flag so we copy data only once */
1041   /* (this function might be called from both communication ends) */
1042   comm->comm.copied = 1;
1043 }