Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
merge back the master trunk into the smpi branch
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
23 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
24                                           int (*match_fun)(void *, void *), void *);
25 static void SIMIX_rdv_free(void *data);
26
27 void SIMIX_network_init(void)
28 {
29   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
30 }
31
32 void SIMIX_network_exit(void)
33 {
34   xbt_dict_free(&rdv_points);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_rdv_t SIMIX_rdv_create(const char *name)
42 {
43   /* two processes may have pushed the same rdv_create request at the same time */
44   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
45
46   if (!rdv) {
47     rdv = xbt_new0(s_smx_rvpoint_t, 1);
48     rdv->name = name ? xbt_strdup(name) : NULL;
49     rdv->comm_fifo = xbt_fifo_new();
50
51     if (rdv->name)
52       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
53   }
54   return rdv;
55 }
56
57 void SIMIX_rdv_destroy(smx_rdv_t rdv)
58 {
59   if (rdv->name)
60     xbt_dict_remove(rdv_points, rdv->name);
61 }
62
63 void SIMIX_rdv_free(void *data)
64 {
65   smx_rdv_t rdv = (smx_rdv_t) data;
66   xbt_free(rdv->name);
67   xbt_fifo_free(rdv->comm_fifo);
68   xbt_free(rdv);  
69 }
70
71 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
72 {
73   return xbt_dict_get_or_null(rdv_points, name);
74 }
75
76 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
77 {
78   smx_action_t comm = NULL;
79   xbt_fifo_item_t item = NULL;
80   int count = 0;
81
82   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
83     if (comm->comm.src_proc->smx_host == host)
84       count++;
85   }
86
87   return count;
88 }
89
90 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
91 {
92   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
93 }
94
95 /**
96  *  \brief Push a communication request into a rendez-vous point
97  *  \param rdv The rendez-vous point
98  *  \param comm The communication request
99  */
100 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
101 {
102   xbt_fifo_push(rdv->comm_fifo, comm);
103   comm->comm.rdv = rdv;
104 }
105
106 /**
107  *  \brief Remove a communication request from a rendez-vous point
108  *  \param rdv The rendez-vous point
109  *  \param comm The communication request
110  */
111 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
112 {
113   xbt_fifo_remove(rdv->comm_fifo, comm);
114   comm->comm.rdv = NULL;
115 }
116
117 /**
118  *  \brief Wrapper to SIMIX_rdv_get_request
119  */
120 smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
121    return SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
122 }
123
124 /**
125  *  \brief Checks if there is a communication action queued in a rendez-vous matching our needs
126  *  \param type The type of communication we are looking for (comm_send, comm_recv)
127  *  \return The communication action if found, NULL otherwise
128  */
129 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
130                                    int (*match_fun)(void *, void *), void *data)
131 {
132   // FIXME rewrite this function by using SIMIX_rdv_has_send/recv_match
133   smx_action_t action;
134   xbt_fifo_item_t item;
135   void* req_data = NULL;
136
137   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
138     if (action->comm.type == SIMIX_COMM_SEND) {
139       req_data = action->comm.src_data;
140     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
141       req_data = action->comm.dst_data;
142     }
143     if (action->comm.type == type && (!match_fun || match_fun(data, req_data))) {
144       XBT_DEBUG("Found a matching communication action %p", action);
145       xbt_fifo_remove_item(rdv->comm_fifo, item);
146       xbt_fifo_free_item(item);
147       action->comm.refcount++;
148       action->comm.rdv = NULL;
149       return action;
150     }
151     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
152            " its type is %d but we are looking for a comm of type %d",
153            action, action->comm.type, type);
154   }
155   XBT_DEBUG("No matching communication action found");
156   return NULL;
157 }
158
159 /**
160  *  \brief Checks if there is a send communication action
161  *  queued in a rendez-vous matching our needs.
162  *  \return 1 if found, 0 otherwise
163  */
164 int SIMIX_comm_has_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
165
166   smx_action_t action;
167   xbt_fifo_item_t item;
168
169   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
170     if (action->comm.type == SIMIX_COMM_SEND
171         && (!match_fun || match_fun(data, action->comm.src_data))) {
172       XBT_DEBUG("Found a matching communication action %p", action);
173       return 1;
174     }
175   }
176   XBT_DEBUG("No matching communication action found");
177   return 0;
178 }
179
180 /**
181  *  \brief Checks if there is a recv communication action
182  *  queued in a rendez-vous matching our needs.
183  *  \return 1 if found, 0 otherwise
184  */
185 int SIMIX_comm_has_recv_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
186
187   smx_action_t action;
188   xbt_fifo_item_t item;
189
190   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t) {
191     if (action->comm.type == SIMIX_COMM_RECEIVE
192         && (!match_fun || match_fun(data, action->comm.dst_data))) {
193       XBT_DEBUG("Found a matching communication action %p", action);
194       return 1;
195     }
196   }
197   XBT_DEBUG("No matching communication action found");
198   return 0;
199 }
200
201 /******************************************************************************/
202 /*                            Comunication Actions                            */
203 /******************************************************************************/
204
205 /**
206  *  \brief Creates a new comunicate action
207  *  \param type The type of request (comm_send, comm_recv)
208  *  \return The new comunicate action
209  */
210 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
211 {
212   smx_action_t act;
213
214   /* alloc structures */
215   act = xbt_mallocator_get(simix_global->action_mallocator);
216
217   act->type = SIMIX_ACTION_COMMUNICATE;
218   act->state = SIMIX_WAITING;
219
220   /* set communication */
221   act->comm.type = type;
222   act->comm.refcount = 1;
223
224 #ifdef HAVE_LATENCY_BOUND_TRACKING
225   //initialize with unknown value
226   act->latency_limited = -1;
227 #endif
228
229 #ifdef HAVE_TRACING
230   act->category = NULL;
231 #endif
232
233   XBT_DEBUG("Create communicate action %p", act);
234   ++smx_total_comms;
235
236   return act;
237 }
238
239 /**
240  *  \brief Destroy a communicate action
241  *  \param action The communicate action to be destroyed
242  */
243 void SIMIX_comm_destroy(smx_action_t action)
244 {
245   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
246       action, action->comm.refcount, action->state);
247
248   if (action->comm.refcount <= 0) {
249         xbt_backtrace_display_current();
250     xbt_die("the refcount of comm %p is already 0 before decreasing it. "
251             "That's a bug!", action);
252   }
253   action->comm.refcount--;
254   if (action->comm.refcount > 0)
255     return;
256   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
257         action->comm.refcount);
258
259 #ifdef HAVE_LATENCY_BOUND_TRACKING
260     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
261 #endif
262
263   xbt_free(action->name);
264   SIMIX_comm_destroy_internal_actions(action);
265
266   if (action->comm.detached && action->state != SIMIX_DONE) {
267     /* the communication has failed and was detached:
268      * we have to free the buffer */
269     ((void_f_pvoid_t) action->comm.src_data)(action->comm.src_buff);
270   }
271
272   xbt_mallocator_release(simix_global->action_mallocator, action);
273 }
274
275 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
276 {
277   if (action->comm.surf_comm){
278 #ifdef HAVE_LATENCY_BOUND_TRACKING
279     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
280 #endif
281     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
282     action->comm.surf_comm = NULL;
283   }
284
285   if (action->comm.src_timeout){
286     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
287     action->comm.src_timeout = NULL;
288   }
289
290   if (action->comm.dst_timeout){
291     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
292     action->comm.dst_timeout = NULL;
293   }
294 }
295
296 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
297                               double task_size, double rate,
298                               void *src_buff, size_t src_buff_size,
299                               int (*match_fun)(void *, void *), void *data,
300                               int detached)
301 {
302   smx_action_t action;
303
304   /* Look for communication request matching our needs.
305      If it is not found then create it and push it into the rendez-vous point */
306   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
307
308   if (!action) {
309     action = SIMIX_comm_new(SIMIX_COMM_SEND);
310     SIMIX_rdv_push(rdv, action);
311   } else {
312     action->state = SIMIX_READY;
313     action->comm.type = SIMIX_COMM_READY;
314   }
315   xbt_fifo_push(src_proc->comms, action);
316
317   /* if the communication action is detached then decrease the refcount
318    * by one, so it will be eliminated by the receiver's destroy call */
319   if (detached) {
320     action->comm.detached = 1;
321     action->comm.refcount--;
322   }
323
324   /* Setup the communication request */
325   action->comm.src_proc = src_proc;
326   action->comm.task_size = task_size;
327   action->comm.rate = rate;
328   action->comm.src_buff = src_buff;
329   action->comm.src_buff_size = src_buff_size;
330   action->comm.src_data = data;
331
332   if (MC_IS_ENABLED) {
333     action->state = SIMIX_RUNNING;
334     return action;
335   }
336
337   SIMIX_comm_start(action);
338   return (detached ? NULL : action);
339 }
340
341 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
342                       void *dst_buff, size_t *dst_buff_size,
343                       int (*match_fun)(void *, void *), void *data)
344 {
345   smx_action_t action;
346
347   /* Look for communication request matching our needs.
348    * If it is not found then create it and push it into the rendez-vous point
349    */
350   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
351
352   if (!action) {
353     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
354     SIMIX_rdv_push(rdv, action);
355   } else {
356     action->state = SIMIX_READY;
357     action->comm.type = SIMIX_COMM_READY;
358   }
359   xbt_fifo_push(dst_proc->comms, action);
360
361   /* Setup communication request */
362   action->comm.dst_proc = dst_proc;
363   action->comm.dst_buff = dst_buff;
364   action->comm.dst_buff_size = dst_buff_size;
365   action->comm.dst_data = data;
366
367   if (MC_IS_ENABLED) {
368     action->state = SIMIX_RUNNING;
369     return action;
370   }
371
372   SIMIX_comm_start(action);
373   return action;
374 }
375
376 void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx)
377 {
378   /* the request may be a wait, a send or a recv */
379   surf_action_t sleep;
380
381   /* Associate this request to the action */
382   xbt_fifo_push(action->request_list, req);
383   req->issuer->waiting_action = action;
384
385   if (MC_IS_ENABLED) {
386     if (idx == 0) {
387       action->state = SIMIX_DONE;
388     } else {
389       /* If we reached this point, the wait request must have a timeout */
390       /* Otherwise it shouldn't be enabled and executed by the MC */
391       if (timeout == -1)
392         THROW_IMPOSSIBLE;
393
394       if (action->comm.src_proc == req->issuer)
395         action->state = SIMIX_SRC_TIMEOUT;
396       else
397         action->state = SIMIX_DST_TIMEOUT;
398     }
399
400     SIMIX_comm_finish(action);
401     return;
402   }
403
404   /* If the action has already finish perform the error handling, */
405   /* otherwise set up a waiting timeout on the right side         */
406   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
407     SIMIX_comm_finish(action);
408   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
409     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
410     surf_workstation_model->action_data_set(sleep, action);
411
412     if (req->issuer == action->comm.src_proc)
413       action->comm.src_timeout = sleep;
414     else
415       action->comm.dst_timeout = sleep;
416   }
417 }
418
419 void SIMIX_pre_comm_test(smx_req_t req)
420 {
421   smx_action_t action = req->comm_test.comm;
422
423   if(MC_IS_ENABLED){
424     req->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
425     if(req->comm_test.result){
426       action->state = SIMIX_DONE;
427       xbt_fifo_push(action->request_list, req);
428       SIMIX_comm_finish(action);
429     }else{
430       SIMIX_request_answer(req);
431     }
432     return;
433   }
434
435   req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
436   if (req->comm_test.result) {
437     xbt_fifo_push(action->request_list, req);
438     SIMIX_comm_finish(action);
439   } else {
440     SIMIX_request_answer(req);
441   }
442 }
443
444 void SIMIX_pre_comm_testany(smx_req_t req, int idx)
445 {
446   unsigned int cursor;
447   smx_action_t action;
448   xbt_dynar_t actions = req->comm_testany.comms;
449   req->comm_testany.result = -1;
450
451   if (MC_IS_ENABLED){
452     if(idx == -1){
453       SIMIX_request_answer(req);
454     }else{
455       action = xbt_dynar_get_as(actions, idx, smx_action_t);
456       req->comm_testany.result = idx;
457       xbt_fifo_push(action->request_list, req);
458       action->state = SIMIX_DONE;
459       SIMIX_comm_finish(action);
460     }
461     return;
462   }
463
464   xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
465     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
466       req->comm_testany.result = cursor;
467       xbt_fifo_push(action->request_list, req);
468       SIMIX_comm_finish(action);
469       return;
470     }
471   }
472   SIMIX_request_answer(req);
473 }
474
475 void SIMIX_pre_comm_waitany(smx_req_t req, int idx)
476 {
477   smx_action_t action;
478   unsigned int cursor = 0;
479   xbt_dynar_t actions = req->comm_waitany.comms;
480
481   if (MC_IS_ENABLED){
482     action = xbt_dynar_get_as(actions, idx, smx_action_t);
483     xbt_fifo_push(action->request_list, req);
484     req->comm_waitany.result = idx;
485     action->state = SIMIX_DONE;
486     SIMIX_comm_finish(action);
487     return;
488   }
489
490   xbt_dynar_foreach(actions, cursor, action){
491     /* Associate this request to the action */
492     xbt_fifo_push(action->request_list, req);
493     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
494       SIMIX_comm_finish(action);
495       break;
496     }
497   }
498 }
499
500 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
501 {
502   smx_action_t action;
503   unsigned int cursor = 0;
504   xbt_dynar_t actions = req->comm_waitany.comms;
505
506   xbt_dynar_foreach(actions, cursor, action){
507     xbt_fifo_remove(action->request_list, req);
508   }
509 }
510
511 /**
512  *  \brief Start the simulation of a communication request
513  *  \param action The communication action
514  */
515
516 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
517 {
518   /* If both the sender and the receiver are already there, start the communication */
519   if (action->state == SIMIX_READY) {
520
521     smx_host_t sender = action->comm.src_proc->smx_host;
522     smx_host_t receiver = action->comm.dst_proc->smx_host;
523
524     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
525            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
526
527     action->comm.surf_comm = surf_workstation_model->extension.workstation.
528         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
529
530     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
531
532     action->state = SIMIX_RUNNING;
533
534     /* If a link is failed, detect it immediately */
535     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
536       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
537           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
538       action->state = SIMIX_LINK_FAILURE;
539       SIMIX_comm_destroy_internal_actions(action);
540     }
541
542     /* If any of the process is suspend, create the action but stop its execution,
543        it will be restarted when the sender process resume */
544     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
545         SIMIX_process_is_suspended(action->comm.dst_proc)) {
546       /* FIXME: check what should happen with the action state */
547       surf_workstation_model->suspend(action->comm.surf_comm);
548     }
549   }
550 }
551
552 /**
553  * \brief Answers the SIMIX requests associated to a communication action.
554  * \param action a finished communication action
555  */
556 void SIMIX_comm_finish(smx_action_t action)
557 {
558   volatile unsigned int destroy_count = 0;
559   smx_req_t req;
560
561   while ((req = xbt_fifo_shift(action->request_list))) {
562
563     /* If a waitany request is waiting for this action to finish, then remove
564        it from the other actions in the waitany list. Afterwards, get the
565        position of the actual action in the waitany request's actions dynar and
566        return it as the result of the call */
567     if (req->call == REQ_COMM_WAITANY) {
568       SIMIX_waitany_req_remove_from_actions(req);
569       if (!MC_IS_ENABLED)
570         req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
571     }
572
573     /* If the action is still in a rendez-vous point then remove from it */
574     if (action->comm.rdv)
575       SIMIX_rdv_remove(action->comm.rdv, action);
576
577     XBT_DEBUG("SIMIX_comm_finish: action state = %d", action->state);
578
579     /* Check out for errors */
580     switch (action->state) {
581
582       case SIMIX_DONE:
583         XBT_DEBUG("Communication %p complete!", action);
584         SIMIX_comm_copy_data(action);
585         break;
586
587       case SIMIX_SRC_TIMEOUT:
588         TRY {
589           THROWF(timeout_error, 0, "Communication timeouted because of sender");
590         }
591         CATCH(req->issuer->running_ctx->exception) {
592           req->issuer->doexception = 1;
593         }
594         break;
595
596       case SIMIX_DST_TIMEOUT:
597         TRY {
598           THROWF(timeout_error, 0, "Communication timeouted because of receiver");
599         }
600         CATCH(req->issuer->running_ctx->exception) {
601           req->issuer->doexception = 1;
602         }
603         break;
604
605       case SIMIX_SRC_HOST_FAILURE:
606         TRY {
607           if (req->issuer == action->comm.src_proc)
608             THROWF(host_error, 0, "Host failed");
609           else
610             THROWF(network_error, 0, "Remote peer failed");
611         }
612         CATCH(req->issuer->running_ctx->exception) {
613           req->issuer->doexception = 1;
614         }
615         break;
616
617       case SIMIX_DST_HOST_FAILURE:
618         TRY {
619           if (req->issuer == action->comm.dst_proc)
620             THROWF(host_error, 0, "Host failed");
621           else
622             THROWF(network_error, 0, "Remote peer failed");
623         }
624         CATCH(req->issuer->running_ctx->exception) {
625           req->issuer->doexception = 1;
626         }
627         break;
628
629       case SIMIX_LINK_FAILURE:
630         TRY {
631           XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
632               action,
633               action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
634               action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
635               req->issuer->name, req->issuer);
636           THROWF(network_error, 0, "Link failure");
637         }
638         CATCH(req->issuer->running_ctx->exception) {
639           req->issuer->doexception = 1;
640         }
641         break;
642
643       case SIMIX_CANCELED:
644         TRY {
645           if (req->issuer == action->comm.dst_proc) {
646             THROWF(cancel_error, 0, "Communication canceled by the sender");
647           }
648           else {
649             THROWF(cancel_error, 0, "Communication canceled by the receiver");
650           }
651         }
652         CATCH(req->issuer->running_ctx->exception) {
653           req->issuer->doexception = 1;
654         }
655         break;
656
657       default:
658         xbt_die("Unexpected action state in SIMIX_comm_finish: %d", action->state);
659     }
660
661     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
662     if (req->issuer->doexception) {
663       if (req->call == REQ_COMM_WAITANY) {
664         req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_waitany.comms, &action);
665       }
666       else if (req->call == REQ_COMM_TESTANY) {
667         req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_testany.comms, &action);
668       }
669     }
670
671     req->issuer->waiting_action = NULL;
672     xbt_fifo_remove(req->issuer->comms, action);
673     SIMIX_request_answer(req);
674     destroy_count++;
675   }
676
677   while (destroy_count-- > 0)
678     SIMIX_comm_destroy(action);
679 }
680
681 /**
682  * \brief This function is called when a Surf communication action is finished.
683  * \param action the corresponding Simix communication
684  */
685 void SIMIX_post_comm(smx_action_t action)
686 {
687   /* Update action state */
688   if (action->comm.src_timeout &&
689      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
690      action->state = SIMIX_SRC_TIMEOUT;
691   else if (action->comm.dst_timeout &&
692           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
693      action->state = SIMIX_DST_TIMEOUT;
694   else if (action->comm.src_timeout &&
695           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
696      action->state = SIMIX_SRC_HOST_FAILURE;
697   else if (action->comm.dst_timeout &&
698           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
699      action->state = SIMIX_DST_HOST_FAILURE;
700   else if (action->comm.surf_comm &&
701           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
702      action->state = SIMIX_LINK_FAILURE;
703   else
704     action->state = SIMIX_DONE;
705
706   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
707       action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
708
709   /* destroy the surf actions associated with the Simix communication */
710   SIMIX_comm_destroy_internal_actions(action);
711
712   /* remove the communication action from the list of pending communications
713    * of both processes (if they still exist) */
714   if (action->comm.src_proc) {
715     xbt_fifo_remove(action->comm.src_proc->comms, action);
716   }
717   if (action->comm.dst_proc) {
718     xbt_fifo_remove(action->comm.dst_proc->comms, action);
719   }
720
721   /* if there are requests associated with the action, then answer them */
722   if (xbt_fifo_size(action->request_list)) {
723     SIMIX_comm_finish(action);
724   }
725 }
726
727 void SIMIX_comm_cancel(smx_action_t action)
728 {
729   /* if the action is a waiting state means that it is still in a rdv */
730   /* so remove from it and delete it */
731   if (action->state == SIMIX_WAITING) {
732     SIMIX_rdv_remove(action->comm.rdv, action);
733     action->state = SIMIX_CANCELED;
734   }
735   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
736       && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
737
738     surf_workstation_model->action_cancel(action->comm.surf_comm);
739   }
740 }
741
742 void SIMIX_comm_suspend(smx_action_t action)
743 {
744   /*FIXME: shall we suspend also the timeout actions? */
745   surf_workstation_model->suspend(action->comm.surf_comm);
746 }
747
748 void SIMIX_comm_resume(smx_action_t action)
749 {
750   /*FIXME: check what happen with the timeouts */
751   surf_workstation_model->resume(action->comm.surf_comm);
752 }
753
754
755 /************* Action Getters **************/
756
757 /**
758  *  \brief get the amount remaining from the communication
759  *  \param action The communication
760  */
761 double SIMIX_comm_get_remains(smx_action_t action)
762 {
763   double remains;
764
765   if(!action){
766       return 0;
767   }
768
769   switch (action->state) {
770
771     case SIMIX_RUNNING:
772       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
773       break;
774
775     case SIMIX_WAITING:
776     case SIMIX_READY:
777       remains = 0; /*FIXME: check what should be returned */
778       break;
779
780     default:
781       remains = 0; /*FIXME: is this correct? */
782       break;
783   }
784   return remains;
785 }
786
787 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
788 {
789   return action->state;
790 }
791
792 /**
793  *  \brief Return the user data associated to the sender of the communication
794  *  \param action The communication
795  *  \return the user data
796  */
797 void* SIMIX_comm_get_src_data(smx_action_t action)
798 {
799   return action->comm.src_data;
800 }
801
802 /**
803  *  \brief Return the user data associated to the receiver of the communication
804  *  \param action The communication
805  *  \return the user data
806  */
807 void* SIMIX_comm_get_dst_data(smx_action_t action)
808 {
809   return action->comm.dst_data;
810 }
811
812 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
813 {
814   return action->comm.src_proc;
815 }
816
817 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
818 {
819   return action->comm.dst_proc;
820 }
821
822 #ifdef HAVE_LATENCY_BOUND_TRACKING
823 /**
824  *  \brief verify if communication is latency bounded
825  *  \param comm The communication
826  */
827 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
828 {
829   if(!action){
830       return 0;
831   }
832   if (action->comm.surf_comm){
833       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
834       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
835       XBT_DEBUG("Action limited is %d", action->latency_limited);
836   }
837   return action->latency_limited;
838 }
839 #endif
840
841 /******************************************************************************/
842 /*                    SIMIX_comm_copy_data callbacks                       */
843 /******************************************************************************/
844 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
845     &SIMIX_comm_copy_pointer_callback;
846
847 void
848 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
849 {
850   SIMIX_comm_copy_data_callback = callback;
851 }
852
853 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
854 {
855   xbt_assert((buff_size == sizeof(void *)),
856               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
857   *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
858 }
859
860 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
861 {
862   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
863 }
864
865 void smpi_comm_copy_data_callback(smx_action_t comm, size_t buff_size)
866 {
867   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
868   if (comm->comm.detached) // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
869           free(comm->comm.src_buff);
870 }
871
872 /**
873  *  \brief Copy the communication data from the sender's buffer to the receiver's one
874  *  \param comm The communication
875  */
876 void SIMIX_comm_copy_data(smx_action_t comm)
877 {
878   size_t buff_size = comm->comm.src_buff_size;
879   /* If there is no data to be copy then return */
880   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
881     return;
882
883   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
884          comm,
885          comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
886          comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
887
888   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
889   if (comm->comm.dst_buff_size)
890     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
891
892   /* Update the receiver's buffer size to the copied amount */
893   if (comm->comm.dst_buff_size)
894     *comm->comm.dst_buff_size = buff_size;
895
896 <<<<<<< HEAD
897   if (buff_size > 0)
898     (*SIMIX_comm_copy_data_callback) (comm, buff_size);
899 =======
900   if (buff_size == 0)
901     return;
902
903   SIMIX_comm_copy_data_callback(comm, buff_size);
904 >>>>>>> master
905
906   /* Set the copied flag so we copy data only once */
907   /* (this function might be called from both communication ends) */
908   comm->comm.copied = 1;
909 }