Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
We never use the name of the mailbox
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "simgrid/s4u/mailbox.hpp"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
15
16 static void SIMIX_mbox_free(void *data);
17 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
18
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
23 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
24     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
25 static void SIMIX_comm_start(smx_synchro_t synchro);
26
27 void SIMIX_mailbox_exit(void)
28 {
29   xbt_dict_free(&mailboxes);
30 }
31
32 /******************************************************************************/
33 /*                           Rendez-Vous Points                               */
34 /******************************************************************************/
35
36 smx_mailbox_t SIMIX_mbox_create(const char *name)
37 {
38   xbt_assert(name, "Mailboxes must have a name");
39   /* two processes may have pushed the same mbox_create simcall at the same time */
40   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
41
42   if (!mbox) {
43     mbox = xbt_new0(s_smx_mailbox_t, 1);
44     mbox->comm_queue = new std::deque<smx_synchro_t>();
45     mbox->done_comm_queue = nullptr; // Allocated on need only
46     mbox->permanent_receiver=NULL;
47
48     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
49     xbt_dict_set(mailboxes, name, mbox, NULL);
50   }
51   return mbox;
52 }
53
54 void SIMIX_mbox_free(void *data)
55 {
56   XBT_DEBUG("mbox free %p", data);
57   smx_mailbox_t mbox = (smx_mailbox_t) data;
58   delete mbox->comm_queue;
59   delete mbox->done_comm_queue;
60
61   xbt_free(mbox);
62 }
63
64 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
65 {
66   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
67 }
68
69 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
70 {
71   return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
72 }
73
74 /**
75  *  \brief get the receiver (process associated to the mailbox)
76  *  \param mbox The rendez-vous point
77  *  \return process The receiving process (NULL if not set)
78  */
79 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
80 {
81   return mbox->permanent_receiver;
82 }
83
84 /**
85  *  \brief set the receiver of the rendez vous point to allow eager sends
86  *  \param mbox The rendez-vous point
87  *  \param process The receiving process
88  */
89 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
90 {
91   mbox->permanent_receiver=process;
92   if (mbox->done_comm_queue == nullptr)
93     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
94 }
95
96 /**
97  *  \brief Pushes a communication synchro into a rendez-vous point
98  *  \param mbox The mailbox
99  *  \param comm The communication synchro
100  */
101 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
102 {
103   mbox->comm_queue->push_back(comm);
104   comm->comm.mbox = mbox;
105 }
106
107 /**
108  *  \brief Removes a communication synchro from a rendez-vous point
109  *  \param mbox The rendez-vous point
110  *  \param comm The communication synchro
111  */
112 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
113 {
114   comm->comm.mbox = NULL;
115   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
116     if (*it == comm) {
117       mbox->comm_queue->erase(it);
118       return;
119     }
120   xbt_die("Cannot remove this comm that is not part of the mailbox");
121 }
122
123 /**
124  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
125  *  \param type The type of communication we are looking for (comm_send, comm_recv)
126  *  \return The communication synchro if found, NULL otherwise
127  */
128 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
129     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
130 {
131   void* other_user_data = NULL;
132
133   for(auto it = deque->begin(); it != deque->end(); it++){
134     smx_synchro_t synchro = *it;
135     if (synchro->comm.type == SIMIX_COMM_SEND) {
136       other_user_data = synchro->comm.src_data;
137     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
138       other_user_data = synchro->comm.dst_data;
139     }
140     if (synchro->comm.type == type &&
141         (!match_fun               ||               match_fun(this_user_data,  other_user_data, synchro)) &&
142         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
143       XBT_DEBUG("Found a matching communication synchro %p", synchro);
144       if (remove_matching)
145         deque->erase(it);
146       synchro->comm.refcount++;
147 #if HAVE_MC
148       synchro->comm.mbox_cpy = synchro->comm.mbox;
149 #endif
150       synchro->comm.mbox = NULL;
151       return synchro;
152     }
153     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
154               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
155               synchro, (int)synchro->comm.type, (int)type);
156   }
157   XBT_DEBUG("No matching communication synchro found");
158   return NULL;
159 }
160
161 /******************************************************************************/
162 /*                          Communication synchros                            */
163 /******************************************************************************/
164
165 /**
166  *  \brief Creates a new communicate synchro
167  *  \param type The direction of communication (comm_send, comm_recv)
168  *  \return The new communicate synchro
169  */
170 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
171 {
172   smx_synchro_t synchro;
173
174   /* alloc structures */
175   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
176
177   synchro->type = SIMIX_SYNC_COMMUNICATE;
178   synchro->state = SIMIX_WAITING;
179
180   /* set communication */
181   synchro->comm.type = type;
182   synchro->comm.refcount = 1;
183   synchro->comm.src_data=NULL;
184   synchro->comm.dst_data=NULL;
185
186   synchro->category = NULL;
187
188   XBT_DEBUG("Create communicate synchro %p", synchro);
189
190   return synchro;
191 }
192
193 /**
194  *  \brief Destroy a communicate synchro
195  *  \param synchro The communicate synchro to be destroyed
196  */
197 void SIMIX_comm_destroy(smx_synchro_t synchro)
198 {
199   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
200             synchro, synchro->comm.refcount, (int)synchro->state);
201
202   if (synchro->comm.refcount <= 0) {
203     xbt_backtrace_display_current();
204     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
205             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
206   }
207   synchro->comm.refcount--;
208   if (synchro->comm.refcount > 0)
209       return;
210   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
211       synchro->comm.refcount);
212
213   xbt_free(synchro->name);
214   SIMIX_comm_destroy_internal_actions(synchro);
215
216   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
217     /* the communication has failed and was detached:
218      * we have to free the buffer */
219     if (synchro->comm.clean_fun) {
220       synchro->comm.clean_fun(synchro->comm.src_buff);
221     }
222     synchro->comm.src_buff = NULL;
223   }
224
225   if(synchro->comm.mbox)
226     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
227
228   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
229 }
230
231 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
232 {
233   if (synchro->comm.surf_comm){
234     synchro->comm.surf_comm->unref();
235     synchro->comm.surf_comm = NULL;
236   }
237
238   if (synchro->comm.src_timeout){
239     synchro->comm.src_timeout->unref();
240     synchro->comm.src_timeout = NULL;
241   }
242
243   if (synchro->comm.dst_timeout){
244     synchro->comm.dst_timeout->unref();
245     synchro->comm.dst_timeout = NULL;
246   }
247 }
248
249 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
250                                   double task_size, double rate,
251                                   void *src_buff, size_t src_buff_size,
252                                   int (*match_fun)(void *, void *,smx_synchro_t),
253                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
254           void *data, double timeout){
255   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
256                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
257                data, 0);
258   SIMCALL_SET_MC_VALUE(simcall, 0);
259   simcall_HANDLER_comm_wait(simcall, comm, timeout);
260 }
261 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
262                                   double task_size, double rate,
263                                   void *src_buff, size_t src_buff_size,
264                                   int (*match_fun)(void *, void *,smx_synchro_t),
265                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
266                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
267                           void *data, int detached)
268 {
269   XBT_DEBUG("send from %p", mbox);
270
271   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
272   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
273
274   /* Look for communication synchro matching our needs. We also provide a description of
275    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
276    *
277    * If it is not found then push our communication into the rendez-vous point */
278   smx_synchro_t other_synchro =
279       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
280
281   if (!other_synchro) {
282     other_synchro = this_synchro;
283
284     if (mbox->permanent_receiver!=NULL){
285       //this mailbox is for small messages, which have to be sent right now
286       other_synchro->state = SIMIX_READY;
287       other_synchro->comm.dst_proc=mbox->permanent_receiver;
288       other_synchro->comm.refcount++;
289       mbox->done_comm_queue->push_back(other_synchro);
290       other_synchro->comm.mbox=mbox;
291       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
292
293     }else{
294       SIMIX_mbox_push(mbox, this_synchro);
295     }
296   } else {
297     XBT_DEBUG("Receive already pushed");
298
299     SIMIX_comm_destroy(this_synchro);
300
301     other_synchro->state = SIMIX_READY;
302     other_synchro->comm.type = SIMIX_COMM_READY;
303
304   }
305   xbt_fifo_push(src_proc->comms, other_synchro);
306
307   /* if the communication synchro is detached then decrease the refcount
308    * by one, so it will be eliminated by the receiver's destroy call */
309   if (detached) {
310     other_synchro->comm.detached = 1;
311     other_synchro->comm.refcount--;
312     other_synchro->comm.clean_fun = clean_fun;
313   } else {
314     other_synchro->comm.clean_fun = NULL;
315   }
316
317   /* Setup the communication synchro */
318   other_synchro->comm.src_proc = src_proc;
319   other_synchro->comm.task_size = task_size;
320   other_synchro->comm.rate = rate;
321   other_synchro->comm.src_buff = src_buff;
322   other_synchro->comm.src_buff_size = src_buff_size;
323   other_synchro->comm.src_data = data;
324
325   other_synchro->comm.match_fun = match_fun;
326   other_synchro->comm.copy_data_fun = copy_data_fun;
327
328
329   if (MC_is_active() || MC_record_replay_is_active()) {
330     other_synchro->state = SIMIX_RUNNING;
331     return (detached ? NULL : other_synchro);
332   }
333
334   SIMIX_comm_start(other_synchro);
335   return (detached ? NULL : other_synchro);
336 }
337
338 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
339                          void *dst_buff, size_t *dst_buff_size,
340                          int (*match_fun)(void *, void *, smx_synchro_t),
341                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
342                          void *data, double timeout, double rate)
343 {
344   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
345                            dst_buff_size, match_fun, copy_data_fun, data, rate);
346   SIMCALL_SET_MC_VALUE(simcall, 0);
347   simcall_HANDLER_comm_wait(simcall, comm, timeout);
348 }
349
350 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
351     void *dst_buff, size_t *dst_buff_size,
352     int (*match_fun)(void *, void *, smx_synchro_t),
353     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
354     void *data, double rate)
355 {
356   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
357 }
358
359 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
360     int (*match_fun)(void *, void *, smx_synchro_t),
361     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
362     void *data, double rate)
363 {
364   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
365   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
366
367   smx_synchro_t other_synchro;
368   //communication already done, get it inside the fifo of completed comms
369   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
370
371     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
372     //find a match in the already received fifo
373     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
374     //if not found, assume the receiver came first, register it to the mailbox in the classical way
375     if (!other_synchro)  {
376       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
377       other_synchro = this_synchro;
378       SIMIX_mbox_push(mbox, this_synchro);
379     } else {
380       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
381         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
382         other_synchro->state = SIMIX_DONE;
383         other_synchro->comm.type = SIMIX_COMM_DONE;
384         other_synchro->comm.mbox = NULL;
385       }
386       other_synchro->comm.refcount--;
387       SIMIX_comm_destroy(this_synchro);
388     }
389   } else {
390     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
391
392     /* Look for communication synchro matching our needs. We also provide a description of
393      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
394      *
395      * If it is not found then push our communication into the rendez-vous point */
396     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
397
398     if (!other_synchro) {
399       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
400       other_synchro = this_synchro;
401       SIMIX_mbox_push(mbox, this_synchro);
402     } else {
403       SIMIX_comm_destroy(this_synchro);
404       other_synchro->state = SIMIX_READY;
405       other_synchro->comm.type = SIMIX_COMM_READY;
406       //other_synchro->comm.refcount--;
407     }
408     xbt_fifo_push(dst_proc->comms, other_synchro);
409   }
410
411   /* Setup communication synchro */
412   other_synchro->comm.dst_proc = dst_proc;
413   other_synchro->comm.dst_buff = dst_buff;
414   other_synchro->comm.dst_buff_size = dst_buff_size;
415   other_synchro->comm.dst_data = data;
416
417   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
418     other_synchro->comm.rate = rate;
419
420   other_synchro->comm.match_fun = match_fun;
421   other_synchro->comm.copy_data_fun = copy_data_fun;
422
423   if (MC_is_active() || MC_record_replay_is_active()) {
424     other_synchro->state = SIMIX_RUNNING;
425     return other_synchro;
426   }
427
428   SIMIX_comm_start(other_synchro);
429   return other_synchro;
430 }
431
432 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
433                                    int type, int src, int tag,
434                                    int (*match_fun)(void *, void *, smx_synchro_t),
435                                    void *data){
436   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
437 }
438
439 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
440                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
441 {
442   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
443   smx_synchro_t this_synchro;
444   int smx_type;
445   if(type == 1){
446     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
447     smx_type = SIMIX_COMM_RECEIVE;
448   } else{
449     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
450     smx_type = SIMIX_COMM_SEND;
451   } 
452   smx_synchro_t other_synchro=NULL;
453   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
454     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
455     other_synchro =
456         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
457   }
458   if (!other_synchro){
459     XBT_DEBUG("check if we have more luck in the normal mailbox");
460     other_synchro =
461         _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
462   }
463   if(other_synchro)
464     other_synchro->comm.refcount--;
465
466   SIMIX_comm_destroy(this_synchro);
467   return other_synchro;
468 }
469
470 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
471 {
472   /* the simcall may be a wait, a send or a recv */
473   surf_action_t sleep;
474
475   /* Associate this simcall to the wait synchro */
476   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
477
478   xbt_fifo_push(synchro->simcalls, simcall);
479   simcall->issuer->waiting_synchro = synchro;
480
481   if (MC_is_active() || MC_record_replay_is_active()) {
482     int idx = SIMCALL_GET_MC_VALUE(simcall);
483     if (idx == 0) {
484       synchro->state = SIMIX_DONE;
485     } else {
486       /* If we reached this point, the wait simcall must have a timeout */
487       /* Otherwise it shouldn't be enabled and executed by the MC */
488       if (timeout == -1)
489         THROW_IMPOSSIBLE;
490
491       if (synchro->comm.src_proc == simcall->issuer)
492         synchro->state = SIMIX_SRC_TIMEOUT;
493       else
494         synchro->state = SIMIX_DST_TIMEOUT;
495     }
496
497     SIMIX_comm_finish(synchro);
498     return;
499   }
500
501   /* If the synchro has already finish perform the error handling, */
502   /* otherwise set up a waiting timeout on the right side          */
503   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
504     SIMIX_comm_finish(synchro);
505   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
506     sleep = surf_host_sleep(simcall->issuer->host, timeout);
507     sleep->setData(synchro);
508
509     if (simcall->issuer == synchro->comm.src_proc)
510       synchro->comm.src_timeout = sleep;
511     else
512       synchro->comm.dst_timeout = sleep;
513   }
514 }
515
516 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
517 {
518   if(MC_is_active() || MC_record_replay_is_active()){
519     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
520     if(simcall_comm_test__get__result(simcall)){
521       synchro->state = SIMIX_DONE;
522       xbt_fifo_push(synchro->simcalls, simcall);
523       SIMIX_comm_finish(synchro);
524     }else{
525       SIMIX_simcall_answer(simcall);
526     }
527     return;
528   }
529
530   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
531   if (simcall_comm_test__get__result(simcall)) {
532     xbt_fifo_push(synchro->simcalls, simcall);
533     SIMIX_comm_finish(synchro);
534   } else {
535     SIMIX_simcall_answer(simcall);
536   }
537 }
538
539 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
540 {
541   unsigned int cursor;
542   smx_synchro_t synchro;
543   simcall_comm_testany__set__result(simcall, -1);
544
545   if (MC_is_active() || MC_record_replay_is_active()){
546     int idx = SIMCALL_GET_MC_VALUE(simcall);
547     if(idx == -1){
548       SIMIX_simcall_answer(simcall);
549     }else{
550       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
551       simcall_comm_testany__set__result(simcall, idx);
552       xbt_fifo_push(synchro->simcalls, simcall);
553       synchro->state = SIMIX_DONE;
554       SIMIX_comm_finish(synchro);
555     }
556     return;
557   }
558
559   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
560     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
561       simcall_comm_testany__set__result(simcall, cursor);
562       xbt_fifo_push(synchro->simcalls, simcall);
563       SIMIX_comm_finish(synchro);
564       return;
565     }
566   }
567   SIMIX_simcall_answer(simcall);
568 }
569
570 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
571 {
572   smx_synchro_t synchro;
573   unsigned int cursor = 0;
574
575   if (MC_is_active() || MC_record_replay_is_active()){
576     int idx = SIMCALL_GET_MC_VALUE(simcall);
577     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
578     xbt_fifo_push(synchro->simcalls, simcall);
579     simcall_comm_waitany__set__result(simcall, idx);
580     synchro->state = SIMIX_DONE;
581     SIMIX_comm_finish(synchro);
582     return;
583   }
584
585   xbt_dynar_foreach(synchros, cursor, synchro){
586     /* associate this simcall to the the synchro */
587     xbt_fifo_push(synchro->simcalls, simcall);
588
589     /* see if the synchro is already finished */
590     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
591       SIMIX_comm_finish(synchro);
592       break;
593     }
594   }
595 }
596
597 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
598 {
599   smx_synchro_t synchro;
600   unsigned int cursor = 0;
601   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
602
603   xbt_dynar_foreach(synchros, cursor, synchro) {
604     xbt_fifo_remove(synchro->simcalls, simcall);
605   }
606 }
607
608 /**
609  *  \brief Starts the simulation of a communication synchro.
610  *  \param synchro the communication synchro
611  */
612 static inline void SIMIX_comm_start(smx_synchro_t synchro)
613 {
614   /* If both the sender and the receiver are already there, start the communication */
615   if (synchro->state == SIMIX_READY) {
616
617     sg_host_t sender = synchro->comm.src_proc->host;
618     sg_host_t receiver = synchro->comm.dst_proc->host;
619
620     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
621               sg_host_get_name(sender), sg_host_get_name(receiver));
622
623     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
624                                                             sender, receiver,
625                                                             synchro->comm.task_size, synchro->comm.rate);
626
627     synchro->comm.surf_comm->setData(synchro);
628
629     synchro->state = SIMIX_RUNNING;
630
631     /* If a link is failed, detect it immediately */
632     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
633       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
634                 sg_host_get_name(sender), sg_host_get_name(receiver));
635       synchro->state = SIMIX_LINK_FAILURE;
636       SIMIX_comm_destroy_internal_actions(synchro);
637     }
638
639     /* If any of the process is suspend, create the synchro but stop its execution,
640        it will be restarted when the sender process resume */
641     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
642         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
643       /* FIXME: check what should happen with the synchro state */
644
645       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
646         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
647                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
648       else
649         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
650                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
651
652       synchro->comm.surf_comm->suspend();
653
654     }
655   }
656 }
657
658 /**
659  * \brief Answers the SIMIX simcalls associated to a communication synchro.
660  * \param synchro a finished communication synchro
661  */
662 void SIMIX_comm_finish(smx_synchro_t synchro)
663 {
664   unsigned int destroy_count = 0;
665   smx_simcall_t simcall;
666
667   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
668
669     /* If a waitany simcall is waiting for this synchro to finish, then remove
670        it from the other synchros in the waitany list. Afterwards, get the
671        position of the actual synchro in the waitany dynar and
672        return it as the result of the simcall */
673
674     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
675       continue; // if process handling comm is killed
676     if (simcall->call == SIMCALL_COMM_WAITANY) {
677       SIMIX_waitany_remove_simcall_from_actions(simcall);
678       if (!MC_is_active() && !MC_record_replay_is_active())
679         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
680     }
681
682     /* If the synchro is still in a rendez-vous point then remove from it */
683     if (synchro->comm.mbox)
684       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
685
686     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
687
688     /* Check out for errors */
689
690     if (simcall->issuer->host->isOff()) {
691       simcall->issuer->context->iwannadie = 1;
692       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
693     } else
694
695     switch (synchro->state) {
696
697     case SIMIX_DONE:
698       XBT_DEBUG("Communication %p complete!", synchro);
699       SIMIX_comm_copy_data(synchro);
700       break;
701
702     case SIMIX_SRC_TIMEOUT:
703       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
704                     "Communication timeouted because of sender");
705       break;
706
707     case SIMIX_DST_TIMEOUT:
708       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
709                     "Communication timeouted because of receiver");
710       break;
711
712     case SIMIX_SRC_HOST_FAILURE:
713       if (simcall->issuer == synchro->comm.src_proc)
714         simcall->issuer->context->iwannadie = 1;
715 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
716       else
717         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
718       break;
719
720     case SIMIX_DST_HOST_FAILURE:
721       if (simcall->issuer == synchro->comm.dst_proc)
722         simcall->issuer->context->iwannadie = 1;
723 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
724       else
725         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
726       break;
727
728     case SIMIX_LINK_FAILURE:
729
730       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
731                 synchro,
732                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
733                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
734                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
735       if (synchro->comm.src_proc == simcall->issuer) {
736         XBT_DEBUG("I'm source");
737       } else if (synchro->comm.dst_proc == simcall->issuer) {
738         XBT_DEBUG("I'm dest");
739       } else {
740         XBT_DEBUG("I'm neither source nor dest");
741       }
742       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
743       break;
744
745     case SIMIX_CANCELED:
746       if (simcall->issuer == synchro->comm.dst_proc)
747         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
748                       "Communication canceled by the sender");
749       else
750         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
751                       "Communication canceled by the receiver");
752       break;
753
754     default:
755       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
756     }
757
758     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
759     if (simcall->issuer->doexception) {
760       if (simcall->call == SIMCALL_COMM_WAITANY) {
761         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
762       }
763       else if (simcall->call == SIMCALL_COMM_TESTANY) {
764         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
765       }
766     }
767
768     if (simcall->issuer->host->isOff()) {
769       simcall->issuer->context->iwannadie = 1;
770     }
771
772     simcall->issuer->waiting_synchro = NULL;
773     xbt_fifo_remove(simcall->issuer->comms, synchro);
774     if(synchro->comm.detached){
775       if(simcall->issuer == synchro->comm.src_proc){
776         if(synchro->comm.dst_proc)
777           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
778       }
779       if(simcall->issuer == synchro->comm.dst_proc){
780         if(synchro->comm.src_proc)
781           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
782       }
783     }
784     SIMIX_simcall_answer(simcall);
785     destroy_count++;
786   }
787
788   while (destroy_count-- > 0)
789     SIMIX_comm_destroy(synchro);
790 }
791
792 /**
793  * \brief This function is called when a Surf communication synchro is finished.
794  * \param synchro the corresponding Simix communication
795  */
796 void SIMIX_post_comm(smx_synchro_t synchro)
797 {
798   /* Update synchro state */
799   if (synchro->comm.src_timeout &&
800       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
801     synchro->state = SIMIX_SRC_TIMEOUT;
802   else if (synchro->comm.dst_timeout &&
803     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
804     synchro->state = SIMIX_DST_TIMEOUT;
805   else if (synchro->comm.src_timeout &&
806     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
807     synchro->state = SIMIX_SRC_HOST_FAILURE;
808   else if (synchro->comm.dst_timeout &&
809       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
810     synchro->state = SIMIX_DST_HOST_FAILURE;
811   else if (synchro->comm.surf_comm &&
812     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
813     XBT_DEBUG("Puta madre. Surf says that the link broke");
814     synchro->state = SIMIX_LINK_FAILURE;
815   } else
816     synchro->state = SIMIX_DONE;
817
818   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
819             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
820
821   /* destroy the surf actions associated with the Simix communication */
822   SIMIX_comm_destroy_internal_actions(synchro);
823
824   /* if there are simcalls associated with the synchro, then answer them */
825   if (xbt_fifo_size(synchro->simcalls)) {
826     SIMIX_comm_finish(synchro);
827   }
828 }
829
830 void SIMIX_comm_cancel(smx_synchro_t synchro)
831 {
832   /* if the synchro is a waiting state means that it is still in a mbox */
833   /* so remove from it and delete it */
834   if (synchro->state == SIMIX_WAITING) {
835     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
836     synchro->state = SIMIX_CANCELED;
837   }
838   else if (!MC_is_active() /* when running the MC there are no surf actions */
839            && !MC_record_replay_is_active()
840            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
841
842     synchro->comm.surf_comm->cancel();
843   }
844 }
845
846 void SIMIX_comm_suspend(smx_synchro_t synchro)
847 {
848   /*FIXME: shall we suspend also the timeout synchro? */
849   if (synchro->comm.surf_comm)
850     synchro->comm.surf_comm->suspend();
851   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
852 }
853
854 void SIMIX_comm_resume(smx_synchro_t synchro)
855 {
856   /*FIXME: check what happen with the timeouts */
857   if (synchro->comm.surf_comm)
858     synchro->comm.surf_comm->resume();
859   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
860 }
861
862
863 /************* synchro Getters **************/
864
865 /**
866  *  \brief get the amount remaining from the communication
867  *  \param synchro The communication
868  */
869 double SIMIX_comm_get_remains(smx_synchro_t synchro)
870 {
871   if(!synchro)
872     return 0;
873
874   double remains;
875   switch (synchro->state) {
876
877   case SIMIX_RUNNING:
878     remains = synchro->comm.surf_comm->getRemains();
879     break;
880
881   case SIMIX_WAITING:
882   case SIMIX_READY:
883     remains = 0; /*FIXME: check what should be returned */
884     break;
885
886   default:
887     remains = 0; /*FIXME: is this correct? */
888     break;
889   }
890   return remains;
891 }
892
893 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
894 {
895   return synchro->state;
896 }
897
898 /**
899  *  \brief Return the user data associated to the sender of the communication
900  *  \param synchro The communication
901  *  \return the user data
902  */
903 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
904 {
905   return synchro->comm.src_data;
906 }
907
908 /**
909  *  \brief Return the user data associated to the receiver of the communication
910  *  \param synchro The communication
911  *  \return the user data
912  */
913 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
914 {
915   return synchro->comm.dst_data;
916 }
917
918 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
919 {
920   return synchro->comm.src_proc;
921 }
922
923 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
924 {
925   return synchro->comm.dst_proc;
926 }
927
928 /******************************************************************************/
929 /*                    SIMIX_comm_copy_data callbacks                       */
930 /******************************************************************************/
931 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
932   &SIMIX_comm_copy_pointer_callback;
933
934 void
935 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
936 {
937   SIMIX_comm_copy_data_callback = callback;
938 }
939
940 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
941 {
942   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
943   *(void **) (comm->comm.dst_buff) = buff;
944 }
945
946 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
947 {
948   XBT_DEBUG("Copy the data over");
949   memcpy(comm->comm.dst_buff, buff, buff_size);
950   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
951     xbt_free(buff);
952     comm->comm.src_buff = NULL;
953   }
954 }
955
956
957 /**
958  *  \brief Copy the communication data from the sender's buffer to the receiver's one
959  *  \param comm The communication
960  */
961 void SIMIX_comm_copy_data(smx_synchro_t comm)
962 {
963   size_t buff_size = comm->comm.src_buff_size;
964   /* If there is no data to be copy then return */
965   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
966     return;
967
968   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
969             comm,
970             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
971             comm->comm.src_buff,
972             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
973             comm->comm.dst_buff, buff_size);
974
975   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
976   if (comm->comm.dst_buff_size)
977     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
978
979   /* Update the receiver's buffer size to the copied amount */
980   if (comm->comm.dst_buff_size)
981     *comm->comm.dst_buff_size = buff_size;
982
983   if (buff_size > 0){
984       if(comm->comm.copy_data_fun)
985         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
986       else
987         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
988   }
989
990
991   /* Set the copied flag so we copy data only once */
992   /* (this function might be called from both communication ends) */
993   comm->comm.copied = 1;
994 }