Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge pull request #107 from adfaure/master
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <algorithm>
7
8 #include <boost/range/algorithm.hpp>
9
10 #include <xbt/ex.hpp>
11
12 #include <simgrid/s4u/host.hpp>
13
14 #include "src/surf/surf_interface.hpp"
15 #include "src/simix/smx_private.h"
16 #include "xbt/log.h"
17 #include "mc/mc.h"
18 #include "src/mc/mc_replay.h"
19 #include "xbt/dict.h"
20 #include "simgrid/s4u/mailbox.hpp"
21
22 #include "src/kernel/activity/SynchroComm.hpp"
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
25
26 static void SIMIX_mbox_free(void *data);
27 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
28
29 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
30 static void SIMIX_comm_copy_data(smx_activity_t comm);
31 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t comm);
32 static smx_activity_t _find_matching_comm(std::deque<smx_activity_t> *deque, e_smx_comm_type_t type,
33     int (*match_fun)(void *, void *,smx_activity_t), void *user_data, smx_activity_t my_synchro, bool remove_matching);
34 static void SIMIX_comm_start(smx_activity_t synchro);
35
36 void SIMIX_mailbox_exit()
37 {
38   xbt_dict_free(&mailboxes);
39 }
40
41 /******************************************************************************/
42 /*                           Rendez-Vous Points                               */
43 /******************************************************************************/
44
45 smx_mailbox_t SIMIX_mbox_create(const char *name)
46 {
47   xbt_assert(name, "Mailboxes must have a name");
48   /* two processes may have pushed the same mbox_create simcall at the same time */
49   smx_mailbox_t mbox = static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
50   if (!mbox) {
51     mbox = new simgrid::simix::Mailbox(name);
52     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
53     xbt_dict_set(mailboxes, mbox->name, mbox, nullptr);
54   }
55   return mbox;
56 }
57
58 void SIMIX_mbox_free(void *data)
59 {
60   XBT_DEBUG("mbox free %p", data);
61   smx_mailbox_t mbox = static_cast<smx_mailbox_t>(data);
62   delete mbox;
63 }
64
65 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
66 {
67   return static_cast<smx_mailbox_t>(xbt_dict_get_or_null(mailboxes, name));
68 }
69
70 /**
71  *  \brief set the receiver of the rendez vous point to allow eager sends
72  *  \param mbox The rendez-vous point
73  *  \param process The receiving process
74  */
75 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
76 {
77   mbox->permanent_receiver = process;
78 }
79
80 /**
81  *  \brief Pushes a communication synchro into a rendez-vous point
82  *  \param mbox The mailbox
83  *  \param synchro The communication synchro
84  */
85 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_activity_t synchro)
86 {
87   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
88   mbox->comm_queue.push_back(comm);
89   comm->mbox = mbox;
90 }
91
92 /**
93  *  \brief Removes a communication synchro from a rendez-vous point
94  *  \param mbox The rendez-vous point
95  *  \param synchro The communication synchro
96  */
97 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_activity_t synchro)
98 {
99   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
100
101   comm->mbox = nullptr;
102   for (auto it = mbox->comm_queue.begin(); it != mbox->comm_queue.end(); it++)
103     if (*it == comm) {
104       mbox->comm_queue. erase(it);
105       return;
106     }
107   xbt_die("Cannot remove the comm %p that is not part of the mailbox %s",comm, mbox->name);
108 }
109
110 /**
111  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
112  *  \param type The type of communication we are looking for (comm_send, comm_recv)
113  *  \return The communication synchro if found, nullptr otherwise
114  */
115 static smx_activity_t _find_matching_comm(std::deque<smx_activity_t> *deque, e_smx_comm_type_t type,
116     int (*match_fun)(void *, void *,smx_activity_t), void *this_user_data, smx_activity_t my_synchro, bool remove_matching)
117 {
118   void* other_user_data = nullptr;
119
120   for(auto it = deque->begin(); it != deque->end(); it++){
121     smx_activity_t synchro = *it;
122     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
123
124     if (comm->type == SIMIX_COMM_SEND) {
125       other_user_data = comm->src_data;
126     } else if (comm->type == SIMIX_COMM_RECEIVE) {
127       other_user_data = comm->dst_data;
128     }
129     if (comm->type == type &&
130         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
131         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
132       XBT_DEBUG("Found a matching communication synchro %p", comm);
133       if (remove_matching)
134         deque->erase(it);
135       comm->ref();
136 #if HAVE_MC
137       comm->mbox_cpy = comm->mbox;
138 #endif
139       comm->mbox = nullptr;
140       return comm;
141     }
142     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
143               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
144               comm, (int)comm->type, (int)type);
145   }
146   XBT_DEBUG("No matching communication synchro found");
147   return nullptr;
148 }
149
150 /******************************************************************************/
151 /*                          Communication synchros                            */
152 /******************************************************************************/
153 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
154                                   double task_size, double rate,
155                                   void *src_buff, size_t src_buff_size,
156                                   int (*match_fun)(void *, void *,smx_activity_t),
157                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),
158           void *data, double timeout){
159   smx_activity_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
160                            src_buff, src_buff_size, match_fun, nullptr, copy_data_fun,
161                data, 0);
162   SIMCALL_SET_MC_VALUE(simcall, 0);
163   simcall_HANDLER_comm_wait(simcall, comm, timeout);
164 }
165 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
166                                   double task_size, double rate,
167                                   void *src_buff, size_t src_buff_size,
168                                   int (*match_fun)(void *, void *,smx_activity_t),
169                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
170                                   void (*copy_data_fun)(smx_activity_t, void*, size_t),// used to copy data if not default one
171                           void *data, int detached)
172 {
173   XBT_DEBUG("send from %p", mbox);
174
175   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
176   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
177
178   /* Look for communication synchro matching our needs. We also provide a description of
179    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
180    *
181    * If it is not found then push our communication into the rendez-vous point */
182   smx_activity_t other_synchro =
183       _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
184   simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
185
186
187   if (!other_synchro) {
188     other_synchro = this_synchro;
189     other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
190
191     if (mbox->permanent_receiver!=nullptr){
192       //this mailbox is for small messages, which have to be sent right now
193       other_synchro->state = SIMIX_READY;
194       other_comm->dst_proc=mbox->permanent_receiver.get();
195       other_comm->ref();
196       mbox->done_comm_queue.push_back(other_synchro);
197       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
198
199     }else{
200       SIMIX_mbox_push(mbox, this_synchro);
201     }
202   } else {
203     XBT_DEBUG("Receive already pushed");
204     this_synchro->unref();
205
206     other_comm->state = SIMIX_READY;
207     other_comm->type = SIMIX_COMM_READY;
208
209   }
210   xbt_fifo_push(src_proc->comms, other_synchro);
211
212
213   if (detached) {
214     other_comm->detached = true;
215     other_comm->clean_fun = clean_fun;
216   } else {
217     other_comm->clean_fun = nullptr;
218   }
219
220   /* Setup the communication synchro */
221   other_comm->src_proc = src_proc;
222   other_comm->task_size = task_size;
223   other_comm->rate = rate;
224   other_comm->src_buff = src_buff;
225   other_comm->src_buff_size = src_buff_size;
226   other_comm->src_data = data;
227
228   other_comm->match_fun = match_fun;
229   other_comm->copy_data_fun = copy_data_fun;
230
231
232   if (MC_is_active() || MC_record_replay_is_active()) {
233     other_comm->state = SIMIX_RUNNING;
234     return (detached ? nullptr : other_comm);
235   }
236
237   SIMIX_comm_start(other_comm);
238   return (detached ? nullptr : other_comm);
239 }
240
241 XBT_PRIVATE void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
242                          void *dst_buff, size_t *dst_buff_size,
243                          int (*match_fun)(void *, void *, smx_activity_t),
244                          void (*copy_data_fun)(smx_activity_t, void*, size_t),
245                          void *data, double timeout, double rate)
246 {
247   smx_activity_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
248   SIMCALL_SET_MC_VALUE(simcall, 0);
249   simcall_HANDLER_comm_wait(simcall, comm, timeout);
250 }
251
252 XBT_PRIVATE smx_activity_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
253     void *dst_buff, size_t *dst_buff_size,
254     int (*match_fun)(void *, void *, smx_activity_t),
255     void (*copy_data_fun)(smx_activity_t, void*, size_t),
256     void *data, double rate)
257 {
258   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
259 }
260
261 smx_activity_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
262     int (*match_fun)(void *, void *, smx_activity_t),
263     void (*copy_data_fun)(smx_activity_t, void*, size_t), // used to copy data if not default one
264     void *data, double rate)
265 {
266   XBT_DEBUG("recv from %p %p", mbox, &mbox->comm_queue);
267   simgrid::kernel::activity::Comm* this_synchro = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
268
269   smx_activity_t other_synchro;
270   //communication already done, get it inside the fifo of completed comms
271   if (mbox->permanent_receiver != nullptr && ! mbox->done_comm_queue.empty()) {
272
273     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
274     //find a match in the already received fifo
275     other_synchro = _find_matching_comm(&mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
276     //if not found, assume the receiver came first, register it to the mailbox in the classical way
277     if (!other_synchro)  {
278       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
279       other_synchro = this_synchro;
280       SIMIX_mbox_push(mbox, this_synchro);
281     } else {
282       simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
283
284       if(other_comm->surf_comm && other_comm->remains() < 1e-12) {
285         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
286         other_comm->state = SIMIX_DONE;
287         other_comm->type = SIMIX_COMM_DONE;
288         other_comm->mbox = nullptr;
289       }
290       other_comm->unref();
291       static_cast<simgrid::kernel::activity::Comm*>(this_synchro)->unref();
292     }
293   } else {
294     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
295
296     /* Look for communication synchro matching our needs. We also provide a description of
297      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
298      *
299      * If it is not found then push our communication into the rendez-vous point */
300     other_synchro = _find_matching_comm(&mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
301
302     if (!other_synchro) {
303       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue.size());
304       other_synchro = this_synchro;
305       SIMIX_mbox_push(mbox, this_synchro);
306     } else {
307       this_synchro->unref();
308       simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
309
310       other_comm->state = SIMIX_READY;
311       other_comm->type = SIMIX_COMM_READY;
312     }
313     xbt_fifo_push(dst_proc->comms, other_synchro);
314   }
315
316   /* Setup communication synchro */
317   simgrid::kernel::activity::Comm *other_comm = static_cast<simgrid::kernel::activity::Comm*>(other_synchro);
318   other_comm->dst_proc = dst_proc;
319   other_comm->dst_buff = dst_buff;
320   other_comm->dst_buff_size = dst_buff_size;
321   other_comm->dst_data = data;
322
323   if (rate > -1.0 && (other_comm->rate < 0.0 || rate < other_comm->rate))
324     other_comm->rate = rate;
325
326   other_comm->match_fun = match_fun;
327   other_comm->copy_data_fun = copy_data_fun;
328
329   if (MC_is_active() || MC_record_replay_is_active()) {
330     other_synchro->state = SIMIX_RUNNING;
331     return other_synchro;
332   }
333
334   SIMIX_comm_start(other_synchro);
335   return other_synchro;
336 }
337
338 smx_activity_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
339                                    int type, int src, int tag,
340                                    int (*match_fun)(void *, void *, smx_activity_t),
341                                    void *data){
342   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
343 }
344
345 smx_activity_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
346                               int tag, int (*match_fun)(void *, void *, smx_activity_t), void *data)
347 {
348   XBT_DEBUG("iprobe from %p %p", mbox, &mbox->comm_queue);
349   simgrid::kernel::activity::Comm* this_comm;
350   int smx_type;
351   if(type == 1){
352     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_SEND);
353     smx_type = SIMIX_COMM_RECEIVE;
354   } else{
355     this_comm = new simgrid::kernel::activity::Comm(SIMIX_COMM_RECEIVE);
356     smx_type = SIMIX_COMM_SEND;
357   } 
358   smx_activity_t other_synchro=nullptr;
359   if (mbox->permanent_receiver != nullptr && !mbox->done_comm_queue.empty()) {
360     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
361     other_synchro = _find_matching_comm(&mbox->done_comm_queue,
362       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
363   }
364   if (!other_synchro){
365     XBT_DEBUG("check if we have more luck in the normal mailbox");
366     other_synchro = _find_matching_comm(&mbox->comm_queue,
367       (e_smx_comm_type_t) smx_type, match_fun, data, this_comm,/*remove_matching*/false);
368   }
369
370   if(other_synchro)
371     other_synchro->unref();
372
373   this_comm->unref();
374   return other_synchro;
375 }
376
377 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_activity_t synchro, double timeout)
378 {
379   /* Associate this simcall to the wait synchro */
380   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
381
382   synchro->simcalls.push_back(simcall);
383   simcall->issuer->waiting_synchro = synchro;
384
385   if (MC_is_active() || MC_record_replay_is_active()) {
386     int idx = SIMCALL_GET_MC_VALUE(simcall);
387     if (idx == 0) {
388       synchro->state = SIMIX_DONE;
389     } else {
390       /* If we reached this point, the wait simcall must have a timeout */
391       /* Otherwise it shouldn't be enabled and executed by the MC */
392       if (timeout < 0.0)
393         THROW_IMPOSSIBLE;
394
395       simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
396       if (comm->src_proc == simcall->issuer)
397         comm->state = SIMIX_SRC_TIMEOUT;
398       else
399         comm->state = SIMIX_DST_TIMEOUT;
400     }
401
402     SIMIX_comm_finish(synchro);
403     return;
404   }
405
406   /* If the synchro has already finish perform the error handling, */
407   /* otherwise set up a waiting timeout on the right side          */
408   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
409     SIMIX_comm_finish(synchro);
410   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
411     surf_action_t sleep = surf_host_sleep(simcall->issuer->host, timeout);
412     sleep->setData(synchro);
413
414     simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
415     if (simcall->issuer == comm->src_proc)
416       comm->src_timeout = sleep;
417     else
418       comm->dst_timeout = sleep;
419   }
420 }
421
422 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_activity_t synchro)
423 {
424   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
425
426   if (MC_is_active() || MC_record_replay_is_active()){
427     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
428     if (simcall_comm_test__get__result(simcall)){
429       synchro->state = SIMIX_DONE;
430       synchro->simcalls.push_back(simcall);
431       SIMIX_comm_finish(synchro);
432     } else {
433       SIMIX_simcall_answer(simcall);
434     }
435     return;
436   }
437
438   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
439   if (simcall_comm_test__get__result(simcall)) {
440     synchro->simcalls.push_back(simcall);
441     SIMIX_comm_finish(synchro);
442   } else {
443     SIMIX_simcall_answer(simcall);
444   }
445 }
446
447 void simcall_HANDLER_comm_testany(
448   smx_simcall_t simcall, simgrid::kernel::activity::ActivityImpl* comms[], size_t count)
449 {
450   // The default result is -1 -- this means, "nothing is ready".
451   // It can be changed below, but only if something matches.
452   simcall_comm_testany__set__result(simcall, -1);
453
454   if (MC_is_active() || MC_record_replay_is_active()){
455     int idx = SIMCALL_GET_MC_VALUE(simcall);
456     if(idx == -1){
457       SIMIX_simcall_answer(simcall);
458     }else{
459       simgrid::kernel::activity::ActivityImpl* synchro = comms[idx];
460       simcall_comm_testany__set__result(simcall, idx);
461       synchro->simcalls.push_back(simcall);
462       synchro->state = SIMIX_DONE;
463       SIMIX_comm_finish(synchro);
464     }
465     return;
466   }
467
468   for (std::size_t i = 0; i != count; ++i) {
469     simgrid::kernel::activity::ActivityImpl* synchro = comms[i];
470     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
471       simcall_comm_testany__set__result(simcall, i);
472       synchro->simcalls.push_back(simcall);
473       SIMIX_comm_finish(synchro);
474       return;
475     }
476   }
477   SIMIX_simcall_answer(simcall);
478 }
479
480 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros, double timeout)
481 {
482   smx_activity_t synchro;
483   unsigned int cursor = 0;
484
485   if (MC_is_active() || MC_record_replay_is_active()){
486     if (timeout > 0.0)
487       xbt_die("Timeout not implemented for waitany in the model-checker"); 
488     int idx = SIMCALL_GET_MC_VALUE(simcall);
489     synchro = xbt_dynar_get_as(synchros, idx, smx_activity_t);
490     synchro->simcalls.push_back(simcall);
491     simcall_comm_waitany__set__result(simcall, idx);
492     synchro->state = SIMIX_DONE;
493     SIMIX_comm_finish(synchro);
494     return;
495   }
496   
497   if (timeout < 0.0){
498     simcall->timer = NULL;
499   } else {
500     simcall->timer = SIMIX_timer_set(timeout, [simcall]() {
501       SIMIX_waitany_remove_simcall_from_actions(simcall);
502       simcall_comm_waitany__set__result(simcall, -1);
503       SIMIX_simcall_answer(simcall);
504     });
505   }
506   
507   xbt_dynar_foreach(synchros, cursor, synchro){
508     /* associate this simcall to the the synchro */
509     synchro->simcalls.push_back(simcall);
510
511     /* see if the synchro is already finished */
512     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
513       SIMIX_comm_finish(synchro);
514       break;
515     }
516   }
517 }
518
519 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
520 {
521   smx_activity_t synchro;
522   unsigned int cursor = 0;
523   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
524
525   xbt_dynar_foreach(synchros, cursor, synchro) {
526     // Remove the first occurence of simcall:
527     auto i = boost::range::find(synchro->simcalls, simcall);
528     if (i !=  synchro->simcalls.end())
529       synchro->simcalls.erase(i);
530   }
531 }
532
533 /**
534  *  \brief Starts the simulation of a communication synchro.
535  *  \param synchro the communication synchro
536  */
537 static inline void SIMIX_comm_start(smx_activity_t synchro)
538 {
539   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
540
541   /* If both the sender and the receiver are already there, start the communication */
542   if (synchro->state == SIMIX_READY) {
543
544     sg_host_t sender   = comm->src_proc->host;
545     sg_host_t receiver = comm->dst_proc->host;
546
547     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
548
549     comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
550     comm->surf_comm->setData(synchro);
551     comm->state = SIMIX_RUNNING;
552
553     /* If a link is failed, detect it immediately */
554     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
555       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
556                 sg_host_get_name(sender), sg_host_get_name(receiver));
557       comm->state = SIMIX_LINK_FAILURE;
558       comm->cleanupSurf();
559     }
560
561     /* If any of the process is suspend, create the synchro but stop its execution,
562        it will be restarted when the sender process resume */
563     if (SIMIX_process_is_suspended(comm->src_proc) || SIMIX_process_is_suspended(comm->dst_proc)) {
564       if (SIMIX_process_is_suspended(comm->src_proc))
565         XBT_DEBUG("The communication is suspended on startup because src (%s@%s) was suspended since it initiated the communication",
566             comm->src_proc->name.c_str(), sg_host_get_name(comm->src_proc->host));
567       else
568         XBT_DEBUG("The communication is suspended on startup because dst (%s@%s) was suspended since it initiated the communication",
569             comm->dst_proc->name.c_str(), sg_host_get_name(comm->dst_proc->host));
570
571       comm->surf_comm->suspend();
572     }
573   }
574 }
575
576 /**
577  * \brief Answers the SIMIX simcalls associated to a communication synchro.
578  * \param synchro a finished communication synchro
579  */
580 void SIMIX_comm_finish(smx_activity_t synchro)
581 {
582   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
583   unsigned int destroy_count = 0;
584
585   while (!synchro->simcalls.empty()) {
586     smx_simcall_t simcall = synchro->simcalls.front();
587     synchro->simcalls.pop_front();
588
589     /* If a waitany simcall is waiting for this synchro to finish, then remove
590        it from the other synchros in the waitany list. Afterwards, get the
591        position of the actual synchro in the waitany dynar and
592        return it as the result of the simcall */
593
594     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
595       continue; // if process handling comm is killed
596     if (simcall->call == SIMCALL_COMM_WAITANY) {
597       SIMIX_waitany_remove_simcall_from_actions(simcall);
598       if (simcall->timer) {
599         SIMIX_timer_remove(simcall->timer);
600         simcall->timer = nullptr;
601       }
602       if (!MC_is_active() && !MC_record_replay_is_active())
603         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
604     }
605
606     /* If the synchro is still in a rendez-vous point then remove from it */
607     if (comm->mbox)
608       SIMIX_mbox_remove(comm->mbox, synchro);
609
610     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
611
612     /* Check out for errors */
613
614     if (simcall->issuer->host->isOff()) {
615       simcall->issuer->context->iwannadie = 1;
616       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
617     } else {
618       switch (synchro->state) {
619
620       case SIMIX_DONE:
621         XBT_DEBUG("Communication %p complete!", synchro);
622         SIMIX_comm_copy_data(synchro);
623         break;
624
625       case SIMIX_SRC_TIMEOUT:
626         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
627         break;
628
629       case SIMIX_DST_TIMEOUT:
630         SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
631         break;
632
633       case SIMIX_SRC_HOST_FAILURE:
634         if (simcall->issuer == comm->src_proc)
635           simcall->issuer->context->iwannadie = 1;
636   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
637         else
638           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
639         break;
640
641       case SIMIX_DST_HOST_FAILURE:
642         if (simcall->issuer == comm->dst_proc)
643           simcall->issuer->context->iwannadie = 1;
644   //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
645         else
646           SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
647         break;
648
649       case SIMIX_LINK_FAILURE:
650
651         XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
652                   synchro,
653                   comm->src_proc ? sg_host_get_name(comm->src_proc->host) : nullptr,
654                   comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : nullptr,
655                   simcall->issuer->name.c_str(), simcall->issuer, comm->detached);
656         if (comm->src_proc == simcall->issuer) {
657           XBT_DEBUG("I'm source");
658         } else if (comm->dst_proc == simcall->issuer) {
659           XBT_DEBUG("I'm dest");
660         } else {
661           XBT_DEBUG("I'm neither source nor dest");
662         }
663         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
664         break;
665
666       case SIMIX_CANCELED:
667         if (simcall->issuer == comm->dst_proc)
668           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
669         else
670           SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
671         break;
672
673       default:
674         xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
675       }
676     }
677
678     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
679     if (simcall->issuer->exception) {
680       // In order to modify the exception we have to rethrow it:
681       try {
682         std::rethrow_exception(simcall->issuer->exception);
683       }
684       catch(xbt_ex& e) {
685         if (simcall->call == SIMCALL_COMM_WAITANY) {
686           e.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
687         }
688         else if (simcall->call == SIMCALL_COMM_TESTANY) {
689           e.value = -1;
690           auto comms = simcall_comm_testany__get__comms(simcall);
691           auto count = simcall_comm_testany__get__count(simcall);
692           auto element = std::find(comms, comms + count, synchro);
693           if (element == comms + count)
694             e.value = -1;
695           else
696             e.value = element - comms;
697         }
698         simcall->issuer->exception = std::make_exception_ptr(e);
699       }
700       catch(...) {
701         // Nothing to do
702       }
703     }
704
705     if (simcall->issuer->host->isOff()) {
706       simcall->issuer->context->iwannadie = 1;
707     }
708
709     simcall->issuer->waiting_synchro = nullptr;
710     xbt_fifo_remove(simcall->issuer->comms, synchro);
711     if(comm->detached){
712       if(simcall->issuer == comm->src_proc){
713         if(comm->dst_proc)
714           xbt_fifo_remove(comm->dst_proc->comms, synchro);
715       }
716       if(simcall->issuer == comm->dst_proc){
717         if(comm->src_proc)
718           xbt_fifo_remove(comm->src_proc->comms, synchro);
719         //in case of a detached comm we have an extra ref to remove, as the sender won't do it
720         destroy_count++;
721       }
722     }
723     SIMIX_simcall_answer(simcall);
724     destroy_count++;
725   }
726
727   while (destroy_count-- > 0)
728     static_cast<simgrid::kernel::activity::Comm*>(synchro)->unref();
729 }
730
731 /******************************************************************************/
732 /*                    SIMIX_comm_copy_data callbacks                       */
733 /******************************************************************************/
734 static void (*SIMIX_comm_copy_data_callback) (smx_activity_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
735
736 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_activity_t, void*, size_t))
737 {
738   SIMIX_comm_copy_data_callback = callback;
739 }
740
741 void SIMIX_comm_copy_pointer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
742 {
743   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
744
745   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
746   *(void **) (comm->dst_buff) = buff;
747 }
748
749 void SIMIX_comm_copy_buffer_callback(smx_activity_t synchro, void* buff, size_t buff_size)
750 {
751   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
752
753   XBT_DEBUG("Copy the data over");
754   memcpy(comm->dst_buff, buff, buff_size);
755   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
756     xbt_free(buff);
757     comm->src_buff = nullptr;
758   }
759 }
760
761
762 /**
763  *  \brief Copy the communication data from the sender's buffer to the receiver's one
764  *  \param comm The communication
765  */
766 void SIMIX_comm_copy_data(smx_activity_t synchro)
767 {
768   simgrid::kernel::activity::Comm *comm = static_cast<simgrid::kernel::activity::Comm*>(synchro);
769
770   size_t buff_size = comm->src_buff_size;
771   /* If there is no data to copy then return */
772   if (!comm->src_buff || !comm->dst_buff || comm->copied)
773     return;
774
775   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
776             comm,
777             comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
778             comm->src_buff,
779             comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
780             comm->dst_buff, buff_size);
781
782   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
783   if (comm->dst_buff_size)
784     buff_size = MIN(buff_size, *(comm->dst_buff_size));
785
786   /* Update the receiver's buffer size to the copied amount */
787   if (comm->dst_buff_size)
788     *comm->dst_buff_size = buff_size;
789
790   if (buff_size > 0){
791       if(comm->copy_data_fun)
792         comm->copy_data_fun (comm, comm->src_buff, buff_size);
793       else
794         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
795   }
796
797
798   /* Set the copied flag so we copy data only once */
799   /* (this function might be called from both communication ends) */
800   comm->copied = 1;
801 }