Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
improve an error message
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "simgrid/s4u/mailbox.hpp"
13
14 #include "src/simix/SynchroComm.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
24 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
25 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
26     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
27 static void SIMIX_comm_start(smx_synchro_t synchro);
28
29 void SIMIX_mailbox_exit(void)
30 {
31   xbt_dict_free(&mailboxes);
32 }
33
34 /******************************************************************************/
35 /*                           Rendez-Vous Points                               */
36 /******************************************************************************/
37
38 smx_mailbox_t SIMIX_mbox_create(const char *name)
39 {
40   xbt_assert(name, "Mailboxes must have a name");
41   /* two processes may have pushed the same mbox_create simcall at the same time */
42   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
43
44   if (!mbox) {
45     mbox = xbt_new0(s_smx_mailbox_t, 1);
46     mbox->name = xbt_strdup(name);
47     mbox->comm_queue = new std::deque<smx_synchro_t>();
48     mbox->done_comm_queue = nullptr; // Allocated on need only
49     mbox->permanent_receiver=NULL;
50
51     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
52     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
53   }
54   return mbox;
55 }
56
57 void SIMIX_mbox_free(void *data)
58 {
59   XBT_DEBUG("mbox free %p", data);
60   smx_mailbox_t mbox = (smx_mailbox_t) data;
61   xbt_free(mbox->name);
62   delete mbox->comm_queue;
63   delete mbox->done_comm_queue;
64
65   xbt_free(mbox);
66 }
67
68 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
69 {
70   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
71 }
72
73 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
74 {
75   return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
76 }
77
78 /**
79  *  \brief get the receiver (process associated to the mailbox)
80  *  \param mbox The rendez-vous point
81  *  \return process The receiving process (NULL if not set)
82  */
83 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
84 {
85   return mbox->permanent_receiver;
86 }
87
88 /**
89  *  \brief set the receiver of the rendez vous point to allow eager sends
90  *  \param mbox The rendez-vous point
91  *  \param process The receiving process
92  */
93 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
94 {
95   mbox->permanent_receiver=process;
96   if (mbox->done_comm_queue == nullptr)
97     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
98 }
99
100 /**
101  *  \brief Pushes a communication synchro into a rendez-vous point
102  *  \param mbox The mailbox
103  *  \param comm The communication synchro
104  */
105 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
106 {
107   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
108
109   mbox->comm_queue->push_back(comm);
110   comm->mbox = mbox;
111 }
112
113 /**
114  *  \brief Removes a communication synchro from a rendez-vous point
115  *  \param mbox The rendez-vous point
116  *  \param comm The communication synchro
117  */
118 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
119 {
120   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
121
122   comm->mbox = NULL;
123   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
124     if (*it == comm) {
125       mbox->comm_queue->erase(it);
126       return;
127     }
128   xbt_die("Cannot remove this comm that is not part of the mailbox");
129 }
130
131 /**
132  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
133  *  \param type The type of communication we are looking for (comm_send, comm_recv)
134  *  \return The communication synchro if found, NULL otherwise
135  */
136 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
137     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
138 {
139   void* other_user_data = NULL;
140
141   for(auto it = deque->begin(); it != deque->end(); it++){
142     smx_synchro_t synchro = *it;
143     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
144
145     if (comm->type == SIMIX_COMM_SEND) {
146       other_user_data = comm->src_data;
147     } else if (comm->type == SIMIX_COMM_RECEIVE) {
148       other_user_data = comm->dst_data;
149     }
150     if (comm->type == type &&
151         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
152         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
153       XBT_DEBUG("Found a matching communication synchro %p", comm);
154       if (remove_matching)
155         deque->erase(it);
156       comm->refcount++;
157 #if HAVE_MC
158       comm->mbox_cpy = comm->mbox;
159 #endif
160       comm->mbox = NULL;
161       return comm;
162     }
163     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
164               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
165               comm, (int)comm->type, (int)type);
166   }
167   XBT_DEBUG("No matching communication synchro found");
168   return NULL;
169 }
170
171 /******************************************************************************/
172 /*                          Communication synchros                            */
173 /******************************************************************************/
174
175 /**
176  *  \brief Creates a new communicate synchro
177  *  \param type The direction of communication (comm_send, comm_recv)
178  *  \return The new communicate synchro
179  */
180 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
181 {
182   simgrid::simix::Comm *comm = new simgrid::simix::Comm();
183   comm->state = SIMIX_WAITING;
184   comm->type = type;
185   comm->refcount = 1;
186   comm->src_data=NULL;
187   comm->dst_data=NULL;
188
189   XBT_DEBUG("Create communicate synchro %p", comm);
190
191   return comm;
192 }
193
194 /**
195  *  \brief Destroy a communicate synchro
196  *  \param synchro The communicate synchro to be destroyed
197  */
198 void SIMIX_comm_destroy(smx_synchro_t synchro)
199 {
200   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
201
202   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
203
204   if (comm->refcount <= 0) {
205     xbt_backtrace_display_current();
206     xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
207   }
208   comm->refcount--;
209   if (comm->refcount > 0)
210       return;
211   XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
212
213   SIMIX_comm_destroy_internal_actions(synchro);
214
215   if (comm->detached && comm->state != SIMIX_DONE) {
216     /* the communication has failed and was detached:
217      * we have to free the buffer */
218     if (comm->clean_fun) {
219       comm->clean_fun(comm->src_buff);
220     }
221     comm->src_buff = NULL;
222   }
223
224   if(comm->mbox)
225     SIMIX_mbox_remove(comm->mbox, comm);
226
227   delete comm;
228 }
229
230 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
231 {
232   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
233   if (comm->surf_comm){
234     comm->surf_comm->unref();
235     comm->surf_comm = NULL;
236   }
237
238   if (comm->src_timeout){
239     comm->src_timeout->unref();
240     comm->src_timeout = NULL;
241   }
242
243   if (comm->dst_timeout){
244     comm->dst_timeout->unref();
245     comm->dst_timeout = NULL;
246   }
247 }
248
249 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
250                                   double task_size, double rate,
251                                   void *src_buff, size_t src_buff_size,
252                                   int (*match_fun)(void *, void *,smx_synchro_t),
253                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
254           void *data, double timeout){
255   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
256                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
257                data, 0);
258   SIMCALL_SET_MC_VALUE(simcall, 0);
259   simcall_HANDLER_comm_wait(simcall, comm, timeout);
260 }
261 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
262                                   double task_size, double rate,
263                                   void *src_buff, size_t src_buff_size,
264                                   int (*match_fun)(void *, void *,smx_synchro_t),
265                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
266                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
267                           void *data, int detached)
268 {
269   XBT_DEBUG("send from %p", mbox);
270
271   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
272   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
273
274   /* Look for communication synchro matching our needs. We also provide a description of
275    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
276    *
277    * If it is not found then push our communication into the rendez-vous point */
278   smx_synchro_t other_synchro =
279       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
280   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
281
282
283   if (!other_synchro) {
284     other_synchro = this_synchro;
285     other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
286
287     if (mbox->permanent_receiver!=NULL){
288       //this mailbox is for small messages, which have to be sent right now
289       other_synchro->state = SIMIX_READY;
290       other_comm->dst_proc=mbox->permanent_receiver;
291       other_comm->refcount++;
292       mbox->done_comm_queue->push_back(other_synchro);
293       other_comm->mbox=mbox;
294       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
295
296     }else{
297       SIMIX_mbox_push(mbox, this_synchro);
298     }
299   } else {
300     XBT_DEBUG("Receive already pushed");
301
302     SIMIX_comm_destroy(this_synchro);
303
304     other_comm->state = SIMIX_READY;
305     other_comm->type = SIMIX_COMM_READY;
306
307   }
308   xbt_fifo_push(src_proc->comms, other_synchro);
309
310   /* if the communication synchro is detached then decrease the refcount
311    * by one, so it will be eliminated by the receiver's destroy call */
312   if (detached) {
313     other_comm->detached = 1;
314     other_comm->refcount--;
315     other_comm->clean_fun = clean_fun;
316   } else {
317     other_comm->clean_fun = NULL;
318   }
319
320   /* Setup the communication synchro */
321   other_comm->src_proc = src_proc;
322   other_comm->task_size = task_size;
323   other_comm->rate = rate;
324   other_comm->src_buff = src_buff;
325   other_comm->src_buff_size = src_buff_size;
326   other_comm->src_data = data;
327
328   other_comm->match_fun = match_fun;
329   other_comm->copy_data_fun = copy_data_fun;
330
331
332   if (MC_is_active() || MC_record_replay_is_active()) {
333     other_comm->state = SIMIX_RUNNING;
334     return (detached ? NULL : other_comm);
335   }
336
337   SIMIX_comm_start(other_comm);
338   return (detached ? NULL : other_comm);
339 }
340
341 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
342                          void *dst_buff, size_t *dst_buff_size,
343                          int (*match_fun)(void *, void *, smx_synchro_t),
344                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
345                          void *data, double timeout, double rate)
346 {
347   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
348   SIMCALL_SET_MC_VALUE(simcall, 0);
349   simcall_HANDLER_comm_wait(simcall, comm, timeout);
350 }
351
352 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
353     void *dst_buff, size_t *dst_buff_size,
354     int (*match_fun)(void *, void *, smx_synchro_t),
355     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
356     void *data, double rate)
357 {
358   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
359 }
360
361 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
362     int (*match_fun)(void *, void *, smx_synchro_t),
363     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
364     void *data, double rate)
365 {
366   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
367   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
368
369   smx_synchro_t other_synchro;
370   //communication already done, get it inside the fifo of completed comms
371   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
372
373     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
374     //find a match in the already received fifo
375     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
376     //if not found, assume the receiver came first, register it to the mailbox in the classical way
377     if (!other_synchro)  {
378       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
379       other_synchro = this_synchro;
380       SIMIX_mbox_push(mbox, this_synchro);
381     } else {
382       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
383
384       if(other_comm->surf_comm && SIMIX_comm_get_remains(other_comm)==0.0) {
385         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
386         other_comm->state = SIMIX_DONE;
387         other_comm->type = SIMIX_COMM_DONE;
388         other_comm->mbox = NULL;
389       }
390       other_comm->refcount--;
391       SIMIX_comm_destroy(this_synchro);
392     }
393   } else {
394     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
395
396     /* Look for communication synchro matching our needs. We also provide a description of
397      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
398      *
399      * If it is not found then push our communication into the rendez-vous point */
400     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
401
402     if (!other_synchro) {
403       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
404       other_synchro = this_synchro;
405       SIMIX_mbox_push(mbox, this_synchro);
406     } else {
407       SIMIX_comm_destroy(this_synchro);
408       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
409
410       other_comm->state = SIMIX_READY;
411       other_comm->type = SIMIX_COMM_READY;
412     }
413     xbt_fifo_push(dst_proc->comms, other_synchro);
414   }
415
416   /* Setup communication synchro */
417   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
418   other_comm->dst_proc = dst_proc;
419   other_comm->dst_buff = dst_buff;
420   other_comm->dst_buff_size = dst_buff_size;
421   other_comm->dst_data = data;
422
423   if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
424     other_comm->rate = rate;
425
426   other_comm->match_fun = match_fun;
427   other_comm->copy_data_fun = copy_data_fun;
428
429   if (MC_is_active() || MC_record_replay_is_active()) {
430     other_synchro->state = SIMIX_RUNNING;
431     return other_synchro;
432   }
433
434   SIMIX_comm_start(other_synchro);
435   return other_synchro;
436 }
437
438 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
439                                    int type, int src, int tag,
440                                    int (*match_fun)(void *, void *, smx_synchro_t),
441                                    void *data){
442   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
443 }
444
445 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
446                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
447 {
448   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
449   smx_synchro_t this_synchro;
450   int smx_type;
451   if(type == 1){
452     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
453     smx_type = SIMIX_COMM_RECEIVE;
454   } else{
455     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
456     smx_type = SIMIX_COMM_SEND;
457   } 
458   smx_synchro_t other_synchro=NULL;
459   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
460     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
461     other_synchro =
462         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
463   }
464   if (!other_synchro){
465     XBT_DEBUG("check if we have more luck in the normal mailbox");
466     other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
467   }
468
469   if(other_synchro) {
470     simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
471     other_comm->refcount--;
472   }
473
474   SIMIX_comm_destroy(this_synchro);
475   return other_synchro;
476 }
477
478 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
479 {
480   /* the simcall may be a wait, a send or a recv */
481   surf_action_t sleep;
482
483   /* Associate this simcall to the wait synchro */
484   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
485
486   xbt_fifo_push(synchro->simcalls, simcall);
487   simcall->issuer->waiting_synchro = synchro;
488
489   if (MC_is_active() || MC_record_replay_is_active()) {
490     int idx = SIMCALL_GET_MC_VALUE(simcall);
491     if (idx == 0) {
492       synchro->state = SIMIX_DONE;
493     } else {
494       /* If we reached this point, the wait simcall must have a timeout */
495       /* Otherwise it shouldn't be enabled and executed by the MC */
496       if (timeout == -1)
497         THROW_IMPOSSIBLE;
498
499       simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
500       if (comm->src_proc == simcall->issuer)
501         comm->state = SIMIX_SRC_TIMEOUT;
502       else
503         comm->state = SIMIX_DST_TIMEOUT;
504     }
505
506     SIMIX_comm_finish(synchro);
507     return;
508   }
509
510   /* If the synchro has already finish perform the error handling, */
511   /* otherwise set up a waiting timeout on the right side          */
512   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
513     SIMIX_comm_finish(synchro);
514   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
515     sleep = surf_host_sleep(simcall->issuer->host, timeout);
516     sleep->setData(synchro);
517
518     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
519     if (simcall->issuer == comm->src_proc)
520       comm->src_timeout = sleep;
521     else
522       comm->dst_timeout = sleep;
523   }
524 }
525
526 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
527 {
528   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
529
530   if (MC_is_active() || MC_record_replay_is_active()){
531     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
532     if (simcall_comm_test__get__result(simcall)){
533       synchro->state = SIMIX_DONE;
534       xbt_fifo_push(synchro->simcalls, simcall);
535       SIMIX_comm_finish(synchro);
536     } else {
537       SIMIX_simcall_answer(simcall);
538     }
539     return;
540   }
541
542   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
543   if (simcall_comm_test__get__result(simcall)) {
544     xbt_fifo_push(synchro->simcalls, simcall);
545     SIMIX_comm_finish(synchro);
546   } else {
547     SIMIX_simcall_answer(simcall);
548   }
549 }
550
551 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
552 {
553   unsigned int cursor;
554   smx_synchro_t synchro;
555   simcall_comm_testany__set__result(simcall, -1);
556
557   if (MC_is_active() || MC_record_replay_is_active()){
558     int idx = SIMCALL_GET_MC_VALUE(simcall);
559     if(idx == -1){
560       SIMIX_simcall_answer(simcall);
561     }else{
562       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
563       simcall_comm_testany__set__result(simcall, idx);
564       xbt_fifo_push(synchro->simcalls, simcall);
565       synchro->state = SIMIX_DONE;
566       SIMIX_comm_finish(synchro);
567     }
568     return;
569   }
570
571   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
572     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
573       simcall_comm_testany__set__result(simcall, cursor);
574       xbt_fifo_push(synchro->simcalls, simcall);
575       SIMIX_comm_finish(synchro);
576       return;
577     }
578   }
579   SIMIX_simcall_answer(simcall);
580 }
581
582 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
583 {
584   smx_synchro_t synchro;
585   unsigned int cursor = 0;
586
587   if (MC_is_active() || MC_record_replay_is_active()){
588     int idx = SIMCALL_GET_MC_VALUE(simcall);
589     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
590     xbt_fifo_push(synchro->simcalls, simcall);
591     simcall_comm_waitany__set__result(simcall, idx);
592     synchro->state = SIMIX_DONE;
593     SIMIX_comm_finish(synchro);
594     return;
595   }
596
597   xbt_dynar_foreach(synchros, cursor, synchro){
598     /* associate this simcall to the the synchro */
599     xbt_fifo_push(synchro->simcalls, simcall);
600
601     /* see if the synchro is already finished */
602     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
603       SIMIX_comm_finish(synchro);
604       break;
605     }
606   }
607 }
608
609 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
610 {
611   smx_synchro_t synchro;
612   unsigned int cursor = 0;
613   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
614
615   xbt_dynar_foreach(synchros, cursor, synchro)
616     xbt_fifo_remove(synchro->simcalls, simcall);
617 }
618
619 /**
620  *  \brief Starts the simulation of a communication synchro.
621  *  \param synchro the communication synchro
622  */
623 static inline void SIMIX_comm_start(smx_synchro_t synchro)
624 {
625   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
626
627   /* If both the sender and the receiver are already there, start the communication */
628   if (synchro->state == SIMIX_READY) {
629
630     sg_host_t sender   = comm->src_proc->host;
631     sg_host_t receiver = comm->dst_proc->host;
632
633     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
634
635     comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
636     comm->surf_comm->setData(synchro);
637     comm->state = SIMIX_RUNNING;
638
639     /* If a link is failed, detect it immediately */
640     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
641       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
642                 sg_host_get_name(sender), sg_host_get_name(receiver));
643       comm->state = SIMIX_LINK_FAILURE;
644       SIMIX_comm_destroy_internal_actions(synchro);
645     }
646
647     /* If any of the process is suspend, create the synchro but stop its execution,
648        it will be restarted when the sender process resume */
649     if (SIMIX_process_is_suspended(comm->src_proc) ||
650         SIMIX_process_is_suspended(comm->dst_proc)) {
651       /* FIXME: check what should happen with the synchro state */
652
653       if (SIMIX_process_is_suspended(comm->src_proc))
654         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
655                   sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
656       else
657         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
658                   sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
659
660       comm->surf_comm->suspend();
661     }
662   }
663 }
664
665 /**
666  * \brief Answers the SIMIX simcalls associated to a communication synchro.
667  * \param synchro a finished communication synchro
668  */
669 void SIMIX_comm_finish(smx_synchro_t synchro)
670 {
671   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
672   unsigned int destroy_count = 0;
673   smx_simcall_t simcall;
674
675   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
676
677     /* If a waitany simcall is waiting for this synchro to finish, then remove
678        it from the other synchros in the waitany list. Afterwards, get the
679        position of the actual synchro in the waitany dynar and
680        return it as the result of the simcall */
681
682     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
683       continue; // if process handling comm is killed
684     if (simcall->call == SIMCALL_COMM_WAITANY) {
685       SIMIX_waitany_remove_simcall_from_actions(simcall);
686       if (!MC_is_active() && !MC_record_replay_is_active())
687         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
688     }
689
690     /* If the synchro is still in a rendez-vous point then remove from it */
691     if (comm->mbox)
692       SIMIX_mbox_remove(comm->mbox, synchro);
693
694     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
695
696     /* Check out for errors */
697
698     if (simcall->issuer->host->isOff()) {
699       simcall->issuer->context->iwannadie = 1;
700       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
701     } else
702
703     switch (synchro->state) {
704
705     case SIMIX_DONE:
706       XBT_DEBUG("Communication %p complete!", synchro);
707       SIMIX_comm_copy_data(synchro);
708       break;
709
710     case SIMIX_SRC_TIMEOUT:
711       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
712       break;
713
714     case SIMIX_DST_TIMEOUT:
715       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
716       break;
717
718     case SIMIX_SRC_HOST_FAILURE:
719       if (simcall->issuer == comm->src_proc)
720         simcall->issuer->context->iwannadie = 1;
721 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
722       else
723         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
724       break;
725
726     case SIMIX_DST_HOST_FAILURE:
727       if (simcall->issuer == comm->dst_proc)
728         simcall->issuer->context->iwannadie = 1;
729 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
730       else
731         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
732       break;
733
734     case SIMIX_LINK_FAILURE:
735
736       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
737                 synchro,
738                 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
739                 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
740                 simcall->issuer->name, simcall->issuer, comm->detached);
741       if (comm->src_proc == simcall->issuer) {
742         XBT_DEBUG("I'm source");
743       } else if (comm->dst_proc == simcall->issuer) {
744         XBT_DEBUG("I'm dest");
745       } else {
746         XBT_DEBUG("I'm neither source nor dest");
747       }
748       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
749       break;
750
751     case SIMIX_CANCELED:
752       if (simcall->issuer == comm->dst_proc)
753         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
754       else
755         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
756       break;
757
758     default:
759       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
760     }
761
762     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
763     if (simcall->issuer->doexception) {
764       if (simcall->call == SIMCALL_COMM_WAITANY) {
765         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
766       }
767       else if (simcall->call == SIMCALL_COMM_TESTANY) {
768         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
769       }
770     }
771
772     if (simcall->issuer->host->isOff()) {
773       simcall->issuer->context->iwannadie = 1;
774     }
775
776     simcall->issuer->waiting_synchro = NULL;
777     xbt_fifo_remove(simcall->issuer->comms, synchro);
778     if(comm->detached){
779       if(simcall->issuer == comm->src_proc){
780         if(comm->dst_proc)
781           xbt_fifo_remove(comm->dst_proc->comms, synchro);
782       }
783       if(simcall->issuer == comm->dst_proc){
784         if(comm->src_proc)
785           xbt_fifo_remove(comm->src_proc->comms, synchro);
786       }
787     }
788     SIMIX_simcall_answer(simcall);
789     destroy_count++;
790   }
791
792   while (destroy_count-- > 0)
793     SIMIX_comm_destroy(synchro);
794 }
795
796 /**
797  * \brief This function is called when a Surf communication synchro is finished.
798  * \param synchro the corresponding Simix communication
799  */
800 void SIMIX_post_comm(smx_synchro_t synchro)
801 {
802   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
803
804   /* Update synchro state */
805   if (comm->src_timeout &&
806       comm->src_timeout->getState() == simgrid::surf::Action::State::done)
807     synchro->state = SIMIX_SRC_TIMEOUT;
808   else if (comm->dst_timeout &&
809     comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
810     synchro->state = SIMIX_DST_TIMEOUT;
811   else if (comm->src_timeout &&
812     comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
813     synchro->state = SIMIX_SRC_HOST_FAILURE;
814   else if (comm->dst_timeout &&
815       comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
816     synchro->state = SIMIX_DST_HOST_FAILURE;
817   else if (comm->surf_comm &&
818     comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
819     synchro->state = SIMIX_LINK_FAILURE;
820   } else
821     synchro->state = SIMIX_DONE;
822
823   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
824             comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
825
826   /* destroy the surf actions associated with the Simix communication */
827   SIMIX_comm_destroy_internal_actions(comm);
828
829   /* if there are simcalls associated with the synchro, then answer them */
830   if (xbt_fifo_size(synchro->simcalls)) {
831     SIMIX_comm_finish(comm);
832   }
833 }
834
835 /************* synchro Getters **************/
836
837 /**
838  *  \brief get the amount remaining from the communication
839  *  \param synchro The communication
840  */
841 double SIMIX_comm_get_remains(smx_synchro_t synchro)
842 {
843   if(!synchro)
844     return 0;
845   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
846
847   double remains;
848   switch (synchro->state) {
849
850   case SIMIX_RUNNING:
851     remains = comm->surf_comm->getRemains();
852     break;
853
854   case SIMIX_WAITING:
855   case SIMIX_READY:
856     remains = 0; /*FIXME: check what should be returned */
857     break;
858
859   default:
860     remains = 0; /*FIXME: is this correct? */
861     break;
862   }
863   return remains;
864 }
865
866 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
867 {
868   return synchro->state;
869 }
870
871 /**
872  *  \brief Return the user data associated to the sender of the communication
873  *  \param synchro The communication
874  *  \return the user data
875  */
876 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
877 {
878   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
879
880   return comm->src_data;
881 }
882
883 /**
884  *  \brief Return the user data associated to the receiver of the communication
885  *  \param synchro The communication
886  *  \return the user data
887  */
888 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
889 {
890   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
891
892   return comm->dst_data;
893 }
894
895 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
896 {
897   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
898
899   return comm->src_proc;
900 }
901
902 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
903 {
904   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
905
906   return comm->dst_proc;
907 }
908
909 /******************************************************************************/
910 /*                    SIMIX_comm_copy_data callbacks                       */
911 /******************************************************************************/
912 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
913
914 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
915 {
916   SIMIX_comm_copy_data_callback = callback;
917 }
918
919 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
920 {
921   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
922
923   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
924   *(void **) (comm->dst_buff) = buff;
925 }
926
927 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
928 {
929   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
930
931   XBT_DEBUG("Copy the data over");
932   memcpy(comm->dst_buff, buff, buff_size);
933   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
934     xbt_free(buff);
935     comm->src_buff = NULL;
936   }
937 }
938
939
940 /**
941  *  \brief Copy the communication data from the sender's buffer to the receiver's one
942  *  \param comm The communication
943  */
944 void SIMIX_comm_copy_data(smx_synchro_t synchro)
945 {
946   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
947
948   size_t buff_size = comm->src_buff_size;
949   /* If there is no data to copy then return */
950   if (!comm->src_buff || !comm->dst_buff || comm->copied)
951     return;
952
953   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
954             comm,
955             comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
956             comm->src_buff,
957             comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
958             comm->dst_buff, buff_size);
959
960   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
961   if (comm->dst_buff_size)
962     buff_size = MIN(buff_size, *(comm->dst_buff_size));
963
964   /* Update the receiver's buffer size to the copied amount */
965   if (comm->dst_buff_size)
966     *comm->dst_buff_size = buff_size;
967
968   if (buff_size > 0){
969       if(comm->copy_data_fun)
970         comm->copy_data_fun (comm, comm->src_buff, buff_size);
971       else
972         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
973   }
974
975
976   /* Set the copied flag so we copy data only once */
977   /* (this function might be called from both communication ends) */
978   comm->copied = 1;
979 }