Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ok, I stop trying to please sonar.
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7
8 #include <boost/range/algorithm.hpp>
9
10 #include <xbt/ex.hpp>
11
12 #include <simgrid/s4u/host.hpp>
13
14 #include "mc/mc.h"
15 #include "simgrid/s4u/Mailbox.hpp"
16 #include "src/mc/mc_replay.h"
17 #include "src/simix/smx_private.h"
18 #include "src/surf/cpu_interface.hpp"
19 #include "src/surf/surf_interface.hpp"
20 #include "xbt/dict.h"
21 #include "xbt/log.h"
22
23 #include "src/kernel/activity/SynchroComm.hpp"
24 #include "src/surf/network_interface.hpp"
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
27
28 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
29 static void SIMIX_comm_copy_data(smx_activity_t comm);
30 static void SIMIX_comm_start(smx_activity_t synchro);
31 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
32     int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
33
34 /**
35  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
36  *  \param type The type of communication we are looking for (comm_send, comm_recv)
37  *  \return The communication synchro if found, nullptr otherwise
38  */
39 static smx_activity_t _find_matching_comm(boost::circular_buffer_space_optimized<smx_activity_t> *deque, e_smx_comm_type_t type,
40     int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
41 {
42   void* other_user_data = nullptr;
43
44   for(auto it = deque->begin(); it != deque->end(); it++){
45     smx_activity_t synchro = *it;
46     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
47
48     if (comm->type == SIMIX_COMM_SEND) {
49       other_user_data = comm->src_data;
50     } else if (comm->type == SIMIX_COMM_RECEIVE) {
51       other_user_data = comm->dst_data;
52     }
53     if (comm->type == type &&
54         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
55         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
56       XBT_DEBUG("Found a matching communication synchro %p", comm);
57       if (remove_matching)
58         deque->erase(it);
59       comm->ref();
60 #if HAVE_MC
61       comm->mbox_cpy = comm->mbox;
62 #endif
63       comm->mbox = nullptr;
64       return comm;
65     }
66     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
67               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
68               comm, (int)comm->type, (int)type);
69   }
70   XBT_DEBUG("No matching communication synchro found");
71   return nullptr;
72 }
73
74 /******************************************************************************/
75 /*                          Communication synchros                            */
76 /******************************************************************************/
77 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t src, smx_mailbox_t mbox,
78                                   double task_size, double rate,
79                                   void *src_buff, size_t src_buff_size,
80                                   int (*match_fun)(void *, void *,smx_activity_t),
81                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),
82           void *data, double timeout){
83   smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
84                            src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
85                data, 0);
86   SIMCALL_SET_MC_VALUE(simcall, 0);
87   simcall_HANDLER_comm_wait(simcall, comm, timeout);
88 }
89 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_actor_t src_proc, smx_mailbox_t mbox,
90                                   double task_size, double rate,
91                                   void *src_buff, size_t src_buff_size,
92                                   int (*match_fun)(void *, void *,smx_activity_t),
93                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
94                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
95                           void *data, int detached)
96 {
97   XBT_DEBUG("send from %p", mbox);
98
99   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
100   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
101
102   /* Look for communication synchro matching our needs. We also provide a description of
103    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
104    *
105    * If it is not found then push our communication into the rendez-vous point */
106   smx_activity_t other_synchro =
107       _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
108   simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
109
110
111   if (!other_synchro) {
112     other_synchro = this_synchro;
113     other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
114
115     if (mbox->permanent_receiver!=nullptr){
116       //this mailbox is for small messages, which have to be sent right now
117       other_synchro->state = SIMIX_READY;
118       other_comm->dst_proc=mbox->permanent_receiver.get();
119       other_comm->ref();
120       mbox->done_comm_queue.push_back(other_synchro);
121       XBT_DEBUG("pushing a message into the permanent receive list %p, comm %p", mbox, &(other_comm));
122
123     }else{
124       mbox->push(this_synchro);
125     }
126   } else {
127     XBT_DEBUG("Receive already pushed");
128     this_synchro->unref();
129
130     other_comm->state = SIMIX_READY;
131     other_comm->type = SIMIX_COMM_READY;
132
133   }
134   src_proc->comms.push_back(other_synchro);
135
136
137   if (detached) {
138     other_comm->detached = true;
139     other_comm->clean_fun = clean_fun;
140   } else {
141     other_comm->clean_fun = nullptr;
142   }
143
144   /* Setup the communication synchro */
145   other_comm->src_proc = src_proc;
146   other_comm->task_size = task_size;
147   other_comm->rate = rate;
148   other_comm->src_buff = src_buff;
149   other_comm->src_buff_size = src_buff_size;
150   other_comm->src_data = data;
151
152   other_comm->match_fun = match_fun;
153   other_comm->copy_data_fun = copy_data_fun;
154
155
156   if (MC_is_active() || MC_record_replay_is_active()) {
157     other_comm->state = SIMIX_RUNNING;
158     return (detached ? nullptr : other_comm);
159   }
160
161   SIMIX_comm_start(other_comm);
162   return (detached ? nullptr : other_comm);
163 }
164
165 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
166                          void *dst_buff, size_t *dst_buff_size,
167                          int (*match_fun)(void *, void *, smx_activity_t),
168                          void (*copy_data_fun)(smx_activity_t, void*, size_t),
169                          void *data, double timeout, double rate)
170 {
171   smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
172   SIMCALL_SET_MC_VALUE(simcall, 0);
173   simcall_HANDLER_comm_wait(simcall, comm, timeout);
174 }
175
176 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_actor_t receiver, smx_mailbox_t mbox,
177     void *dst_buff, size_t *dst_buff_size,
178     int (*match_fun)(void *, void *, smx_activity_t),
179     void (*copy_data_fun)(smx_activity_t, void*, size_t),
180     void *data, double rate)
181 {
182   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
183 }
184
185 smx_activity_t SIMIX_comm_irecv(smx_actor_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
186     int (*match_fun)(void *, void *, smx_activity_t),
187     void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
188     void *data, double rate)
189 {
190   XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
191   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
192
193   smx_activity_t other_synchro;
194   //communication already done, get it inside the list of completed comms
195   if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
196
197     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
198     //find a match in the list of already received comms
199     other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
200     //if not found, assume the receiver came first, register it to the mailbox in the classical way
201     if (!other_synchro)  {
202       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into list");
203       other_synchro = this_synchro;
204       mbox->push(this_synchro);
205     } else {
206       simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
207
208       if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
209         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
210         other_comm->state = SIMIX_DONE;
211         other_comm->type = SIMIX_COMM_DONE;
212         other_comm->mbox = nullptr;
213       }
214       other_comm->unref();
215       static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
216     }
217   } else {
218     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
219
220     /* Look for communication synchro matching our needs. We also provide a description of
221      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
222      *
223      * If it is not found then push our communication into the rendez-vous point */
224     other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
225
226     if (!other_synchro) {
227       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
228       other_synchro = this_synchro;
229       mbox->push(this_synchro);
230     } else {
231       this_synchro->unref();
232       simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
233
234       other_comm->state = SIMIX_READY;
235       other_comm->type = SIMIX_COMM_READY;
236     }
237     dst_proc->comms.push_back(other_synchro);
238   }
239
240   /* Setup communication synchro */
241   simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
242   other_comm->dst_proc = dst_proc;
243   other_comm->dst_buff = dst_buff;
244   other_comm->dst_buff_size = dst_buff_size;
245   other_comm->dst_data = data;
246
247   if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
248     other_comm->rate = rate;
249
250   other_comm->match_fun = match_fun;
251   other_comm->copy_data_fun = copy_data_fun;
252
253   if (MC_is_active() || MC_record_replay_is_active()) {
254     other_synchro->state = SIMIX_RUNNING;
255     return other_synchro;
256   }
257
258   SIMIX_comm_start(other_synchro);
259   return other_synchro;
260 }
261
262 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
263                                    int type, int src, int tag,
264                                    int (*match_fun)(void *, void *, smx_activity_t),
265                                    void *data){
266   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
267 }
268
269 smx_activity_t SIMIX_comm_iprobe(smx_actor_t dst_proc, smx_mailbox_t mbox, int type, int src,
270                               int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
271 {
272   XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
273   simgrid::kernel::activity::Comm* this_comm;
274   int smx_type;
275   if(type == 1){
276     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
277     smx_type = SIMIX_COMM_RECEIVE;
278   } else{
279     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
280     smx_type = SIMIX_COMM_SEND;
281   } 
282   smx_activity_t other_synchro=nullptr;
283   if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
284     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
285     other_synchro = _find_matching_comm(&mbox->done_comm_queue,
286       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
287   }
288   if (!other_synchro){
289     XBT_DEBUG("check if we have more luck in the normal mailbox");
290     other_synchro = _find_matching_comm(&mbox->comm_queue,
291       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
292   }
293
294   if(other_synchro)
295     other_synchro->unref();
296
297   this_comm->unref();
298   return other_synchro;
299 }
300
301 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
302 {
303   /* Associate this simcall to the wait synchro */
304   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
305
306   synchro->simcalls.push_back(simcall);
307   simcall->issuer->waiting_synchro = synchro;
308
309   if (MC_is_active() || MC_record_replay_is_active()) {
310     int idx = SIMCALL_GET_MC_VALUE(simcall);
311     if (idx == 0) {
312       synchro->state = SIMIX_DONE;
313     } else {
314       /* If we reached this point, the wait simcall must have a timeout */
315       /* Otherwise it shouldn't be enabled and executed by the MC */
316       if (timeout < 0.0)
317         THROW_IMPOSSIBLE;
318
319       simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
320       if (comm->src_proc == simcall->issuer)
321         comm->state = SIMIX_SRC_TIMEOUT;
322       else
323         comm->state = SIMIX_DST_TIMEOUT;
324     }
325
326     SIMIX_comm_finish(synchro);
327     return;
328   }
329
330   /* If the synchro has already finish perform the error handling, */
331   /* otherwise set up a waiting timeout on the right side          */
332   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
333     SIMIX_comm_finish(synchro);
334   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
335     surf_action_t sleep = simcall->issuer->host->pimpl_cpu->sleep(timeout);
336     sleep->setData(synchro);
337
338     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
339     if (simcall->issuer == comm->src_proc)
340       comm->src_timeout = sleep;
341     else
342       comm->dst_timeout = sleep;
343   }
344 }
345
346 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
347 {
348   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
349
350   if (MC_is_active() || MC_record_replay_is_active()){
351     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
352     if (simcall_comm_test__get__result(simcall)){
353       synchro->state = SIMIX_DONE;
354       synchro->simcalls.push_back(simcall);
355       SIMIX_comm_finish(synchro);
356     } else {
357       SIMIX_simcall_answer(simcall);
358     }
359     return;
360   }
361
362   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
363   if (simcall_comm_test__get__result(simcall)) {
364     synchro->simcalls.push_back(simcall);
365     SIMIX_comm_finish(synchro);
366   } else {
367     SIMIX_simcall_answer(simcall);
368   }
369 }
370
371 void simcall_HANDLER_comm_testany(
372   smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
373 {
374   // The default result is -1 -- this means, "nothing is ready".
375   // It can be changed below, but only if something matches.
376   simcall_comm_testany__set__result(simcall, -1);
377
378   if (MC_is_active() || MC_record_replay_is_active()){
379     int idx = SIMCALL_GET_MC_VALUE(simcall);
380     if(idx == -1){
381       SIMIX_simcall_answer(simcall);
382     }else{
383       simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
384       simcall_comm_testany__set__result(simcall, idx);
385       synchro->simcalls.push_back(simcall);
386       synchro->state = SIMIX_DONE;
387       SIMIX_comm_finish(synchro);
388     }
389     return;
390   }
391
392   for (std::size_t i = 0; i != count; ++i) {
393     simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
394     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
395       simcall_comm_testany__set__result(simcall, i);
396       synchro->simcalls.push_back(simcall);
397       SIMIX_comm_finish(synchro);
398       return;
399     }
400   }
401   SIMIX_simcall_answer(simcall);
402 }
403
404 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
405 {
406   smx_activity_t synchro;
407   unsigned int cursor = 0;
408
409   if (MC_is_active() || MC_record_replay_is_active()){
410     if (timeout > 0.0)
411       xbt_die("Timeout not implemented for waitany in the model-checker"); 
412     int idx = SIMCALL_GET_MC_VALUE(simcall);
413     synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
414     synchro->simcalls.push_back(simcall);
415     simcall_comm_waitany__set__result(simcall, idx);
416     synchro->state = SIMIX_DONE;
417     SIMIX_comm_finish(synchro);
418     return;
419   }
420   
421   if (timeout < 0.0){
422     simcall->timer = NULL;
423   } else {
424     simcall->timer = SIMIX_timer_set(SIMIX_get_clock() + timeout, [simcall]() {
425       SIMIX_waitany_remove_simcall_from_actions(simcall);
426       simcall_comm_waitany__set__result(simcall, -1);
427       SIMIX_simcall_answer(simcall);
428     });
429   }
430   
431   xbt_dynar_foreach(synchros, cursor, synchro){
432     /* associate this simcall to the the synchro */
433     synchro->simcalls.push_back(simcall);
434
435     /* see if the synchro is already finished */
436     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
437       SIMIX_comm_finish(synchro);
438       break;
439     }
440   }
441 }
442
443 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
444 {
445   smx_activity_t synchro;
446   unsigned int cursor = 0;
447   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
448
449   xbt_dynar_foreach(synchros, cursor, synchro) {
450     // Remove the first occurence of simcall:
451     auto i = boost::range::find(synchro->simcalls, simcall);
452     if (i !=  synchro->simcalls.end())
453       synchro->simcalls.erase(i);
454   }
455 }
456
457 /**
458  *  \brief Starts the simulation of a communication synchro.
459  *  \param synchro the communication synchro
460  */
461 static inline void SIMIX_comm_start(smx_activity_t synchro)
462 {
463   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
464
465   /* If both the sender and the receiver are already there, start the communication */
466   if (synchro->state == SIMIX_READY) {
467
468     simgrid::s4u::Host* sender   = comm->src_proc->host;
469     simgrid::s4u::Host* receiver = comm->dst_proc->host;
470
471     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sender->cname(), receiver->cname());
472
473     comm->surf_comm = surf_network_model->communicate(sender, receiver, comm->task_size, comm->rate);
474     comm->surf_comm->setData(synchro);
475     comm->state = SIMIX_RUNNING;
476
477     /* If a link is failed, detect it immediately */
478     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
479       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->cname(),
480                 receiver->cname());
481       comm->state = SIMIX_LINK_FAILURE;
482       comm->cleanupSurf();
483     }
484
485     /* If any of the process is suspend, create the synchro but stop its execution,
486        it will be restarted when the sender process resume */
487     if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
488       if (SIMIX_process_is_suspended(comm->src_proc))
489         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the "
490                   "communication",
491                   comm->src_proc->cname(), comm->src_proc->host->cname());
492       else
493         XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the "
494                   "communication",
495                   comm->dst_proc->cname(), comm->dst_proc->host->cname());
496
497       comm->surf_comm->suspend();
498     }
499   }
500 }
501
502 /**
503  * \brief Answers the SIMIX simcalls associated to a communication synchro.
504  * \param synchro a finished communication synchro
505  */
506 void SIMIX_comm_finish(smx_activity_t synchro)
507 {
508   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
509   unsigned int destroy_count = 0;
510
511   while (!synchro->simcalls.empty()) {
512     smx_simcall_t simcall = synchro->simcalls.front();
513     synchro->simcalls.pop_front();
514
515     /* If a waitany simcall is waiting for this synchro to finish, then remove
516        it from the other synchros in the waitany list. Afterwards, get the
517        position of the actual synchro in the waitany dynar and
518        return it as the result of the simcall */
519
520     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
521       continue; // if process handling comm is killed
522     if (simcall->call == SIMCALL_COMM_WAITANY) {
523       SIMIX_waitany_remove_simcall_from_actions(simcall);
524       if (simcall->timer) {
525         SIMIX_timer_remove(simcall->timer);
526         simcall->timer = nullptr;
527       }
528       if (!MC_is_active() && !MC_record_replay_is_active())
529         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
530     }
531
532     /* If the synchro is still in a rendez-vous point then remove from it */
533     if (comm->mbox)
534       comm->mbox->remove(synchro);
535
536     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
537
538     /* Check out for errors */
539
540     if (simcall->issuer->host->isOff()) {
541       simcall->issuer->context->iwannadie = 1;
542       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
543     } else {
544       switch (synchro->state) {
545
546       case SIMIX_DONE:
547         XBT_DEBUG("Communication %p complete!", synchro);
548         SIMIX_comm_copy_data(synchro);
549         break;
550
551       case SIMIX_SRC_TIMEOUT:
552         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
553         break;
554
555       case SIMIX_DST_TIMEOUT:
556         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
557         break;
558
559       case SIMIX_SRC_HOST_FAILURE:
560         if (simcall->issuer == comm->src_proc)
561           simcall->issuer->context->iwannadie = 1;
562   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
563         else
564           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
565         break;
566
567       case SIMIX_DST_HOST_FAILURE:
568         if (simcall->issuer == comm->dst_proc)
569           simcall->issuer->context->iwannadie = 1;
570   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
571         else
572           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
573         break;
574
575       case SIMIX_LINK_FAILURE:
576
577         XBT_DEBUG(
578             "Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
579             synchro, comm->src_proc ? comm->src_proc->host->cname() : nullptr,
580             comm->dst_proc ? comm->dst_proc->host->cname() : nullptr, simcall->issuer->cname(), simcall->issuer,
581             comm->detached);
582         if (comm->src_proc == simcall->issuer) {
583           XBT_DEBUG("I'm source");
584         } else if (comm->dst_proc == simcall->issuer) {
585           XBT_DEBUG("I'm dest");
586         } else {
587           XBT_DEBUG("I'm neither source nor dest");
588         }
589         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
590         break;
591
592       case SIMIX_CANCELED:
593         if (simcall->issuer == comm->dst_proc)
594           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
595         else
596           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
597         break;
598
599       default:
600         xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
601       }
602     }
603
604     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
605     if (simcall->issuer->exception) {
606       // In order to modify the exception we have to rethrow it:
607       try {
608         std::rethrow_exception(simcall->issuer->exception);
609       }
610       catch(xbt_ex& e) {
611         if (simcall->call == SIMCALL_COMM_WAITANY) {
612           e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
613         }
614         else if (simcall->call == SIMCALL_COMM_TESTANY) {
615           e.value = -1;
616           auto comms = simcall_comm_testany__get__comms(simcall);
617           auto count = simcall_comm_testany__get__count(simcall);
618           auto element = std::find(comms, comms + count, synchro);
619           if (element == comms + count)
620             e.value = -1;
621           else
622             e.value = element - comms;
623         }
624         simcall->issuer->exception = std::make_exception_ptr(e);
625       }
626       catch(...) {
627         // Nothing to do
628       }
629     }
630
631     if (simcall->issuer->host->isOff()) {
632       simcall->issuer->context->iwannadie = 1;
633     }
634
635     simcall->issuer->waiting_synchro = nullptr;
636     simcall->issuer->comms.remove(synchro);
637     if(comm->detached){
638       if(simcall->issuer == comm->src_proc){
639         if(comm->dst_proc)
640           comm->dst_proc->comms.remove(synchro);
641       }
642       else if(simcall->issuer == comm->dst_proc){
643         if(comm->src_proc)
644           comm->src_proc->comms.remove(synchro);
645       }
646       else{
647         comm->dst_proc->comms.remove(synchro);
648         comm->src_proc->comms.remove(synchro);
649       }
650       //in case of a detached comm we have an extra ref to remove, as the sender won't do it
651       destroy_count++;
652     }
653
654     SIMIX_simcall_answer(simcall);
655     destroy_count++;
656   }
657
658   while (destroy_count-- > 0)
659     static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
660 }
661
662 /******************************************************************************/
663 /*                    SIMIX_comm_copy_data callbacks                       */
664 /******************************************************************************/
665 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
666
667 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
668 {
669   SIMIX_comm_copy_data_callback = callback;
670 }
671
672 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
673 {
674   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
675
676   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
677   *(void **) (comm->dst_buff) = buff;
678 }
679
680 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
681 {
682   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
683
684   XBT_DEBUG("Copy the data over");
685   memcpy(comm->dst_buff, buff, buff_size);
686   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
687     xbt_free(buff);
688     comm->src_buff = nullptr;
689   }
690 }
691
692
693 /**
694  *  @brief Copy the communication data from the sender's buffer to the receiver's one
695  *  @param synchro The communication
696  */
697 void SIMIX_comm_copy_data(smx_activity_t synchro)
698 {
699   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
700
701   size_t buff_size = comm->src_buff_size;
702   /* If there is no data to copy then return */
703   if (!comm->src_buff || !comm->dst_buff || comm->copied)
704     return;
705
706   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)", comm,
707             comm->src_proc ? comm->src_proc->host->cname() : "a finished process", comm->src_buff,
708             comm->dst_proc ? comm->dst_proc->host->cname() : "a finished process", comm->dst_buff, buff_size);
709
710   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
711   if (comm->dst_buff_size)
712     buff_size = MIN(buff_size, *(comm->dst_buff_size));
713
714   /* Update the receiver's buffer size to the copied amount */
715   if (comm->dst_buff_size)
716     *comm->dst_buff_size = buff_size;
717
718   if (buff_size > 0){
719       if(comm->copy_data_fun)
720         comm->copy_data_fun (comm, comm->src_buff, buff_size);
721       else
722         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
723   }
724
725
726   /* Set the copied flag so we copy data only once */
727   /* (this function might be called from both communication ends) */
728   comm->copied = 1;
729 }