Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge remote-tracking branch 'origin/master'
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23             int (*match_fun)(void *, void *,smx_action_t),
24             void *user_data, smx_action_t my_action);
25 static void SIMIX_rdv_free(void *data);
26
27 void SIMIX_network_init(void)
28 {
29   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
30 }
31
32 void SIMIX_network_exit(void)
33 {
34   xbt_dict_free(&rdv_points);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_rdv_t SIMIX_rdv_create(const char *name)
42 {
43   /* two processes may have pushed the same rdv_create simcall at the same time */
44   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
45
46   if (!rdv) {
47     rdv = xbt_new0(s_smx_rvpoint_t, 1);
48     rdv->name = name ? xbt_strdup(name) : NULL;
49     rdv->comm_fifo = xbt_fifo_new();
50     rdv->done_comm_fifo = xbt_fifo_new();
51     rdv->permanent_receiver=NULL;
52
53         XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
54
55     if (rdv->name)
56       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
57   }
58   return rdv;
59 }
60
61 void SIMIX_rdv_destroy(smx_rdv_t rdv)
62 {
63   if (rdv->name)
64     xbt_dict_remove(rdv_points, rdv->name);
65 }
66
67 void SIMIX_rdv_free(void *data)
68 {
69   XBT_DEBUG("rdv free %p", data);
70   smx_rdv_t rdv = (smx_rdv_t) data;
71   xbt_free(rdv->name);
72   xbt_fifo_free(rdv->comm_fifo);
73   xbt_fifo_free(rdv->done_comm_fifo);
74
75   xbt_free(rdv);  
76 }
77
78 xbt_dict_t SIMIX_get_rdv_points()
79 {
80   return rdv_points;
81 }
82
83 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
84 {
85   return xbt_dict_get_or_null(rdv_points, name);
86 }
87
88 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
89 {
90   smx_action_t comm = NULL;
91   xbt_fifo_item_t item = NULL;
92   int count = 0;
93
94   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
95     if (comm->comm.src_proc->smx_host == host)
96       count++;
97   }
98
99   return count;
100 }
101
102 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
103 {
104   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
105 }
106
107 /**
108  *  \brief get the receiver (process associated to the mailbox)
109  *  \param rdv The rendez-vous point
110  *  \return process The receiving process (NULL if not set)
111  */
112 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
113 {
114   return rdv->permanent_receiver;
115 }
116
117 /**
118  *  \brief set the receiver of the rendez vous point to allow eager sends
119  *  \param rdv The rendez-vous point
120  *  \param process The receiving process
121  */
122 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
123 {
124    rdv->permanent_receiver=process;
125 }
126
127 /**
128  *  \brief Pushes a communication action into a rendez-vous point
129  *  \param rdv The rendez-vous point
130  *  \param comm The communication action
131  */
132 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
133 {
134   xbt_fifo_push(rdv->comm_fifo, comm);
135   comm->comm.rdv = rdv;
136 }
137
138 /**
139  *  \brief Removes a communication action from a rendez-vous point
140  *  \param rdv The rendez-vous point
141  *  \param comm The communication action
142  */
143 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
144 {
145   xbt_fifo_remove(rdv->comm_fifo, comm);
146   comm->comm.rdv = NULL;
147 }
148
149 /**
150  *  \brief Checks if there is a communication action queued in a fifo matching our needs
151  *  \param type The type of communication we are looking for (comm_send, comm_recv)
152  *  \return The communication action if found, NULL otherwise
153  */
154 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155                                    int (*match_fun)(void *, void *,smx_action_t),
156                                    void *this_user_data, smx_action_t my_action)
157 {
158   smx_action_t action;
159   xbt_fifo_item_t item;
160   void* other_user_data = NULL;
161
162   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
163     if (action->comm.type == SIMIX_COMM_SEND) {
164       other_user_data = action->comm.src_data;
165     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
166       other_user_data = action->comm.dst_data;
167     }
168     if (action->comm.type == type &&
169         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
170         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
171       XBT_DEBUG("Found a matching communication action %p", action);
172       xbt_fifo_remove_item(fifo, item);
173       xbt_fifo_free_item(item);
174       action->comm.refcount++;
175       action->comm.rdv = NULL;
176       return action;
177     }
178     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
179            " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
180            action, (int)action->comm.type, (int)type);
181   }
182   XBT_DEBUG("No matching communication action found");
183   return NULL;
184 }
185
186
187 /******************************************************************************/
188 /*                            Communication Actions                            */
189 /******************************************************************************/
190
191 /**
192  *  \brief Creates a new communicate action
193  *  \param type The direction of communication (comm_send, comm_recv)
194  *  \return The new communicate action
195  */
196 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
197 {
198   smx_action_t act;
199
200   /* alloc structures */
201   act = xbt_mallocator_get(simix_global->action_mallocator);
202
203   act->type = SIMIX_ACTION_COMMUNICATE;
204   act->state = SIMIX_WAITING;
205
206   /* set communication */
207   act->comm.type = type;
208   act->comm.refcount = 1;
209
210 #ifdef HAVE_LATENCY_BOUND_TRACKING
211   //initialize with unknown value
212   act->latency_limited = -1;
213 #endif
214
215 #ifdef HAVE_TRACING
216   act->category = NULL;
217 #endif
218
219   XBT_DEBUG("Create communicate action %p", act);
220   ++smx_total_comms;
221
222   return act;
223 }
224
225 /**
226  *  \brief Destroy a communicate action
227  *  \param action The communicate action to be destroyed
228  */
229 void SIMIX_comm_destroy(smx_action_t action)
230 {
231   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
232             action, action->comm.refcount, (int)action->state);
233
234   if (action->comm.refcount <= 0) {
235     xbt_backtrace_display_current();
236     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
237             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
238   }
239   action->comm.refcount--;
240   if (action->comm.refcount > 0)
241     return;
242   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
243         action->comm.refcount);
244
245 #ifdef HAVE_LATENCY_BOUND_TRACKING
246     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
247 #endif
248
249   xbt_free(action->name);
250   SIMIX_comm_destroy_internal_actions(action);
251
252   if (action->comm.detached && action->state != SIMIX_DONE) {
253     /* the communication has failed and was detached:
254      * we have to free the buffer */
255     if (action->comm.clean_fun) {
256       action->comm.clean_fun(action->comm.src_buff);
257     }
258     action->comm.src_buff = NULL;
259   }
260
261   xbt_mallocator_release(simix_global->action_mallocator, action);
262 }
263
264 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
265 {
266   if (action->comm.surf_comm){
267 #ifdef HAVE_LATENCY_BOUND_TRACKING
268     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
269 #endif
270     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
271     action->comm.surf_comm = NULL;
272   }
273
274   if (action->comm.src_timeout){
275     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
276     action->comm.src_timeout = NULL;
277   }
278
279   if (action->comm.dst_timeout){
280     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
281     action->comm.dst_timeout = NULL;
282   }
283 }
284
285 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
286                               double task_size, double rate,
287                               void *src_buff, size_t src_buff_size,
288                               int (*match_fun)(void *, void *,smx_action_t),
289                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
290                               void *data,
291                               int detached)
292 {
293   XBT_DEBUG("send from %p\n", rdv);
294
295   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
296   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
297
298   /* Look for communication action matching our needs. We also provide a description of
299    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
300    *
301    * If it is not found then push our communication into the rendez-vous point */
302   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
303
304   if (!other_action) {
305     other_action = this_action;
306
307     if (rdv->permanent_receiver!=NULL){
308           //this mailbox is for small messages, which have to be sent right now
309           other_action->state = SIMIX_READY;
310           other_action->comm.dst_proc=rdv->permanent_receiver;
311           other_action->comm.refcount++;
312           other_action->comm.rdv = rdv;
313           xbt_fifo_push(rdv->done_comm_fifo,other_action);
314       other_action->comm.rdv=rdv;
315           XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
316
317     }else{
318       SIMIX_rdv_push(rdv, this_action);
319         }
320   } else {
321         XBT_DEBUG("Receive already pushed\n");
322
323     SIMIX_comm_destroy(this_action);
324     --smx_total_comms; // this creation was a pure waste
325
326     other_action->state = SIMIX_READY;
327     other_action->comm.type = SIMIX_COMM_READY;
328
329   }
330   xbt_fifo_push(src_proc->comms, other_action);
331
332   /* if the communication action is detached then decrease the refcount
333    * by one, so it will be eliminated by the receiver's destroy call */
334   if (detached) {
335     other_action->comm.detached = 1;
336     other_action->comm.refcount--;
337     other_action->comm.clean_fun = clean_fun;
338   } else {
339     other_action->comm.clean_fun = NULL;
340   }
341
342   /* Setup the communication action */
343   other_action->comm.src_proc = src_proc;
344   other_action->comm.task_size = task_size;
345   other_action->comm.rate = rate;
346   other_action->comm.src_buff = src_buff;
347   other_action->comm.src_buff_size = src_buff_size;
348   other_action->comm.src_data = data;
349
350   other_action->comm.match_fun = match_fun;
351
352   if (MC_IS_ENABLED) {
353     other_action->state = SIMIX_RUNNING;
354     return other_action;
355   }
356
357   SIMIX_comm_start(other_action);
358   return (detached ? NULL : other_action);
359 }
360
361 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
362                       void *dst_buff, size_t *dst_buff_size,
363                       int (*match_fun)(void *, void *, smx_action_t), void *data)
364 {
365         XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
366         smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
367
368         smx_action_t other_action;
369   //communication already done, get it inside the fifo of completed comms
370   //permanent receive v1
371   //int already_received=0;
372   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
373
374           XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
375           //find a match in the already received fifo
376           other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
377           //if not found, assume the receiver came first, register it to the mailbox in the classical way
378           if (!other_action)  {
379                 XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
380                 other_action = this_action;
381                 SIMIX_rdv_push(rdv, this_action);
382           }else{
383                 if(other_action->comm.surf_comm &&      SIMIX_comm_get_remains(other_action)==0.0)
384                 {
385                   XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
386                   other_action->state = SIMIX_DONE;
387                   other_action->comm.type = SIMIX_COMM_DONE;
388                   other_action->comm.rdv = NULL;
389                   SIMIX_comm_destroy(this_action);
390                   --smx_total_comms; // this creation was a pure waste
391                   //already_received=1;
392                   other_action->comm.refcount--;
393                 }/*else{
394                   XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
395                 }*/
396                 other_action->comm.refcount--;
397                 SIMIX_comm_destroy(this_action);
398                 --smx_total_comms; // this creation was a pure waste
399           }
400   }else{
401           /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
402
403           /* Look for communication action matching our needs. We also provide a description of
404            * ourself so that the other side also gets a chance of choosing if it wants to match with us.
405            *
406            * If it is not found then push our communication into the rendez-vous point */
407           other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
408
409           if (!other_action) {
410                 XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
411                 other_action = this_action;
412                 SIMIX_rdv_push(rdv, this_action);
413           } else {
414                 SIMIX_comm_destroy(this_action);
415                 --smx_total_comms; // this creation was a pure waste
416                 other_action->state = SIMIX_READY;
417                 other_action->comm.type = SIMIX_COMM_READY;
418                 xbt_fifo_push(dst_proc->comms, other_action);
419
420           }
421   }
422
423
424           /* Setup communication action */
425           other_action->comm.dst_proc = dst_proc;
426           other_action->comm.dst_buff = dst_buff;
427           other_action->comm.dst_buff_size = dst_buff_size;
428           other_action->comm.dst_data = data;
429
430           other_action->comm.match_fun = match_fun;
431
432
433           /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
434           SIMIX_comm_copy_data(other_action);*/
435
436
437           if (MC_IS_ENABLED) {
438                 other_action->state = SIMIX_RUNNING;
439                 return other_action;
440           }
441
442           SIMIX_comm_start(other_action);
443  // }
444   return other_action;
445 }
446
447 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
448 {
449
450   /* the simcall may be a wait, a send or a recv */
451   surf_action_t sleep;
452
453   /* Associate this simcall to the wait action */
454   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
455
456   xbt_fifo_push(action->simcalls, simcall);
457   simcall->issuer->waiting_action = action;
458
459   if (MC_IS_ENABLED) {
460     if (idx == 0) {
461       action->state = SIMIX_DONE;
462     } else {
463       /* If we reached this point, the wait simcall must have a timeout */
464       /* Otherwise it shouldn't be enabled and executed by the MC */
465       if (timeout == -1)
466         THROW_IMPOSSIBLE;
467
468       if (action->comm.src_proc == simcall->issuer)
469         action->state = SIMIX_SRC_TIMEOUT;
470       else
471         action->state = SIMIX_DST_TIMEOUT;
472     }
473
474     SIMIX_comm_finish(action);
475     return;
476   }
477
478   /* If the action has already finish perform the error handling, */
479   /* otherwise set up a waiting timeout on the right side         */
480   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
481     SIMIX_comm_finish(action);
482   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
483     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
484     surf_workstation_model->action_data_set(sleep, action);
485
486     if (simcall->issuer == action->comm.src_proc)
487       action->comm.src_timeout = sleep;
488     else
489       action->comm.dst_timeout = sleep;
490   }
491 }
492
493 void SIMIX_pre_comm_test(smx_simcall_t simcall)
494 {
495   smx_action_t action = simcall->comm_test.comm;
496
497   if(MC_IS_ENABLED){
498     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
499     if(simcall->comm_test.result){
500       action->state = SIMIX_DONE;
501       xbt_fifo_push(action->simcalls, simcall);
502       SIMIX_comm_finish(action);
503     }else{
504       SIMIX_simcall_answer(simcall);
505     }
506     return;
507   }
508
509   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
510   if (simcall->comm_test.result) {
511     xbt_fifo_push(action->simcalls, simcall);
512     SIMIX_comm_finish(action);
513   } else {
514     SIMIX_simcall_answer(simcall);
515   }
516 }
517
518 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
519 {
520   unsigned int cursor;
521   smx_action_t action;
522   xbt_dynar_t actions = simcall->comm_testany.comms;
523   simcall->comm_testany.result = -1;
524
525   if (MC_IS_ENABLED){
526     if(idx == -1){
527       SIMIX_simcall_answer(simcall);
528     }else{
529       action = xbt_dynar_get_as(actions, idx, smx_action_t);
530       simcall->comm_testany.result = idx;
531       xbt_fifo_push(action->simcalls, simcall);
532       action->state = SIMIX_DONE;
533       SIMIX_comm_finish(action);
534     }
535     return;
536   }
537
538   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
539     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
540       simcall->comm_testany.result = cursor;
541       xbt_fifo_push(action->simcalls, simcall);
542       SIMIX_comm_finish(action);
543       return;
544     }
545   }
546   SIMIX_simcall_answer(simcall);
547 }
548
549 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
550 {
551   smx_action_t action;
552   unsigned int cursor = 0;
553   xbt_dynar_t actions = simcall->comm_waitany.comms;
554
555   if (MC_IS_ENABLED){
556     action = xbt_dynar_get_as(actions, idx, smx_action_t);
557     xbt_fifo_push(action->simcalls, simcall);
558     simcall->comm_waitany.result = idx;
559     action->state = SIMIX_DONE;
560     SIMIX_comm_finish(action);
561     return;
562   }
563
564   xbt_dynar_foreach(actions, cursor, action){
565     /* associate this simcall to the the action */
566     xbt_fifo_push(action->simcalls, simcall);
567
568     /* see if the action is already finished */
569     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
570       SIMIX_comm_finish(action);
571       break;
572     }
573   }
574 }
575
576 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
577 {
578   smx_action_t action;
579   unsigned int cursor = 0;
580   xbt_dynar_t actions = simcall->comm_waitany.comms;
581
582   xbt_dynar_foreach(actions, cursor, action) {
583     xbt_fifo_remove(action->simcalls, simcall);
584   }
585 }
586
587 /**
588  *  \brief Starts the simulation of a communication action.
589  *  \param action the communication action
590  */
591 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
592 {
593   /* If both the sender and the receiver are already there, start the communication */
594   if (action->state == SIMIX_READY) {
595
596     smx_host_t sender = action->comm.src_proc->smx_host;
597     smx_host_t receiver = action->comm.dst_proc->smx_host;
598
599     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
600            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
601
602     action->comm.surf_comm = surf_workstation_model->extension.workstation.
603         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
604
605     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
606
607     action->state = SIMIX_RUNNING;
608
609     /* If a link is failed, detect it immediately */
610     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
611       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
612     SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
613       action->state = SIMIX_LINK_FAILURE;
614       SIMIX_comm_destroy_internal_actions(action);
615     }
616
617     /* If any of the process is suspend, create the action but stop its execution,
618        it will be restarted when the sender process resume */
619     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
620         SIMIX_process_is_suspended(action->comm.dst_proc)) {
621       /* FIXME: check what should happen with the action state */
622
623       if (SIMIX_process_is_suspended(action->comm.src_proc))
624         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
625             SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
626       else
627         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
628             SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
629
630       surf_workstation_model->suspend(action->comm.surf_comm);
631
632     }
633   }
634 }
635
636 /**
637  * \brief Answers the SIMIX simcalls associated to a communication action.
638  * \param action a finished communication action
639  */
640 void SIMIX_comm_finish(smx_action_t action)
641 {
642   unsigned int destroy_count = 0;
643   smx_simcall_t simcall;
644
645   while ((simcall = xbt_fifo_shift(action->simcalls))) {
646
647     /* If a waitany simcall is waiting for this action to finish, then remove
648        it from the other actions in the waitany list. Afterwards, get the
649        position of the actual action in the waitany dynar and
650        return it as the result of the simcall */
651     if (simcall->call == SIMCALL_COMM_WAITANY) {
652       SIMIX_waitany_remove_simcall_from_actions(simcall);
653       if (!MC_IS_ENABLED)
654         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
655     }
656
657     /* If the action is still in a rendez-vous point then remove from it */
658     if (action->comm.rdv)
659       SIMIX_rdv_remove(action->comm.rdv, action);
660
661     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
662
663     /* Check out for errors */
664     switch (action->state) {
665
666       case SIMIX_DONE:
667         XBT_DEBUG("Communication %p complete!", action);
668         SIMIX_comm_copy_data(action);
669         break;
670
671       case SIMIX_SRC_TIMEOUT:
672         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
673                   "Communication timeouted because of sender");
674         break;
675
676       case SIMIX_DST_TIMEOUT:
677         SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
678                   "Communication timeouted because of receiver");
679         break;
680
681       case SIMIX_SRC_HOST_FAILURE:
682         if (simcall->issuer == action->comm.src_proc)
683           simcall->issuer->context->iwannadie = 1;
684 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
685         else
686           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
687         break;
688
689       case SIMIX_DST_HOST_FAILURE:
690         if (simcall->issuer == action->comm.dst_proc)
691           simcall->issuer->context->iwannadie = 1;
692 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
693         else
694           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
695         break;
696
697       case SIMIX_LINK_FAILURE:
698         XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
699             action,
700             action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
701             action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
702             simcall->issuer->name, simcall->issuer, action->comm.detached);
703         if (action->comm.src_proc == simcall->issuer) {
704           XBT_DEBUG("I'm source");
705         } else if (action->comm.dst_proc == simcall->issuer) {
706           XBT_DEBUG("I'm dest");
707         } else {
708           XBT_DEBUG("I'm neither source nor dest");
709         }
710         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
711         break;
712
713       case SIMIX_CANCELED:
714         if (simcall->issuer == action->comm.dst_proc)
715           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
716                     "Communication canceled by the sender");
717         else
718           SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
719                     "Communication canceled by the receiver");
720         break;
721
722       default:
723         xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
724     }
725
726     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
727     if (simcall->issuer->doexception) {
728       if (simcall->call == SIMCALL_COMM_WAITANY) {
729         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
730       }
731       else if (simcall->call == SIMCALL_COMM_TESTANY) {
732         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
733       }
734     }
735
736     if (surf_workstation_model->extension.
737         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
738       simcall->issuer->context->iwannadie = 1;
739     }
740
741     simcall->issuer->waiting_action = NULL;
742     xbt_fifo_remove(simcall->issuer->comms, action);
743     SIMIX_simcall_answer(simcall);
744     destroy_count++;
745   }
746
747   while (destroy_count-- > 0)
748     SIMIX_comm_destroy(action);
749 }
750
751 /**
752  * \brief This function is called when a Surf communication action is finished.
753  * \param action the corresponding Simix communication
754  */
755 void SIMIX_post_comm(smx_action_t action)
756 {
757   /* Update action state */
758   if (action->comm.src_timeout &&
759      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
760      action->state = SIMIX_SRC_TIMEOUT;
761   else if (action->comm.dst_timeout &&
762           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
763      action->state = SIMIX_DST_TIMEOUT;
764   else if (action->comm.src_timeout &&
765           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
766      action->state = SIMIX_SRC_HOST_FAILURE;
767   else if (action->comm.dst_timeout &&
768           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
769      action->state = SIMIX_DST_HOST_FAILURE;
770   else if (action->comm.surf_comm &&
771           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
772     XBT_DEBUG("Puta madre. Surf says that the link broke");
773      action->state = SIMIX_LINK_FAILURE;
774   } else
775     action->state = SIMIX_DONE;
776
777   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
778       action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
779
780   /* destroy the surf actions associated with the Simix communication */
781   SIMIX_comm_destroy_internal_actions(action);
782
783   /* remove the communication action from the list of pending communications
784    * of both processes (if they still exist) */
785   if (action->comm.src_proc) {
786     xbt_fifo_remove(action->comm.src_proc->comms, action);
787   }
788   if (action->comm.dst_proc) {
789     xbt_fifo_remove(action->comm.dst_proc->comms, action);
790   }
791
792   /* if there are simcalls associated with the action, then answer them */
793   if (xbt_fifo_size(action->simcalls)) {
794     SIMIX_comm_finish(action);
795   }
796 }
797
798 void SIMIX_comm_cancel(smx_action_t action)
799 {
800   /* if the action is a waiting state means that it is still in a rdv */
801   /* so remove from it and delete it */
802   if (action->state == SIMIX_WAITING) {
803     SIMIX_rdv_remove(action->comm.rdv, action);
804     action->state = SIMIX_CANCELED;
805   }
806   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
807       && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
808
809     surf_workstation_model->action_cancel(action->comm.surf_comm);
810   }
811 }
812
813 void SIMIX_comm_suspend(smx_action_t action)
814 {
815   /*FIXME: shall we suspend also the timeout actions? */
816   if (action->comm.surf_comm)
817     surf_workstation_model->suspend(action->comm.surf_comm);
818   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
819 }
820
821 void SIMIX_comm_resume(smx_action_t action)
822 {
823   /*FIXME: check what happen with the timeouts */
824   if (action->comm.surf_comm)
825     surf_workstation_model->resume(action->comm.surf_comm);
826   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
827 }
828
829
830 /************* Action Getters **************/
831
832 /**
833  *  \brief get the amount remaining from the communication
834  *  \param action The communication
835  */
836 double SIMIX_comm_get_remains(smx_action_t action)
837 {
838   double remains;
839
840   if(!action){
841       return 0;
842   }
843
844   switch (action->state) {
845
846     case SIMIX_RUNNING:
847       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
848       break;
849
850     case SIMIX_WAITING:
851     case SIMIX_READY:
852       remains = 0; /*FIXME: check what should be returned */
853       break;
854
855     default:
856       remains = 0; /*FIXME: is this correct? */
857       break;
858   }
859   return remains;
860 }
861
862 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
863 {
864   return action->state;
865 }
866
867 /**
868  *  \brief Return the user data associated to the sender of the communication
869  *  \param action The communication
870  *  \return the user data
871  */
872 void* SIMIX_comm_get_src_data(smx_action_t action)
873 {
874   return action->comm.src_data;
875 }
876
877 /**
878  *  \brief Return the user data associated to the receiver of the communication
879  *  \param action The communication
880  *  \return the user data
881  */
882 void* SIMIX_comm_get_dst_data(smx_action_t action)
883 {
884   return action->comm.dst_data;
885 }
886
887 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
888 {
889   return action->comm.src_proc;
890 }
891
892 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
893 {
894   return action->comm.dst_proc;
895 }
896
897 #ifdef HAVE_LATENCY_BOUND_TRACKING
898 /**
899  *  \brief verify if communication is latency bounded
900  *  \param comm The communication
901  */
902 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
903 {
904   if(!action){
905       return 0;
906   }
907   if (action->comm.surf_comm){
908       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
909       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
910       XBT_DEBUG("Action limited is %d", action->latency_limited);
911   }
912   return action->latency_limited;
913 }
914 #endif
915
916 /******************************************************************************/
917 /*                    SIMIX_comm_copy_data callbacks                       */
918 /******************************************************************************/
919 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
920     &SIMIX_comm_copy_pointer_callback;
921
922 void
923 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
924 {
925   SIMIX_comm_copy_data_callback = callback;
926 }
927
928 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
929 {
930   xbt_assert((buff_size == sizeof(void *)),
931               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
932   *(void **) (comm->comm.dst_buff) = buff;
933 }
934
935 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
936 {
937   XBT_DEBUG("Copy the data over");
938   memcpy(comm->comm.dst_buff, buff, buff_size);
939   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
940     xbt_free(buff);
941     comm->comm.src_buff = NULL;
942   }
943 }
944
945
946 /**
947  *  \brief Copy the communication data from the sender's buffer to the receiver's one
948  *  \param comm The communication
949  */
950 void SIMIX_comm_copy_data(smx_action_t comm)
951 {
952   size_t buff_size = comm->comm.src_buff_size;
953   /* If there is no data to be copy then return */
954   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
955     return;
956
957   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
958          comm,
959          comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
960          comm->comm.src_buff,
961          comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
962          comm->comm.dst_buff, buff_size);
963
964   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
965   if (comm->comm.dst_buff_size)
966     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
967
968   /* Update the receiver's buffer size to the copied amount */
969   if (comm->comm.dst_buff_size)
970     *comm->comm.dst_buff_size = buff_size;
971
972   if (buff_size > 0)
973     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
974
975   /* Set the copied flag so we copy data only once */
976   /* (this function might be called from both communication ends) */
977   comm->comm.copied = 1;
978 }