Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e6ce384b48d9397968a51ebfd1f7a255be8b5c0d
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <boost/range/algorithm.hpp>
7
8 #include <xbt/ex.hpp>
9
10 #include <simgrid/s4u/host.hpp>
11
12 #include "src/surf/surf_interface.hpp"
13 #include "src/simix/smx_private.h"
14 #include "xbt/log.h"
15 #include "mc/mc.h"
16 #include "src/mc/mc_replay.h"
17 #include "xbt/dict.h"
18 #include "simgrid/s4u/mailbox.hpp"
19
20 #include "src/simix/SynchroComm.hpp"
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
23
24 static void SIMIX_mbox_free(void *data);
25 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
26
27 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
28 static void SIMIX_comm_copy_data(smx_synchro_t comm);
29 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
30 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
31     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
32 static void SIMIX_comm_start(smx_synchro_t synchro);
33
34 void SIMIX_mailbox_exit(void)
35 {
36   xbt_dict_free(&mailboxes);
37 }
38
39 /******************************************************************************/
40 /*                           Rendez-Vous Points                               */
41 /******************************************************************************/
42
43 smx_mailbox_t SIMIX_mbox_create(const char *name)
44 {
45   xbt_assert(name, "Mailboxes must have a name");
46   /* two processes may have pushed the same mbox_create simcall at the same time */
47   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
48
49   if (!mbox) {
50     mbox = new s_smx_mailbox_t();
51     mbox->name = xbt_strdup(name);
52     mbox->comm_queue = new std::deque<smx_synchro_t>();
53     mbox->done_comm_queue = nullptr; // Allocated on need only
54     mbox->permanent_receiver=nullptr;
55
56     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
57     xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
58   }
59   return mbox;
60 }
61
62 void SIMIX_mbox_free(void *data)
63 {
64   XBT_DEBUG("mbox free %p", data);
65   smx_mailbox_t mbox = (smx_mailbox_t) data;
66   xbt_free(mbox->name);
67   delete mbox->comm_queue;
68   delete mbox->done_comm_queue;
69   delete mbox;
70 }
71
72 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
73 {
74   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
75 }
76
77 /**
78  *  \brief set the receiver of the rendez vous point to allow eager sends
79  *  \param mbox The rendez-vous point
80  *  \param process The receiving process
81  */
82 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
83 {
84   mbox->permanent_receiver=process;
85   if (mbox->done_comm_queue == nullptr)
86     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
87 }
88
89 /**
90  *  \brief Pushes a communication synchro into a rendez-vous point
91  *  \param mbox The mailbox
92  *  \param synchro The communication synchro
93  */
94 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
95 {
96   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
97
98   mbox->comm_queue->push_back(comm);
99   comm->mbox = mbox;
100 }
101
102 /**
103  *  \brief Removes a communication synchro from a rendez-vous point
104  *  \param mbox The rendez-vous point
105  *  \param synchro The communication synchro
106  */
107 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
108 {
109   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
110
111   comm->mbox = nullptr;
112   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
113     if (*it == comm) {
114       mbox->comm_queue->erase(it);
115       return;
116     }
117   xbt_die("Cannot remove this comm that is not part of the mailbox");
118 }
119
120 /**
121  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
122  *  \param type The type of communication we are looking for (comm_send, comm_recv)
123  *  \return The communication synchro if found, nullptr otherwise
124  */
125 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
126     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
127 {
128   void* other_user_data = nullptr;
129
130   for(auto it = deque->begin(); it != deque->end(); it++){
131     smx_synchro_t synchro = *it;
132     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
133
134     if (comm->type == SIMIX_COMM_SEND) {
135       other_user_data = comm->src_data;
136     } else if (comm->type == SIMIX_COMM_RECEIVE) {
137       other_user_data = comm->dst_data;
138     }
139     if (comm->type == type &&
140         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
141         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
142       XBT_DEBUG("Found a matching communication synchro %p", comm);
143       if (remove_matching)
144         deque->erase(it);
145       comm->ref();
146 #if HAVE_MC
147       comm->mbox_cpy = comm->mbox;
148 #endif
149       comm->mbox = nullptr;
150       return comm;
151     }
152     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
153               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
154               comm, (int)comm->type, (int)type);
155   }
156   XBT_DEBUG("No matching communication synchro found");
157   return nullptr;
158 }
159
160 /******************************************************************************/
161 /*                          Communication synchros                            */
162 /******************************************************************************/
163 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
164                                   double task_size, double rate,
165                                   void *src_buff, size_t src_buff_size,
166                                   int (*match_fun)(void *, void *,smx_synchro_t),
167                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
168           void *data, double timeout){
169   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
170                            src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
171                data, 0);
172   SIMCALL_SET_MC_VALUE(simcall, 0);
173   simcall_HANDLER_comm_wait(simcall, comm, timeout);
174 }
175 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
176                                   double task_size, double rate,
177                                   void *src_buff, size_t src_buff_size,
178                                   int (*match_fun)(void *, void *,smx_synchro_t),
179                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
180                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
181                           void *data, int detached)
182 {
183   XBT_DEBUG("send from %p", mbox);
184
185   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
186   simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
187
188   /* Look for communication synchro matching our needs. We also provide a description of
189    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
190    *
191    * If it is not found then push our communication into the rendez-vous point */
192   smx_synchro_t other_synchro =
193       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
194   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
195
196
197   if (!other_synchro) {
198     other_synchro = this_synchro;
199     other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
200
201     if (mbox->permanent_receiver!=nullptr){
202       //this mailbox is for small messages, which have to be sent right now
203       other_synchro->state = SIMIX_READY;
204       other_comm->dst_proc=mbox->permanent_receiver.get();
205       other_comm->ref();
206       mbox->done_comm_queue->push_back(other_synchro);
207       other_comm->mbox=mbox;
208       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
209
210     }else{
211       SIMIX_mbox_push(mbox, this_synchro);
212     }
213   } else {
214     XBT_DEBUG("Receive already pushed");
215     this_synchro->unref();
216
217     other_comm->state = SIMIX_READY;
218     other_comm->type = SIMIX_COMM_READY;
219
220   }
221   xbt_fifo_push(src_proc->comms, other_synchro);
222
223
224   if (detached) {
225     other_comm->detached = true;
226     other_comm->clean_fun = clean_fun;
227   } else {
228     other_comm->clean_fun = nullptr;
229   }
230
231   /* Setup the communication synchro */
232   other_comm->src_proc = src_proc;
233   other_comm->task_size = task_size;
234   other_comm->rate = rate;
235   other_comm->src_buff = src_buff;
236   other_comm->src_buff_size = src_buff_size;
237   other_comm->src_data = data;
238
239   other_comm->match_fun = match_fun;
240   other_comm->copy_data_fun = copy_data_fun;
241
242
243   if (MC_is_active() || MC_record_replay_is_active()) {
244     other_comm->state = SIMIX_RUNNING;
245     return (detached ? nullptr : other_comm);
246   }
247
248   SIMIX_comm_start(other_comm);
249   return (detached ? nullptr : other_comm);
250 }
251
252 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
253                          void *dst_buff, size_t *dst_buff_size,
254                          int (*match_fun)(void *, void *, smx_synchro_t),
255                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256                          void *data, double timeout, double rate)
257 {
258   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
259   SIMCALL_SET_MC_VALUE(simcall, 0);
260   simcall_HANDLER_comm_wait(simcall, comm, timeout);
261 }
262
263 XBT_PRIVATE smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
264     void *dst_buff, size_t *dst_buff_size,
265     int (*match_fun)(void *, void *, smx_synchro_t),
266     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
267     void *data, double rate)
268 {
269   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
270 }
271
272 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
273     int (*match_fun)(void *, void *, smx_synchro_t),
274     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
275     void *data, double rate)
276 {
277   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
278   simgrid::simix::Comm* this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
279
280   smx_synchro_t other_synchro;
281   //communication already done, get it inside the fifo of completed comms
282   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
283
284     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
285     //find a match in the already received fifo
286     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
287     //if not found, assume the receiver came first, register it to the mailbox in the classical way
288     if (!other_synchro)  {
289       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
290       other_synchro = this_synchro;
291       SIMIX_mbox_push(mbox, this_synchro);
292     } else {
293       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
294
295       if(other_comm->surf_comm && other_comm->remains()==0.0) {
296         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
297         other_comm->state = SIMIX_DONE;
298         other_comm->type = SIMIX_COMM_DONE;
299         other_comm->mbox = nullptr;
300       }
301       other_comm->unref();
302       static_cast<simgrid::simix::Comm*>(this_synchro)->unref();
303     }
304   } else {
305     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
306
307     /* Look for communication synchro matching our needs. We also provide a description of
308      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
309      *
310      * If it is not found then push our communication into the rendez-vous point */
311     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
312
313     if (!other_synchro) {
314       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
315       other_synchro = this_synchro;
316       SIMIX_mbox_push(mbox, this_synchro);
317     } else {
318       this_synchro->unref();
319       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
320
321       other_comm->state = SIMIX_READY;
322       other_comm->type = SIMIX_COMM_READY;
323     }
324     xbt_fifo_push(dst_proc->comms, other_synchro);
325   }
326
327   /* Setup communication synchro */
328   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
329   other_comm->dst_proc = dst_proc;
330   other_comm->dst_buff = dst_buff;
331   other_comm->dst_buff_size = dst_buff_size;
332   other_comm->dst_data = data;
333
334   if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
335     other_comm->rate = rate;
336
337   other_comm->match_fun = match_fun;
338   other_comm->copy_data_fun = copy_data_fun;
339
340   if (MC_is_active() || MC_record_replay_is_active()) {
341     other_synchro->state = SIMIX_RUNNING;
342     return other_synchro;
343   }
344
345   SIMIX_comm_start(other_synchro);
346   return other_synchro;
347 }
348
349 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
350                                    int type, int src, int tag,
351                                    int (*match_fun)(void *, void *, smx_synchro_t),
352                                    void *data){
353   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
354 }
355
356 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
357                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
358 {
359   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
360   simgrid::simix::Comm* this_comm;
361   int smx_type;
362   if(type == 1){
363     this_comm = new simgrid::simix::Comm(SIMIX_COMM_SEND);
364     smx_type = SIMIX_COMM_RECEIVE;
365   } else{
366     this_comm = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
367     smx_type = SIMIX_COMM_SEND;
368   } 
369   smx_synchro_t other_synchro=nullptr;
370   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
371     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
372     other_synchro =
373         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
374   }
375   if (!other_synchro){
376     XBT_DEBUG("check if we have more luck in the normal mailbox");
377     other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
378   }
379
380   if(other_synchro)
381     other_synchro->unref();
382
383   this_comm->unref();
384   return other_synchro;
385 }
386
387 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
388 {
389   /* Associate this simcall to the wait synchro */
390   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
391
392   synchro->simcalls.push_back(simcall);
393   simcall->issuer->waiting_synchro = synchro;
394
395   if (MC_is_active() || MC_record_replay_is_active()) {
396     int idx = SIMCALL_GET_MC_VALUE(simcall);
397     if (idx == 0) {
398       synchro->state = SIMIX_DONE;
399     } else {
400       /* If we reached this point, the wait simcall must have a timeout */
401       /* Otherwise it shouldn't be enabled and executed by the MC */
402       if (timeout == -1)
403         THROW_IMPOSSIBLE;
404
405       simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
406       if (comm->src_proc == simcall->issuer)
407         comm->state = SIMIX_SRC_TIMEOUT;
408       else
409         comm->state = SIMIX_DST_TIMEOUT;
410     }
411
412     SIMIX_comm_finish(synchro);
413     return;
414   }
415
416   /* If the synchro has already finish perform the error handling, */
417   /* otherwise set up a waiting timeout on the right side          */
418   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
419     SIMIX_comm_finish(synchro);
420   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
421     surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
422     sleep->setData(synchro);
423
424     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
425     if (simcall->issuer == comm->src_proc)
426       comm->src_timeout = sleep;
427     else
428       comm->dst_timeout = sleep;
429   }
430 }
431
432 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
433 {
434   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
435
436   if (MC_is_active() || MC_record_replay_is_active()){
437     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
438     if (simcall_comm_test__get__result(simcall)){
439       synchro->state = SIMIX_DONE;
440       synchro->simcalls.push_back(simcall);
441       SIMIX_comm_finish(synchro);
442     } else {
443       SIMIX_simcall_answer(simcall);
444     }
445     return;
446   }
447
448   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
449   if (simcall_comm_test__get__result(simcall)) {
450     synchro->simcalls.push_back(simcall);
451     SIMIX_comm_finish(synchro);
452   } else {
453     SIMIX_simcall_answer(simcall);
454   }
455 }
456
457 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
458 {
459   unsigned int cursor;
460   smx_synchro_t synchro;
461   // The default result is -1 -- this means, "nothing is ready".
462   // It can be changed below, but only if something matches.
463   simcall_comm_testany__set__result(simcall, -1);
464
465   if (MC_is_active() || MC_record_replay_is_active()){
466     int idx = SIMCALL_GET_MC_VALUE(simcall);
467     if(idx == -1){
468       SIMIX_simcall_answer(simcall);
469     }else{
470       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
471       simcall_comm_testany__set__result(simcall, idx);
472       synchro->simcalls.push_back(simcall);
473       synchro->state = SIMIX_DONE;
474       SIMIX_comm_finish(synchro);
475     }
476     return;
477   }
478
479   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
480     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
481       simcall_comm_testany__set__result(simcall, cursor);
482       synchro->simcalls.push_back(simcall);
483       SIMIX_comm_finish(synchro);
484       return;
485     }
486   }
487   SIMIX_simcall_answer(simcall);
488 }
489
490 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
491 {
492   smx_synchro_t synchro;
493   unsigned int cursor = 0;
494
495   if (MC_is_active() || MC_record_replay_is_active()){
496     int idx = SIMCALL_GET_MC_VALUE(simcall);
497     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
498     synchro->simcalls.push_back(simcall);
499     simcall_comm_waitany__set__result(simcall, idx);
500     synchro->state = SIMIX_DONE;
501     SIMIX_comm_finish(synchro);
502     return;
503   }
504
505   xbt_dynar_foreach(synchros, cursor, synchro){
506     /* associate this simcall to the the synchro */
507     synchro->simcalls.push_back(simcall);
508
509     /* see if the synchro is already finished */
510     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
511       SIMIX_comm_finish(synchro);
512       break;
513     }
514   }
515 }
516
517 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
518 {
519   smx_synchro_t synchro;
520   unsigned int cursor = 0;
521   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
522
523   xbt_dynar_foreach(synchros, cursor, synchro) {
524     // Remove the first occurence of simcall:
525     auto i = boost::range::find(synchro->simcalls, simcall);
526     if (i !=  synchro->simcalls.end())
527       synchro->simcalls.erase(i);
528   }
529 }
530
531 /**
532  *  \brief Starts the simulation of a communication synchro.
533  *  \param synchro the communication synchro
534  */
535 static inline void SIMIX_comm_start(smx_synchro_t synchro)
536 {
537   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
538
539   /* If both the sender and the receiver are already there, start the communication */
540   if (synchro->state == SIMIX_READY) {
541
542     sg_host_t sender   = comm->src_proc->host;
543     sg_host_t receiver = comm->dst_proc->host;
544
545     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
546
547     comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
548     comm->surf_comm->setData(synchro);
549     comm->state = SIMIX_RUNNING;
550
551     /* If a link is failed, detect it immediately */
552     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
553       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
554                 sg_host_get_name(sender), sg_host_get_name(receiver));
555       comm->state = SIMIX_LINK_FAILURE;
556       comm->cleanupSurf();
557     }
558
559     /* If any of the process is suspend, create the synchro but stop its execution,
560        it will be restarted when the sender process resume */
561     if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
562       if (SIMIX_process_is_suspended(comm->src_proc))
563         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
564             comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
565       else
566         XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
567             comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
568
569       comm->surf_comm->suspend();
570     }
571   }
572 }
573
574 /**
575  * \brief Answers the SIMIX simcalls associated to a communication synchro.
576  * \param synchro a finished communication synchro
577  */
578 void SIMIX_comm_finish(smx_synchro_t synchro)
579 {
580   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
581   unsigned int destroy_count = 0;
582
583   while (!synchro->simcalls.empty()) {
584     smx_simcall_t simcall = synchro->simcalls.front();
585     synchro->simcalls.pop_front();
586
587     /* If a waitany simcall is waiting for this synchro to finish, then remove
588        it from the other synchros in the waitany list. Afterwards, get the
589        position of the actual synchro in the waitany dynar and
590        return it as the result of the simcall */
591
592     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
593       continue; // if process handling comm is killed
594     if (simcall->call == SIMCALL_COMM_WAITANY) {
595       SIMIX_waitany_remove_simcall_from_actions(simcall);
596       if (!MC_is_active() && !MC_record_replay_is_active())
597         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
598     }
599
600     /* If the synchro is still in a rendez-vous point then remove from it */
601     if (comm->mbox)
602       SIMIX_mbox_remove(comm->mbox, synchro);
603
604     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
605
606     /* Check out for errors */
607
608     if (simcall->issuer->host->isOff()) {
609       simcall->issuer->context->iwannadie = 1;
610       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
611     } else {
612       switch (synchro->state) {
613
614       case SIMIX_DONE:
615         XBT_DEBUG("Communication %p complete!", synchro);
616         SIMIX_comm_copy_data(synchro);
617         break;
618
619       case SIMIX_SRC_TIMEOUT:
620         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
621         break;
622
623       case SIMIX_DST_TIMEOUT:
624         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
625         break;
626
627       case SIMIX_SRC_HOST_FAILURE:
628         if (simcall->issuer == comm->src_proc)
629           simcall->issuer->context->iwannadie = 1;
630   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
631         else
632           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
633         break;
634
635       case SIMIX_DST_HOST_FAILURE:
636         if (simcall->issuer == comm->dst_proc)
637           simcall->issuer->context->iwannadie = 1;
638   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
639         else
640           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
641         break;
642
643       case SIMIX_LINK_FAILURE:
644
645         XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
646                   synchro,
647                   comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
648                   comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
649                   simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
650         if (comm->src_proc == simcall->issuer) {
651           XBT_DEBUG("I'm source");
652         } else if (comm->dst_proc == simcall->issuer) {
653           XBT_DEBUG("I'm dest");
654         } else {
655           XBT_DEBUG("I'm neither source nor dest");
656         }
657         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
658         break;
659
660       case SIMIX_CANCELED:
661         if (simcall->issuer == comm->dst_proc)
662           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
663         else
664           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
665         break;
666
667       default:
668         xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
669       }
670     }
671
672     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
673     if (simcall->issuer->exception) {
674       // In order to modify the exception we have to rethrow it:
675       try {
676         std::rethrow_exception(simcall->issuer->exception);
677       }
678       catch(xbt_ex& e) {
679         if (simcall->call == SIMCALL_COMM_WAITANY) {
680           e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
681         }
682         else if (simcall->call == SIMCALL_COMM_TESTANY) {
683           e.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
684         }
685         simcall->issuer->exception = std::make_exception_ptr(e);
686       }
687       catch(...) {
688         // Nothing to do
689       }
690     }
691
692     if (simcall->issuer->host->isOff()) {
693       simcall->issuer->context->iwannadie = 1;
694     }
695
696     simcall->issuer->waiting_synchro = nullptr;
697     xbt_fifo_remove(simcall->issuer->comms, synchro);
698     if(comm->detached){
699       if(simcall->issuer == comm->src_proc){
700         if(comm->dst_proc)
701           xbt_fifo_remove(comm->dst_proc->comms, synchro);
702       }
703       if(simcall->issuer == comm->dst_proc){
704         if(comm->src_proc)
705           xbt_fifo_remove(comm->src_proc->comms, synchro);
706         //in case of a detached comm we have an extra ref to remove, as the sender won't do it
707         destroy_count++;
708       }
709     }
710     SIMIX_simcall_answer(simcall);
711     destroy_count++;
712   }
713
714   while (destroy_count-- > 0)
715     static_cast<simgrid::simix::Comm*>(synchro)->unref();
716 }
717
718 /******************************************************************************/
719 /*                    SIMIX_comm_copy_data callbacks                       */
720 /******************************************************************************/
721 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
722
723 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
724 {
725   SIMIX_comm_copy_data_callback = callback;
726 }
727
728 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
729 {
730   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
731
732   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
733   *(void **) (comm->dst_buff) = buff;
734 }
735
736 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
737 {
738   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
739
740   XBT_DEBUG("Copy the data over");
741   memcpy(comm->dst_buff, buff, buff_size);
742   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
743     xbt_free(buff);
744     comm->src_buff = nullptr;
745   }
746 }
747
748
749 /**
750  *  \brief Copy the communication data from the sender's buffer to the receiver's one
751  *  \param comm The communication
752  */
753 void SIMIX_comm_copy_data(smx_synchro_t synchro)
754 {
755   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
756
757   size_t buff_size = comm->src_buff_size;
758   /* If there is no data to copy then return */
759   if (!comm->src_buff || !comm->dst_buff || comm->copied)
760     return;
761
762   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
763             comm,
764             comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
765             comm->src_buff,
766             comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
767             comm->dst_buff, buff_size);
768
769   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
770   if (comm->dst_buff_size)
771     buff_size = MIN(buff_size, *(comm->dst_buff_size));
772
773   /* Update the receiver's buffer size to the copied amount */
774   if (comm->dst_buff_size)
775     *comm->dst_buff_size = buff_size;
776
777   if (buff_size > 0){
778       if(comm->copy_data_fun)
779         comm->copy_data_fun (comm, comm->src_buff, buff_size);
780       else
781         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
782   }
783
784
785   /* Set the copied flag so we copy data only once */
786   /* (this function might be called from both communication ends) */
787   comm->copied = 1;
788 }