Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3c36832b1d92c65c1152476dcabbba26d4e9c849
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16
17 static XBT_INLINE void SIMIX_comm_start(smx_action_t action);
18 static void SIMIX_comm_finish(smx_action_t action);
19 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
20 static void SIMIX_comm_copy_data(smx_action_t comm);
21 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
23                                                       double timeout);
24 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
25 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
26 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
27 static void SIMIX_rdv_free(void *data);
28
29 void SIMIX_network_init(void)
30 {
31   rdv_points = xbt_dict_new();
32 }
33
34 void SIMIX_network_exit(void)
35 {
36   xbt_dict_free(&rdv_points);
37 }
38
39 /******************************************************************************/
40 /*                           Rendez-Vous Points                               */
41 /******************************************************************************/
42
43 smx_rdv_t SIMIX_rdv_create(const char *name)
44 {
45   /* two processes may have pushed the same rdv_create request at the same time */
46   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47
48   if (!rdv) {
49     rdv = xbt_new0(s_smx_rvpoint_t, 1);
50     rdv->name = name ? xbt_strdup(name) : NULL;
51     rdv->comm_fifo = xbt_fifo_new();
52
53     if (name)
54       xbt_dict_set(rdv_points, name, rdv, SIMIX_rdv_free);
55   }
56   return rdv;
57 }
58
59 void SIMIX_rdv_destroy(smx_rdv_t rdv)
60 {
61   if (rdv->name)
62     xbt_dict_remove(rdv_points, rdv->name); 
63 }
64
65 void SIMIX_rdv_free(void *data)
66 {
67   smx_rdv_t rdv = (smx_rdv_t) data;
68   if (rdv->name)
69     xbt_free(rdv->name);
70   xbt_fifo_free(rdv->comm_fifo);
71   xbt_free(rdv);  
72 }
73
74 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
75 {
76   return xbt_dict_get_or_null(rdv_points, name);
77 }
78
79 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
80 {
81   smx_action_t comm = NULL;
82   xbt_fifo_item_t item = NULL;
83   int count = 0;
84
85   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
86     if (comm->comm.src_proc->smx_host == host)
87       count++;
88   }
89
90   return count;
91 }
92
93 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
94 {
95   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
96 }
97
98 /**
99  *  \brief Push a communication request into a rendez-vous point
100  *  \param rdv The rendez-vous point
101  *  \param comm The communication request
102  */
103 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
104 {
105   xbt_fifo_push(rdv->comm_fifo, comm);
106   comm->comm.rdv = rdv;
107 }
108
109 /**
110  *  \brief Remove a communication request from a rendez-vous point
111  *  \param rdv The rendez-vous point
112  *  \param comm The communication request
113  */
114 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
115 {
116   xbt_fifo_remove(rdv->comm_fifo, comm);
117   comm->comm.rdv = NULL;
118 }
119
120 /**
121  *  \brief Checks if there is a communication request queued in a rendez-vous matching our needs
122  *  \param type The type of communication we are looking for (comm_send, comm_recv)
123  *  \return The communication request if found, NULL otherwise
124  */
125 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
126 {
127   smx_action_t comm = (smx_action_t)
128       xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
129
130   if (comm && comm->comm.type == type) {
131     DEBUG0("Communication request found!");
132     xbt_fifo_shift(rdv->comm_fifo);
133     comm->comm.refcount++;
134     comm->comm.rdv = NULL;
135     return comm;
136   }
137
138   DEBUG0("Communication request not found");
139   return NULL;
140 }
141
142 /******************************************************************************/
143 /*                            Comunication Actions                            */
144 /******************************************************************************/
145
146 /**
147  *  \brief Creates a new comunicate action
148  *  \param type The type of request (comm_send, comm_recv)
149  *  \return The new comunicate action
150  */
151 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
152 {
153   smx_action_t act;
154
155   /* alloc structures */
156   act = xbt_new0(s_smx_action_t, 1);
157   act->type = SIMIX_ACTION_COMMUNICATE;
158   act->state = SIMIX_WAITING;
159   act->request_list = xbt_fifo_new();
160
161   /* set communication */
162   act->comm.type = type;
163   act->comm.refcount = 1;
164
165 #ifdef HAVE_TRACING
166   act->category = NULL;
167 #endif
168
169   DEBUG1("Create communicate action %p", act);
170
171   return act;
172 }
173
174 /**
175  *  \brief Destroy a communicate action
176  *  \param action The communicate action to be destroyed
177  */
178 void SIMIX_comm_destroy(smx_action_t action)
179 {
180   DEBUG1("Destroy action %p", action);
181
182   if (action->comm.refcount <= 0)
183     xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
184
185 #ifdef HAVE_LATENCY_BOUND_TRACKING
186   //save is latency limited flag to use afterwards
187   if (action->comm.surf_comm) {
188     DEBUG2("adding key %p with latency limited value %d to the dict", action,
189            SIMIX_comm_is_latency_bounded(action));
190     xbt_dicti_set(simix_global->latency_limited_dict, (uintptr_t) action,
191                   SIMIX_comm_is_latency_bounded(action));
192   }
193 #endif
194
195   action->comm.refcount--;
196   if (action->comm.refcount > 0)
197     return;
198   VERB2("Really free communication %p; refcount is now %d", action,
199         action->comm.refcount);
200
201 #ifdef HAVE_TRACING
202   TRACE_smx_action_destroy(action);
203 #endif
204
205   if (action->name)
206     xbt_free(action->name);
207
208   xbt_fifo_free(action->request_list);
209
210   SIMIX_comm_destroy_internal_actions(action);
211
212   xbt_free(action);
213 }
214
215 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
216 {
217   if (action->comm.surf_comm){
218     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
219     action->comm.surf_comm = NULL;
220   }
221
222   if (action->comm.src_timeout){
223     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
224     action->comm.src_timeout = NULL;
225   }
226
227   if (action->comm.dst_timeout){
228     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
229     action->comm.dst_timeout = NULL;
230   }
231 }
232
233 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
234                               double task_size, double rate,
235                               void *src_buff, size_t src_buff_size, void *data)
236 {
237   smx_action_t action;
238
239   /* Look for communication request matching our needs.
240      If it is not found then create it and push it into the rendez-vous point */
241   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
242
243   if (!action) {
244     action = SIMIX_comm_new(SIMIX_COMM_SEND);
245     SIMIX_rdv_push(rdv, action);
246   } else {
247     action->state = SIMIX_READY;
248     action->comm.type = SIMIX_COMM_READY;
249   }
250
251   /* Setup the communication request */
252   action->comm.src_proc = src_proc;
253   action->comm.task_size = task_size;
254   action->comm.rate = rate;
255   action->comm.src_buff = src_buff;
256   action->comm.src_buff_size = src_buff_size;
257   action->comm.data = data;
258
259   if (MC_IS_ENABLED) {
260     action->state = SIMIX_RUNNING;
261     return action;
262   }
263
264   SIMIX_comm_start(action);
265   return action;
266 }
267
268 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
269                       void *dst_buff, size_t *dst_buff_size)
270 {
271   smx_action_t action;
272
273   /* Look for communication request matching our needs.
274    * If it is not found then create it and push it into the rendez-vous point
275    */
276   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
277
278   if (!action) {
279     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
280     SIMIX_rdv_push(rdv, action);
281   } else {
282     action->state = SIMIX_READY;
283     action->comm.type = SIMIX_COMM_READY;
284   }
285
286   /* Setup communication request */
287   action->comm.dst_proc = dst_proc;
288   action->comm.dst_buff = dst_buff;
289   action->comm.dst_buff_size = dst_buff_size;
290
291   if (MC_IS_ENABLED) {
292     action->state = SIMIX_RUNNING;
293     return action;
294   }
295
296   SIMIX_comm_start(action);
297   return action;
298 }
299
300 void SIMIX_pre_comm_wait(smx_req_t req)
301 {
302   smx_action_t action = req->comm_wait.comm;
303   double timeout = req->comm_wait.timeout;
304   surf_action_t sleep;
305
306   /* Associate this request to the action */
307   xbt_fifo_push(action->request_list, req);
308   req->issuer->waiting_action = action;
309
310   if (MC_IS_ENABLED){
311     action->state = SIMIX_DONE;
312     SIMIX_comm_finish(action);
313   }
314
315   /* If the action has already finish perform the error handling, */
316   /* otherwise set up a waiting timeout on the right side         */
317   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
318     SIMIX_comm_finish(action);
319   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
320     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
321     surf_workstation_model->action_data_set(sleep, action);
322
323     if (req->issuer == action->comm.src_proc)
324       action->comm.src_timeout = sleep;
325     else
326       action->comm.dst_timeout = sleep;
327   }
328 }
329
330 void SIMIX_pre_comm_test(smx_req_t req)
331 {
332   smx_action_t action = req->comm_test.comm;
333   req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
334
335   if (req->comm_test.result) {
336     xbt_fifo_push(action->request_list, req);
337     SIMIX_comm_finish(action);
338   }
339   else {
340     SIMIX_request_answer(req);
341   }
342 }
343
344 void SIMIX_pre_comm_waitany(smx_req_t req)
345 {
346   smx_action_t action;
347   unsigned int cursor = 0;
348   xbt_dynar_t actions = req->comm_waitany.comms;
349   xbt_dynar_foreach(actions, cursor, action){
350     /* Associate this request to the action */
351     xbt_fifo_push(action->request_list, req);
352     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
353       SIMIX_comm_finish(action);
354       break;
355     }
356   }
357 }
358
359 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
360 {
361   smx_action_t action;
362   unsigned int cursor = 0;
363   xbt_dynar_t actions = req->comm_waitany.comms;
364
365   xbt_dynar_foreach(actions, cursor, action){
366     xbt_fifo_remove(action->request_list, req);
367   }
368 }
369
370 /**
371  *  \brief Start the simulation of a communication request
372  *  \param action The communication action
373  */
374 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
375 {
376   /* If both the sender and the receiver are already there, start the communication */
377   if (action->state == SIMIX_READY) {
378     smx_host_t sender = action->comm.src_proc->smx_host;
379     smx_host_t receiver = action->comm.dst_proc->smx_host;
380
381     DEBUG3("Starting communication %p from '%s' to '%s'", action,
382            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
383
384     action->comm.surf_comm = surf_workstation_model->extension.workstation.
385         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
386
387     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
388
389     action->state = SIMIX_RUNNING;
390
391 #ifdef HAVE_TRACING
392     TRACE_smx_action_communicate(action, action->comm.src_proc);
393 #endif
394
395     /* If a link is failed, detect it immediately */
396     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
397       DEBUG2("Communication from '%s' to '%s' failed to start because of a link failure",
398           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
399       action->state = SIMIX_LINK_FAILURE;
400       SIMIX_comm_destroy_internal_actions(action);
401     }
402
403     /* If any of the process is suspend, create the action but stop its execution,
404        it will be restarted when the sender process resume */
405     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
406         SIMIX_process_is_suspended(action->comm.dst_proc)) {
407       /* FIXME: check what should happen with the action state */
408       surf_workstation_model->suspend(action->comm.surf_comm);
409     }
410   }
411 }
412
413 void SIMIX_comm_finish(smx_action_t action)
414 {
415   smx_req_t req;
416
417   while ((req = xbt_fifo_shift(action->request_list))) {
418
419     /* If a waitany request is waiting for this action to finish, then remove
420        it from the other actions in the waitany list. Afterwards, get the
421        position of the actual action in the waitany request's actions dynar and
422        return it as the result of the call */
423     if (req->call == REQ_COMM_WAITANY) {
424       SIMIX_waitany_req_remove_from_actions(req);
425       req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
426     }
427
428     /* If the action is still in a rendez-vous point then remove from it */
429     if (action->comm.rdv)
430       SIMIX_rdv_remove(action->comm.rdv, action);
431
432     DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
433
434     /* Check out for errors */
435     switch (action->state) {
436
437       case SIMIX_DONE:
438         DEBUG1("Communication %p complete!", action);
439         SIMIX_comm_copy_data(action);
440         break;
441
442       case SIMIX_SRC_TIMEOUT:
443         TRY {
444           THROW0(timeout_error, 0, "Communication timeouted because of sender");
445         }
446         CATCH(req->issuer->running_ctx->exception) {
447           req->issuer->doexception = 1;
448         }
449         break;
450
451       case SIMIX_DST_TIMEOUT:
452         TRY {
453           THROW0(timeout_error, 0, "Communication timeouted because of receiver");
454         }
455         CATCH(req->issuer->running_ctx->exception) {
456           req->issuer->doexception = 1;
457         }
458         break;
459
460       case SIMIX_SRC_HOST_FAILURE:
461         TRY {
462           if (req->issuer == action->comm.src_proc)
463             THROW0(host_error, 0, "Host failed");
464           else
465             THROW0(network_error, 0, "Remote peer failed");
466         }
467         CATCH(req->issuer->running_ctx->exception) {
468           req->issuer->doexception = 1;
469         }
470         break;
471
472       case SIMIX_DST_HOST_FAILURE:
473         TRY {
474           if (req->issuer == action->comm.dst_proc)
475             THROW0(host_error, 0, "Host failed");
476           else
477             THROW0(network_error, 0, "Remote peer failed");
478         }
479         CATCH(req->issuer->running_ctx->exception) {
480           req->issuer->doexception = 1;
481         }
482         break;
483
484       case SIMIX_LINK_FAILURE:
485         TRY {
486           DEBUG5("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
487               action, action->comm.src_proc->smx_host->name, action->comm.dst_proc->smx_host->name,
488               req->issuer->name, req->issuer);
489           THROW0(network_error, 0, "Link failure");
490         }
491         CATCH(req->issuer->running_ctx->exception) {
492           req->issuer->doexception = 1;
493         }
494         break;
495
496       default:
497         THROW_IMPOSSIBLE;
498     }
499     req->issuer->waiting_action = NULL;
500     SIMIX_request_answer(req);
501   }
502 }
503
504 void SIMIX_post_comm(smx_action_t action)
505 {
506   /* Update action state */
507   if (action->comm.src_timeout &&
508      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
509      action->state = SIMIX_SRC_TIMEOUT;
510   else if (action->comm.dst_timeout &&
511           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
512      action->state = SIMIX_DST_TIMEOUT;
513   else if (action->comm.src_timeout &&
514           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
515      action->state = SIMIX_SRC_HOST_FAILURE;
516   else if (action->comm.dst_timeout &&
517           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
518      action->state = SIMIX_DST_HOST_FAILURE;
519   else if (action->comm.surf_comm &&
520           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
521      action->state = SIMIX_LINK_FAILURE;
522   else
523     action->state = SIMIX_DONE;
524
525   DEBUG1("SIMIX_post_comm: action state = %d", action->state);
526
527   /* After this point the surf actions associated with the simix communicate
528      action are no longer needed, thus we delete them. */
529   SIMIX_comm_destroy_internal_actions(action);
530
531   /* If there are requests associated with the action, then answer them */
532   if (xbt_fifo_size(action->request_list))
533     SIMIX_comm_finish(action);
534 }
535
536 void SIMIX_comm_cancel(smx_action_t action)
537 {
538   /* If the action is a waiting state means that it is still in a rdv */
539   /* so remove from it and delete it */
540   if (action->state == SIMIX_WAITING) {
541     SIMIX_rdv_remove(action->comm.rdv, action);
542     action->state = SIMIX_FAILED;
543   } else {
544     surf_workstation_model->action_cancel(action->comm.surf_comm);
545   }
546 }
547
548 void SIMIX_comm_suspend(smx_action_t action)
549 {
550   /*FIXME: shall we suspend also the timeout actions? */
551   surf_workstation_model->suspend(action->comm.surf_comm);
552 }
553
554 void SIMIX_comm_resume(smx_action_t action)
555 {
556   /*FIXME: check what happen with the timeouts */
557   surf_workstation_model->resume(action->comm.surf_comm);
558 }
559
560
561 /************* Action Getters **************/
562
563 /**
564  *  \brief get the amount remaining from the communication
565  *  \param action The communication
566  */
567 double SIMIX_comm_get_remains(smx_action_t action)
568 {
569   double remains;
570
571   switch (action->state) {
572
573     case SIMIX_RUNNING:
574       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
575       break;
576
577     case SIMIX_WAITING:
578     case SIMIX_READY:
579       remains = 0; /*FIXME: check what should be returned */
580       break;
581
582     default:
583       remains = 0; /*FIXME: is this correct? */
584       break;
585   }
586   return remains;
587 }
588
589 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
590 {
591   return action->state;
592 }
593
594 /**
595  *  \brief Return the user data associated to the communication
596  *  \param action The communication
597  *  \return the user data
598  */
599 void* SIMIX_comm_get_data(smx_action_t action)
600 {
601   return action->comm.data;
602 }
603
604 void* SIMIX_comm_get_src_buff(smx_action_t action)
605 {
606   return action->comm.src_buff;
607 }
608
609 void* SIMIX_comm_get_dst_buff(smx_action_t action)
610 {
611   return action->comm.dst_buff;
612 }
613
614 size_t SIMIX_comm_get_src_buff_size(smx_action_t action)
615 {
616   return action->comm.src_buff_size;
617 }
618
619 size_t SIMIX_comm_get_dst_buff_size(smx_action_t action)
620 {
621   size_t buff_size;
622
623   if (action->comm.dst_buff_size)
624     buff_size = *(action->comm.dst_buff_size);
625   else
626     buff_size = 0;
627
628   return buff_size;
629 }
630
631 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
632 {
633   return action->comm.src_proc;
634 }
635
636 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
637 {
638   return action->comm.dst_proc;
639 }
640
641 #ifdef HAVE_LATENCY_BOUND_TRACKING
642 /**
643  *  \brief verify if communication is latency bounded
644  *  \param comm The communication
645  */
646 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
647 {
648   //try to find comm on the list of finished flows
649   uintptr_t key = 0;
650   uintptr_t data = 0;
651   xbt_dict_cursor_t cursor;
652   xbt_dict_foreach(simix_global->latency_limited_dict, cursor, key, data) {
653     DEBUG2("comparing key=%p with comm=%p", (void *) key, (void *) action);
654     if ((void *) action == (void *) key) {
655       DEBUG2("key %p found, return value latency limited value %d",
656              (void *) key, (int) data);
657       xbt_dict_cursor_free(&cursor);
658       return (int) data;
659     }
660   }
661
662   return surf_workstation_model->get_latency_limited(action->comm.surf_comm);
663 }
664 #endif
665
666 /******************************************************************************/
667 /*                    SIMIX_comm_copy_data callbacks                       */
668 /******************************************************************************/
669 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
670     &SIMIX_comm_copy_pointer_callback;
671
672 void
673 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
674 {
675   SIMIX_comm_copy_data_callback = callback;
676 }
677
678 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
679 {
680   xbt_assert1((buff_size == sizeof(void *)),
681               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
682   *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
683 }
684
685 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
686 {
687   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
688 }
689
690 /**
691  *  \brief Copy the communication data from the sender's buffer to the receiver's one
692  *  \param comm The communication
693  */
694 void SIMIX_comm_copy_data(smx_action_t comm)
695 {
696   size_t buff_size = comm->comm.src_buff_size;
697   /* If there is no data to be copy then return */
698   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
699     return;
700
701   DEBUG6("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
702          comm,
703          comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
704          comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
705
706   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
707   if (comm->comm.dst_buff_size)
708     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
709
710   /* Update the receiver's buffer size to the copied amount */
711   if (comm->comm.dst_buff_size)
712     *comm->comm.dst_buff_size = buff_size;
713
714   if (buff_size == 0)
715     return;
716
717   (*SIMIX_comm_copy_data_callback) (comm, buff_size);
718
719   /* Set the copied flag so we copy data only once */
720   /* (this function might be called from both communication ends) */
721   comm->comm.copied = 1;
722 }