Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add an immediate asynchronous send possibility for messages. This can be set by addin...
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23             int (*match_fun)(void *, void *,smx_action_t),
24             void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
26
27 void SIMIX_network_init(void)
28 {
29   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
30 }
31
32 void SIMIX_network_exit(void)
33 {
34   xbt_dict_free(&rdv_points);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_rdv_t SIMIX_rdv_create(const char *name)
42 {
43   /* two processes may have pushed the same rdv_create simcall at the same time */
44   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
45
46   if (!rdv) {
47     rdv = xbt_new0(s_smx_rvpoint_t, 1);
48     rdv->name = name ? xbt_strdup(name) : NULL;
49     rdv->comm_fifo = xbt_fifo_new();
50     rdv->done_comm_fifo = xbt_fifo_new();
51     rdv->permanent_receiver=NULL;
52
53         XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
54
55     if (rdv->name)
56       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
57   }
58   return rdv;
59 }
60
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
62 {
63   if (rdv->name)
64     xbt_dict_remove(rdv_points, rdv->name);
65 }
66
67 void SIMIX_rdv_free(void *data)
68 {
69   XBT_DEBUG("rdv free %p", data);
70   smx_rdv_t rdv = (smx_rdv_t) data;
71   xbt_free(rdv->name);
72   xbt_fifo_free(rdv->comm_fifo);
73   xbt_fifo_free(rdv->done_comm_fifo);
74
75   xbt_free(rdv);  
76 }
77
78 xbt_dict_t SIMIX_get_rdv_points()
79 {
80   return rdv_points;
81 }
82
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
84 {
85   return xbt_dict_get_or_null(rdv_points, name);
86 }
87
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
89 {
90   smx_action_t comm = NULL;
91   xbt_fifo_item_t item = NULL;
92   int count = 0;
93
94   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95     if (comm->comm.src_proc->smx_host == host)
96       count++;
97   }
98
99   return count;
100 }
101
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
103 {
104   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
105 }
106
107 /**
108  *  \brief Pushes a communication action into a rendez-vous point
109  *  \param rdv The rendez-vous point
110  *  \param comm The communication action
111  */
112 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
113 {
114   xbt_fifo_push(rdv->comm_fifo, comm);
115   comm->comm.rdv = rdv;
116 }
117
118 /**
119  *  \brief Removes a communication action from a rendez-vous point
120  *  \param rdv The rendez-vous point
121  *  \param comm The communication action
122  */
123 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
124 {
125   xbt_fifo_remove(rdv->comm_fifo, comm);
126   comm->comm.rdv = NULL;
127 }
128
129 /**
130  *  \brief Checks if there is a communication action queued in a fifo matching our needs
131  *  \param type The type of communication we are looking for (comm_send, comm_recv)
132  *  \return The communication action if found, NULL otherwise
133  */
134 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
135                                    int (*match_fun)(void *, void *,smx_action_t),
136                                    void *this_user_data, smx_action_t my_action)
137 {
138   smx_action_t action;
139   xbt_fifo_item_t item;
140   void* other_user_data = NULL;
141
142   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
143     if (action->comm.type == SIMIX_COMM_SEND) {
144       other_user_data = action->comm.src_data;
145     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
146       other_user_data = action->comm.dst_data;
147     }
148     if (action->comm.type == type &&
149         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
150         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
151       XBT_DEBUG("Found a matching communication action %p", action);
152       xbt_fifo_remove_item(fifo, item);
153       xbt_fifo_free_item(item);
154       action->comm.refcount++;
155       action->comm.rdv = NULL;
156       return action;
157     }
158     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
159            " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
160            action, (int)action->comm.type, (int)type);
161   }
162   XBT_DEBUG("No matching communication action found");
163   return NULL;
164 }
165
166
167 /******************************************************************************/
168 /*                            Communication Actions                            */
169 /******************************************************************************/
170
171 /**
172  *  \brief Creates a new communicate action
173  *  \param type The direction of communication (comm_send, comm_recv)
174  *  \return The new communicate action
175  */
176 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
177 {
178   smx_action_t act;
179
180   /* alloc structures */
181   act = xbt_mallocator_get(simix_global->action_mallocator);
182
183   act->type = SIMIX_ACTION_COMMUNICATE;
184   act->state = SIMIX_WAITING;
185
186   /* set communication */
187   act->comm.type = type;
188   act->comm.refcount = 1;
189
190 #ifdef HAVE_LATENCY_BOUND_TRACKING
191   //initialize with unknown value
192   act->latency_limited = -1;
193 #endif
194
195 #ifdef HAVE_TRACING
196   act->category = NULL;
197 #endif
198
199   XBT_DEBUG("Create communicate action %p", act);
200   ++smx_total_comms;
201
202   return act;
203 }
204
205 /**
206  *  \brief Destroy a communicate action
207  *  \param action The communicate action to be destroyed
208  */
209 void SIMIX_comm_destroy(smx_action_t action)
210 {
211   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
212             action, action->comm.refcount, (int)action->state);
213
214   if (action->comm.refcount <= 0) {
215     xbt_backtrace_display_current();
216     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
217             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
218   }
219   action->comm.refcount--;
220   if (action->comm.refcount > 0)
221     return;
222   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
223         action->comm.refcount);
224
225 #ifdef HAVE_LATENCY_BOUND_TRACKING
226     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
227 #endif
228
229   xbt_free(action->name);
230   SIMIX_comm_destroy_internal_actions(action);
231
232   if (action->comm.detached && action->state != SIMIX_DONE) {
233     /* the communication has failed and was detached:
234      * we have to free the buffer */
235     if (action->comm.clean_fun) {
236       action->comm.clean_fun(action->comm.src_buff);
237     }
238     action->comm.src_buff = NULL;
239   }
240
241   xbt_mallocator_release(simix_global->action_mallocator, action);
242 }
243
244 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
245 {
246   if (action->comm.surf_comm){
247 #ifdef HAVE_LATENCY_BOUND_TRACKING
248     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
249 #endif
250     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
251     action->comm.surf_comm = NULL;
252   }
253
254   if (action->comm.src_timeout){
255     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
256     action->comm.src_timeout = NULL;
257   }
258
259   if (action->comm.dst_timeout){
260     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
261     action->comm.dst_timeout = NULL;
262   }
263 }
264
265 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
266                               double task_size, double rate,
267                               void *src_buff, size_t src_buff_size,
268                               int (*match_fun)(void *, void *,smx_action_t),
269                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
270                               void *data,
271                               int detached)
272 {
273   XBT_DEBUG("send from %p\n", rdv);
274
275   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
276   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
277
278   /* Look for communication action matching our needs. We also provide a description of
279    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
280    *
281    * If it is not found then push our communication into the rendez-vous point */
282   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
283
284   if (!other_action) {
285     other_action = this_action;
286
287     if (rdv->permanent_receiver!=NULL){
288           //this mailbox is for small messages, which have to be sent right now
289           other_action->state = SIMIX_READY;
290           other_action->comm.dst_proc=rdv->permanent_receiver;
291           other_action->comm.refcount++;
292           other_action->comm.rdv = rdv;
293           xbt_fifo_push(rdv->done_comm_fifo,other_action);
294       other_action->comm.rdv=rdv;
295           XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
296
297     }else{
298       SIMIX_rdv_push(rdv, this_action);
299         }
300   } else {
301         XBT_DEBUG("Receive already pushed\n");
302
303     SIMIX_comm_destroy(this_action);
304     --smx_total_comms; // this creation was a pure waste
305
306     other_action->state = SIMIX_READY;
307     other_action->comm.type = SIMIX_COMM_READY;
308
309   }
310   xbt_fifo_push(src_proc->comms, other_action);
311
312   /* if the communication action is detached then decrease the refcount
313    * by one, so it will be eliminated by the receiver's destroy call */
314   if (detached) {
315     other_action->comm.detached = 1;
316     other_action->comm.refcount--;
317     other_action->comm.clean_fun = clean_fun;
318   } else {
319     other_action->comm.clean_fun = NULL;
320   }
321
322   /* Setup the communication action */
323   other_action->comm.src_proc = src_proc;
324   other_action->comm.task_size = task_size;
325   other_action->comm.rate = rate;
326   other_action->comm.src_buff = src_buff;
327   other_action->comm.src_buff_size = src_buff_size;
328   other_action->comm.src_data = data;
329
330   other_action->comm.match_fun = match_fun;
331
332   if (MC_IS_ENABLED) {
333     other_action->state = SIMIX_RUNNING;
334     return other_action;
335   }
336
337   SIMIX_comm_start(other_action);
338   return (detached ? NULL : other_action);
339 }
340
341 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
342                       void *dst_buff, size_t *dst_buff_size,
343                       int (*match_fun)(void *, void *, smx_action_t), void *data)
344 {
345         XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
346         smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
347
348         smx_action_t other_action;
349   //communication already done, get it inside the fifo of completed comms
350   //permanent receive v1
351   //int already_received=0;
352   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
353
354           XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
355           //find a match in the already received fifo
356           other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
357           //if not found, assume the receiver came first, register it to the mailbox in the classical way
358           if (!other_action)  {
359                 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
360                 other_action = this_action;
361                 SIMIX_rdv_push(rdv, this_action);
362           }else{
363                 if(other_action->comm.surf_comm &&      SIMIX_comm_get_remains(other_action)==0.0)
364                 {
365                   XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
366                   other_action->state = SIMIX_DONE;
367                   other_action->comm.type = SIMIX_COMM_DONE;
368                   other_action->comm.rdv = NULL;
369                   SIMIX_comm_destroy(this_action);
370                   --smx_total_comms; // this creation was a pure waste
371                   //already_received=1;
372                   other_action->comm.refcount--;
373                 }/*else{
374                   XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
375                 }*/
376                 other_action->comm.refcount--;
377                 SIMIX_comm_destroy(this_action);
378                 --smx_total_comms; // this creation was a pure waste
379           }
380   }else{
381           /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
382
383           /* Look for communication action matching our needs. We also provide a description of
384            * ourself so that the other side also gets a chance of choosing if it wants to match with us.
385            *
386            * If it is not found then push our communication into the rendez-vous point */
387           other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
388
389           if (!other_action) {
390                 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
391                 other_action = this_action;
392                 SIMIX_rdv_push(rdv, this_action);
393           } else {
394                 SIMIX_comm_destroy(this_action);
395                 --smx_total_comms; // this creation was a pure waste
396                 other_action->state = SIMIX_READY;
397                 other_action->comm.type = SIMIX_COMM_READY;
398                 xbt_fifo_push(dst_proc->comms, other_action);
399
400           }
401   }
402
403
404           /* Setup communication action */
405           other_action->comm.dst_proc = dst_proc;
406           other_action->comm.dst_buff = dst_buff;
407           other_action->comm.dst_buff_size = dst_buff_size;
408           other_action->comm.dst_data = data;
409
410           other_action->comm.match_fun = match_fun;
411
412
413           /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
414           SIMIX_comm_copy_data(other_action);*/
415
416
417           if (MC_IS_ENABLED) {
418                 other_action->state = SIMIX_RUNNING;
419                 return other_action;
420           }
421
422           SIMIX_comm_start(other_action);
423  // }
424   return other_action;
425 }
426
427 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
428 {
429
430   /* the simcall may be a wait, a send or a recv */
431   surf_action_t sleep;
432
433   /* Associate this simcall to the wait action */
434   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
435
436   xbt_fifo_push(action->simcalls, simcall);
437   simcall->issuer->waiting_action = action;
438
439   if (MC_IS_ENABLED) {
440     if (idx == 0) {
441       action->state = SIMIX_DONE;
442     } else {
443       /* If we reached this point, the wait simcall must have a timeout */
444       /* Otherwise it shouldn't be enabled and executed by the MC */
445       if (timeout == -1)
446         THROW_IMPOSSIBLE;
447
448       if (action->comm.src_proc == simcall->issuer)
449         action->state = SIMIX_SRC_TIMEOUT;
450       else
451         action->state = SIMIX_DST_TIMEOUT;
452     }
453
454     SIMIX_comm_finish(action);
455     return;
456   }
457
458   /* If the action has already finish perform the error handling, */
459   /* otherwise set up a waiting timeout on the right side         */
460   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
461     SIMIX_comm_finish(action);
462   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
463     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
464     surf_workstation_model->action_data_set(sleep, action);
465
466     if (simcall->issuer == action->comm.src_proc)
467       action->comm.src_timeout = sleep;
468     else
469       action->comm.dst_timeout = sleep;
470   }
471 }
472
473 void SIMIX_pre_comm_test(smx_simcall_t simcall)
474 {
475   smx_action_t action = simcall->comm_test.comm;
476
477   if(MC_IS_ENABLED){
478     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
479     if(simcall->comm_test.result){
480       action->state = SIMIX_DONE;
481       xbt_fifo_push(action->simcalls, simcall);
482       SIMIX_comm_finish(action);
483     }else{
484       SIMIX_simcall_answer(simcall);
485     }
486     return;
487   }
488
489   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
490   if (simcall->comm_test.result) {
491     xbt_fifo_push(action->simcalls, simcall);
492     SIMIX_comm_finish(action);
493   } else {
494     SIMIX_simcall_answer(simcall);
495   }
496 }
497
498 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
499 {
500   unsigned int cursor;
501   smx_action_t action;
502   xbt_dynar_t actions = simcall->comm_testany.comms;
503   simcall->comm_testany.result = -1;
504
505   if (MC_IS_ENABLED){
506     if(idx == -1){
507       SIMIX_simcall_answer(simcall);
508     }else{
509       action = xbt_dynar_get_as(actions, idx, smx_action_t);
510       simcall->comm_testany.result = idx;
511       xbt_fifo_push(action->simcalls, simcall);
512       action->state = SIMIX_DONE;
513       SIMIX_comm_finish(action);
514     }
515     return;
516   }
517
518   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
519     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
520       simcall->comm_testany.result = cursor;
521       xbt_fifo_push(action->simcalls, simcall);
522       SIMIX_comm_finish(action);
523       return;
524     }
525   }
526   SIMIX_simcall_answer(simcall);
527 }
528
529 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
530 {
531   smx_action_t action;
532   unsigned int cursor = 0;
533   xbt_dynar_t actions = simcall->comm_waitany.comms;
534
535   if (MC_IS_ENABLED){
536     action = xbt_dynar_get_as(actions, idx, smx_action_t);
537     xbt_fifo_push(action->simcalls, simcall);
538     simcall->comm_waitany.result = idx;
539     action->state = SIMIX_DONE;
540     SIMIX_comm_finish(action);
541     return;
542   }
543
544   xbt_dynar_foreach(actions, cursor, action){
545     /* associate this simcall to the the action */
546     xbt_fifo_push(action->simcalls, simcall);
547
548     /* see if the action is already finished */
549     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
550       SIMIX_comm_finish(action);
551       break;
552     }
553   }
554 }
555
556 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
557 {
558   smx_action_t action;
559   unsigned int cursor = 0;
560   xbt_dynar_t actions = simcall->comm_waitany.comms;
561
562   xbt_dynar_foreach(actions, cursor, action) {
563     xbt_fifo_remove(action->simcalls, simcall);
564   }
565 }
566
567 /**
568  *  \brief Starts the simulation of a communication action.
569  *  \param action the communication action
570  */
571 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
572 {
573   /* If both the sender and the receiver are already there, start the communication */
574   if (action->state == SIMIX_READY) {
575
576     smx_host_t sender = action->comm.src_proc->smx_host;
577     smx_host_t receiver = action->comm.dst_proc->smx_host;
578
579     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
580            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
581
582     action->comm.surf_comm = surf_workstation_model->extension.workstation.
583         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
584
585     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
586
587     action->state = SIMIX_RUNNING;
588
589     /* If a link is failed, detect it immediately */
590     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
591       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
592     SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
593       action->state = SIMIX_LINK_FAILURE;
594       SIMIX_comm_destroy_internal_actions(action);
595     }
596
597     /* If any of the process is suspend, create the action but stop its execution,
598        it will be restarted when the sender process resume */
599     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
600         SIMIX_process_is_suspended(action->comm.dst_proc)) {
601       /* FIXME: check what should happen with the action state */
602
603       if (SIMIX_process_is_suspended(action->comm.src_proc))
604         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
605             SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
606       else
607         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
608             SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
609
610       surf_workstation_model->suspend(action->comm.surf_comm);
611
612     }
613   }
614 }
615
616 /**
617  * \brief Answers the SIMIX simcalls associated to a communication action.
618  * \param action a finished communication action
619  */
620 void SIMIX_comm_finish(smx_action_t action)
621 {
622   unsigned int destroy_count = 0;
623   smx_simcall_t simcall;
624
625   while ((simcall = xbt_fifo_shift(action->simcalls))) {
626
627     /* If a waitany simcall is waiting for this action to finish, then remove
628        it from the other actions in the waitany list. Afterwards, get the
629        position of the actual action in the waitany dynar and
630        return it as the result of the simcall */
631     if (simcall->call == SIMCALL_COMM_WAITANY) {
632       SIMIX_waitany_remove_simcall_from_actions(simcall);
633       if (!MC_IS_ENABLED)
634         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
635     }
636
637     /* If the action is still in a rendez-vous point then remove from it */
638     if (action->comm.rdv)
639       SIMIX_rdv_remove(action->comm.rdv, action);
640
641     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
642
643     /* Check out for errors */
644     switch (action->state) {
645
646       case SIMIX_DONE:
647         XBT_DEBUG("Communication %p complete!", action);
648         SIMIX_comm_copy_data(action);
649         break;
650
651       case SIMIX_SRC_TIMEOUT:
652         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
653                   "Communication timeouted because of sender");
654         break;
655
656       case SIMIX_DST_TIMEOUT:
657         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
658                   "Communication timeouted because of receiver");
659         break;
660
661       case SIMIX_SRC_HOST_FAILURE:
662         if (simcall->issuer == action->comm.src_proc)
663           simcall->issuer->context->iwannadie = 1;
664 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
665         else
666           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
667         break;
668
669       case SIMIX_DST_HOST_FAILURE:
670         if (simcall->issuer == action->comm.dst_proc)
671           simcall->issuer->context->iwannadie = 1;
672 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
673         else
674           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
675         break;
676
677       case SIMIX_LINK_FAILURE:
678         XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
679             action,
680             action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
681             action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
682             simcall->issuer->name, simcall->issuer, action->comm.detached);
683         if (action->comm.src_proc == simcall->issuer) {
684           XBT_DEBUG("I'm source");
685         } else if (action->comm.dst_proc == simcall->issuer) {
686           XBT_DEBUG("I'm dest");
687         } else {
688           XBT_DEBUG("I'm neither source nor dest");
689         }
690         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
691         break;
692
693       case SIMIX_CANCELED:
694         if (simcall->issuer == action->comm.dst_proc)
695           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
696                     "Communication canceled by the sender");
697         else
698           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
699                     "Communication canceled by the receiver");
700         break;
701
702       default:
703         xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
704     }
705
706     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
707     if (simcall->issuer->doexception) {
708       if (simcall->call == SIMCALL_COMM_WAITANY) {
709         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
710       }
711       else if (simcall->call == SIMCALL_COMM_TESTANY) {
712         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
713       }
714     }
715
716     if (surf_workstation_model->extension.
717         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
718       simcall->issuer->context->iwannadie = 1;
719     }
720
721     simcall->issuer->waiting_action = NULL;
722     xbt_fifo_remove(simcall->issuer->comms, action);
723     SIMIX_simcall_answer(simcall);
724     destroy_count++;
725   }
726
727   while (destroy_count-- > 0)
728     SIMIX_comm_destroy(action);
729 }
730
731 /**
732  * \brief This function is called when a Surf communication action is finished.
733  * \param action the corresponding Simix communication
734  */
735 void SIMIX_post_comm(smx_action_t action)
736 {
737   /* Update action state */
738   if (action->comm.src_timeout &&
739      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
740      action->state = SIMIX_SRC_TIMEOUT;
741   else if (action->comm.dst_timeout &&
742           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
743      action->state = SIMIX_DST_TIMEOUT;
744   else if (action->comm.src_timeout &&
745           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
746      action->state = SIMIX_SRC_HOST_FAILURE;
747   else if (action->comm.dst_timeout &&
748           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
749      action->state = SIMIX_DST_HOST_FAILURE;
750   else if (action->comm.surf_comm &&
751           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
752     XBT_DEBUG("Puta madre. Surf says that the link broke");
753      action->state = SIMIX_LINK_FAILURE;
754   } else
755     action->state = SIMIX_DONE;
756
757   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
758       action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
759
760   /* destroy the surf actions associated with the Simix communication */
761   SIMIX_comm_destroy_internal_actions(action);
762
763   /* remove the communication action from the list of pending communications
764    * of both processes (if they still exist) */
765   if (action->comm.src_proc) {
766     xbt_fifo_remove(action->comm.src_proc->comms, action);
767   }
768   if (action->comm.dst_proc) {
769     xbt_fifo_remove(action->comm.dst_proc->comms, action);
770   }
771
772   /* if there are simcalls associated with the action, then answer them */
773   if (xbt_fifo_size(action->simcalls)) {
774     SIMIX_comm_finish(action);
775   }
776 }
777
778 void SIMIX_comm_cancel(smx_action_t action)
779 {
780   /* if the action is a waiting state means that it is still in a rdv */
781   /* so remove from it and delete it */
782   if (action->state == SIMIX_WAITING) {
783     SIMIX_rdv_remove(action->comm.rdv, action);
784     action->state = SIMIX_CANCELED;
785   }
786   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
787       && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
788
789     surf_workstation_model->action_cancel(action->comm.surf_comm);
790   }
791 }
792
793 void SIMIX_comm_suspend(smx_action_t action)
794 {
795   /*FIXME: shall we suspend also the timeout actions? */
796   if (action->comm.surf_comm)
797     surf_workstation_model->suspend(action->comm.surf_comm);
798   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
799 }
800
801 void SIMIX_comm_resume(smx_action_t action)
802 {
803   /*FIXME: check what happen with the timeouts */
804   if (action->comm.surf_comm)
805     surf_workstation_model->resume(action->comm.surf_comm);
806   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
807 }
808
809
810 /************* Action Getters **************/
811
812 /**
813  *  \brief get the amount remaining from the communication
814  *  \param action The communication
815  */
816 double SIMIX_comm_get_remains(smx_action_t action)
817 {
818   double remains;
819
820   if(!action){
821       return 0;
822   }
823
824   switch (action->state) {
825
826     case SIMIX_RUNNING:
827       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
828       break;
829
830     case SIMIX_WAITING:
831     case SIMIX_READY:
832       remains = 0; /*FIXME: check what should be returned */
833       break;
834
835     default:
836       remains = 0; /*FIXME: is this correct? */
837       break;
838   }
839   return remains;
840 }
841
842 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
843 {
844   return action->state;
845 }
846
847 /**
848  *  \brief Return the user data associated to the sender of the communication
849  *  \param action The communication
850  *  \return the user data
851  */
852 void* SIMIX_comm_get_src_data(smx_action_t action)
853 {
854   return action->comm.src_data;
855 }
856
857 /**
858  *  \brief Return the user data associated to the receiver of the communication
859  *  \param action The communication
860  *  \return the user data
861  */
862 void* SIMIX_comm_get_dst_data(smx_action_t action)
863 {
864   return action->comm.dst_data;
865 }
866
867 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
868 {
869   return action->comm.src_proc;
870 }
871
872 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
873 {
874   return action->comm.dst_proc;
875 }
876
877 #ifdef HAVE_LATENCY_BOUND_TRACKING
878 /**
879  *  \brief verify if communication is latency bounded
880  *  \param comm The communication
881  */
882 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
883 {
884   if(!action){
885       return 0;
886   }
887   if (action->comm.surf_comm){
888       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
889       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
890       XBT_DEBUG("Action limited is %d", action->latency_limited);
891   }
892   return action->latency_limited;
893 }
894 #endif
895
896 /******************************************************************************/
897 /*                    SIMIX_comm_copy_data callbacks                       */
898 /******************************************************************************/
899 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
900     &SIMIX_comm_copy_pointer_callback;
901
902 void
903 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
904 {
905   SIMIX_comm_copy_data_callback = callback;
906 }
907
908 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
909 {
910   xbt_assert((buff_size == sizeof(void *)),
911               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
912   *(void **) (comm->comm.dst_buff) = buff;
913 }
914
915 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
916 {
917   XBT_DEBUG("Copy the data over");
918   memcpy(comm->comm.dst_buff, buff, buff_size);
919 }
920
921 void smpi_comm_copy_data_callback(smx_action_t comm, void* buff, size_t buff_size)
922 {
923   XBT_DEBUG("Copy the data over");
924   memcpy(comm->comm.dst_buff, buff, buff_size);
925   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
926     xbt_free(buff);
927     comm->comm.src_buff = NULL;
928   }
929 }
930
931 /**
932  *  \brief Copy the communication data from the sender's buffer to the receiver's one
933  *  \param comm The communication
934  */
935 void SIMIX_comm_copy_data(smx_action_t comm)
936 {
937   size_t buff_size = comm->comm.src_buff_size;
938   /* If there is no data to be copy then return */
939   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
940     return;
941
942   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
943          comm,
944          comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
945          comm->comm.src_buff,
946          comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
947          comm->comm.dst_buff, buff_size);
948
949   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
950   if (comm->comm.dst_buff_size)
951     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
952
953   /* Update the receiver's buffer size to the copied amount */
954   if (comm->comm.dst_buff_size)
955     *comm->comm.dst_buff_size = buff_size;
956
957   if (buff_size > 0)
958     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
959
960   /* Set the copied flag so we copy data only once */
961   /* (this function might be called from both communication ends) */
962   comm->comm.copied = 1;
963 }