Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rework context factory's initialization logic.
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16
17 static XBT_INLINE void SIMIX_comm_start(smx_action_t action);
18 static void SIMIX_comm_finish(smx_action_t action);
19 static void SIMIX_waitany_req_remove_from_actions(smx_req_t req);
20 static void SIMIX_comm_copy_data(smx_action_t comm);
21 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm,
23                                                       double timeout);
24 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
25 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm);
26 static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type);
27 static void SIMIX_rdv_free(void *data);
28
29 void SIMIX_network_init(void)
30 {
31   rdv_points = xbt_dict_new();
32 }
33
34 void SIMIX_network_exit(void)
35 {
36   xbt_dict_free(&rdv_points);
37 }
38
39 /******************************************************************************/
40 /*                           Rendez-Vous Points                               */
41 /******************************************************************************/
42
43 smx_rdv_t SIMIX_rdv_create(const char *name)
44 {
45   /* two processes may have pushed the same rdv_create request at the same time */
46   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
47
48   if (!rdv) {
49     rdv = xbt_new0(s_smx_rvpoint_t, 1);
50     rdv->name = name ? xbt_strdup(name) : NULL;
51     rdv->comm_fifo = xbt_fifo_new();
52
53     if (name)
54       xbt_dict_set(rdv_points, name, rdv, SIMIX_rdv_free);
55   }
56   return rdv;
57 }
58
59 void SIMIX_rdv_destroy(smx_rdv_t rdv)
60 {
61   if (rdv->name)
62     xbt_dict_remove(rdv_points, rdv->name); 
63 }
64
65 void SIMIX_rdv_free(void *data)
66 {
67   smx_rdv_t rdv = (smx_rdv_t) data;
68   if (rdv->name)
69     xbt_free(rdv->name);
70   xbt_fifo_free(rdv->comm_fifo);
71   xbt_free(rdv);  
72 }
73
74 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
75 {
76   return xbt_dict_get_or_null(rdv_points, name);
77 }
78
79 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
80 {
81   smx_action_t comm = NULL;
82   xbt_fifo_item_t item = NULL;
83   int count = 0;
84
85   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
86     if (comm->comm.src_proc->smx_host == host)
87       count++;
88   }
89
90   return count;
91 }
92
93 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
94 {
95   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
96 }
97
98 /**
99  *  \brief Push a communication request into a rendez-vous point
100  *  \param rdv The rendez-vous point
101  *  \param comm The communication request
102  */
103 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
104 {
105   xbt_fifo_push(rdv->comm_fifo, comm);
106   comm->comm.rdv = rdv;
107 }
108
109 /**
110  *  \brief Remove a communication request from a rendez-vous point
111  *  \param rdv The rendez-vous point
112  *  \param comm The communication request
113  */
114 static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
115 {
116   xbt_fifo_remove(rdv->comm_fifo, comm);
117   comm->comm.rdv = NULL;
118 }
119
120 /**
121  *  \brief Checks if there is a communication request queued in a rendez-vous matching our needs
122  *  \param type The type of communication we are looking for (comm_send, comm_recv)
123  *  \return The communication request if found, NULL otherwise
124  */
125 smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type)
126 {
127   smx_action_t comm = (smx_action_t)
128       xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
129
130   if (comm && comm->comm.type == type) {
131     DEBUG0("Communication request found!");
132     xbt_fifo_shift(rdv->comm_fifo);
133     comm->comm.refcount++;
134     comm->comm.rdv = NULL;
135     return comm;
136   }
137
138   DEBUG0("Communication request not found");
139   return NULL;
140 }
141
142 /******************************************************************************/
143 /*                            Comunication Actions                            */
144 /******************************************************************************/
145
146 /**
147  *  \brief Creates a new comunicate action
148  *  \param type The type of request (comm_send, comm_recv)
149  *  \return The new comunicate action
150  */
151 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
152 {
153   smx_action_t act;
154
155   /* alloc structures */
156   act = xbt_new0(s_smx_action_t, 1);
157   act->type = SIMIX_ACTION_COMMUNICATE;
158   act->state = SIMIX_WAITING;
159   act->request_list = xbt_fifo_new();
160
161   /* set communication */
162   act->comm.type = type;
163   act->comm.refcount = 1;
164
165 #ifdef HAVE_TRACING
166   act->category = NULL;
167 #endif
168
169   DEBUG1("Create communicate action %p", act);
170
171   return act;
172 }
173
174 /**
175  *  \brief Destroy a communicate action
176  *  \param action The communicate action to be destroyed
177  */
178 void SIMIX_comm_destroy(smx_action_t action)
179 {
180   DEBUG1("Destroy action %p", action);
181
182   if (action->comm.refcount <= 0)
183     xbt_die(bprintf("the refcount of comm %p is already 0 before decreasing it. That's a bug!",action));
184
185 #ifdef HAVE_LATENCY_BOUND_TRACKING
186   //save is latency limited flag to use afterwards
187   if (action->comm.surf_comm) {
188     DEBUG2("adding key %p with latency limited value %d to the dict", action,
189            SIMIX_comm_is_latency_bounded(action));
190     xbt_dicti_set(simix_global->latency_limited_dict, (uintptr_t) action,
191                   SIMIX_comm_is_latency_bounded(action));
192   }
193 #endif
194
195   action->comm.refcount--;
196   if (action->comm.refcount > 0)
197     return;
198   VERB2("Really free communication %p; refcount is now %d", action,
199         action->comm.refcount);
200
201 #ifdef HAVE_TRACING
202   TRACE_smx_action_destroy(action);
203 #endif
204
205   if (action->name)
206     xbt_free(action->name);
207
208   xbt_fifo_free(action->request_list);
209
210   SIMIX_comm_destroy_internal_actions(action);
211
212   xbt_free(action);
213 }
214
215 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
216 {
217   if (action->comm.surf_comm){
218     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
219     action->comm.surf_comm = NULL;
220   }
221
222   if (action->comm.src_timeout){
223     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
224     action->comm.src_timeout = NULL;
225   }
226
227   if (action->comm.dst_timeout){
228     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
229     action->comm.dst_timeout = NULL;
230   }
231 }
232
233 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
234                               double task_size, double rate,
235                               void *src_buff, size_t src_buff_size, void *data)
236 {
237   smx_action_t action;
238
239   /* Look for communication request matching our needs.
240      If it is not found then create it and push it into the rendez-vous point */
241   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE);
242
243   if (!action) {
244     action = SIMIX_comm_new(SIMIX_COMM_SEND);
245     SIMIX_rdv_push(rdv, action);
246   } else {
247     action->state = SIMIX_READY;
248     action->comm.type = SIMIX_COMM_READY;
249   }
250
251   /* Setup the communication request */
252   action->comm.src_proc = src_proc;
253   action->comm.task_size = task_size;
254   action->comm.rate = rate;
255   action->comm.src_buff = src_buff;
256   action->comm.src_buff_size = src_buff_size;
257   action->comm.data = data;
258
259   if (MC_IS_ENABLED) {
260     action->state = SIMIX_RUNNING;
261     return action;
262   }
263
264   SIMIX_comm_start(action);
265   return action;
266 }
267
268 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
269                       void *dst_buff, size_t *dst_buff_size)
270 {
271   smx_action_t action;
272
273   /* Look for communication request matching our needs.
274    * If it is not found then create it and push it into the rendez-vous point
275    */
276   action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND);
277
278   if (!action) {
279     action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
280     SIMIX_rdv_push(rdv, action);
281   } else {
282     action->state = SIMIX_READY;
283     action->comm.type = SIMIX_COMM_READY;
284   }
285
286   /* Setup communication request */
287   action->comm.dst_proc = dst_proc;
288   action->comm.dst_buff = dst_buff;
289   action->comm.dst_buff_size = dst_buff_size;
290
291   if (MC_IS_ENABLED) {
292     action->state = SIMIX_RUNNING;
293     return action;
294   }
295
296   SIMIX_comm_start(action);
297   return action;
298 }
299
300 void SIMIX_pre_comm_wait(smx_req_t req)
301 {
302   smx_action_t action = req->comm_wait.comm;
303   double timeout = req->comm_wait.timeout;
304   surf_action_t sleep;
305
306   /* Associate this request to the action */
307   xbt_fifo_push(action->request_list, req);
308   req->issuer->waiting_action = action;
309
310   if (MC_IS_ENABLED){
311     action->state = SIMIX_DONE;
312     SIMIX_comm_finish(action);
313   }
314
315   /* If the action has already finish perform the error handling, */
316   /* otherwise set up a waiting timeout on the right side         */
317   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
318     SIMIX_comm_finish(action);
319   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
320     sleep = surf_workstation_model->extension.workstation.sleep(req->issuer->smx_host->host, timeout);
321     surf_workstation_model->action_data_set(sleep, action);
322
323     if (req->issuer == action->comm.src_proc)
324       action->comm.src_timeout = sleep;
325     else
326       action->comm.dst_timeout = sleep;
327   }
328 }
329
330 void SIMIX_pre_comm_test(smx_req_t req)
331 {
332   smx_action_t action = req->comm_test.comm;
333   req->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
334
335   if (req->comm_test.result) {
336     xbt_fifo_push(action->request_list, req);
337     SIMIX_comm_finish(action);
338   }
339   else {
340     SIMIX_request_answer(req);
341   }
342 }
343
344 void SIMIX_pre_comm_testany(smx_req_t req)
345 {
346   unsigned int cursor;
347   smx_action_t action;
348   req->comm_testany.result = -1;
349   xbt_dynar_foreach(req->comm_testany.comms,cursor,action) {
350     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
351       req->comm_testany.result = cursor;
352       xbt_fifo_push(action->request_list, req);
353       SIMIX_comm_finish(action);
354       break;
355     }
356   }
357   SIMIX_request_answer(req);
358 }
359
360 void SIMIX_pre_comm_waitany(smx_req_t req)
361 {
362   smx_action_t action;
363   unsigned int cursor = 0;
364   xbt_dynar_t actions = req->comm_waitany.comms;
365   xbt_dynar_foreach(actions, cursor, action){
366     /* Associate this request to the action */
367     xbt_fifo_push(action->request_list, req);
368     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
369       SIMIX_comm_finish(action);
370       break;
371     }
372   }
373 }
374
375 void SIMIX_waitany_req_remove_from_actions(smx_req_t req)
376 {
377   smx_action_t action;
378   unsigned int cursor = 0;
379   xbt_dynar_t actions = req->comm_waitany.comms;
380
381   xbt_dynar_foreach(actions, cursor, action){
382     xbt_fifo_remove(action->request_list, req);
383   }
384 }
385
386 /**
387  *  \brief Start the simulation of a communication request
388  *  \param action The communication action
389  */
390 static XBT_INLINE void SIMIX_comm_start(smx_action_t action)
391 {
392   /* If both the sender and the receiver are already there, start the communication */
393   if (action->state == SIMIX_READY) {
394     smx_host_t sender = action->comm.src_proc->smx_host;
395     smx_host_t receiver = action->comm.dst_proc->smx_host;
396
397     DEBUG3("Starting communication %p from '%s' to '%s'", action,
398            SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
399
400     action->comm.surf_comm = surf_workstation_model->extension.workstation.
401         communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
402
403     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
404
405     action->state = SIMIX_RUNNING;
406
407 #ifdef HAVE_TRACING
408     TRACE_smx_action_communicate(action, action->comm.src_proc);
409 #endif
410
411     /* If a link is failed, detect it immediately */
412     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
413       DEBUG2("Communication from '%s' to '%s' failed to start because of a link failure",
414           SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
415       action->state = SIMIX_LINK_FAILURE;
416       SIMIX_comm_destroy_internal_actions(action);
417     }
418
419     /* If any of the process is suspend, create the action but stop its execution,
420        it will be restarted when the sender process resume */
421     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
422         SIMIX_process_is_suspended(action->comm.dst_proc)) {
423       /* FIXME: check what should happen with the action state */
424       surf_workstation_model->suspend(action->comm.surf_comm);
425     }
426   }
427 }
428
429 void SIMIX_comm_finish(smx_action_t action)
430 {
431   smx_req_t req;
432
433   while ((req = xbt_fifo_shift(action->request_list))) {
434
435     /* If a waitany request is waiting for this action to finish, then remove
436        it from the other actions in the waitany list. Afterwards, get the
437        position of the actual action in the waitany request's actions dynar and
438        return it as the result of the call */
439     if (req->call == REQ_COMM_WAITANY) {
440       SIMIX_waitany_req_remove_from_actions(req);
441       req->comm_waitany.result = xbt_dynar_search(req->comm_waitany.comms, &action);
442     }
443
444     /* If the action is still in a rendez-vous point then remove from it */
445     if (action->comm.rdv)
446       SIMIX_rdv_remove(action->comm.rdv, action);
447
448     DEBUG1("SIMIX_comm_finish: action state = %d", action->state);
449
450     /* Check out for errors */
451     switch (action->state) {
452
453       case SIMIX_DONE:
454         DEBUG1("Communication %p complete!", action);
455         SIMIX_comm_copy_data(action);
456         break;
457
458       case SIMIX_SRC_TIMEOUT:
459         TRY {
460           THROW0(timeout_error, 0, "Communication timeouted because of sender");
461         }
462         CATCH(req->issuer->running_ctx->exception) {
463           req->issuer->doexception = 1;
464         }
465         break;
466
467       case SIMIX_DST_TIMEOUT:
468         TRY {
469           THROW0(timeout_error, 0, "Communication timeouted because of receiver");
470         }
471         CATCH(req->issuer->running_ctx->exception) {
472           req->issuer->doexception = 1;
473         }
474         break;
475
476       case SIMIX_SRC_HOST_FAILURE:
477         TRY {
478           if (req->issuer == action->comm.src_proc)
479             THROW0(host_error, 0, "Host failed");
480           else
481             THROW0(network_error, 0, "Remote peer failed");
482         }
483         CATCH(req->issuer->running_ctx->exception) {
484           req->issuer->doexception = 1;
485         }
486         break;
487
488       case SIMIX_DST_HOST_FAILURE:
489         TRY {
490           if (req->issuer == action->comm.dst_proc)
491             THROW0(host_error, 0, "Host failed");
492           else
493             THROW0(network_error, 0, "Remote peer failed");
494         }
495         CATCH(req->issuer->running_ctx->exception) {
496           req->issuer->doexception = 1;
497         }
498         break;
499
500       case SIMIX_LINK_FAILURE:
501         TRY {
502           DEBUG5("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p)",
503               action, action->comm.src_proc->smx_host->name, action->comm.dst_proc->smx_host->name,
504               req->issuer->name, req->issuer);
505           THROW0(network_error, 0, "Link failure");
506         }
507         CATCH(req->issuer->running_ctx->exception) {
508           req->issuer->doexception = 1;
509         }
510         break;
511
512       default:
513         THROW_IMPOSSIBLE;
514     }
515     req->issuer->waiting_action = NULL;
516     SIMIX_request_answer(req);
517   }
518 }
519
520 void SIMIX_post_comm(smx_action_t action)
521 {
522   /* Update action state */
523   if (action->comm.src_timeout &&
524      surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
525      action->state = SIMIX_SRC_TIMEOUT;
526   else if (action->comm.dst_timeout &&
527           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
528      action->state = SIMIX_DST_TIMEOUT;
529   else if (action->comm.src_timeout &&
530           surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
531      action->state = SIMIX_SRC_HOST_FAILURE;
532   else if (action->comm.dst_timeout &&
533           surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
534      action->state = SIMIX_DST_HOST_FAILURE;
535   else if (action->comm.surf_comm &&
536           surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED)
537      action->state = SIMIX_LINK_FAILURE;
538   else
539     action->state = SIMIX_DONE;
540
541   DEBUG1("SIMIX_post_comm: action state = %d", action->state);
542
543   /* After this point the surf actions associated with the simix communicate
544      action are no longer needed, thus we delete them. */
545   SIMIX_comm_destroy_internal_actions(action);
546
547   /* If there are requests associated with the action, then answer them */
548   if (xbt_fifo_size(action->request_list))
549     SIMIX_comm_finish(action);
550 }
551
552 void SIMIX_comm_cancel(smx_action_t action)
553 {
554   /* If the action is a waiting state means that it is still in a rdv */
555   /* so remove from it and delete it */
556   if (action->state == SIMIX_WAITING) {
557     SIMIX_rdv_remove(action->comm.rdv, action);
558     action->state = SIMIX_FAILED;
559   } else {
560     surf_workstation_model->action_cancel(action->comm.surf_comm);
561   }
562 }
563
564 void SIMIX_comm_suspend(smx_action_t action)
565 {
566   /*FIXME: shall we suspend also the timeout actions? */
567   surf_workstation_model->suspend(action->comm.surf_comm);
568 }
569
570 void SIMIX_comm_resume(smx_action_t action)
571 {
572   /*FIXME: check what happen with the timeouts */
573   surf_workstation_model->resume(action->comm.surf_comm);
574 }
575
576
577 /************* Action Getters **************/
578
579 /**
580  *  \brief get the amount remaining from the communication
581  *  \param action The communication
582  */
583 double SIMIX_comm_get_remains(smx_action_t action)
584 {
585   double remains;
586
587   switch (action->state) {
588
589     case SIMIX_RUNNING:
590       remains = surf_workstation_model->get_remains(action->comm.surf_comm);
591       break;
592
593     case SIMIX_WAITING:
594     case SIMIX_READY:
595       remains = 0; /*FIXME: check what should be returned */
596       break;
597
598     default:
599       remains = 0; /*FIXME: is this correct? */
600       break;
601   }
602   return remains;
603 }
604
605 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
606 {
607   return action->state;
608 }
609
610 /**
611  *  \brief Return the user data associated to the communication
612  *  \param action The communication
613  *  \return the user data
614  */
615 void* SIMIX_comm_get_data(smx_action_t action)
616 {
617   return action->comm.data;
618 }
619
620 void* SIMIX_comm_get_src_buff(smx_action_t action)
621 {
622   return action->comm.src_buff;
623 }
624
625 void* SIMIX_comm_get_dst_buff(smx_action_t action)
626 {
627   return action->comm.dst_buff;
628 }
629
630 size_t SIMIX_comm_get_src_buff_size(smx_action_t action)
631 {
632   return action->comm.src_buff_size;
633 }
634
635 size_t SIMIX_comm_get_dst_buff_size(smx_action_t action)
636 {
637   size_t buff_size;
638
639   if (action->comm.dst_buff_size)
640     buff_size = *(action->comm.dst_buff_size);
641   else
642     buff_size = 0;
643
644   return buff_size;
645 }
646
647 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
648 {
649   return action->comm.src_proc;
650 }
651
652 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
653 {
654   return action->comm.dst_proc;
655 }
656
657 #ifdef HAVE_LATENCY_BOUND_TRACKING
658 /**
659  *  \brief verify if communication is latency bounded
660  *  \param comm The communication
661  */
662 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
663 {
664   //try to find comm on the list of finished flows
665   uintptr_t key = 0;
666   uintptr_t data = 0;
667   xbt_dict_cursor_t cursor;
668   xbt_dict_foreach(simix_global->latency_limited_dict, cursor, key, data) {
669     DEBUG2("comparing key=%p with comm=%p", (void *) key, (void *) action);
670     if ((void *) action == (void *) key) {
671       DEBUG2("key %p found, return value latency limited value %d",
672              (void *) key, (int) data);
673       xbt_dict_cursor_free(&cursor);
674       return (int) data;
675     }
676   }
677
678   return surf_workstation_model->get_latency_limited(action->comm.surf_comm);
679 }
680 #endif
681
682 /******************************************************************************/
683 /*                    SIMIX_comm_copy_data callbacks                       */
684 /******************************************************************************/
685 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, size_t) =
686     &SIMIX_comm_copy_pointer_callback;
687
688 void
689 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, size_t))
690 {
691   SIMIX_comm_copy_data_callback = callback;
692 }
693
694 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, size_t buff_size)
695 {
696   xbt_assert1((buff_size == sizeof(void *)),
697               "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
698   *(void **) (comm->comm.dst_buff) = comm->comm.src_buff;
699 }
700
701 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, size_t buff_size)
702 {
703   memcpy(comm->comm.dst_buff, comm->comm.src_buff, buff_size);
704 }
705
706 /**
707  *  \brief Copy the communication data from the sender's buffer to the receiver's one
708  *  \param comm The communication
709  */
710 void SIMIX_comm_copy_data(smx_action_t comm)
711 {
712   size_t buff_size = comm->comm.src_buff_size;
713   /* If there is no data to be copy then return */
714   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied == 1)
715     return;
716
717   DEBUG6("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
718          comm,
719          comm->comm.src_proc->smx_host->name, comm->comm.src_buff,
720          comm->comm.dst_proc->smx_host->name, comm->comm.dst_buff, buff_size);
721
722   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
723   if (comm->comm.dst_buff_size)
724     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
725
726   /* Update the receiver's buffer size to the copied amount */
727   if (comm->comm.dst_buff_size)
728     *comm->comm.dst_buff_size = buff_size;
729
730   if (buff_size == 0)
731     return;
732
733   (*SIMIX_comm_copy_data_callback) (comm, buff_size);
734
735   /* Set the copied flag so we copy data only once */
736   /* (this function might be called from both communication ends) */
737   comm->comm.copied = 1;
738 }