Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cosmetics
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "simgrid/s4u/mailbox.hpp"
13
14 #include "src/simix/SynchroComm.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
24 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
25 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
26     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
27 static void SIMIX_comm_start(smx_synchro_t synchro);
28
29 void SIMIX_mailbox_exit(void)
30 {
31   xbt_dict_free(&mailboxes);
32 }
33
34 /******************************************************************************/
35 /*                           Rendez-Vous Points                               */
36 /******************************************************************************/
37
38 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 {
40   xbt_assert(name, "Mailboxes must have a name");
41   /* two processes may have pushed the same mbox_create simcall at the same time */
42   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
43
44   if (!mbox) {
45     mbox = xbt_new0(s_smx_mailbox_t, 1);
46     mbox->name = xbt_strdup(name);
47     mbox->comm_queue = new std::deque<smx_synchro_t>();
48     mbox->done_comm_queue = nullptr; // Allocated on need only
49     mbox->permanent_receiver=NULL;
50
51     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
52     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
53   }
54   return mbox;
55 }
56
57 void SIMIX_mbox_free(void *data)
58 {
59   XBT_DEBUG("mbox free %p", data);
60   smx_mailbox_t mbox = (smx_mailbox_t) data;
61   xbt_free(mbox->name);
62   delete mbox->comm_queue;
63   delete mbox->done_comm_queue;
64
65   xbt_free(mbox);
66 }
67
68 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 {
70   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
71 }
72
73 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
74 {
75   return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
76 }
77
78 /**
79  *  \brief get the receiver (process associated to the mailbox)
80  *  \param mbox The rendez-vous point
81  *  \return process The receiving process (NULL if not set)
82  */
83 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
84 {
85   return mbox->permanent_receiver;
86 }
87
88 /**
89  *  \brief set the receiver of the rendez vous point to allow eager sends
90  *  \param mbox The rendez-vous point
91  *  \param process The receiving process
92  */
93 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
94 {
95   mbox->permanent_receiver=process;
96   if (mbox->done_comm_queue == nullptr)
97     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
98 }
99
100 /**
101  *  \brief Pushes a communication synchro into a rendez-vous point
102  *  \param mbox The mailbox
103  *  \param comm The communication synchro
104  */
105 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
106 {
107   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
108
109   mbox->comm_queue->push_back(comm);
110   comm->mbox = mbox;
111 }
112
113 /**
114  *  \brief Removes a communication synchro from a rendez-vous point
115  *  \param mbox The rendez-vous point
116  *  \param comm The communication synchro
117  */
118 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
119 {
120   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
121
122   comm->mbox = NULL;
123   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
124     if (*it == comm) {
125       mbox->comm_queue->erase(it);
126       return;
127     }
128   xbt_die("Cannot remove this comm that is not part of the mailbox");
129 }
130
131 /**
132  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
133  *  \param type The type of communication we are looking for (comm_send, comm_recv)
134  *  \return The communication synchro if found, NULL otherwise
135  */
136 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
137     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
138 {
139   void* other_user_data = NULL;
140
141   for(auto it = deque->begin(); it != deque->end(); it++){
142     smx_synchro_t synchro = *it;
143     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
144
145     if (comm->type == SIMIX_COMM_SEND) {
146       other_user_data = comm->src_data;
147     } else if (comm->type == SIMIX_COMM_RECEIVE) {
148       other_user_data = comm->dst_data;
149     }
150     if (comm->type == type &&
151         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
152         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
153       XBT_DEBUG("Found a matching communication synchro %p", comm);
154       if (remove_matching)
155         deque->erase(it);
156       comm->refcount++;
157 #if HAVE_MC
158       comm->mbox_cpy = comm->mbox;
159 #endif
160       comm->mbox = NULL;
161       return comm;
162     }
163     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
164               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
165               comm, (int)comm->type, (int)type);
166   }
167   XBT_DEBUG("No matching communication synchro found");
168   return NULL;
169 }
170
171 /******************************************************************************/
172 /*                          Communication synchros                            */
173 /******************************************************************************/
174
175 /**
176  *  \brief Creates a new communicate synchro
177  *  \param type The direction of communication (comm_send, comm_recv)
178  *  \return The new communicate synchro
179  */
180 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
181 {
182   simgrid::simix::Comm *comm = new simgrid::simix::Comm();
183   comm->state = SIMIX_WAITING;
184   comm->type = type;
185   comm->refcount = 1;
186   comm->src_data=NULL;
187   comm->dst_data=NULL;
188
189   XBT_DEBUG("Create communicate synchro %p", comm);
190
191   return comm;
192 }
193
194 /**
195  *  \brief Destroy a communicate synchro
196  *  \param synchro The communicate synchro to be destroyed
197  */
198 void SIMIX_comm_destroy(smx_synchro_t synchro)
199 {
200   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
201
202   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
203
204   if (comm->refcount <= 0) {
205     xbt_backtrace_display_current();
206     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
207             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
208   }
209   comm->refcount--;
210   if (comm->refcount > 0)
211       return;
212   XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
213
214   SIMIX_comm_destroy_internal_actions(synchro);
215
216   if (comm->detached && comm->state != SIMIX_DONE) {
217     /* the communication has failed and was detached:
218      * we have to free the buffer */
219     if (comm->clean_fun) {
220       comm->clean_fun(comm->src_buff);
221     }
222     comm->src_buff = NULL;
223   }
224
225   if(comm->mbox)
226     SIMIX_mbox_remove(comm->mbox, comm);
227
228   delete comm;
229 }
230
231 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
232 {
233   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
234   if (comm->surf_comm){
235     comm->surf_comm->unref();
236     comm->surf_comm = NULL;
237   }
238
239   if (comm->src_timeout){
240     comm->src_timeout->unref();
241     comm->src_timeout = NULL;
242   }
243
244   if (comm->dst_timeout){
245     comm->dst_timeout->unref();
246     comm->dst_timeout = NULL;
247   }
248 }
249
250 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
251                                   double task_size, double rate,
252                                   void *src_buff, size_t src_buff_size,
253                                   int (*match_fun)(void *, void *,smx_synchro_t),
254                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
255           void *data, double timeout){
256   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
257                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
258                data, 0);
259   SIMCALL_SET_MC_VALUE(simcall, 0);
260   simcall_HANDLER_comm_wait(simcall, comm, timeout);
261 }
262 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
263                                   double task_size, double rate,
264                                   void *src_buff, size_t src_buff_size,
265                                   int (*match_fun)(void *, void *,smx_synchro_t),
266                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
267                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
268                           void *data, int detached)
269 {
270   XBT_DEBUG("send from %p", mbox);
271
272   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
273   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
274
275   /* Look for communication synchro matching our needs. We also provide a description of
276    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
277    *
278    * If it is not found then push our communication into the rendez-vous point */
279   smx_synchro_t other_synchro =
280       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
281   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
282
283
284   if (!other_synchro) {
285     other_synchro = this_synchro;
286     other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
287
288     if (mbox->permanent_receiver!=NULL){
289       //this mailbox is for small messages, which have to be sent right now
290       other_synchro->state = SIMIX_READY;
291       other_comm->dst_proc=mbox->permanent_receiver;
292       other_comm->refcount++;
293       mbox->done_comm_queue->push_back(other_synchro);
294       other_comm->mbox=mbox;
295       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
296
297     }else{
298       SIMIX_mbox_push(mbox, this_synchro);
299     }
300   } else {
301     XBT_DEBUG("Receive already pushed");
302
303     SIMIX_comm_destroy(this_synchro);
304
305     other_comm->state = SIMIX_READY;
306     other_comm->type = SIMIX_COMM_READY;
307
308   }
309   xbt_fifo_push(src_proc->comms, other_synchro);
310
311   /* if the communication synchro is detached then decrease the refcount
312    * by one, so it will be eliminated by the receiver's destroy call */
313   if (detached) {
314     other_comm->detached = 1;
315     other_comm->refcount--;
316     other_comm->clean_fun = clean_fun;
317   } else {
318     other_comm->clean_fun = NULL;
319   }
320
321   /* Setup the communication synchro */
322   other_comm->src_proc = src_proc;
323   other_comm->task_size = task_size;
324   other_comm->rate = rate;
325   other_comm->src_buff = src_buff;
326   other_comm->src_buff_size = src_buff_size;
327   other_comm->src_data = data;
328
329   other_comm->match_fun = match_fun;
330   other_comm->copy_data_fun = copy_data_fun;
331
332
333   if (MC_is_active() || MC_record_replay_is_active()) {
334     other_comm->state = SIMIX_RUNNING;
335     return (detached ? NULL : other_comm);
336   }
337
338   SIMIX_comm_start(other_comm);
339   return (detached ? NULL : other_comm);
340 }
341
342 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
343                          void *dst_buff, size_t *dst_buff_size,
344                          int (*match_fun)(void *, void *, smx_synchro_t),
345                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
346                          void *data, double timeout, double rate)
347 {
348   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
349   SIMCALL_SET_MC_VALUE(simcall, 0);
350   simcall_HANDLER_comm_wait(simcall, comm, timeout);
351 }
352
353 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
354     void *dst_buff, size_t *dst_buff_size,
355     int (*match_fun)(void *, void *, smx_synchro_t),
356     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
357     void *data, double rate)
358 {
359   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
360 }
361
362 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
363     int (*match_fun)(void *, void *, smx_synchro_t),
364     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
365     void *data, double rate)
366 {
367   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
368   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
369
370   smx_synchro_t other_synchro;
371   //communication already done, get it inside the fifo of completed comms
372   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
373
374     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
375     //find a match in the already received fifo
376     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
377     //if not found, assume the receiver came first, register it to the mailbox in the classical way
378     if (!other_synchro)  {
379       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
380       other_synchro = this_synchro;
381       SIMIX_mbox_push(mbox, this_synchro);
382     } else {
383       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
384
385       if(other_comm->surf_comm && SIMIX_comm_get_remains(other_comm)==0.0) {
386         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
387         other_comm->state = SIMIX_DONE;
388         other_comm->type = SIMIX_COMM_DONE;
389         other_comm->mbox = NULL;
390       }
391       other_comm->refcount--;
392       SIMIX_comm_destroy(this_synchro);
393     }
394   } else {
395     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
396
397     /* Look for communication synchro matching our needs. We also provide a description of
398      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
399      *
400      * If it is not found then push our communication into the rendez-vous point */
401     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
402
403     if (!other_synchro) {
404       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
405       other_synchro = this_synchro;
406       SIMIX_mbox_push(mbox, this_synchro);
407     } else {
408       SIMIX_comm_destroy(this_synchro);
409       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
410
411       other_comm->state = SIMIX_READY;
412       other_comm->type = SIMIX_COMM_READY;
413     }
414     xbt_fifo_push(dst_proc->comms, other_synchro);
415   }
416
417   /* Setup communication synchro */
418   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
419   other_comm->dst_proc = dst_proc;
420   other_comm->dst_buff = dst_buff;
421   other_comm->dst_buff_size = dst_buff_size;
422   other_comm->dst_data = data;
423
424   if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
425     other_comm->rate = rate;
426
427   other_comm->match_fun = match_fun;
428   other_comm->copy_data_fun = copy_data_fun;
429
430   if (MC_is_active() || MC_record_replay_is_active()) {
431     other_synchro->state = SIMIX_RUNNING;
432     return other_synchro;
433   }
434
435   SIMIX_comm_start(other_synchro);
436   return other_synchro;
437 }
438
439 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
440                                    int type, int src, int tag,
441                                    int (*match_fun)(void *, void *, smx_synchro_t),
442                                    void *data){
443   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
444 }
445
446 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
447                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
448 {
449   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
450   smx_synchro_t this_synchro;
451   int smx_type;
452   if(type == 1){
453     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
454     smx_type = SIMIX_COMM_RECEIVE;
455   } else{
456     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
457     smx_type = SIMIX_COMM_SEND;
458   } 
459   smx_synchro_t other_synchro=NULL;
460   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
461     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
462     other_synchro =
463         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
464   }
465   if (!other_synchro){
466     XBT_DEBUG("check if we have more luck in the normal mailbox");
467     other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
468   }
469
470   if(other_synchro) {
471     simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
472     other_comm->refcount--;
473   }
474
475   SIMIX_comm_destroy(this_synchro);
476   return other_synchro;
477 }
478
479 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
480 {
481   /* the simcall may be a wait, a send or a recv */
482   surf_action_t sleep;
483
484   /* Associate this simcall to the wait synchro */
485   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
486
487   xbt_fifo_push(synchro->simcalls, simcall);
488   simcall->issuer->waiting_synchro = synchro;
489
490   if (MC_is_active() || MC_record_replay_is_active()) {
491     int idx = SIMCALL_GET_MC_VALUE(simcall);
492     if (idx == 0) {
493       synchro->state = SIMIX_DONE;
494     } else {
495       /* If we reached this point, the wait simcall must have a timeout */
496       /* Otherwise it shouldn't be enabled and executed by the MC */
497       if (timeout == -1)
498         THROW_IMPOSSIBLE;
499
500       simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
501       if (comm->src_proc == simcall->issuer)
502         comm->state = SIMIX_SRC_TIMEOUT;
503       else
504         comm->state = SIMIX_DST_TIMEOUT;
505     }
506
507     SIMIX_comm_finish(synchro);
508     return;
509   }
510
511   /* If the synchro has already finish perform the error handling, */
512   /* otherwise set up a waiting timeout on the right side          */
513   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
514     SIMIX_comm_finish(synchro);
515   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
516     sleep = surf_host_sleep(simcall->issuer->host, timeout);
517     sleep->setData(synchro);
518
519     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
520     if (simcall->issuer == comm->src_proc)
521       comm->src_timeout = sleep;
522     else
523       comm->dst_timeout = sleep;
524   }
525 }
526
527 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
528 {
529   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
530
531   if (MC_is_active() || MC_record_replay_is_active()){
532     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
533     if (simcall_comm_test__get__result(simcall)){
534       synchro->state = SIMIX_DONE;
535       xbt_fifo_push(synchro->simcalls, simcall);
536       SIMIX_comm_finish(synchro);
537     } else {
538       SIMIX_simcall_answer(simcall);
539     }
540     return;
541   }
542
543   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
544   if (simcall_comm_test__get__result(simcall)) {
545     xbt_fifo_push(synchro->simcalls, simcall);
546     SIMIX_comm_finish(synchro);
547   } else {
548     SIMIX_simcall_answer(simcall);
549   }
550 }
551
552 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
553 {
554   unsigned int cursor;
555   smx_synchro_t synchro;
556   simcall_comm_testany__set__result(simcall, -1);
557
558   if (MC_is_active() || MC_record_replay_is_active()){
559     int idx = SIMCALL_GET_MC_VALUE(simcall);
560     if(idx == -1){
561       SIMIX_simcall_answer(simcall);
562     }else{
563       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
564       simcall_comm_testany__set__result(simcall, idx);
565       xbt_fifo_push(synchro->simcalls, simcall);
566       synchro->state = SIMIX_DONE;
567       SIMIX_comm_finish(synchro);
568     }
569     return;
570   }
571
572   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
573     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
574       simcall_comm_testany__set__result(simcall, cursor);
575       xbt_fifo_push(synchro->simcalls, simcall);
576       SIMIX_comm_finish(synchro);
577       return;
578     }
579   }
580   SIMIX_simcall_answer(simcall);
581 }
582
583 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
584 {
585   smx_synchro_t synchro;
586   unsigned int cursor = 0;
587
588   if (MC_is_active() || MC_record_replay_is_active()){
589     int idx = SIMCALL_GET_MC_VALUE(simcall);
590     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
591     xbt_fifo_push(synchro->simcalls, simcall);
592     simcall_comm_waitany__set__result(simcall, idx);
593     synchro->state = SIMIX_DONE;
594     SIMIX_comm_finish(synchro);
595     return;
596   }
597
598   xbt_dynar_foreach(synchros, cursor, synchro){
599     /* associate this simcall to the the synchro */
600     xbt_fifo_push(synchro->simcalls, simcall);
601
602     /* see if the synchro is already finished */
603     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
604       SIMIX_comm_finish(synchro);
605       break;
606     }
607   }
608 }
609
610 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
611 {
612   smx_synchro_t synchro;
613   unsigned int cursor = 0;
614   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
615
616   xbt_dynar_foreach(synchros, cursor, synchro)
617     xbt_fifo_remove(synchro->simcalls, simcall);
618 }
619
620 /**
621  *  \brief Starts the simulation of a communication synchro.
622  *  \param synchro the communication synchro
623  */
624 static inline void SIMIX_comm_start(smx_synchro_t synchro)
625 {
626   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
627
628   /* If both the sender and the receiver are already there, start the communication */
629   if (synchro->state == SIMIX_READY) {
630
631     sg_host_t sender   = comm->src_proc->host;
632     sg_host_t receiver = comm->dst_proc->host;
633
634     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
635
636     comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
637     comm->surf_comm->setData(synchro);
638     comm->state = SIMIX_RUNNING;
639
640     /* If a link is failed, detect it immediately */
641     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
642       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
643                 sg_host_get_name(sender), sg_host_get_name(receiver));
644       comm->state = SIMIX_LINK_FAILURE;
645       SIMIX_comm_destroy_internal_actions(synchro);
646     }
647
648     /* If any of the process is suspend, create the synchro but stop its execution,
649        it will be restarted when the sender process resume */
650     if (SIMIX_process_is_suspended(comm->src_proc) ||
651         SIMIX_process_is_suspended(comm->dst_proc)) {
652       /* FIXME: check what should happen with the synchro state */
653
654       if (SIMIX_process_is_suspended(comm->src_proc))
655         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
656                   sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
657       else
658         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
659                   sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
660
661       comm->surf_comm->suspend();
662     }
663   }
664 }
665
666 /**
667  * \brief Answers the SIMIX simcalls associated to a communication synchro.
668  * \param synchro a finished communication synchro
669  */
670 void SIMIX_comm_finish(smx_synchro_t synchro)
671 {
672   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
673   unsigned int destroy_count = 0;
674   smx_simcall_t simcall;
675
676   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
677
678     /* If a waitany simcall is waiting for this synchro to finish, then remove
679        it from the other synchros in the waitany list. Afterwards, get the
680        position of the actual synchro in the waitany dynar and
681        return it as the result of the simcall */
682
683     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
684       continue; // if process handling comm is killed
685     if (simcall->call == SIMCALL_COMM_WAITANY) {
686       SIMIX_waitany_remove_simcall_from_actions(simcall);
687       if (!MC_is_active() && !MC_record_replay_is_active())
688         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
689     }
690
691     /* If the synchro is still in a rendez-vous point then remove from it */
692     if (comm->mbox)
693       SIMIX_mbox_remove(comm->mbox, synchro);
694
695     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
696
697     /* Check out for errors */
698
699     if (simcall->issuer->host->isOff()) {
700       simcall->issuer->context->iwannadie = 1;
701       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
702     } else
703
704     switch (synchro->state) {
705
706     case SIMIX_DONE:
707       XBT_DEBUG("Communication %p complete!", synchro);
708       SIMIX_comm_copy_data(synchro);
709       break;
710
711     case SIMIX_SRC_TIMEOUT:
712       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
713       break;
714
715     case SIMIX_DST_TIMEOUT:
716       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
717       break;
718
719     case SIMIX_SRC_HOST_FAILURE:
720       if (simcall->issuer == comm->src_proc)
721         simcall->issuer->context->iwannadie = 1;
722 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
723       else
724         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
725       break;
726
727     case SIMIX_DST_HOST_FAILURE:
728       if (simcall->issuer == comm->dst_proc)
729         simcall->issuer->context->iwannadie = 1;
730 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
731       else
732         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
733       break;
734
735     case SIMIX_LINK_FAILURE:
736
737       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
738                 synchro,
739                 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
740                 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
741                 simcall->issuer->name, simcall->issuer, comm->detached);
742       if (comm->src_proc == simcall->issuer) {
743         XBT_DEBUG("I'm source");
744       } else if (comm->dst_proc == simcall->issuer) {
745         XBT_DEBUG("I'm dest");
746       } else {
747         XBT_DEBUG("I'm neither source nor dest");
748       }
749       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
750       break;
751
752     case SIMIX_CANCELED:
753       if (simcall->issuer == comm->dst_proc)
754         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
755       else
756         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
757       break;
758
759     default:
760       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
761     }
762
763     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
764     if (simcall->issuer->doexception) {
765       if (simcall->call == SIMCALL_COMM_WAITANY) {
766         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
767       }
768       else if (simcall->call == SIMCALL_COMM_TESTANY) {
769         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
770       }
771     }
772
773     if (simcall->issuer->host->isOff()) {
774       simcall->issuer->context->iwannadie = 1;
775     }
776
777     simcall->issuer->waiting_synchro = NULL;
778     xbt_fifo_remove(simcall->issuer->comms, synchro);
779     if(comm->detached){
780       if(simcall->issuer == comm->src_proc){
781         if(comm->dst_proc)
782           xbt_fifo_remove(comm->dst_proc->comms, synchro);
783       }
784       if(simcall->issuer == comm->dst_proc){
785         if(comm->src_proc)
786           xbt_fifo_remove(comm->src_proc->comms, synchro);
787       }
788     }
789     SIMIX_simcall_answer(simcall);
790     destroy_count++;
791   }
792
793   while (destroy_count-- > 0)
794     SIMIX_comm_destroy(synchro);
795 }
796
797 /**
798  * \brief This function is called when a Surf communication synchro is finished.
799  * \param synchro the corresponding Simix communication
800  */
801 void SIMIX_post_comm(smx_synchro_t synchro)
802 {
803   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
804
805   /* Update synchro state */
806   if (comm->src_timeout &&
807       comm->src_timeout->getState() == simgrid::surf::Action::State::done)
808     synchro->state = SIMIX_SRC_TIMEOUT;
809   else if (comm->dst_timeout &&
810     comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
811     synchro->state = SIMIX_DST_TIMEOUT;
812   else if (comm->src_timeout &&
813     comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
814     synchro->state = SIMIX_SRC_HOST_FAILURE;
815   else if (comm->dst_timeout &&
816       comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
817     synchro->state = SIMIX_DST_HOST_FAILURE;
818   else if (comm->surf_comm &&
819     comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
820     synchro->state = SIMIX_LINK_FAILURE;
821   } else
822     synchro->state = SIMIX_DONE;
823
824   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
825             comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
826
827   /* destroy the surf actions associated with the Simix communication */
828   SIMIX_comm_destroy_internal_actions(comm);
829
830   /* if there are simcalls associated with the synchro, then answer them */
831   if (xbt_fifo_size(synchro->simcalls)) {
832     SIMIX_comm_finish(comm);
833   }
834 }
835
836 void SIMIX_comm_cancel(smx_synchro_t synchro)
837 {
838   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
839
840   /* if the synchro is a waiting state means that it is still in a mbox */
841   /* so remove from it and delete it */
842   if (comm->state == SIMIX_WAITING) {
843     SIMIX_mbox_remove(comm->mbox, synchro);
844     comm->state = SIMIX_CANCELED;
845   }
846   else if (!MC_is_active() /* when running the MC there are no surf actions */
847            && !MC_record_replay_is_active()
848            && (comm->state == SIMIX_READY || comm->state == SIMIX_RUNNING)) {
849
850     comm->surf_comm->cancel();
851   }
852 }
853
854 /************* synchro Getters **************/
855
856 /**
857  *  \brief get the amount remaining from the communication
858  *  \param synchro The communication
859  */
860 double SIMIX_comm_get_remains(smx_synchro_t synchro)
861 {
862   if(!synchro)
863     return 0;
864   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
865
866   double remains;
867   switch (synchro->state) {
868
869   case SIMIX_RUNNING:
870     remains = comm->surf_comm->getRemains();
871     break;
872
873   case SIMIX_WAITING:
874   case SIMIX_READY:
875     remains = 0; /*FIXME: check what should be returned */
876     break;
877
878   default:
879     remains = 0; /*FIXME: is this correct? */
880     break;
881   }
882   return remains;
883 }
884
885 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
886 {
887   return synchro->state;
888 }
889
890 /**
891  *  \brief Return the user data associated to the sender of the communication
892  *  \param synchro The communication
893  *  \return the user data
894  */
895 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
896 {
897   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
898
899   return comm->src_data;
900 }
901
902 /**
903  *  \brief Return the user data associated to the receiver of the communication
904  *  \param synchro The communication
905  *  \return the user data
906  */
907 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
908 {
909   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
910
911   return comm->dst_data;
912 }
913
914 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
915 {
916   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
917
918   return comm->src_proc;
919 }
920
921 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
922 {
923   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
924
925   return comm->dst_proc;
926 }
927
928 /******************************************************************************/
929 /*                    SIMIX_comm_copy_data callbacks                       */
930 /******************************************************************************/
931 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
932
933 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
934 {
935   SIMIX_comm_copy_data_callback = callback;
936 }
937
938 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
939 {
940   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
941
942   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
943   *(void **) (comm->dst_buff) = buff;
944 }
945
946 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
947 {
948   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
949
950   XBT_DEBUG("Copy the data over");
951   memcpy(comm->dst_buff, buff, buff_size);
952   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
953     xbt_free(buff);
954     comm->src_buff = NULL;
955   }
956 }
957
958
959 /**
960  *  \brief Copy the communication data from the sender's buffer to the receiver's one
961  *  \param comm The communication
962  */
963 void SIMIX_comm_copy_data(smx_synchro_t synchro)
964 {
965   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
966
967   size_t buff_size = comm->src_buff_size;
968   /* If there is no data to copy then return */
969   if (!comm->src_buff || !comm->dst_buff || comm->copied)
970     return;
971
972   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
973             comm,
974             comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
975             comm->src_buff,
976             comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
977             comm->dst_buff, buff_size);
978
979   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
980   if (comm->dst_buff_size)
981     buff_size = MIN(buff_size, *(comm->dst_buff_size));
982
983   /* Update the receiver's buffer size to the copied amount */
984   if (comm->dst_buff_size)
985     *comm->dst_buff_size = buff_size;
986
987   if (buff_size > 0){
988       if(comm->copy_data_fun)
989         comm->copy_data_fun (comm, comm->src_buff, buff_size);
990       else
991         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
992   }
993
994
995   /* Set the copied flag so we copy data only once */
996   /* (this function might be called from both communication ends) */
997   comm->copied = 1;
998 }