Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
turn SIMIX_comm_new() into a proper constructor
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "simgrid/s4u/mailbox.hpp"
13
14 #include "src/simix/SynchroComm.hpp"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
17
18 static void SIMIX_mbox_free(void *data);
19 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
20
21 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
22 static void SIMIX_comm_copy_data(smx_synchro_t comm);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
25     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
26 static void SIMIX_comm_start(smx_synchro_t synchro);
27
28 void SIMIX_mailbox_exit(void)
29 {
30   xbt_dict_free(&mailboxes);
31 }
32
33 /******************************************************************************/
34 /*                           Rendez-Vous Points                               */
35 /******************************************************************************/
36
37 smx_mailbox_t SIMIX_mbox_create(const char *name)
38 {
39   xbt_assert(name, "Mailboxes must have a name");
40   /* two processes may have pushed the same mbox_create simcall at the same time */
41   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
42
43   if (!mbox) {
44     mbox = xbt_new0(s_smx_mailbox_t, 1);
45     mbox->name = xbt_strdup(name);
46     mbox->comm_queue = new std::deque<smx_synchro_t>();
47     mbox->done_comm_queue = nullptr; // Allocated on need only
48     mbox->permanent_receiver=NULL;
49
50     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
51     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
52   }
53   return mbox;
54 }
55
56 void SIMIX_mbox_free(void *data)
57 {
58   XBT_DEBUG("mbox free %p", data);
59   smx_mailbox_t mbox = (smx_mailbox_t) data;
60   xbt_free(mbox->name);
61   delete mbox->comm_queue;
62   delete mbox->done_comm_queue;
63
64   xbt_free(mbox);
65 }
66
67 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
68 {
69   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
70 }
71
72 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
73 {
74   return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
75 }
76
77 /**
78  *  \brief get the receiver (process associated to the mailbox)
79  *  \param mbox The rendez-vous point
80  *  \return process The receiving process (NULL if not set)
81  */
82 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
83 {
84   return mbox->permanent_receiver;
85 }
86
87 /**
88  *  \brief set the receiver of the rendez vous point to allow eager sends
89  *  \param mbox The rendez-vous point
90  *  \param process The receiving process
91  */
92 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
93 {
94   mbox->permanent_receiver=process;
95   if (mbox->done_comm_queue == nullptr)
96     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
97 }
98
99 /**
100  *  \brief Pushes a communication synchro into a rendez-vous point
101  *  \param mbox The mailbox
102  *  \param comm The communication synchro
103  */
104 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t synchro)
105 {
106   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
107
108   mbox->comm_queue->push_back(comm);
109   comm->mbox = mbox;
110 }
111
112 /**
113  *  \brief Removes a communication synchro from a rendez-vous point
114  *  \param mbox The rendez-vous point
115  *  \param comm The communication synchro
116  */
117 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t synchro)
118 {
119   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
120
121   comm->mbox = NULL;
122   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
123     if (*it == comm) {
124       mbox->comm_queue->erase(it);
125       return;
126     }
127   xbt_die("Cannot remove this comm that is not part of the mailbox");
128 }
129
130 /**
131  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
132  *  \param type The type of communication we are looking for (comm_send, comm_recv)
133  *  \return The communication synchro if found, NULL otherwise
134  */
135 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
136     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
137 {
138   void* other_user_data = NULL;
139
140   for(auto it = deque->begin(); it != deque->end(); it++){
141     smx_synchro_t synchro = *it;
142     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
143
144     if (comm->type == SIMIX_COMM_SEND) {
145       other_user_data = comm->src_data;
146     } else if (comm->type == SIMIX_COMM_RECEIVE) {
147       other_user_data = comm->dst_data;
148     }
149     if (comm->type == type &&
150         (!      match_fun ||       match_fun(this_user_data,  other_user_data, synchro)) &&
151         (!comm->match_fun || comm->match_fun(other_user_data, this_user_data,  my_synchro))) {
152       XBT_DEBUG("Found a matching communication synchro %p", comm);
153       if (remove_matching)
154         deque->erase(it);
155       comm->refcount++;
156 #if HAVE_MC
157       comm->mbox_cpy = comm->mbox;
158 #endif
159       comm->mbox = NULL;
160       return comm;
161     }
162     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
163               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
164               comm, (int)comm->type, (int)type);
165   }
166   XBT_DEBUG("No matching communication synchro found");
167   return NULL;
168 }
169
170 /******************************************************************************/
171 /*                          Communication synchros                            */
172 /******************************************************************************/
173
174 /**
175  *  \brief Destroy a communicate synchro
176  *  \param synchro The communicate synchro to be destroyed
177  */
178 void SIMIX_comm_destroy(smx_synchro_t synchro)
179 {
180   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
181
182   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d", comm, comm->refcount, (int)comm->state);
183
184   if (comm->refcount <= 0) {
185     xbt_backtrace_display_current();
186     xbt_die("This comm has a negative refcount! You must not call test() or wait() more than once on a given communication.");
187   }
188   comm->refcount--;
189   if (comm->refcount > 0)
190       return;
191   XBT_DEBUG("Really free communication %p; refcount is now %d", comm, comm->refcount);
192
193   SIMIX_comm_destroy_internal_actions(synchro);
194
195   if (comm->detached && comm->state != SIMIX_DONE) {
196     /* the communication has failed and was detached:
197      * we have to free the buffer */
198     if (comm->clean_fun) {
199       comm->clean_fun(comm->src_buff);
200     }
201     comm->src_buff = NULL;
202   }
203
204   if(comm->mbox)
205     SIMIX_mbox_remove(comm->mbox, comm);
206
207   delete comm;
208 }
209
210 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
211 {
212   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
213   if (comm->surf_comm){
214     comm->surf_comm->unref();
215     comm->surf_comm = NULL;
216   }
217
218   if (comm->src_timeout){
219     comm->src_timeout->unref();
220     comm->src_timeout = NULL;
221   }
222
223   if (comm->dst_timeout){
224     comm->dst_timeout->unref();
225     comm->dst_timeout = NULL;
226   }
227 }
228
229 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
230                                   double task_size, double rate,
231                                   void *src_buff, size_t src_buff_size,
232                                   int (*match_fun)(void *, void *,smx_synchro_t),
233                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
234           void *data, double timeout){
235   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
236                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
237                data, 0);
238   SIMCALL_SET_MC_VALUE(simcall, 0);
239   simcall_HANDLER_comm_wait(simcall, comm, timeout);
240 }
241 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
242                                   double task_size, double rate,
243                                   void *src_buff, size_t src_buff_size,
244                                   int (*match_fun)(void *, void *,smx_synchro_t),
245                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
246                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
247                           void *data, int detached)
248 {
249   XBT_DEBUG("send from %p", mbox);
250
251   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
252   smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
253
254   /* Look for communication synchro matching our needs. We also provide a description of
255    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
256    *
257    * If it is not found then push our communication into the rendez-vous point */
258   smx_synchro_t other_synchro =
259       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
260   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
261
262
263   if (!other_synchro) {
264     other_synchro = this_synchro;
265     other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
266
267     if (mbox->permanent_receiver!=NULL){
268       //this mailbox is for small messages, which have to be sent right now
269       other_synchro->state = SIMIX_READY;
270       other_comm->dst_proc=mbox->permanent_receiver;
271       other_comm->refcount++;
272       mbox->done_comm_queue->push_back(other_synchro);
273       other_comm->mbox=mbox;
274       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_comm));
275
276     }else{
277       SIMIX_mbox_push(mbox, this_synchro);
278     }
279   } else {
280     XBT_DEBUG("Receive already pushed");
281
282     SIMIX_comm_destroy(this_synchro);
283
284     other_comm->state = SIMIX_READY;
285     other_comm->type = SIMIX_COMM_READY;
286
287   }
288   xbt_fifo_push(src_proc->comms, other_synchro);
289
290   /* if the communication synchro is detached then decrease the refcount
291    * by one, so it will be eliminated by the receiver's destroy call */
292   if (detached) {
293     other_comm->detached = 1;
294     other_comm->refcount--;
295     other_comm->clean_fun = clean_fun;
296   } else {
297     other_comm->clean_fun = NULL;
298   }
299
300   /* Setup the communication synchro */
301   other_comm->src_proc = src_proc;
302   other_comm->task_size = task_size;
303   other_comm->rate = rate;
304   other_comm->src_buff = src_buff;
305   other_comm->src_buff_size = src_buff_size;
306   other_comm->src_data = data;
307
308   other_comm->match_fun = match_fun;
309   other_comm->copy_data_fun = copy_data_fun;
310
311
312   if (MC_is_active() || MC_record_replay_is_active()) {
313     other_comm->state = SIMIX_RUNNING;
314     return (detached ? NULL : other_comm);
315   }
316
317   SIMIX_comm_start(other_comm);
318   return (detached ? NULL : other_comm);
319 }
320
321 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
322                          void *dst_buff, size_t *dst_buff_size,
323                          int (*match_fun)(void *, void *, smx_synchro_t),
324                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
325                          void *data, double timeout, double rate)
326 {
327   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
328   SIMCALL_SET_MC_VALUE(simcall, 0);
329   simcall_HANDLER_comm_wait(simcall, comm, timeout);
330 }
331
332 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
333     void *dst_buff, size_t *dst_buff_size,
334     int (*match_fun)(void *, void *, smx_synchro_t),
335     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
336     void *data, double rate)
337 {
338   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
339 }
340
341 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
342     int (*match_fun)(void *, void *, smx_synchro_t),
343     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
344     void *data, double rate)
345 {
346   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
347   smx_synchro_t this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
348
349   smx_synchro_t other_synchro;
350   //communication already done, get it inside the fifo of completed comms
351   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
352
353     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
354     //find a match in the already received fifo
355     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
356     //if not found, assume the receiver came first, register it to the mailbox in the classical way
357     if (!other_synchro)  {
358       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
359       other_synchro = this_synchro;
360       SIMIX_mbox_push(mbox, this_synchro);
361     } else {
362       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
363
364       if(other_comm->surf_comm && SIMIX_comm_get_remains(other_comm)==0.0) {
365         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",other_comm);
366         other_comm->state = SIMIX_DONE;
367         other_comm->type = SIMIX_COMM_DONE;
368         other_comm->mbox = NULL;
369       }
370       other_comm->refcount--;
371       SIMIX_comm_destroy(this_synchro);
372     }
373   } else {
374     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
375
376     /* Look for communication synchro matching our needs. We also provide a description of
377      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
378      *
379      * If it is not found then push our communication into the rendez-vous point */
380     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
381
382     if (!other_synchro) {
383       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
384       other_synchro = this_synchro;
385       SIMIX_mbox_push(mbox, this_synchro);
386     } else {
387       SIMIX_comm_destroy(this_synchro);
388       simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
389
390       other_comm->state = SIMIX_READY;
391       other_comm->type = SIMIX_COMM_READY;
392     }
393     xbt_fifo_push(dst_proc->comms, other_synchro);
394   }
395
396   /* Setup communication synchro */
397   simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
398   other_comm->dst_proc = dst_proc;
399   other_comm->dst_buff = dst_buff;
400   other_comm->dst_buff_size = dst_buff_size;
401   other_comm->dst_data = data;
402
403   if (rate != -1.0 && (other_comm->rate == -1.0 || rate < other_comm->rate))
404     other_comm->rate = rate;
405
406   other_comm->match_fun = match_fun;
407   other_comm->copy_data_fun = copy_data_fun;
408
409   if (MC_is_active() || MC_record_replay_is_active()) {
410     other_synchro->state = SIMIX_RUNNING;
411     return other_synchro;
412   }
413
414   SIMIX_comm_start(other_synchro);
415   return other_synchro;
416 }
417
418 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
419                                    int type, int src, int tag,
420                                    int (*match_fun)(void *, void *, smx_synchro_t),
421                                    void *data){
422   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
423 }
424
425 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
426                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
427 {
428   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
429   smx_synchro_t this_synchro;
430   int smx_type;
431   if(type == 1){
432     this_synchro = new simgrid::simix::Comm(SIMIX_COMM_SEND);
433     smx_type = SIMIX_COMM_RECEIVE;
434   } else{
435     this_synchro = new simgrid::simix::Comm(SIMIX_COMM_RECEIVE);
436     smx_type = SIMIX_COMM_SEND;
437   } 
438   smx_synchro_t other_synchro=NULL;
439   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
440     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
441     other_synchro =
442         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
443   }
444   if (!other_synchro){
445     XBT_DEBUG("check if we have more luck in the normal mailbox");
446     other_synchro = _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
447   }
448
449   if(other_synchro) {
450     simgrid::simix::Comm *other_comm = static_cast<simgrid::simix::Comm*>(other_synchro);
451     other_comm->refcount--;
452   }
453
454   SIMIX_comm_destroy(this_synchro);
455   return other_synchro;
456 }
457
458 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
459 {
460   /* the simcall may be a wait, a send or a recv */
461   surf_action_t sleep;
462
463   /* Associate this simcall to the wait synchro */
464   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
465
466   xbt_fifo_push(synchro->simcalls, simcall);
467   simcall->issuer->waiting_synchro = synchro;
468
469   if (MC_is_active() || MC_record_replay_is_active()) {
470     int idx = SIMCALL_GET_MC_VALUE(simcall);
471     if (idx == 0) {
472       synchro->state = SIMIX_DONE;
473     } else {
474       /* If we reached this point, the wait simcall must have a timeout */
475       /* Otherwise it shouldn't be enabled and executed by the MC */
476       if (timeout == -1)
477         THROW_IMPOSSIBLE;
478
479       simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
480       if (comm->src_proc == simcall->issuer)
481         comm->state = SIMIX_SRC_TIMEOUT;
482       else
483         comm->state = SIMIX_DST_TIMEOUT;
484     }
485
486     SIMIX_comm_finish(synchro);
487     return;
488   }
489
490   /* If the synchro has already finish perform the error handling, */
491   /* otherwise set up a waiting timeout on the right side          */
492   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
493     SIMIX_comm_finish(synchro);
494   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
495     sleep = surf_host_sleep(simcall->issuer->host, timeout);
496     sleep->setData(synchro);
497
498     simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
499     if (simcall->issuer == comm->src_proc)
500       comm->src_timeout = sleep;
501     else
502       comm->dst_timeout = sleep;
503   }
504 }
505
506 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
507 {
508   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
509
510   if (MC_is_active() || MC_record_replay_is_active()){
511     simcall_comm_test__set__result(simcall, comm->src_proc && comm->dst_proc);
512     if (simcall_comm_test__get__result(simcall)){
513       synchro->state = SIMIX_DONE;
514       xbt_fifo_push(synchro->simcalls, simcall);
515       SIMIX_comm_finish(synchro);
516     } else {
517       SIMIX_simcall_answer(simcall);
518     }
519     return;
520   }
521
522   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
523   if (simcall_comm_test__get__result(simcall)) {
524     xbt_fifo_push(synchro->simcalls, simcall);
525     SIMIX_comm_finish(synchro);
526   } else {
527     SIMIX_simcall_answer(simcall);
528   }
529 }
530
531 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
532 {
533   unsigned int cursor;
534   smx_synchro_t synchro;
535   simcall_comm_testany__set__result(simcall, -1);
536
537   if (MC_is_active() || MC_record_replay_is_active()){
538     int idx = SIMCALL_GET_MC_VALUE(simcall);
539     if(idx == -1){
540       SIMIX_simcall_answer(simcall);
541     }else{
542       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
543       simcall_comm_testany__set__result(simcall, idx);
544       xbt_fifo_push(synchro->simcalls, simcall);
545       synchro->state = SIMIX_DONE;
546       SIMIX_comm_finish(synchro);
547     }
548     return;
549   }
550
551   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
552     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
553       simcall_comm_testany__set__result(simcall, cursor);
554       xbt_fifo_push(synchro->simcalls, simcall);
555       SIMIX_comm_finish(synchro);
556       return;
557     }
558   }
559   SIMIX_simcall_answer(simcall);
560 }
561
562 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
563 {
564   smx_synchro_t synchro;
565   unsigned int cursor = 0;
566
567   if (MC_is_active() || MC_record_replay_is_active()){
568     int idx = SIMCALL_GET_MC_VALUE(simcall);
569     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
570     xbt_fifo_push(synchro->simcalls, simcall);
571     simcall_comm_waitany__set__result(simcall, idx);
572     synchro->state = SIMIX_DONE;
573     SIMIX_comm_finish(synchro);
574     return;
575   }
576
577   xbt_dynar_foreach(synchros, cursor, synchro){
578     /* associate this simcall to the the synchro */
579     xbt_fifo_push(synchro->simcalls, simcall);
580
581     /* see if the synchro is already finished */
582     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
583       SIMIX_comm_finish(synchro);
584       break;
585     }
586   }
587 }
588
589 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
590 {
591   smx_synchro_t synchro;
592   unsigned int cursor = 0;
593   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
594
595   xbt_dynar_foreach(synchros, cursor, synchro)
596     xbt_fifo_remove(synchro->simcalls, simcall);
597 }
598
599 /**
600  *  \brief Starts the simulation of a communication synchro.
601  *  \param synchro the communication synchro
602  */
603 static inline void SIMIX_comm_start(smx_synchro_t synchro)
604 {
605   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
606
607   /* If both the sender and the receiver are already there, start the communication */
608   if (synchro->state == SIMIX_READY) {
609
610     sg_host_t sender   = comm->src_proc->host;
611     sg_host_t receiver = comm->dst_proc->host;
612
613     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro, sg_host_get_name(sender), sg_host_get_name(receiver));
614
615     comm->surf_comm = surf_network_model_communicate(surf_network_model, sender, receiver, comm->task_size, comm->rate);
616     comm->surf_comm->setData(synchro);
617     comm->state = SIMIX_RUNNING;
618
619     /* If a link is failed, detect it immediately */
620     if (comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
621       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
622                 sg_host_get_name(sender), sg_host_get_name(receiver));
623       comm->state = SIMIX_LINK_FAILURE;
624       SIMIX_comm_destroy_internal_actions(synchro);
625     }
626
627     /* If any of the process is suspend, create the synchro but stop its execution,
628        it will be restarted when the sender process resume */
629     if (SIMIX_process_is_suspended(comm->src_proc) ||
630         SIMIX_process_is_suspended(comm->dst_proc)) {
631       /* FIXME: check what should happen with the synchro state */
632
633       if (SIMIX_process_is_suspended(comm->src_proc))
634         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
635                   sg_host_get_name(comm->src_proc->host), comm->src_proc->name);
636       else
637         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
638                   sg_host_get_name(comm->dst_proc->host), comm->dst_proc->name);
639
640       comm->surf_comm->suspend();
641     }
642   }
643 }
644
645 /**
646  * \brief Answers the SIMIX simcalls associated to a communication synchro.
647  * \param synchro a finished communication synchro
648  */
649 void SIMIX_comm_finish(smx_synchro_t synchro)
650 {
651   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
652   unsigned int destroy_count = 0;
653   smx_simcall_t simcall;
654
655   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
656
657     /* If a waitany simcall is waiting for this synchro to finish, then remove
658        it from the other synchros in the waitany list. Afterwards, get the
659        position of the actual synchro in the waitany dynar and
660        return it as the result of the simcall */
661
662     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
663       continue; // if process handling comm is killed
664     if (simcall->call == SIMCALL_COMM_WAITANY) {
665       SIMIX_waitany_remove_simcall_from_actions(simcall);
666       if (!MC_is_active() && !MC_record_replay_is_active())
667         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
668     }
669
670     /* If the synchro is still in a rendez-vous point then remove from it */
671     if (comm->mbox)
672       SIMIX_mbox_remove(comm->mbox, synchro);
673
674     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
675
676     /* Check out for errors */
677
678     if (simcall->issuer->host->isOff()) {
679       simcall->issuer->context->iwannadie = 1;
680       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
681     } else
682
683     switch (synchro->state) {
684
685     case SIMIX_DONE:
686       XBT_DEBUG("Communication %p complete!", synchro);
687       SIMIX_comm_copy_data(synchro);
688       break;
689
690     case SIMIX_SRC_TIMEOUT:
691       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of sender");
692       break;
693
694     case SIMIX_DST_TIMEOUT:
695       SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Communication timeouted because of receiver");
696       break;
697
698     case SIMIX_SRC_HOST_FAILURE:
699       if (simcall->issuer == comm->src_proc)
700         simcall->issuer->context->iwannadie = 1;
701 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
702       else
703         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
704       break;
705
706     case SIMIX_DST_HOST_FAILURE:
707       if (simcall->issuer == comm->dst_proc)
708         simcall->issuer->context->iwannadie = 1;
709 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
710       else
711         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
712       break;
713
714     case SIMIX_LINK_FAILURE:
715
716       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
717                 synchro,
718                 comm->src_proc ? sg_host_get_name(comm->src_proc->host) : NULL,
719                 comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : NULL,
720                 simcall->issuer->name, simcall->issuer, comm->detached);
721       if (comm->src_proc == simcall->issuer) {
722         XBT_DEBUG("I'm source");
723       } else if (comm->dst_proc == simcall->issuer) {
724         XBT_DEBUG("I'm dest");
725       } else {
726         XBT_DEBUG("I'm neither source nor dest");
727       }
728       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
729       break;
730
731     case SIMIX_CANCELED:
732       if (simcall->issuer == comm->dst_proc)
733         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the sender");
734       else
735         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Communication canceled by the receiver");
736       break;
737
738     default:
739       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
740     }
741
742     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
743     if (simcall->issuer->doexception) {
744       if (simcall->call == SIMCALL_COMM_WAITANY) {
745         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
746       }
747       else if (simcall->call == SIMCALL_COMM_TESTANY) {
748         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
749       }
750     }
751
752     if (simcall->issuer->host->isOff()) {
753       simcall->issuer->context->iwannadie = 1;
754     }
755
756     simcall->issuer->waiting_synchro = NULL;
757     xbt_fifo_remove(simcall->issuer->comms, synchro);
758     if(comm->detached){
759       if(simcall->issuer == comm->src_proc){
760         if(comm->dst_proc)
761           xbt_fifo_remove(comm->dst_proc->comms, synchro);
762       }
763       if(simcall->issuer == comm->dst_proc){
764         if(comm->src_proc)
765           xbt_fifo_remove(comm->src_proc->comms, synchro);
766       }
767     }
768     SIMIX_simcall_answer(simcall);
769     destroy_count++;
770   }
771
772   while (destroy_count-- > 0)
773     SIMIX_comm_destroy(synchro);
774 }
775
776 /**
777  * \brief This function is called when a Surf communication synchro is finished.
778  * \param synchro the corresponding Simix communication
779  */
780 void SIMIX_post_comm(smx_synchro_t synchro)
781 {
782   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
783
784   /* Update synchro state */
785   if (comm->src_timeout &&
786       comm->src_timeout->getState() == simgrid::surf::Action::State::done)
787     synchro->state = SIMIX_SRC_TIMEOUT;
788   else if (comm->dst_timeout &&
789     comm->dst_timeout->getState() == simgrid::surf::Action::State::done)
790     synchro->state = SIMIX_DST_TIMEOUT;
791   else if (comm->src_timeout &&
792     comm->src_timeout->getState() == simgrid::surf::Action::State::failed)
793     synchro->state = SIMIX_SRC_HOST_FAILURE;
794   else if (comm->dst_timeout &&
795       comm->dst_timeout->getState() == simgrid::surf::Action::State::failed)
796     synchro->state = SIMIX_DST_HOST_FAILURE;
797   else if (comm->surf_comm &&
798     comm->surf_comm->getState() == simgrid::surf::Action::State::failed) {
799     synchro->state = SIMIX_LINK_FAILURE;
800   } else
801     synchro->state = SIMIX_DONE;
802
803   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
804             comm, (int)comm->state, comm->src_proc, comm->dst_proc, comm->detached);
805
806   /* destroy the surf actions associated with the Simix communication */
807   SIMIX_comm_destroy_internal_actions(comm);
808
809   /* if there are simcalls associated with the synchro, then answer them */
810   if (xbt_fifo_size(synchro->simcalls)) {
811     SIMIX_comm_finish(comm);
812   }
813 }
814
815 /************* synchro Getters **************/
816
817 /**
818  *  \brief get the amount remaining from the communication
819  *  \param synchro The communication
820  */
821 double SIMIX_comm_get_remains(smx_synchro_t synchro)
822 {
823   if(!synchro)
824     return 0;
825   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
826
827   double remains;
828   switch (synchro->state) {
829
830   case SIMIX_RUNNING:
831     remains = comm->surf_comm->getRemains();
832     break;
833
834   case SIMIX_WAITING:
835   case SIMIX_READY:
836     remains = 0; /*FIXME: check what should be returned */
837     break;
838
839   default:
840     remains = 0; /*FIXME: is this correct? */
841     break;
842   }
843   return remains;
844 }
845
846 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
847 {
848   return synchro->state;
849 }
850
851 /**
852  *  \brief Return the user data associated to the sender of the communication
853  *  \param synchro The communication
854  *  \return the user data
855  */
856 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
857 {
858   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
859
860   return comm->src_data;
861 }
862
863 /**
864  *  \brief Return the user data associated to the receiver of the communication
865  *  \param synchro The communication
866  *  \return the user data
867  */
868 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
869 {
870   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
871
872   return comm->dst_data;
873 }
874
875 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
876 {
877   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
878
879   return comm->src_proc;
880 }
881
882 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
883 {
884   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
885
886   return comm->dst_proc;
887 }
888
889 /******************************************************************************/
890 /*                    SIMIX_comm_copy_data callbacks                       */
891 /******************************************************************************/
892 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) = &SIMIX_comm_copy_pointer_callback;
893
894 void SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
895 {
896   SIMIX_comm_copy_data_callback = callback;
897 }
898
899 void SIMIX_comm_copy_pointer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
900 {
901   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
902
903   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
904   *(void **) (comm->dst_buff) = buff;
905 }
906
907 void SIMIX_comm_copy_buffer_callback(smx_synchro_t synchro, void* buff, size_t buff_size)
908 {
909   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
910
911   XBT_DEBUG("Copy the data over");
912   memcpy(comm->dst_buff, buff, buff_size);
913   if (comm->detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
914     xbt_free(buff);
915     comm->src_buff = NULL;
916   }
917 }
918
919
920 /**
921  *  \brief Copy the communication data from the sender's buffer to the receiver's one
922  *  \param comm The communication
923  */
924 void SIMIX_comm_copy_data(smx_synchro_t synchro)
925 {
926   simgrid::simix::Comm *comm = static_cast<simgrid::simix::Comm*>(synchro);
927
928   size_t buff_size = comm->src_buff_size;
929   /* If there is no data to copy then return */
930   if (!comm->src_buff || !comm->dst_buff || comm->copied)
931     return;
932
933   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
934             comm,
935             comm->src_proc ? sg_host_get_name(comm->src_proc->host) : "a finished process",
936             comm->src_buff,
937             comm->dst_proc ? sg_host_get_name(comm->dst_proc->host) : "a finished process",
938             comm->dst_buff, buff_size);
939
940   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
941   if (comm->dst_buff_size)
942     buff_size = MIN(buff_size, *(comm->dst_buff_size));
943
944   /* Update the receiver's buffer size to the copied amount */
945   if (comm->dst_buff_size)
946     *comm->dst_buff_size = buff_size;
947
948   if (buff_size > 0){
949       if(comm->copy_data_fun)
950         comm->copy_data_fun (comm, comm->src_buff, buff_size);
951       else
952         SIMIX_comm_copy_data_callback (comm, comm->src_buff, buff_size);
953   }
954
955
956   /* Set the copied flag so we copy data only once */
957   /* (this function might be called from both communication ends) */
958   comm->copied = 1;
959 }