Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Biggest commit ever (SIMIX2): the user processes can now run in parallel
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16
17 static XBT_INLINE void SIMIX_comm_start(smx_action_t action);
18 static void SIMIX_comm_finish(smx_action_t action);
19 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
20 static void SIMIX_comm_copy_data(smx_action_t comm);
21 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
23                                                       double timeout);
24 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
25 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
26 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
27 static void SIMIX_rdv_free(void *data);
28
29 void SIMIX_network_init(void)
30 {
31   rdv_points = xbt_dict_new();
32 }
33
34 void SIMIX_network_exit(void)
35 {
36   xbt_dict_free(&rdv_points);
37 }
38
39 /******************************************************************************/
40 /*                           Rendez-Vous Points                               */
41 /******************************************************************************/
42
43 smx_rdv_t SIMIX_rdv_create(const char *name)
44 {
45   /* two processes may have pushed the same rdv_create request at the same time */
46   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47
48   if (!rdv) {
49     rdv = xbt_new0(s_smx_rvpoint_t, 1);
50     rdv->name = name ? xbt_strdup(name) : NULL;
51     rdv->comm_fifo = xbt_fifo_new();
52
53     if (name)
54       xbt_dict_set(rdv_points, name, rdv, SIMIX_rdv_free);
55   }
56   return rdv;
57 }
58
59 void SIMIX_rdv_destroy(smx_rdv_t rdv)
60 {
61   if (rdv->name)
62     xbt_dict_remove(rdv_points, rdv->name); 
63 }
64
65 void SIMIX_rdv_free(void *data)
66 {
67   smx_rdv_t rdv = (smx_rdv_t) data;
68   if (rdv->name)
69     xbt_free(rdv->name);
70   xbt_fifo_free(rdv->comm_fifo);
71   xbt_free(rdv);  
72 }
73
74 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
75 {
76   return xbt_dict_get_or_null(rdv_points, name);
77 }
78
79 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
80 {
81   smx_action_t comm = NULL;
82   xbt_fifo_item_t item = NULL;
83   int count = 0;
84
85   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
86     if (comm->comm.src_proc->smx_host == host)
87       count++;
88   }
89
90   return count;
91 }
92
93 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
94 {
95   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
96 }
97
98 /**
99  *  \brief Push a communication request into a rendez-vous point
100  *  \param rdv The rendez-vous point
101  *  \param comm The communication request
102  */
103 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
104 {
105   xbt_fifo_push(rdv->comm_fifo, comm);
106   comm->comm.rdv = rdv;
107 }
108
109 /**
110  *  \brief Remove a communication request from a rendez-vous point
111  *  \param rdv The rendez-vous point
112  *  \param comm The communication request
113  */
114 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
115 {
116   xbt_fifo_remove(rdv->comm_fifo, comm);
117   comm->comm.rdv = NULL;
118 }
119
120 /**
121  *  \brief Checks if there is a communication request queued in a rendez-vous matching our needs
122  *  \param type The type of communication we are looking for (comm_send, comm_recv)
123  *  \return The communication request if found, NULL otherwise
124  */
125 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
126 {
127   smx_action_t comm = (smx_action_t)
128       xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
129
130   if (comm && comm->comm.type == type) {
131     DEBUG0("Communication request found!");
132     xbt_fifo_shift(rdv->comm_fifo);
133     comm->comm.refcount++;
134     comm->comm.rdv = NULL;
135     return comm;
136   }
137
138   DEBUG0("Communication request not found");
139   return NULL;
140 }
141
142 /******************************************************************************/
143 /*                            Comunication Actions                            */
144 /******************************************************************************/
145
146 /**
147  *  \brief Creates a new comunicate action
148  *  \param type The type of request (comm_send, comm_recv)
149  *  \return The new comunicate action
150  */
151 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
152 {
153   smx_action_t act;
154
155   /* alloc structures */
156   act = xbt_new0(s_smx_action_t, 1);
157   act->type = SIMIX_ACTION_COMMUNICATE;
158   act->state = SIMIX_WAITING;
159   act->request_list = xbt_fifo_new();
160
161   /* set communication */
162   act->comm.type = type;
163   act->comm.refcount = 1;
164
165 #ifdef HAVE_TRACING
166   act->category = NULL;
167 #endif
168
169   DEBUG1("Create communicate action %p", act);
170   
171   return act;
172 }
173
174 /**
175  *  \brief Destroy a communicate action
176  *  \param action The communicate action to be destroyed
177  */
178 void SIMIX_comm_destroy(smx_action_t action)
179 {
180   DEBUG1("Destroy action %p", action);
181
182   if(!(action->comm.refcount > 0))
183           xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
184
185 #ifdef HAVE_LATENCY_BOUND_TRACKING
186   //save is latency limited flag to use afterwards
187   if (action->comm.surf_comm) {
188     DEBUG2("adding key %p with latency limited value %d to the dict", action,
189            SIMIX_comm_is_latency_bounded(action));
190     xbt_dicti_set(simix_global->latency_limited_dict, (uintptr_t) action,
191                   SIMIX_comm_is_latency_bounded(action));
192   }
193 #endif
194
195   action->comm.refcount--;
196   if (action->comm.refcount > 0)
197     return;
198   VERB2("Really free communication %p; refcount is now %d", action,
199         action->comm.refcount);
200
201 #ifdef HAVE_TRACING
202   TRACE_smx_action_destroy(action);
203 #endif
204
205   if (action->name)
206     xbt_free(action->name);
207
208   xbt_fifo_free(action->request_list);
209
210   SIMIX_comm_destroy_internal_actions(action);
211
212   xbt_free(action);
213 }
214
215 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
216 {
217   if (action->comm.surf_comm){
218     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
219     action->comm.surf_comm = NULL;
220   }
221
222   if (action->comm.src_timeout){
223     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
224     action->comm.src_timeout = NULL;
225   }
226
227   if (action->comm.dst_timeout){
228     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
229     action->comm.dst_timeout = NULL;
230   }
231 }
232
233 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
234                               double task_size, double rate,
235                               void *src_buff, size_t src_buff_size, void *data)
236 {
237   smx_action_t action;
238
239   /* Look for communication request matching our needs.
240      If it is not found then create it and push it into the rendez-vous point */
241   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
242
243   if (!action) {
244     action = SIMIX_comm_new(SIMIX_COMM_SEND);
245     SIMIX_rdv_push(rdv, action);
246   }else{
247     action->state = SIMIX_READY;
248     action->comm.type = SIMIX_COMM_READY;
249   }
250
251   /* Setup the communication request */
252   action->comm.src_proc = src_proc;
253   action->comm.task_size = task_size;
254   action->comm.rate = rate;
255   action->comm.src_buff = src_buff;
256   action->comm.src_buff_size = src_buff_size;
257   action->comm.data = data;
258 #ifdef HAVE_MC
259   if(_surf_do_model_check){
260     action->state = SIMIX_RUNNING;
261     return action;
262   }
263 #endif
264   SIMIX_comm_start(action);
265   return action;
266 }
267
268 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
269                       void *dst_buff, size_t *dst_buff_size)
270 {
271   smx_action_t action;
272
273   /* Look for communication request matching our needs.
274    * If it is not found then create it and push it into the rendez-vous point
275    */
276   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
277
278   if (!action) {
279     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
280     SIMIX_rdv_push(rdv, action);
281   } else {
282     action->state = SIMIX_READY;
283     action->comm.type = SIMIX_COMM_READY;
284   }
285
286   /* Setup communication request */
287   action->comm.dst_proc = dst_proc;
288   action->comm.dst_buff = dst_buff;
289   action->comm.dst_buff_size = dst_buff_size;
290
291 #ifdef HAVE_MC
292   if(_surf_do_model_check){
293     action->state = SIMIX_RUNNING;
294     return action;
295   }
296 #endif
297
298   SIMIX_comm_start(action);
299   return action;
300 }
301
302 void SIMIX_pre_comm_wait(smx_req_t req)
303 {
304   smx_action_t action = req->comm_wait.comm;
305   double timeout = req->comm_wait.timeout;
306   surf_action_t sleep;
307
308   /* Associate this request to the action */
309   xbt_fifo_push(action->request_list, req);
310   req->issuer->waiting_action = action;
311
312 #ifdef HAVE_MC
313   if(_surf_do_model_check){
314     action->state = SIMIX_DONE;
315     SIMIX_comm_finish(action);
316   }
317 #endif
318
319   /* If the action has already finish perform the error handling, */
320   /* otherwise set up a waiting timeout on the right side         */
321   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
322     SIMIX_comm_finish(action);
323   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
324     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
325     surf_workstation_model->action_data_set(sleep, action);
326
327     if (req->issuer == action->comm.src_proc)
328       action->comm.src_timeout = sleep;
329     else
330       action->comm.dst_timeout = sleep;
331   }
332 }
333
334 void SIMIX_pre_comm_test(smx_req_t req)
335 {
336   smx_action_t action = req->comm_test.comm;
337   req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
338
339   if (req->comm_test.result) {
340     xbt_fifo_push(action->request_list, req);
341     SIMIX_comm_finish(action);
342   }
343   else {
344     SIMIX_request_answer(req);
345   }
346 }
347
348 void SIMIX_pre_comm_waitany(smx_req_t req)
349 {
350   smx_action_t action;
351   unsigned int cursor = 0;
352   xbt_dynar_t actions = req->comm_waitany.comms;
353   xbt_dynar_foreach(actions, cursor, action){
354     /* Associate this request to the action */
355     xbt_fifo_push(action->request_list, req);
356     if(action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
357       SIMIX_comm_finish(action);
358       break;
359     }
360   }
361 }
362
363 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
364 {
365   smx_action_t action;
366   unsigned int cursor = 0;
367   xbt_dynar_t actions = req->comm_waitany.comms;
368
369   xbt_dynar_foreach(actions, cursor, action){
370     xbt_fifo_remove(action->request_list, req);
371   }
372 }
373
374 /**
375  *  \brief Start the simulation of a communication request
376  *  \param action The communication action
377  */
378 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
379 {
380   /* If both the sender and the receiver are already there, start the communication */
381   if (action->state == SIMIX_READY) {
382     smx_host_t sender = action->comm.src_proc->smx_host;
383     smx_host_t receiver = action->comm.dst_proc->smx_host;
384
385     DEBUG3("Starting communication %p from '%s' to '%s'", action,
386           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
387
388     action->comm.surf_comm =
389       surf_workstation_model->extension.workstation.
390       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
391
392     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
393
394     action->state = SIMIX_RUNNING;
395
396 #ifdef HAVE_TRACING
397     TRACE_smx_action_communicate(comm, comm->comm.src_proc);
398     TRACE_surf_action(comm->surf_action, comm->category);
399 #endif
400
401     /* If a link is failed, detect it immediately */
402     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
403       DEBUG2("Communication from '%s' to '%s' failed to start because of a link failure",
404           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
405       action->state = SIMIX_LINK_FAILURE;
406       SIMIX_comm_destroy_internal_actions(action);
407     }
408
409     /* If any of the process is suspend, create the action but stop its execution,
410        it will be restarted when the sender process resume */
411     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
412         SIMIX_process_is_suspended(action->comm.dst_proc)) {
413       /* FIXME: check what should happen with the action state */
414       surf_workstation_model->suspend(action->comm.surf_comm);
415     }
416   }
417 }
418
419 void SIMIX_comm_finish(smx_action_t action)
420 {
421   smx_req_t req;
422
423   while((req = xbt_fifo_shift(action->request_list))){
424
425     /* If a waitany request is waiting for this action to finish, then remove
426        it from the other actions in the waitany list. Afterwards, get the
427        position of the actual action in the waitany request's actions dynar and
428        return it as the result of the call */
429     if(req->call == REQ_COMM_WAITANY){
430       SIMIX_waitany_req_remove_from_actions(req);
431       req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
432     }
433
434     /* If the action is still in a rendez-vous point then remove from it */
435     if(action->comm.rdv)
436       SIMIX_rdv_remove(action->comm.rdv, action);
437
438     DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
439
440     /* Check out for errors */
441     switch (action->state) {
442
443       case SIMIX_DONE:
444         DEBUG1("Communication %p complete!", action);
445         SIMIX_comm_copy_data(action);
446         break;
447
448       case SIMIX_SRC_TIMEOUT:
449         TRY {
450           THROW0(timeout_error, 0, "Communication timeouted because of sender");
451         }
452         CATCH(req->issuer->running_ctx->exception) {
453           req->issuer->doexception = 1;
454         }
455         break;
456
457       case SIMIX_DST_TIMEOUT:
458         TRY {
459           THROW0(timeout_error, 0, "Communication timeouted because of receiver");
460         }
461         CATCH(req->issuer->running_ctx->exception) {
462           req->issuer->doexception = 1;
463         }
464         break;
465
466       case SIMIX_SRC_HOST_FAILURE:
467         TRY {
468           if(req->issuer == action->comm.src_proc)
469             THROW0(host_error, 0, "Host failed");
470           else
471             THROW0(network_error, 0, "Remote peer failed");
472         }
473         CATCH(req->issuer->running_ctx->exception) {
474           req->issuer->doexception = 1;
475         }
476         break;
477
478       case SIMIX_DST_HOST_FAILURE:
479         TRY {
480           if (req->issuer == action->comm.dst_proc)
481             THROW0(host_error, 0, "Host failed");
482           else
483             THROW0(network_error, 0, "Remote peer failed");
484         }
485         CATCH(req->issuer->running_ctx->exception) {
486           req->issuer->doexception = 1;
487         }
488         break;
489
490       case SIMIX_LINK_FAILURE:
491         TRY {
492           DEBUG5("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
493               action, action->comm.src_proc->smx_host->name, action->comm.dst_proc->smx_host->name,
494               req->issuer->name, req->issuer);
495           THROW0(network_error, 0, "Link failure");
496         }
497         CATCH(req->issuer->running_ctx->exception) {
498           req->issuer->doexception = 1;
499         }
500         break;
501
502       default:
503         THROW_IMPOSSIBLE;
504     }
505     req->issuer->waiting_action = NULL;
506     SIMIX_request_answer(req);
507   }
508 }
509
510 void SIMIX_post_comm(smx_action_t action)
511 {
512   /* Update action state */
513   if(action->comm.src_timeout &&
514      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
515      action->state = SIMIX_SRC_TIMEOUT;
516   else if(action->comm.dst_timeout &&
517           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
518      action->state = SIMIX_DST_TIMEOUT;
519   else if(action->comm.src_timeout &&
520           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
521      action->state = SIMIX_SRC_HOST_FAILURE;
522   else if(action->comm.dst_timeout &&
523           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
524      action->state = SIMIX_DST_HOST_FAILURE;
525   else if(action->comm.surf_comm &&
526           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
527      action->state = SIMIX_LINK_FAILURE;
528   else
529     action->state = SIMIX_DONE;
530
531   DEBUG1("SIMIX_post_comm: action state = %d", action->state);
532
533   /* After this point the surf actions associated with the simix communicate
534      action are no longer needed, thus we delete them. */
535   SIMIX_comm_destroy_internal_actions(action);
536
537   /* If there are requests associated with the action, then answer them */
538   if(xbt_fifo_size(action->request_list))
539     SIMIX_comm_finish(action);
540 }
541
542 void SIMIX_comm_cancel(smx_action_t action)
543 {
544   /* If the action is a waiting state means that it is still in a rdv */
545   /* so remove from it and delete it */
546   if (action->state == SIMIX_WAITING) {
547     SIMIX_rdv_remove(action->comm.rdv, action);
548     action->state = SIMIX_FAILED;
549   } else {
550     surf_workstation_model->action_cancel(action->comm.surf_comm);
551   }
552 }
553
554 void SIMIX_comm_suspend(smx_action_t action)
555 {
556   /*FIXME: shall we suspend also the timeout actions? */
557   surf_workstation_model->suspend(action->comm.surf_comm);
558 }
559
560 void SIMIX_comm_resume(smx_action_t action)
561 {
562   /*FIXME: check what happen with the timeouts */
563   surf_workstation_model->resume(action->comm.surf_comm);
564 }
565
566
567 /************* Action Getters **************/
568
569 /**
570  *  \brief get the amount remaining from the communication
571  *  \param action The communication
572  */
573 double SIMIX_comm_get_remains(smx_action_t action)
574 {
575   double remains;
576
577   switch (action->state) {
578
579     case SIMIX_RUNNING:
580       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
581       break;
582
583     case SIMIX_WAITING:
584     case SIMIX_READY:
585       remains = 0; /*FIXME: check what should be returned */
586       break;
587
588     default:
589       remains = 0; /*FIXME: is this correct? */
590       break;
591   }
592   return remains;
593 }
594
595 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
596 {
597   return action->state;
598 }
599
600 /**
601  *  \brief Return the user data associated to the communication
602  *  \param action The communication
603  *  \return the user data
604  */
605 void* SIMIX_comm_get_data(smx_action_t action)
606 {
607   return action->comm.data;
608 }
609
610 void* SIMIX_comm_get_src_buff(smx_action_t action)
611 {
612   return action->comm.src_buff;
613 }
614
615 void* SIMIX_comm_get_dst_buff(smx_action_t action)
616 {
617   return action->comm.dst_buff;
618 }
619
620 size_t SIMIX_comm_get_src_buff_size(smx_action_t action)
621 {
622   return action->comm.src_buff_size;
623 }
624
625 size_t SIMIX_comm_get_dst_buff_size(smx_action_t action)
626 {
627   size_t buff_size;
628
629   if (action->comm.dst_buff_size)
630     buff_size = *(action->comm.dst_buff_size);
631   else
632     buff_size = 0;
633
634   return buff_size;
635 }
636
637 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
638 {
639   return action->comm.src_proc;
640 }
641
642 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
643 {
644   return action->comm.dst_proc;
645 }
646
647 #ifdef HAVE_LATENCY_BOUND_TRACKING
648 /**
649  *  \brief verify if communication is latency bounded
650  *  \param comm The communication
651  */
652 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
653 {
654   //try to find comm on the list of finished flows
655   uintptr_t key = 0;
656   uintptr_t data = 0;
657   xbt_dict_cursor_t cursor;
658   xbt_dict_foreach(simix_global->latency_limited_dict, cursor, key, data) {
659     DEBUG2("comparing key=%p with comm=%p", (void *) key, (void *) action);
660     if ((void *) action == (void *) key) {
661       DEBUG2("key %p found, return value latency limited value %d",
662              (void *) key, (int) data);
663       xbt_dict_cursor_free(&cursor);
664       return (int) data;
665     }
666   }
667
668   return surf_workstation_model->get_latency_limited(action->comm.surf_comm);
669 }
670 #endif
671
672 /******************************************************************************/
673 /*                    SIMIX_comm_copy_data callbacks                       */
674 /******************************************************************************/
675 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
676     &SIMIX_comm_copy_pointer_callback;
677
678 void
679 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
680 {
681   SIMIX_comm_copy_data_callback = callback;
682 }
683
684 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
685 {
686   xbt_assert1((buff_size == sizeof(void *)),
687               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
688   *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
689 }
690
691 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
692 {
693   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
694 }
695
696 /**
697  *  \brief Copy the communication data from the sender's buffer to the receiver's one
698  *  \param comm The communication
699  */
700 void SIMIX_comm_copy_data(smx_action_t comm)
701 {
702   size_t buff_size = comm->comm.src_buff_size;
703   /* If there is no data to be copy then return */
704   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
705     return;
706
707   DEBUG6("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
708          comm,
709          comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
710          comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
711
712   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
713   if (comm->comm.dst_buff_size)
714     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
715
716   /* Update the receiver's buffer size to the copied amount */
717   if (comm->comm.dst_buff_size)
718     *comm->comm.dst_buff_size = buff_size;
719
720   if (buff_size == 0)
721     return;
722   (*SIMIX_comm_copy_data_callback) (comm, buff_size);
723
724   /* Set the copied flag so we copy data only once */
725   /* (this function might be called from both communication ends) */
726   comm->comm.copied = 1;
727 }