Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Move instruction at proper place.
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
26
27 void SIMIX_network_init(void)
28 {
29   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
30 }
31
32 void SIMIX_network_exit(void)
33 {
34   xbt_dict_free(&rdv_points);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_rdv_t SIMIX_rdv_create(const char *name)
42 {
43   /* two processes may have pushed the same rdv_create simcall at the same time */
44   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
45
46   if (!rdv) {
47     rdv = xbt_new0(s_smx_rvpoint_t, 1);
48     rdv->name = name ? xbt_strdup(name) : NULL;
49     rdv->comm_fifo = xbt_fifo_new();
50     rdv->done_comm_fifo = xbt_fifo_new();
51     rdv->permanent_receiver=NULL;
52
53     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
54
55     if (rdv->name)
56       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
57   }
58   return rdv;
59 }
60
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
62 {
63   if (rdv->name)
64     xbt_dict_remove(rdv_points, rdv->name);
65 }
66
67 void SIMIX_rdv_free(void *data)
68 {
69   XBT_DEBUG("rdv free %p", data);
70   smx_rdv_t rdv = (smx_rdv_t) data;
71   xbt_free(rdv->name);
72   xbt_fifo_free(rdv->comm_fifo);
73   xbt_fifo_free(rdv->done_comm_fifo);
74
75   xbt_free(rdv);  
76 }
77
78 xbt_dict_t SIMIX_get_rdv_points()
79 {
80   return rdv_points;
81 }
82
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
84 {
85   return xbt_dict_get_or_null(rdv_points, name);
86 }
87
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
89 {
90   smx_action_t comm = NULL;
91   xbt_fifo_item_t item = NULL;
92   int count = 0;
93
94   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95     if (comm->comm.src_proc->smx_host == host)
96       count++;
97   }
98
99   return count;
100 }
101
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
103 {
104   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
105 }
106
107 /**
108  *  \brief get the receiver (process associated to the mailbox)
109  *  \param rdv The rendez-vous point
110  *  \return process The receiving process (NULL if not set)
111  */
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
113 {
114   return rdv->permanent_receiver;
115 }
116
117 /**
118  *  \brief set the receiver of the rendez vous point to allow eager sends
119  *  \param rdv The rendez-vous point
120  *  \param process The receiving process
121  */
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
123 {
124   rdv->permanent_receiver=process;
125 }
126
127 /**
128  *  \brief Pushes a communication action into a rendez-vous point
129  *  \param rdv The rendez-vous point
130  *  \param comm The communication action
131  */
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
133 {
134   xbt_fifo_push(rdv->comm_fifo, comm);
135   comm->comm.rdv = rdv;
136 }
137
138 /**
139  *  \brief Removes a communication action from a rendez-vous point
140  *  \param rdv The rendez-vous point
141  *  \param comm The communication action
142  */
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
144 {
145   xbt_fifo_remove(rdv->comm_fifo, comm);
146   comm->comm.rdv = NULL;
147 }
148
149 /**
150  *  \brief Checks if there is a communication action queued in a fifo matching our needs
151  *  \param type The type of communication we are looking for (comm_send, comm_recv)
152  *  \return The communication action if found, NULL otherwise
153  */
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155                                  int (*match_fun)(void *, void *,smx_action_t),
156                                  void *this_user_data, smx_action_t my_action)
157 {
158   smx_action_t action;
159   xbt_fifo_item_t item;
160   void* other_user_data = NULL;
161
162   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163     if (action->comm.type == SIMIX_COMM_SEND) {
164       other_user_data = action->comm.src_data;
165     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166       other_user_data = action->comm.dst_data;
167     }
168     if (action->comm.type == type &&
169         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
170         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
171       XBT_DEBUG("Found a matching communication action %p", action);
172       xbt_fifo_remove_item(fifo, item);
173       xbt_fifo_free_item(item);
174       action->comm.refcount++;
175       action->comm.rdv = NULL;
176       return action;
177     }
178     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180               action, (int)action->comm.type, (int)type);
181   }
182   XBT_DEBUG("No matching communication action found");
183   return NULL;
184 }
185
186
187 /******************************************************************************/
188 /*                            Communication Actions                            */
189 /******************************************************************************/
190
191 /**
192  *  \brief Creates a new communicate action
193  *  \param type The direction of communication (comm_send, comm_recv)
194  *  \return The new communicate action
195  */
196 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
197 {
198   smx_action_t act;
199
200   /* alloc structures */
201   act = xbt_mallocator_get(simix_global->action_mallocator);
202
203   act->type = SIMIX_ACTION_COMMUNICATE;
204   act->state = SIMIX_WAITING;
205
206   /* set communication */
207   act->comm.type = type;
208   act->comm.refcount = 1;
209
210 #ifdef HAVE_LATENCY_BOUND_TRACKING
211   //initialize with unknown value
212   act->latency_limited = -1;
213 #endif
214
215 #ifdef HAVE_TRACING
216   act->category = NULL;
217 #endif
218
219   XBT_DEBUG("Create communicate action %p", act);
220   ++smx_total_comms;
221
222   return act;
223 }
224
225 /**
226  *  \brief Destroy a communicate action
227  *  \param action The communicate action to be destroyed
228  */
229 void SIMIX_comm_destroy(smx_action_t action)
230 {
231   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
232             action, action->comm.refcount, (int)action->state);
233
234   if (action->comm.refcount <= 0) {
235     xbt_backtrace_display_current();
236     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
237             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
238   }
239   action->comm.refcount--;
240   if (action->comm.refcount > 0)
241     return;
242   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
243             action->comm.refcount);
244
245 #ifdef HAVE_LATENCY_BOUND_TRACKING
246   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
247 #endif
248
249   xbt_free(action->name);
250   SIMIX_comm_destroy_internal_actions(action);
251
252   if (action->comm.detached && action->state != SIMIX_DONE) {
253     /* the communication has failed and was detached:
254      * we have to free the buffer */
255     if (action->comm.clean_fun) {
256       action->comm.clean_fun(action->comm.src_buff);
257     }
258     action->comm.src_buff = NULL;
259   }
260
261   xbt_mallocator_release(simix_global->action_mallocator, action);
262 }
263
264 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
265 {
266   if (action->comm.surf_comm){
267 #ifdef HAVE_LATENCY_BOUND_TRACKING
268     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
269 #endif
270     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
271     action->comm.surf_comm = NULL;
272   }
273
274   if (action->comm.src_timeout){
275     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
276     action->comm.src_timeout = NULL;
277   }
278
279   if (action->comm.dst_timeout){
280     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
281     action->comm.dst_timeout = NULL;
282   }
283 }
284
285 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
286                               double task_size, double rate,
287                               void *src_buff, size_t src_buff_size,
288                               int (*match_fun)(void *, void *,smx_action_t),
289                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
290                               void *data,
291                               int detached)
292 {
293   XBT_DEBUG("send from %p\n", rdv);
294
295   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
296   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
297
298   /* Look for communication action matching our needs. We also provide a description of
299    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
300    *
301    * If it is not found then push our communication into the rendez-vous point */
302   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
303
304   if (!other_action) {
305     other_action = this_action;
306
307     if (rdv->permanent_receiver!=NULL){
308       //this mailbox is for small messages, which have to be sent right now
309       other_action->state = SIMIX_READY;
310       other_action->comm.dst_proc=rdv->permanent_receiver;
311       other_action->comm.refcount++;
312       other_action->comm.rdv = rdv;
313       xbt_fifo_push(rdv->done_comm_fifo,other_action);
314       other_action->comm.rdv=rdv;
315       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
316
317     }else{
318       SIMIX_rdv_push(rdv, this_action);
319     }
320   } else {
321     XBT_DEBUG("Receive already pushed\n");
322
323     SIMIX_comm_destroy(this_action);
324     --smx_total_comms; // this creation was a pure waste
325
326     other_action->state = SIMIX_READY;
327     other_action->comm.type = SIMIX_COMM_READY;
328
329   }
330   xbt_fifo_push(src_proc->comms, other_action);
331
332   /* if the communication action is detached then decrease the refcount
333    * by one, so it will be eliminated by the receiver's destroy call */
334   if (detached) {
335     other_action->comm.detached = 1;
336     other_action->comm.refcount--;
337     other_action->comm.clean_fun = clean_fun;
338   } else {
339     other_action->comm.clean_fun = NULL;
340   }
341
342   /* Setup the communication action */
343   other_action->comm.src_proc = src_proc;
344   other_action->comm.task_size = task_size;
345   other_action->comm.rate = rate;
346   other_action->comm.src_buff = src_buff;
347   other_action->comm.src_buff_size = src_buff_size;
348   other_action->comm.src_data = data;
349
350   other_action->comm.match_fun = match_fun;
351
352   if (MC_IS_ENABLED) {
353     other_action->state = SIMIX_RUNNING;
354     return other_action;
355   }
356
357   SIMIX_comm_start(other_action);
358   return (detached ? NULL : other_action);
359 }
360
361 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
362                               void *dst_buff, size_t *dst_buff_size,
363                               int (*match_fun)(void *, void *, smx_action_t), void *data)
364 {
365   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
366   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
367
368   smx_action_t other_action;
369   //communication already done, get it inside the fifo of completed comms
370   //permanent receive v1
371   //int already_received=0;
372   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
373
374     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
375     //find a match in the already received fifo
376     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
377     //if not found, assume the receiver came first, register it to the mailbox in the classical way
378     if (!other_action)  {
379       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
380       other_action = this_action;
381       SIMIX_rdv_push(rdv, this_action);
382     }else{
383       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
384       {
385         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
386         other_action->state = SIMIX_DONE;
387         other_action->comm.type = SIMIX_COMM_DONE;
388         other_action->comm.rdv = NULL;
389         SIMIX_comm_destroy(this_action);
390         --smx_total_comms; // this creation was a pure waste
391         //already_received=1;
392         other_action->comm.refcount--;
393       }/*else{
394          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
395          }*/
396       other_action->comm.refcount--;
397       SIMIX_comm_destroy(this_action);
398       --smx_total_comms; // this creation was a pure waste
399     }
400   }else{
401     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
402
403     /* Look for communication action matching our needs. We also provide a description of
404      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
405      *
406      * If it is not found then push our communication into the rendez-vous point */
407     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
408
409     if (!other_action) {
410       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
411       other_action = this_action;
412       SIMIX_rdv_push(rdv, this_action);
413     } else {
414       SIMIX_comm_destroy(this_action);
415       --smx_total_comms; // this creation was a pure waste
416       other_action->state = SIMIX_READY;
417       other_action->comm.type = SIMIX_COMM_READY;
418     }
419     xbt_fifo_push(dst_proc->comms, other_action);
420   }
421
422   /* Setup communication action */
423   other_action->comm.dst_proc = dst_proc;
424   other_action->comm.dst_buff = dst_buff;
425   other_action->comm.dst_buff_size = dst_buff_size;
426   other_action->comm.dst_data = data;
427
428   other_action->comm.match_fun = match_fun;
429
430
431   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
432     SIMIX_comm_copy_data(other_action);*/
433
434
435   if (MC_IS_ENABLED) {
436     other_action->state = SIMIX_RUNNING;
437     return other_action;
438   }
439
440   SIMIX_comm_start(other_action);
441   // }
442   return other_action;
443 }
444
445 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
446 {
447
448   /* the simcall may be a wait, a send or a recv */
449   surf_action_t sleep;
450
451   /* Associate this simcall to the wait action */
452   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
453
454   xbt_fifo_push(action->simcalls, simcall);
455   simcall->issuer->waiting_action = action;
456
457   if (MC_IS_ENABLED) {
458     if (idx == 0) {
459       action->state = SIMIX_DONE;
460     } else {
461       /* If we reached this point, the wait simcall must have a timeout */
462       /* Otherwise it shouldn't be enabled and executed by the MC */
463       if (timeout == -1)
464         THROW_IMPOSSIBLE;
465
466       if (action->comm.src_proc == simcall->issuer)
467         action->state = SIMIX_SRC_TIMEOUT;
468       else
469         action->state = SIMIX_DST_TIMEOUT;
470     }
471
472     SIMIX_comm_finish(action);
473     return;
474   }
475
476   /* If the action has already finish perform the error handling, */
477   /* otherwise set up a waiting timeout on the right side         */
478   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
479     SIMIX_comm_finish(action);
480   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
481     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
482     surf_workstation_model->action_data_set(sleep, action);
483
484     if (simcall->issuer == action->comm.src_proc)
485       action->comm.src_timeout = sleep;
486     else
487       action->comm.dst_timeout = sleep;
488   }
489 }
490
491 void SIMIX_pre_comm_test(smx_simcall_t simcall)
492 {
493   smx_action_t action = simcall->comm_test.comm;
494
495   if(MC_IS_ENABLED){
496     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
497     if(simcall->comm_test.result){
498       action->state = SIMIX_DONE;
499       xbt_fifo_push(action->simcalls, simcall);
500       SIMIX_comm_finish(action);
501     }else{
502       SIMIX_simcall_answer(simcall);
503     }
504     return;
505   }
506
507   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
508   if (simcall->comm_test.result) {
509     xbt_fifo_push(action->simcalls, simcall);
510     SIMIX_comm_finish(action);
511   } else {
512     SIMIX_simcall_answer(simcall);
513   }
514 }
515
516 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
517 {
518   unsigned int cursor;
519   smx_action_t action;
520   xbt_dynar_t actions = simcall->comm_testany.comms;
521   simcall->comm_testany.result = -1;
522
523   if (MC_IS_ENABLED){
524     if(idx == -1){
525       SIMIX_simcall_answer(simcall);
526     }else{
527       action = xbt_dynar_get_as(actions, idx, smx_action_t);
528       simcall->comm_testany.result = idx;
529       xbt_fifo_push(action->simcalls, simcall);
530       action->state = SIMIX_DONE;
531       SIMIX_comm_finish(action);
532     }
533     return;
534   }
535
536   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
537     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
538       simcall->comm_testany.result = cursor;
539       xbt_fifo_push(action->simcalls, simcall);
540       SIMIX_comm_finish(action);
541       return;
542     }
543   }
544   SIMIX_simcall_answer(simcall);
545 }
546
547 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
548 {
549   smx_action_t action;
550   unsigned int cursor = 0;
551   xbt_dynar_t actions = simcall->comm_waitany.comms;
552
553   if (MC_IS_ENABLED){
554     action = xbt_dynar_get_as(actions, idx, smx_action_t);
555     xbt_fifo_push(action->simcalls, simcall);
556     simcall->comm_waitany.result = idx;
557     action->state = SIMIX_DONE;
558     SIMIX_comm_finish(action);
559     return;
560   }
561
562   xbt_dynar_foreach(actions, cursor, action){
563     /* associate this simcall to the the action */
564     xbt_fifo_push(action->simcalls, simcall);
565
566     /* see if the action is already finished */
567     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
568       SIMIX_comm_finish(action);
569       break;
570     }
571   }
572 }
573
574 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
575 {
576   smx_action_t action;
577   unsigned int cursor = 0;
578   xbt_dynar_t actions = simcall->comm_waitany.comms;
579
580   xbt_dynar_foreach(actions, cursor, action) {
581     xbt_fifo_remove(action->simcalls, simcall);
582   }
583 }
584
585 /**
586  *  \brief Starts the simulation of a communication action.
587  *  \param action the communication action
588  */
589 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
590 {
591   /* If both the sender and the receiver are already there, start the communication */
592   if (action->state == SIMIX_READY) {
593
594     smx_host_t sender = action->comm.src_proc->smx_host;
595     smx_host_t receiver = action->comm.dst_proc->smx_host;
596
597     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
598               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
599
600     action->comm.surf_comm = surf_workstation_model->extension.workstation.
601       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
602
603     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
604
605     action->state = SIMIX_RUNNING;
606
607     /* If a link is failed, detect it immediately */
608     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
609       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
610                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
611       action->state = SIMIX_LINK_FAILURE;
612       SIMIX_comm_destroy_internal_actions(action);
613     }
614
615     /* If any of the process is suspend, create the action but stop its execution,
616        it will be restarted when the sender process resume */
617     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
618         SIMIX_process_is_suspended(action->comm.dst_proc)) {
619       /* FIXME: check what should happen with the action state */
620
621       if (SIMIX_process_is_suspended(action->comm.src_proc))
622         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
623                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
624       else
625         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
626                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
627
628       surf_workstation_model->suspend(action->comm.surf_comm);
629
630     }
631   }
632 }
633
634 /**
635  * \brief Answers the SIMIX simcalls associated to a communication action.
636  * \param action a finished communication action
637  */
638 void SIMIX_comm_finish(smx_action_t action)
639 {
640   unsigned int destroy_count = 0;
641   smx_simcall_t simcall;
642
643   while ((simcall = xbt_fifo_shift(action->simcalls))) {
644
645     /* If a waitany simcall is waiting for this action to finish, then remove
646        it from the other actions in the waitany list. Afterwards, get the
647        position of the actual action in the waitany dynar and
648        return it as the result of the simcall */
649     if (simcall->call == SIMCALL_COMM_WAITANY) {
650       SIMIX_waitany_remove_simcall_from_actions(simcall);
651       if (!MC_IS_ENABLED)
652         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
653     }
654
655     /* If the action is still in a rendez-vous point then remove from it */
656     if (action->comm.rdv)
657       SIMIX_rdv_remove(action->comm.rdv, action);
658
659     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
660
661     /* Check out for errors */
662     switch (action->state) {
663
664     case SIMIX_DONE:
665       XBT_DEBUG("Communication %p complete!", action);
666       SIMIX_comm_copy_data(action);
667       break;
668
669     case SIMIX_SRC_TIMEOUT:
670       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
671                     "Communication timeouted because of sender");
672       break;
673
674     case SIMIX_DST_TIMEOUT:
675       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
676                     "Communication timeouted because of receiver");
677       break;
678
679     case SIMIX_SRC_HOST_FAILURE:
680       if (simcall->issuer == action->comm.src_proc)
681         simcall->issuer->context->iwannadie = 1;
682 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
683       else
684         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
685       break;
686
687     case SIMIX_DST_HOST_FAILURE:
688       if (simcall->issuer == action->comm.dst_proc)
689         simcall->issuer->context->iwannadie = 1;
690 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
691       else
692         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
693       break;
694
695     case SIMIX_LINK_FAILURE:
696       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
697                 action,
698                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
699                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
700                 simcall->issuer->name, simcall->issuer, action->comm.detached);
701       if (action->comm.src_proc == simcall->issuer) {
702         XBT_DEBUG("I'm source");
703       } else if (action->comm.dst_proc == simcall->issuer) {
704         XBT_DEBUG("I'm dest");
705       } else {
706         XBT_DEBUG("I'm neither source nor dest");
707       }
708       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
709       break;
710
711     case SIMIX_CANCELED:
712       if (simcall->issuer == action->comm.dst_proc)
713         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
714                       "Communication canceled by the sender");
715       else
716         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
717                       "Communication canceled by the receiver");
718       break;
719
720     default:
721       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
722     }
723
724     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
725     if (simcall->issuer->doexception) {
726       if (simcall->call == SIMCALL_COMM_WAITANY) {
727         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
728       }
729       else if (simcall->call == SIMCALL_COMM_TESTANY) {
730         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
731       }
732     }
733
734     if (surf_workstation_model->extension.
735         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
736       simcall->issuer->context->iwannadie = 1;
737     }
738
739     simcall->issuer->waiting_action = NULL;
740     xbt_fifo_remove(simcall->issuer->comms, action);
741     SIMIX_simcall_answer(simcall);
742     destroy_count++;
743   }
744
745   while (destroy_count-- > 0)
746     SIMIX_comm_destroy(action);
747 }
748
749 /**
750  * \brief This function is called when a Surf communication action is finished.
751  * \param action the corresponding Simix communication
752  */
753 void SIMIX_post_comm(smx_action_t action)
754 {
755   /* Update action state */
756   if (action->comm.src_timeout &&
757       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
758     action->state = SIMIX_SRC_TIMEOUT;
759   else if (action->comm.dst_timeout &&
760            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
761     action->state = SIMIX_DST_TIMEOUT;
762   else if (action->comm.src_timeout &&
763            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
764     action->state = SIMIX_SRC_HOST_FAILURE;
765   else if (action->comm.dst_timeout &&
766            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
767     action->state = SIMIX_DST_HOST_FAILURE;
768   else if (action->comm.surf_comm &&
769            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
770     XBT_DEBUG("Puta madre. Surf says that the link broke");
771     action->state = SIMIX_LINK_FAILURE;
772   } else
773     action->state = SIMIX_DONE;
774
775   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
776             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
777
778   /* destroy the surf actions associated with the Simix communication */
779   SIMIX_comm_destroy_internal_actions(action);
780
781   /* remove the communication action from the list of pending communications
782    * of both processes (if they still exist) */
783   if (action->comm.src_proc) {
784     xbt_fifo_remove(action->comm.src_proc->comms, action);
785   }
786   if (action->comm.dst_proc) {
787     xbt_fifo_remove(action->comm.dst_proc->comms, action);
788   }
789
790   /* if there are simcalls associated with the action, then answer them */
791   if (xbt_fifo_size(action->simcalls)) {
792     SIMIX_comm_finish(action);
793   }
794 }
795
796 void SIMIX_comm_cancel(smx_action_t action)
797 {
798   /* if the action is a waiting state means that it is still in a rdv */
799   /* so remove from it and delete it */
800   if (action->state == SIMIX_WAITING) {
801     SIMIX_rdv_remove(action->comm.rdv, action);
802     action->state = SIMIX_CANCELED;
803   }
804   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
805            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
806
807     surf_workstation_model->action_cancel(action->comm.surf_comm);
808   }
809 }
810
811 void SIMIX_comm_suspend(smx_action_t action)
812 {
813   /*FIXME: shall we suspend also the timeout actions? */
814   if (action->comm.surf_comm)
815     surf_workstation_model->suspend(action->comm.surf_comm);
816   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
817 }
818
819 void SIMIX_comm_resume(smx_action_t action)
820 {
821   /*FIXME: check what happen with the timeouts */
822   if (action->comm.surf_comm)
823     surf_workstation_model->resume(action->comm.surf_comm);
824   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
825 }
826
827
828 /************* Action Getters **************/
829
830 /**
831  *  \brief get the amount remaining from the communication
832  *  \param action The communication
833  */
834 double SIMIX_comm_get_remains(smx_action_t action)
835 {
836   double remains;
837
838   if(!action){
839     return 0;
840   }
841
842   switch (action->state) {
843
844   case SIMIX_RUNNING:
845     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
846     break;
847
848   case SIMIX_WAITING:
849   case SIMIX_READY:
850     remains = 0; /*FIXME: check what should be returned */
851     break;
852
853   default:
854     remains = 0; /*FIXME: is this correct? */
855     break;
856   }
857   return remains;
858 }
859
860 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
861 {
862   return action->state;
863 }
864
865 /**
866  *  \brief Return the user data associated to the sender of the communication
867  *  \param action The communication
868  *  \return the user data
869  */
870 void* SIMIX_comm_get_src_data(smx_action_t action)
871 {
872   return action->comm.src_data;
873 }
874
875 /**
876  *  \brief Return the user data associated to the receiver of the communication
877  *  \param action The communication
878  *  \return the user data
879  */
880 void* SIMIX_comm_get_dst_data(smx_action_t action)
881 {
882   return action->comm.dst_data;
883 }
884
885 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
886 {
887   return action->comm.src_proc;
888 }
889
890 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
891 {
892   return action->comm.dst_proc;
893 }
894
895 #ifdef HAVE_LATENCY_BOUND_TRACKING
896 /**
897  *  \brief verify if communication is latency bounded
898  *  \param comm The communication
899  */
900 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
901 {
902   if(!action){
903     return 0;
904   }
905   if (action->comm.surf_comm){
906     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
907     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
908     XBT_DEBUG("Action limited is %d", action->latency_limited);
909   }
910   return action->latency_limited;
911 }
912 #endif
913
914 /******************************************************************************/
915 /*                    SIMIX_comm_copy_data callbacks                       */
916 /******************************************************************************/
917 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
918   &SIMIX_comm_copy_pointer_callback;
919
920 void
921 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
922 {
923   SIMIX_comm_copy_data_callback = callback;
924 }
925
926 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
927 {
928   xbt_assert((buff_size == sizeof(void *)),
929              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
930   *(void **) (comm->comm.dst_buff) = buff;
931 }
932
933 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
934 {
935   XBT_DEBUG("Copy the data over");
936   memcpy(comm->comm.dst_buff, buff, buff_size);
937   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
938     xbt_free(buff);
939     comm->comm.src_buff = NULL;
940   }
941 }
942
943
944 /**
945  *  \brief Copy the communication data from the sender's buffer to the receiver's one
946  *  \param comm The communication
947  */
948 void SIMIX_comm_copy_data(smx_action_t comm)
949 {
950   size_t buff_size = comm->comm.src_buff_size;
951   /* If there is no data to be copy then return */
952   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
953     return;
954
955   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
956             comm,
957             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
958             comm->comm.src_buff,
959             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
960             comm->comm.dst_buff, buff_size);
961
962   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
963   if (comm->comm.dst_buff_size)
964     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
965
966   /* Update the receiver's buffer size to the copied amount */
967   if (comm->comm.dst_buff_size)
968     *comm->comm.dst_buff_size = buff_size;
969
970   if (buff_size > 0)
971     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
972
973   /* Set the copied flag so we copy data only once */
974   /* (this function might be called from both communication ends) */
975   comm->comm.copied = 1;
976 }