Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ac97e48ce94dc215d82fc8c17ef7801a11688d96
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7
8 #include <boost/range/algorithm.hpp>
9
10 #include <xbt/ex.hpp>
11
12 #include <simgrid/s4u/host.hpp>
13
14 #include "mc/mc.h"
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
20 #include "xbt/dict.h"
21 #include "xbt/log.h"
22
23 #include "src/kernel/activity/SynchroComm.hpp"
24 #include "src/surf/network_interface.hpp"
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
27
28 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
29 static void SIMIX_comm_copy_data(smx_activity_t comm);
30 static void SIMIX_comm_start(smx_activity_t synchro);
31 static simgrid::kernel::activity::Comm*
32 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
33                     int (*match_fun)(void*, void*, smx_activity_t), void* user_data, smx_activity_t my_synchro,
34                     bool remove_matching);
35
36 /**
37  *  \brief Checks if there is a communication activity queued in a deque matching our needs
38  *  \param type The type of communication we are looking for (comm_send, comm_recv)
39  *  \return The communication activity if found, nullptr otherwise
40  */
41 static simgrid::kernel::activity::Comm*
42 _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t>* deque, e_smx_comm_type_t type,
43                     int (*match_fun)(void*, void*, smx_activity_t), void* this_user_data, smx_activity_t my_synchro,
44                     bool remove_matching)
45 {
46   void* other_user_data = nullptr;
47
48   for(auto it = deque->begin(); it != deque->end(); it++){
49     smx_activity_t synchro = *it;
50     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
51
52     if (comm->type == SIMIX_COMM_SEND) {
53       other_user_data = comm->src_data;
54     } else if (comm->type == SIMIX_COMM_RECEIVE) {
55       other_user_data = comm->dst_data;
56     }
57     if (comm->type == type &&
58         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
59         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
60       XBT_DEBUG("Found a matching communication synchro %p", comm);
61       if (remove_matching)
62         deque->erase(it);
63       comm->ref();
64 #if HAVE_MC
65       comm->mbox_cpy = comm->mbox;
66 #endif
67       comm->mbox = nullptr;
68       return comm;
69     }
70     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
71               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
72               comm, (int)comm->type, (int)type);
73   }
74   XBT_DEBUG("No matching communication synchro found");
75   return nullptr;
76 }
77
78 /******************************************************************************/
79 /*                          Communication synchros                            */
80 /******************************************************************************/
81 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
82                                   double task_size, double rate,
83                                   void *src_buff, size_t src_buff_size,
84                                   int (*match_fun)(void *, void *,smx_activity_t),
85                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),
86           void *data, double timeout){
87   smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
88                            src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
89                data, 0);
90   SIMCALL_SET_MC_VALUE(simcall, 0);
91   simcall_HANDLER_comm_wait(simcall, comm, timeout);
92 }
93 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
94                                   double task_size, double rate,
95                                   void *src_buff, size_t src_buff_size,
96                                   int (*match_fun)(void *, void *,smx_activity_t),
97                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
98                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
99                           void *data, int detached)
100 {
101   XBT_DEBUG("send from %p", mbox);
102
103   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
104   simgrid::kernel::activity::Comm* this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
105
106   /* Look for communication synchro matching our needs. We also provide a description of
107    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
108    *
109    * If it is not found then push our communication into the rendez-vous point */
110   simgrid::kernel::activity::Comm* other_comm =
111       _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_comm, /*remove_matching*/ true);
112
113   if (!other_comm) {
114     other_comm = this_comm;
115
116     if (mbox->permanent_receiver!=nullptr){
117       //this mailbox is for small messages, which have to be sent right now
118       other_comm->state   = SIMIX_READY;
119       other_comm->dst_proc=mbox->permanent_receiver.get();
120       other_comm->ref();
121       mbox->done_comm_queue.push_back(other_comm);
122       XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
123
124     }else{
125       mbox->push(this_comm);
126     }
127   } else {
128     XBT_DEBUG("Receive already pushed");
129     this_comm->unref();
130
131     other_comm->state = SIMIX_READY;
132     other_comm->type = SIMIX_COMM_READY;
133
134   }
135   src_proc->comms.push_back(other_comm);
136
137   if (detached) {
138     other_comm->detached = true;
139     other_comm->clean_fun = clean_fun;
140   } else {
141     other_comm->clean_fun = nullptr;
142   }
143
144   /* Setup the communication synchro */
145   other_comm->src_proc = src_proc;
146   other_comm->task_size = task_size;
147   other_comm->rate = rate;
148   other_comm->src_buff = src_buff;
149   other_comm->src_buff_size = src_buff_size;
150   other_comm->src_data = data;
151
152   other_comm->match_fun = match_fun;
153   other_comm->copy_data_fun = copy_data_fun;
154
155
156   if (MC_is_active() || MC_record_replay_is_active()) {
157     other_comm->state = SIMIX_RUNNING;
158     return (detached ? nullptr : other_comm);
159   }
160
161   SIMIX_comm_start(other_comm);
162   return (detached ? nullptr : other_comm);
163 }
164
165 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
166                          void *dst_buff, size_t *dst_buff_size,
167                          int (*match_fun)(void *, void *, smx_activity_t),
168                          void (*copy_data_fun)(smx_activity_t, void*, size_t),
169                          void *data, double timeout, double rate)
170 {
171   smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
172   SIMCALL_SET_MC_VALUE(simcall, 0);
173   simcall_HANDLER_comm_wait(simcall, comm, timeout);
174 }
175
176 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
177     void *dst_buff, size_t *dst_buff_size,
178     int (*match_fun)(void *, void *, smx_activity_t),
179     void (*copy_data_fun)(smx_activity_t, void*, size_t),
180     void *data, double rate)
181 {
182   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
183 }
184
185 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
186     int (*match_fun)(void *, void *, smx_activity_t),
187     void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
188     void *data, double rate)
189 {
190   XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
191   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
192
193   simgrid::kernel::activity::Comm* other_comm;
194   //communication already done, get it inside the list of completed comms
195   if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
196
197     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
198     //find a match in the list of already received comms
199     other_comm = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
200                                      /*remove_matching*/ true);
201     //if not found, assume the receiver came first, register it to the mailbox in the classical way
202     if (!other_comm) {
203       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list");
204       other_comm = this_synchro;
205       mbox->push(this_synchro);
206     } else {
207       if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
208         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
209         other_comm->state = SIMIX_DONE;
210         other_comm->type = SIMIX_COMM_DONE;
211         other_comm->mbox = nullptr;
212       }
213       other_comm->unref();
214       static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
215     }
216   } else {
217     /* Prepare a comm describing us, so that it gets passed to the user-provided filter of other side */
218
219     /* Look for communication activity matching our needs. We also provide a description of
220      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
221      *
222      * If it is not found then push our communication into the rendez-vous point */
223     other_comm = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,
224                                      /*remove_matching*/ true);
225
226     if (!other_comm) {
227       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
228       other_comm = this_synchro;
229       mbox->push(this_synchro);
230     } else {
231       this_synchro->unref();
232
233       other_comm->state = SIMIX_READY;
234       other_comm->type = SIMIX_COMM_READY;
235     }
236     dst_proc->comms.push_back(other_comm);
237   }
238
239   /* Setup communication synchro */
240   other_comm->dst_proc = dst_proc;
241   other_comm->dst_buff = dst_buff;
242   other_comm->dst_buff_size = dst_buff_size;
243   other_comm->dst_data = data;
244
245   if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
246     other_comm->rate = rate;
247
248   other_comm->match_fun = match_fun;
249   other_comm->copy_data_fun = copy_data_fun;
250
251   if (MC_is_active() || MC_record_replay_is_active()) {
252     other_comm->state = SIMIX_RUNNING;
253     return other_comm;
254   }
255
256   SIMIX_comm_start(other_comm);
257   return other_comm;
258 }
259
260 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
261                                    int type, int src, int tag,
262                                    int (*match_fun)(void *, void *, smx_activity_t),
263                                    void *data){
264   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
265 }
266
267 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
268                               int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
269 {
270   XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
271   simgrid::kernel::activity::Comm* this_comm;
272   int smx_type;
273   if(type == 1){
274     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
275     smx_type = SIMIX_COMM_RECEIVE;
276   } else{
277     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
278     smx_type = SIMIX_COMM_SEND;
279   } 
280   smx_activity_t other_synchro=nullptr;
281   if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
282     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
283     other_synchro = _find_matching_comm(&mbox->done_comm_queue,
284       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
285   }
286   if (!other_synchro){
287     XBT_DEBUG("check if we have more luck in the normal mailbox");
288     other_synchro = _find_matching_comm(&mbox->comm_queue,
289       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
290   }
291
292   if(other_synchro)
293     other_synchro->unref();
294
295   this_comm->unref();
296   return other_synchro;
297 }
298
299 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
300 {
301   /* Associate this simcall to the wait synchro */
302   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
303
304   synchro->simcalls.push_back(simcall);
305   simcall->issuer->waiting_synchro = synchro;
306
307   if (MC_is_active() || MC_record_replay_is_active()) {
308     int idx = SIMCALL_GET_MC_VALUE(simcall);
309     if (idx == 0) {
310       synchro->state = SIMIX_DONE;
311     } else {
312       /* If we reached this point, the wait simcall must have a timeout */
313       /* Otherwise it shouldn't be enabled and executed by the MC */
314       if (timeout < 0.0)
315         THROW_IMPOSSIBLE;
316
317       simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
318       if (comm->src_proc == simcall->issuer)
319         comm->state = SIMIX_SRC_TIMEOUT;
320       else
321         comm->state = SIMIX_DST_TIMEOUT;
322     }
323
324     SIMIX_comm_finish(synchro);
325     return;
326   }
327
328   /* If the synchro has already finish perform the error handling, */
329   /* otherwise set up a waiting timeout on the right side          */
330   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
331     SIMIX_comm_finish(synchro);
332   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
333     surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
334     sleep->setData(synchro);
335
336     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
337     if (simcall->issuer == comm->src_proc)
338       comm->src_timeout = sleep;
339     else
340       comm->dst_timeout = sleep;
341   }
342 }
343
344 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
345 {
346   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
347
348   if (MC_is_active() || MC_record_replay_is_active()){
349     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
350     if (simcall_comm_test__get__result(simcall)){
351       synchro->state = SIMIX_DONE;
352       synchro->simcalls.push_back(simcall);
353       SIMIX_comm_finish(synchro);
354     } else {
355       SIMIX_simcall_answer(simcall);
356     }
357     return;
358   }
359
360   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
361   if (simcall_comm_test__get__result(simcall)) {
362     synchro->simcalls.push_back(simcall);
363     SIMIX_comm_finish(synchro);
364   } else {
365     SIMIX_simcall_answer(simcall);
366   }
367 }
368
369 void simcall_HANDLER_comm_testany(
370   smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
371 {
372   // The default result is -1 -- this means, "nothing is ready".
373   // It can be changed below, but only if something matches.
374   simcall_comm_testany__set__result(simcall, -1);
375
376   if (MC_is_active() || MC_record_replay_is_active()){
377     int idx = SIMCALL_GET_MC_VALUE(simcall);
378     if(idx == -1){
379       SIMIX_simcall_answer(simcall);
380     }else{
381       simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
382       simcall_comm_testany__set__result(simcall, idx);
383       synchro->simcalls.push_back(simcall);
384       synchro->state = SIMIX_DONE;
385       SIMIX_comm_finish(synchro);
386     }
387     return;
388   }
389
390   for (std::size_t i = 0; i != count; ++i) {
391     simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
392     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
393       simcall_comm_testany__set__result(simcall, i);
394       synchro->simcalls.push_back(simcall);
395       SIMIX_comm_finish(synchro);
396       return;
397     }
398   }
399   SIMIX_simcall_answer(simcall);
400 }
401
402 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
403 {
404   smx_activity_t synchro;
405   unsigned int cursor = 0;
406
407   if (MC_is_active() || MC_record_replay_is_active()){
408     if (timeout > 0.0)
409       xbt_die("Timeout not implemented for waitany in the model-checker"); 
410     int idx = SIMCALL_GET_MC_VALUE(simcall);
411     synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
412     synchro->simcalls.push_back(simcall);
413     simcall_comm_waitany__set__result(simcall, idx);
414     synchro->state = SIMIX_DONE;
415     SIMIX_comm_finish(synchro);
416     return;
417   }
418   
419   if (timeout < 0.0){
420     simcall->timer = NULL;
421   } else {
422     simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
423       SIMIX_waitany_remove_simcall_from_actions(simcall);
424       simcall_comm_waitany__set__result(simcall, -1);
425       SIMIX_simcall_answer(simcall);
426     });
427   }
428   
429   xbt_dynar_foreach(synchros, cursor, synchro){
430     /* associate this simcall to the the synchro */
431     synchro->simcalls.push_back(simcall);
432
433     /* see if the synchro is already finished */
434     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
435       SIMIX_comm_finish(synchro);
436       break;
437     }
438   }
439 }
440
441 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
442 {
443   smx_activity_t synchro;
444   unsigned int cursor = 0;
445   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
446
447   xbt_dynar_foreach(synchros, cursor, synchro) {
448     // Remove the first occurence of simcall:
449     auto i = boost::range::find(synchro->simcalls, simcall);
450     if (i !=  synchro->simcalls.end())
451       synchro->simcalls.erase(i);
452   }
453 }
454
455 /**
456  *  \brief Starts the simulation of a communication synchro.
457  *  \param synchro the communication synchro
458  */
459 static inline void SIMIX_comm_start(smx_activity_t synchro)
460 {
461   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
462
463   /* If both the sender and the receiver are already there, start the communication */
464   if (synchro->state == SIMIX_READY) {
465
466     simgrid::s4u::Host* sender   = comm->src_proc->host;
467     simgrid::s4u::Host* receiver = comm->dst_proc->host;
468
469     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
470
471     comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
472     comm->surf_comm->setData(synchro);
473     comm->state = SIMIX_RUNNING;
474
475     /* If a link is failed, detect it immediately */
476     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
477       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
478                 receiver->cname());
479       comm->state = SIMIX_LINK_FAILURE;
480       comm->cleanupSurf();
481     }
482
483     /* If any of the process is suspend, create the synchro but stop its execution,
484        it will be restarted when the sender process resume */
485     if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
486       if (SIMIX_process_is_suspended(comm->src_proc))
487         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
488                   "communication",
489                   comm->src_proc->cname(), comm->src_proc->host->cname());
490       else
491         XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
492                   "communication",
493                   comm->dst_proc->cname(), comm->dst_proc->host->cname());
494
495       comm->surf_comm->suspend();
496     }
497   }
498 }
499
500 /**
501  * \brief Answers the SIMIX simcalls associated to a communication synchro.
502  * \param synchro a finished communication synchro
503  */
504 void SIMIX_comm_finish(smx_activity_t synchro)
505 {
506   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
507   unsigned int destroy_count = 0;
508
509   while (!synchro->simcalls.empty()) {
510     smx_simcall_t simcall = synchro->simcalls.front();
511     synchro->simcalls.pop_front();
512
513     /* If a waitany simcall is waiting for this synchro to finish, then remove
514        it from the other synchros in the waitany list. Afterwards, get the
515        position of the actual synchro in the waitany dynar and
516        return it as the result of the simcall */
517
518     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
519       continue; // if process handling comm is killed
520     if (simcall->call == SIMCALL_COMM_WAITANY) {
521       SIMIX_waitany_remove_simcall_from_actions(simcall);
522       if (simcall->timer) {
523         SIMIX_timer_remove(simcall->timer);
524         simcall->timer = nullptr;
525       }
526       if (!MC_is_active() && !MC_record_replay_is_active())
527         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
528     }
529
530     /* If the synchro is still in a rendez-vous point then remove from it */
531     if (comm->mbox)
532       comm->mbox->remove(synchro);
533
534     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
535
536     /* Check out for errors */
537
538     if (simcall->issuer->host->isOff()) {
539       simcall->issuer->context->iwannadie = 1;
540       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
541     } else {
542       switch (synchro->state) {
543
544       case SIMIX_DONE:
545         XBT_DEBUG("Communication %p complete!", synchro);
546         SIMIX_comm_copy_data(synchro);
547         break;
548
549       case SIMIX_SRC_TIMEOUT:
550         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
551         break;
552
553       case SIMIX_DST_TIMEOUT:
554         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
555         break;
556
557       case SIMIX_SRC_HOST_FAILURE:
558         if (simcall->issuer == comm->src_proc)
559           simcall->issuer->context->iwannadie = 1;
560   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
561         else
562           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
563         break;
564
565       case SIMIX_DST_HOST_FAILURE:
566         if (simcall->issuer == comm->dst_proc)
567           simcall->issuer->context->iwannadie = 1;
568   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
569         else
570           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
571         break;
572
573       case SIMIX_LINK_FAILURE:
574
575         XBT_DEBUG(
576             "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
577             synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
578             comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
579             comm->detached);
580         if (comm->src_proc == simcall->issuer) {
581           XBT_DEBUG("I'm source");
582         } else if (comm->dst_proc == simcall->issuer) {
583           XBT_DEBUG("I'm dest");
584         } else {
585           XBT_DEBUG("I'm neither source nor dest");
586         }
587         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
588         break;
589
590       case SIMIX_CANCELED:
591         if (simcall->issuer == comm->dst_proc)
592           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
593         else
594           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
595         break;
596
597       default:
598         xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
599       }
600     }
601
602     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
603     if (simcall->issuer->exception) {
604       // In order to modify the exception we have to rethrow it:
605       try {
606         std::rethrow_exception(simcall->issuer->exception);
607       }
608       catch(xbt_ex& e) {
609         if (simcall->call == SIMCALL_COMM_WAITANY) {
610           e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
611         }
612         else if (simcall->call == SIMCALL_COMM_TESTANY) {
613           e.value = -1;
614           auto comms = simcall_comm_testany__get__comms(simcall);
615           auto count = simcall_comm_testany__get__count(simcall);
616           auto element = std::find(comms, comms + count, synchro);
617           if (element == comms + count)
618             e.value = -1;
619           else
620             e.value = element - comms;
621         }
622         simcall->issuer->exception = std::make_exception_ptr(e);
623       }
624       catch(...) {
625         // Nothing to do
626       }
627     }
628
629     if (simcall->issuer->host->isOff()) {
630       simcall->issuer->context->iwannadie = 1;
631     }
632
633     simcall->issuer->waiting_synchro = nullptr;
634     simcall->issuer->comms.remove(synchro);
635     if(comm->detached){
636       if(simcall->issuer == comm->src_proc){
637         if(comm->dst_proc)
638           comm->dst_proc->comms.remove(synchro);
639       }
640       else if(simcall->issuer == comm->dst_proc){
641         if(comm->src_proc)
642           comm->src_proc->comms.remove(synchro);
643       }
644       else{
645         comm->dst_proc->comms.remove(synchro);
646         comm->src_proc->comms.remove(synchro);
647       }
648       //in case of a detached comm we have an extra ref to remove, as the sender won't do it
649       destroy_count++;
650     }
651
652     SIMIX_simcall_answer(simcall);
653     destroy_count++;
654   }
655
656   while (destroy_count-- > 0)
657     static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
658 }
659
660 /******************************************************************************/
661 /*                    SIMIX_comm_copy_data callbacks                       */
662 /******************************************************************************/
663 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
664
665 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
666 {
667   SIMIX_comm_copy_data_callback = callback;
668 }
669
670 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
671 {
672   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
673
674   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
675   *(void **) (comm->dst_buff) = buff;
676 }
677
678 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
679 {
680   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
681
682   XBT_DEBUG("Copy the data over");
683   memcpy(comm->dst_buff, buff, buff_size);
684   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
685     xbt_free(buff);
686     comm->src_buff = nullptr;
687   }
688 }
689
690
691 /**
692  *  @brief Copy the communication data from the sender's buffer to the receiver's one
693  *  @param synchro The communication
694  */
695 void SIMIX_comm_copy_data(smx_activity_t synchro)
696 {
697   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
698
699   size_t buff_size = comm->src_buff_size;
700   /* If there is no data to copy then return */
701   if (!comm->src_buff || !comm->dst_buff || comm->copied)
702     return;
703
704   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
705             comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
706             comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
707
708   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
709   if (comm->dst_buff_size)
710     buff_size = MIN(buff_size, *(comm->dst_buff_size));
711
712   /* Update the receiver's buffer size to the copied amount */
713   if (comm->dst_buff_size)
714     *comm->dst_buff_size = buff_size;
715
716   if (buff_size > 0){
717       if(comm->copy_data_fun)
718         comm->copy_data_fun (comm, comm->src_buff, buff_size);
719       else
720         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
721   }
722
723
724   /* Set the copied flag so we copy data only once */
725   /* (this function might be called from both communication ends) */
726   comm->copied = 1;
727 }