Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
allow MSG users to play with request matching mecanism
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static XBT_INLINE void SIMIX_comm_start(smx_action_t action);
19 static void SIMIX_comm_finish(smx_action_t action);
20 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
21 static void SIMIX_comm_copy_data(smx_action_t comm);
22 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
24 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
25 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
26                                           int (*match_fun)(void *, void *), void *);
27 static void SIMIX_rdv_free(void *data);
28
29 void SIMIX_network_init(void)
30 {
31   rdv_points = xbt_dict_new();
32 }
33
34 void SIMIX_network_exit(void)
35 {
36   xbt_dict_free(&rdv_points);
37 }
38
39 /******************************************************************************/
40 /*                           Rendez-Vous Points                               */
41 /******************************************************************************/
42
43 smx_rdv_t SIMIX_rdv_create(const char *name)
44 {
45   /* two processes may have pushed the same rdv_create request at the same time */
46   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47
48   if (!rdv) {
49     rdv = xbt_new0(s_smx_rvpoint_t, 1);
50     rdv->name = name ? xbt_strdup(name) : NULL;
51     rdv->comm_fifo = xbt_fifo_new();
52
53     if (rdv->name)
54       xbt_dict_set(rdv_points, rdv->name, rdv, SIMIX_rdv_free);
55   }
56   return rdv;
57 }
58
59 void SIMIX_rdv_destroy(smx_rdv_t rdv)
60 {
61   if (rdv->name)
62     xbt_dict_remove(rdv_points, rdv->name);
63 }
64
65 void SIMIX_rdv_free(void *data)
66 {
67   smx_rdv_t rdv = (smx_rdv_t) data;
68   if (rdv->name)
69     xbt_free(rdv->name);
70   xbt_fifo_free(rdv->comm_fifo);
71   xbt_free(rdv);  
72 }
73
74 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
75 {
76   return xbt_dict_get_or_null(rdv_points, name);
77 }
78
79 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
80 {
81   smx_action_t comm = NULL;
82   xbt_fifo_item_t item = NULL;
83   int count = 0;
84
85   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
86     if (comm->comm.src_proc->smx_host == host)
87       count++;
88   }
89
90   return count;
91 }
92
93 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
94 {
95   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
96 }
97
98 /**
99  *  \brief Push a communication request into a rendez-vous point
100  *  \param rdv The rendez-vous point
101  *  \param comm The communication request
102  */
103 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
104 {
105   xbt_fifo_push(rdv->comm_fifo, comm);
106   comm->comm.rdv = rdv;
107 }
108
109 /**
110  *  \brief Remove a communication request from a rendez-vous point
111  *  \param rdv The rendez-vous point
112  *  \param comm The communication request
113  */
114 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
115 {
116   xbt_fifo_remove(rdv->comm_fifo, comm);
117   comm->comm.rdv = NULL;
118 }
119
120 smx_action_t SIMIX_comm_get_send_match(smx_rdv_t rdv, int (*match_fun)(void*, void*), void* data) {
121    return SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
122 }
123
124 /**
125  *  \brief Checks if there is a communication action queued in a rendez-vous matching our needs
126  *  \param type The type of communication we are looking for (comm_send, comm_recv)
127  *  \return The communication action if found, NULL otherwise
128  */
129 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type,
130                                    int (*match_fun)(void *, void *), void *data)
131 {
132   smx_action_t action;
133   xbt_fifo_item_t item;
134   void* req_data = NULL;
135
136   xbt_fifo_foreach(rdv->comm_fifo, item, action, smx_action_t){
137     if (action->comm.type == SIMIX_COMM_SEND) {
138       req_data = action->comm.src_data;
139     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
140       req_data = action->comm.dst_data;
141     }
142     if (action->comm.type == type && (!match_fun || match_fun(data, req_data))) {
143       XBT_DEBUG("Found a matching communication action %p", action);
144       xbt_fifo_remove_item(rdv->comm_fifo, item);
145       xbt_fifo_free_item(item);
146       action->comm.refcount++;
147       action->comm.rdv = NULL;
148       return action;
149     }
150     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
151            " its type is %d but we are looking for a comm of type %d",
152            action, action->comm.type, type);
153   }
154   XBT_DEBUG("No matching communication action found");
155   return NULL;
156 }
157
158 /******************************************************************************/
159 /*                            Comunication Actions                            */
160 /******************************************************************************/
161
162 /**
163  *  \brief Creates a new comunicate action
164  *  \param type The type of request (comm_send, comm_recv)
165  *  \return The new comunicate action
166  */
167 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
168 {
169   smx_action_t act;
170
171   /* alloc structures */
172   act = xbt_mallocator_get(simix_global->action_mallocator);
173   act->type = SIMIX_ACTION_COMMUNICATE;
174   act->state = SIMIX_WAITING;
175
176   /* set communication */
177   act->comm.type = type;
178   act->comm.refcount = 1;
179
180 #ifdef HAVE_LATENCY_BOUND_TRACKING
181   //initialize with unknown value
182   act->latency_limited = -1;
183 #endif
184
185 #ifdef HAVE_TRACING
186   act->category = NULL;
187 #endif
188
189   XBT_DEBUG("Create communicate action %p", act);
190   ++smx_total_comms;
191
192   return act;
193 }
194
195 /**
196  *  \brief Destroy a communicate action
197  *  \param action The communicate action to be destroyed
198  */
199 void SIMIX_comm_destroy(smx_action_t action)
200 {
201   XBT_DEBUG("Destroy action %p (refcount:%d)", action, action->comm.refcount);
202
203   if (action->comm.refcount <= 0)
204     xbt_die("the refcount of comm %p is already 0 before decreasing it. "
205             "That's a bug!", action);
206
207   action->comm.refcount--;
208   if (action->comm.refcount > 0)
209     return;
210   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
211         action->comm.refcount);
212
213 #ifdef HAVE_LATENCY_BOUND_TRACKING
214     action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
215 #endif
216
217 #ifdef HAVE_TRACING
218   TRACE_smx_action_destroy(action);
219 #endif
220
221   xbt_free(action->name);
222   SIMIX_comm_destroy_internal_actions(action);
223
224   if (action->comm.detached && action->state != SIMIX_DONE) {
225     /* the communication has failed and was detached:
226      * we have to free the buffer */
227     ((void_f_pvoid_t) action->comm.src_data)(action->comm.src_buff);
228   }
229
230   xbt_mallocator_release(simix_global->action_mallocator, action);
231 }
232
233 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
234 {
235   if (action->comm.surf_comm){
236 #ifdef HAVE_LATENCY_BOUND_TRACKING
237     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
238 #endif
239     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
240     action->comm.surf_comm = NULL;
241   }
242
243   if (action->comm.src_timeout){
244     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
245     action->comm.src_timeout = NULL;
246   }
247
248   if (action->comm.dst_timeout){
249     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
250     action->comm.dst_timeout = NULL;
251   }
252 }
253
254 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
255                               double task_size, double rate,
256                               void *src_buff, size_t src_buff_size,
257                               int (*match_fun)(void *, void *), void *data,
258                               int detached)
259 {
260   smx_action_t action;
261
262   /* Look for communication request matching our needs.
263      If it is not found then create it and push it into the rendez-vous point */
264   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data);
265
266   if (!action) {
267     action = SIMIX_comm_new(SIMIX_COMM_SEND);
268     SIMIX_rdv_push(rdv, action);
269   } else {
270     action->state = SIMIX_READY;
271     action->comm.type = SIMIX_COMM_READY;
272   }
273
274   /* If the communication action is detached then decrease the refcount
275    * by one, so it will be eliminated by the receivers destroy call */
276   if (detached) {
277     action->comm.detached = 1;
278     action->comm.refcount--;
279   }
280
281   /* Setup the communication request */
282   action->comm.src_proc = src_proc;
283   action->comm.task_size = task_size;
284   action->comm.rate = rate;
285   action->comm.src_buff = src_buff;
286   action->comm.src_buff_size = src_buff_size;
287   action->comm.src_data = data;
288
289   if (MC_IS_ENABLED) {
290     action->state = SIMIX_RUNNING;
291     return action;
292   }
293
294   SIMIX_comm_start(action);
295   return action;
296 }
297
298 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
299                       void *dst_buff, size_t *dst_buff_size,
300                       int (*match_fun)(void *, void *), void *data)
301 {
302   smx_action_t action;
303
304   /* Look for communication request matching our needs.
305    * If it is not found then create it and push it into the rendez-vous point
306    */
307   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data);
308
309   if (!action) {
310     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
311     SIMIX_rdv_push(rdv, action);
312   } else {
313     action->state = SIMIX_READY;
314     action->comm.type = SIMIX_COMM_READY;
315   }
316
317   /* Setup communication request */
318   action->comm.dst_proc = dst_proc;
319   action->comm.dst_buff = dst_buff;
320   action->comm.dst_buff_size = dst_buff_size;
321   action->comm.dst_data = data;
322
323   if (MC_IS_ENABLED) {
324     action->state = SIMIX_RUNNING;
325     return action;
326   }
327
328   SIMIX_comm_start(action);
329   return action;
330 }
331
332 void SIMIX_pre_comm_wait(smx_req_t req, smx_action_t action, double timeout, int idx)
333 {
334   /* the request may be a wait, a send or a recv */
335   surf_action_t sleep;
336
337   /* Associate this request to the action */
338   xbt_fifo_push(action->request_list, req);
339   req->issuer->waiting_action = action;
340
341   if (MC_IS_ENABLED) {
342     if (idx == 0) {
343       action->state = SIMIX_DONE;
344     } else {
345       /* If we reached this point, the wait request must have a timeout */
346       /* Otherwise it shouldn't be enabled and executed by the MC */
347       if (timeout == -1)
348         THROW_IMPOSSIBLE;
349
350       if (action->comm.src_proc == req->issuer)
351         action->state = SIMIX_SRC_TIMEOUT;
352       else
353         action->state = SIMIX_DST_TIMEOUT;
354     }
355
356     SIMIX_comm_finish(action);
357     return;
358   }
359
360   /* If the action has already finish perform the error handling, */
361   /* otherwise set up a waiting timeout on the right side         */
362   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
363     SIMIX_comm_finish(action);
364   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
365     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
366     surf_workstation_model->action_data_set(sleep, action);
367
368     if (req->issuer == action->comm.src_proc)
369       action->comm.src_timeout = sleep;
370     else
371       action->comm.dst_timeout = sleep;
372   }
373 }
374
375 void SIMIX_pre_comm_test(smx_req_t req)
376 {
377   smx_action_t action = req->comm_test.comm;
378
379   if(MC_IS_ENABLED){
380     req->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
381     if(req->comm_test.result){
382       action->state = SIMIX_DONE;
383       xbt_fifo_push(action->request_list, req);
384       SIMIX_comm_finish(action);
385     }else{
386       SIMIX_request_answer(req);
387     }
388     return;
389   }
390
391   req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
392   if (req->comm_test.result) {
393     xbt_fifo_push(action->request_list, req);
394     SIMIX_comm_finish(action);
395   } else {
396     SIMIX_request_answer(req);
397   }
398 }
399
400 void SIMIX_pre_comm_testany(smx_req_t req, int idx)
401 {
402   unsigned int cursor;
403   smx_action_t action;
404   xbt_dynar_t actions = req->comm_testany.comms;
405   req->comm_testany.result = -1;
406
407   if (MC_IS_ENABLED){
408     if(idx == -1){
409       SIMIX_request_answer(req);
410     }else{
411       action = xbt_dynar_get_as(actions, idx, smx_action_t);
412       req->comm_testany.result = idx;
413       xbt_fifo_push(action->request_list, req);
414       action->state = SIMIX_DONE;
415       SIMIX_comm_finish(action);
416     }
417     return;
418   }
419
420   xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
421     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
422       req->comm_testany.result = cursor;
423       xbt_fifo_push(action->request_list, req);
424       SIMIX_comm_finish(action);
425       return;
426     }
427   }
428   SIMIX_request_answer(req);
429 }
430
431 void SIMIX_pre_comm_waitany(smx_req_t req, int idx)
432 {
433   smx_action_t action;
434   unsigned int cursor = 0;
435   xbt_dynar_t actions = req->comm_waitany.comms;
436
437   if (MC_IS_ENABLED){
438     action = xbt_dynar_get_as(actions, idx, smx_action_t);
439     xbt_fifo_push(action->request_list, req);
440     req->comm_waitany.result = idx;
441     action->state = SIMIX_DONE;
442     SIMIX_comm_finish(action);
443     return;
444   }
445
446   xbt_dynar_foreach(actions, cursor, action){
447     /* Associate this request to the action */
448     xbt_fifo_push(action->request_list, req);
449     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
450       SIMIX_comm_finish(action);
451       break;
452     }
453   }
454 }
455
456 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
457 {
458   smx_action_t action;
459   unsigned int cursor = 0;
460   xbt_dynar_t actions = req->comm_waitany.comms;
461
462   xbt_dynar_foreach(actions, cursor, action){
463     xbt_fifo_remove(action->request_list, req);
464   }
465 }
466
467 /**
468  *  \brief Start the simulation of a communication request
469  *  \param action The communication action
470  */
471 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
472 {
473   /* If both the sender and the receiver are already there, start the communication */
474   if (action->state == SIMIX_READY) {
475     smx_host_t sender = action->comm.src_proc->smx_host;
476     smx_host_t receiver = action->comm.dst_proc->smx_host;
477
478     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
479            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
480
481     action->comm.surf_comm = surf_workstation_model->extension.workstation.
482         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
483
484     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
485
486     action->state = SIMIX_RUNNING;
487
488 #ifdef HAVE_TRACING
489     TRACE_smx_action_communicate(action, action->comm.src_proc);
490 #endif
491
492     /* If a link is failed, detect it immediately */
493     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
494       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
495           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
496       action->state = SIMIX_LINK_FAILURE;
497       SIMIX_comm_destroy_internal_actions(action);
498     }
499
500     /* If any of the process is suspend, create the action but stop its execution,
501        it will be restarted when the sender process resume */
502     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
503         SIMIX_process_is_suspended(action->comm.dst_proc)) {
504       /* FIXME: check what should happen with the action state */
505       surf_workstation_model->suspend(action->comm.surf_comm);
506     }
507   }
508 }
509
510 void SIMIX_comm_finish(smx_action_t action)
511 {
512   unsigned int destroy_count = 0;
513   smx_req_t req;
514
515   while ((req = xbt_fifo_shift(action->request_list))) {
516
517     /* If a waitany request is waiting for this action to finish, then remove
518        it from the other actions in the waitany list. Afterwards, get the
519        position of the actual action in the waitany request's actions dynar and
520        return it as the result of the call */
521     if (req->call == REQ_COMM_WAITANY) {
522       SIMIX_waitany_req_remove_from_actions(req);
523       if (!MC_IS_ENABLED)
524         req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
525     }
526
527     /* If the action is still in a rendez-vous point then remove from it */
528     if (action->comm.rdv)
529       SIMIX_rdv_remove(action->comm.rdv, action);
530
531     XBT_DEBUG("SIMIX_comm_finish: action state = %d", action->state);
532
533     /* Check out for errors */
534     switch (action->state) {
535
536       case SIMIX_DONE:
537         XBT_DEBUG("Communication %p complete!", action);
538         SIMIX_comm_copy_data(action);
539         break;
540
541       case SIMIX_SRC_TIMEOUT:
542         TRY {
543           THROW0(timeout_error, 0, "Communication timeouted because of sender");
544         }
545         CATCH(req->issuer->running_ctx->exception) {
546           req->issuer->doexception = 1;
547         }
548         break;
549
550       case SIMIX_DST_TIMEOUT:
551         TRY {
552           THROW0(timeout_error, 0, "Communication timeouted because of receiver");
553         }
554         CATCH(req->issuer->running_ctx->exception) {
555           req->issuer->doexception = 1;
556         }
557         break;
558
559       case SIMIX_SRC_HOST_FAILURE:
560         TRY {
561           if (req->issuer == action->comm.src_proc)
562             THROW0(host_error, 0, "Host failed");
563           else
564             THROW0(network_error, 0, "Remote peer failed");
565         }
566         CATCH(req->issuer->running_ctx->exception) {
567           req->issuer->doexception = 1;
568         }
569         break;
570
571       case SIMIX_DST_HOST_FAILURE:
572         TRY {
573           if (req->issuer == action->comm.dst_proc)
574             THROW0(host_error, 0, "Host failed");
575           else
576             THROW0(network_error, 0, "Remote peer failed");
577         }
578         CATCH(req->issuer->running_ctx->exception) {
579           req->issuer->doexception = 1;
580         }
581         break;
582
583       case SIMIX_LINK_FAILURE:
584         TRY {
585           XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
586               action, action->comm.src_proc->smx_host->name, action->comm.dst_proc->smx_host->name,
587               req->issuer->name, req->issuer);
588           THROW0(network_error, 0, "Link failure");
589         }
590         CATCH(req->issuer->running_ctx->exception) {
591           req->issuer->doexception = 1;
592         }
593         break;
594
595       default:
596         THROW_IMPOSSIBLE;
597     }
598
599     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
600     if (req->issuer->doexception) {
601       if (req->call == REQ_COMM_WAITANY) {
602         req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_waitany.comms, &action);
603       }
604       else if (req->call == REQ_COMM_TESTANY) {
605         req->issuer->running_ctx->exception.value = xbt_dynar_search(req->comm_testany.comms, &action);
606       }
607     }
608
609     req->issuer->waiting_action = NULL;
610     SIMIX_request_answer(req);
611     destroy_count++;
612   }
613
614   while (destroy_count-- > 0)
615     SIMIX_comm_destroy(action);
616 }
617
618 void SIMIX_post_comm(smx_action_t action)
619 {
620   /* Update action state */
621   if (action->comm.src_timeout &&
622      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
623      action->state = SIMIX_SRC_TIMEOUT;
624   else if (action->comm.dst_timeout &&
625           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
626      action->state = SIMIX_DST_TIMEOUT;
627   else if (action->comm.src_timeout &&
628           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
629      action->state = SIMIX_SRC_HOST_FAILURE;
630   else if (action->comm.dst_timeout &&
631           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
632      action->state = SIMIX_DST_HOST_FAILURE;
633   else if (action->comm.surf_comm &&
634           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
635      action->state = SIMIX_LINK_FAILURE;
636   else
637     action->state = SIMIX_DONE;
638
639   XBT_DEBUG("SIMIX_post_comm: action state = %d", action->state);
640
641   /* After this point the surf actions associated with the simix communicate
642      action are no longer needed, thus we delete them. */
643   SIMIX_comm_destroy_internal_actions(action);
644
645   /* If there are requests associated with the action, then answer them */
646   if (xbt_fifo_size(action->request_list))
647     SIMIX_comm_finish(action);
648 }
649
650 void SIMIX_comm_cancel(smx_action_t action)
651 {
652   /* If the action is a waiting state means that it is still in a rdv */
653   /* so remove from it and delete it */
654   if (action->state == SIMIX_WAITING) {
655     SIMIX_rdv_remove(action->comm.rdv, action);
656     action->state = SIMIX_FAILED;
657   } else {
658     /* When running the MC there are no surf actions */
659     if(!MC_IS_ENABLED)
660       surf_workstation_model->action_cancel(action->comm.surf_comm);
661   }
662 }
663
664 void SIMIX_comm_suspend(smx_action_t action)
665 {
666   /*FIXME: shall we suspend also the timeout actions? */
667   surf_workstation_model->suspend(action->comm.surf_comm);
668 }
669
670 void SIMIX_comm_resume(smx_action_t action)
671 {
672   /*FIXME: check what happen with the timeouts */
673   surf_workstation_model->resume(action->comm.surf_comm);
674 }
675
676
677 /************* Action Getters **************/
678
679 /**
680  *  \brief get the amount remaining from the communication
681  *  \param action The communication
682  */
683 double SIMIX_comm_get_remains(smx_action_t action)
684 {
685   double remains;
686
687   switch (action->state) {
688
689     case SIMIX_RUNNING:
690       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
691       break;
692
693     case SIMIX_WAITING:
694     case SIMIX_READY:
695       remains = 0; /*FIXME: check what should be returned */
696       break;
697
698     default:
699       remains = 0; /*FIXME: is this correct? */
700       break;
701   }
702   return remains;
703 }
704
705 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
706 {
707   return action->state;
708 }
709
710 /**
711  *  \brief Return the user data associated to the sender of the communication
712  *  \param action The communication
713  *  \return the user data
714  */
715 void* SIMIX_comm_get_src_data(smx_action_t action)
716 {
717   return action->comm.src_data;
718 }
719
720 /**
721  *  \brief Return the user data associated to the receiver of the communication
722  *  \param action The communication
723  *  \return the user data
724  */
725 void* SIMIX_comm_get_dst_data(smx_action_t action)
726 {
727   return action->comm.dst_data;
728 }
729
730 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
731 {
732   return action->comm.src_proc;
733 }
734
735 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
736 {
737   return action->comm.dst_proc;
738 }
739
740 #ifdef HAVE_LATENCY_BOUND_TRACKING
741 /**
742  *  \brief verify if communication is latency bounded
743  *  \param comm The communication
744  */
745 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
746 {
747   if (action->comm.surf_comm){
748       XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
749       action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
750       XBT_DEBUG("Action limited is %d", action->latency_limited);
751   }
752   return action->latency_limited;
753 }
754 #endif
755
756 /******************************************************************************/
757 /*                    SIMIX_comm_copy_data callbacks                       */
758 /******************************************************************************/
759 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
760     &SIMIX_comm_copy_pointer_callback;
761
762 void
763 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
764 {
765   SIMIX_comm_copy_data_callback = callback;
766 }
767
768 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
769 {
770   xbt_assert1((buff_size == sizeof(void *)),
771               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
772   *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
773 }
774
775 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
776 {
777   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
778 }
779
780 /**
781  *  \brief Copy the communication data from the sender's buffer to the receiver's one
782  *  \param comm The communication
783  */
784 void SIMIX_comm_copy_data(smx_action_t comm)
785 {
786   size_t buff_size = comm->comm.src_buff_size;
787   /* If there is no data to be copy then return */
788   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
789     return;
790
791   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
792          comm,
793          comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
794          comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
795
796   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
797   if (comm->comm.dst_buff_size)
798     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
799
800   /* Update the receiver's buffer size to the copied amount */
801   if (comm->comm.dst_buff_size)
802     *comm->comm.dst_buff_size = buff_size;
803
804   if (buff_size == 0)
805     return;
806
807   (*SIMIX_comm_copy_data_callback) (comm, buff_size);
808
809   /* Set the copied flag so we copy data only once */
810   /* (this function might be called from both communication ends) */
811   comm->comm.copied = 1;
812 }