Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leaks --
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/surf/surf_interface.hpp"
7 #include "src/simix/smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/dict.h"
12 #include "simgrid/s4u/mailbox.hpp"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
15
16 static void SIMIX_mbox_free(void *data);
17 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
18
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
23 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
24     int (*match_fun)(void *, void *,smx_synchro_t), void *user_data, smx_synchro_t my_synchro, bool remove_matching);
25 static void SIMIX_comm_start(smx_synchro_t synchro);
26
27 void SIMIX_mailbox_exit(void)
28 {
29   xbt_dict_free(&mailboxes);
30 }
31
32 /******************************************************************************/
33 /*                           Rendez-Vous Points                               */
34 /******************************************************************************/
35
36 smx_mailbox_t SIMIX_mbox_create(const char *name)
37 {
38   xbt_assert(name, "Mailboxes must have a name");
39   /* two processes may have pushed the same mbox_create simcall at the same time */
40   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
41
42   if (!mbox) {
43     mbox = xbt_new0(s_smx_mailbox_t, 1);
44     mbox->name = xbt_strdup(name);
45     mbox->comm_queue = new std::deque<smx_synchro_t>();
46     mbox->done_comm_queue = nullptr; // Allocated on need only
47     mbox->permanent_receiver=NULL;
48
49     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
50     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
51   }
52   return mbox;
53 }
54
55 void SIMIX_mbox_free(void *data)
56 {
57   XBT_DEBUG("mbox free %p", data);
58   smx_mailbox_t mbox = (smx_mailbox_t) data;
59   xbt_free(mbox->name);
60   delete mbox->comm_queue;
61   delete mbox->done_comm_queue;
62
63   xbt_free(mbox);
64 }
65
66 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
67 {
68   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
69 }
70
71 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
72 {
73   return mbox->comm_queue->empty()? nullptr:mbox->comm_queue->front();
74 }
75
76 /**
77  *  \brief get the receiver (process associated to the mailbox)
78  *  \param mbox The rendez-vous point
79  *  \return process The receiving process (NULL if not set)
80  */
81 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
82 {
83   return mbox->permanent_receiver;
84 }
85
86 /**
87  *  \brief set the receiver of the rendez vous point to allow eager sends
88  *  \param mbox The rendez-vous point
89  *  \param process The receiving process
90  */
91 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
92 {
93   mbox->permanent_receiver=process;
94   if (mbox->done_comm_queue == nullptr)
95     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
96 }
97
98 /**
99  *  \brief Pushes a communication synchro into a rendez-vous point
100  *  \param mbox The mailbox
101  *  \param comm The communication synchro
102  */
103 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
104 {
105   mbox->comm_queue->push_back(comm);
106   comm->comm.mbox = mbox;
107 }
108
109 /**
110  *  \brief Removes a communication synchro from a rendez-vous point
111  *  \param mbox The rendez-vous point
112  *  \param comm The communication synchro
113  */
114 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
115 {
116   comm->comm.mbox = NULL;
117   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
118     if (*it == comm) {
119       mbox->comm_queue->erase(it);
120       return;
121     }
122   xbt_die("Cannot remove this comm that is not part of the mailbox");
123 }
124
125 /**
126  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
127  *  \param type The type of communication we are looking for (comm_send, comm_recv)
128  *  \return The communication synchro if found, NULL otherwise
129  */
130 static smx_synchro_t _find_matching_comm(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
131     int (*match_fun)(void *, void *,smx_synchro_t), void *this_user_data, smx_synchro_t my_synchro, bool remove_matching)
132 {
133   void* other_user_data = NULL;
134
135   for(auto it = deque->begin(); it != deque->end(); it++){
136     smx_synchro_t synchro = *it;
137     if (synchro->comm.type == SIMIX_COMM_SEND) {
138       other_user_data = synchro->comm.src_data;
139     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
140       other_user_data = synchro->comm.dst_data;
141     }
142     if (synchro->comm.type == type &&
143         (!match_fun               ||               match_fun(this_user_data,  other_user_data, synchro)) &&
144         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
145       XBT_DEBUG("Found a matching communication synchro %p", synchro);
146       if (remove_matching)
147         deque->erase(it);
148       synchro->comm.refcount++;
149 #if HAVE_MC
150       synchro->comm.mbox_cpy = synchro->comm.mbox;
151 #endif
152       synchro->comm.mbox = NULL;
153       return synchro;
154     }
155     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
156               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
157               synchro, (int)synchro->comm.type, (int)type);
158   }
159   XBT_DEBUG("No matching communication synchro found");
160   return NULL;
161 }
162
163 /******************************************************************************/
164 /*                          Communication synchros                            */
165 /******************************************************************************/
166
167 /**
168  *  \brief Creates a new communicate synchro
169  *  \param type The direction of communication (comm_send, comm_recv)
170  *  \return The new communicate synchro
171  */
172 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
173 {
174   smx_synchro_t synchro;
175
176   /* alloc structures */
177   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
178
179   synchro->type = SIMIX_SYNC_COMMUNICATE;
180   synchro->state = SIMIX_WAITING;
181
182   /* set communication */
183   synchro->comm.type = type;
184   synchro->comm.refcount = 1;
185   synchro->comm.src_data=NULL;
186   synchro->comm.dst_data=NULL;
187
188   synchro->category = NULL;
189
190   XBT_DEBUG("Create communicate synchro %p", synchro);
191
192   return synchro;
193 }
194
195 /**
196  *  \brief Destroy a communicate synchro
197  *  \param synchro The communicate synchro to be destroyed
198  */
199 void SIMIX_comm_destroy(smx_synchro_t synchro)
200 {
201   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
202             synchro, synchro->comm.refcount, (int)synchro->state);
203
204   if (synchro->comm.refcount <= 0) {
205     xbt_backtrace_display_current();
206     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
207             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
208   }
209   synchro->comm.refcount--;
210   if (synchro->comm.refcount > 0)
211       return;
212   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
213       synchro->comm.refcount);
214
215   xbt_free(synchro->name);
216   SIMIX_comm_destroy_internal_actions(synchro);
217
218   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
219     /* the communication has failed and was detached:
220      * we have to free the buffer */
221     if (synchro->comm.clean_fun) {
222       synchro->comm.clean_fun(synchro->comm.src_buff);
223     }
224     synchro->comm.src_buff = NULL;
225   }
226
227   if(synchro->comm.mbox)
228     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
229
230   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
231 }
232
233 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
234 {
235   if (synchro->comm.surf_comm){
236     synchro->comm.surf_comm->unref();
237     synchro->comm.surf_comm = NULL;
238   }
239
240   if (synchro->comm.src_timeout){
241     synchro->comm.src_timeout->unref();
242     synchro->comm.src_timeout = NULL;
243   }
244
245   if (synchro->comm.dst_timeout){
246     synchro->comm.dst_timeout->unref();
247     synchro->comm.dst_timeout = NULL;
248   }
249 }
250
251 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
252                                   double task_size, double rate,
253                                   void *src_buff, size_t src_buff_size,
254                                   int (*match_fun)(void *, void *,smx_synchro_t),
255                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
256           void *data, double timeout){
257   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
258                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
259                data, 0);
260   SIMCALL_SET_MC_VALUE(simcall, 0);
261   simcall_HANDLER_comm_wait(simcall, comm, timeout);
262 }
263 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
264                                   double task_size, double rate,
265                                   void *src_buff, size_t src_buff_size,
266                                   int (*match_fun)(void *, void *,smx_synchro_t),
267                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
268                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
269                           void *data, int detached)
270 {
271   XBT_DEBUG("send from %p", mbox);
272
273   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
274   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
275
276   /* Look for communication synchro matching our needs. We also provide a description of
277    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
278    *
279    * If it is not found then push our communication into the rendez-vous point */
280   smx_synchro_t other_synchro =
281       _find_matching_comm(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro, /*remove_matching*/true);
282
283   if (!other_synchro) {
284     other_synchro = this_synchro;
285
286     if (mbox->permanent_receiver!=NULL){
287       //this mailbox is for small messages, which have to be sent right now
288       other_synchro->state = SIMIX_READY;
289       other_synchro->comm.dst_proc=mbox->permanent_receiver;
290       other_synchro->comm.refcount++;
291       mbox->done_comm_queue->push_back(other_synchro);
292       other_synchro->comm.mbox=mbox;
293       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
294
295     }else{
296       SIMIX_mbox_push(mbox, this_synchro);
297     }
298   } else {
299     XBT_DEBUG("Receive already pushed");
300
301     SIMIX_comm_destroy(this_synchro);
302
303     other_synchro->state = SIMIX_READY;
304     other_synchro->comm.type = SIMIX_COMM_READY;
305
306   }
307   xbt_fifo_push(src_proc->comms, other_synchro);
308
309   /* if the communication synchro is detached then decrease the refcount
310    * by one, so it will be eliminated by the receiver's destroy call */
311   if (detached) {
312     other_synchro->comm.detached = 1;
313     other_synchro->comm.refcount--;
314     other_synchro->comm.clean_fun = clean_fun;
315   } else {
316     other_synchro->comm.clean_fun = NULL;
317   }
318
319   /* Setup the communication synchro */
320   other_synchro->comm.src_proc = src_proc;
321   other_synchro->comm.task_size = task_size;
322   other_synchro->comm.rate = rate;
323   other_synchro->comm.src_buff = src_buff;
324   other_synchro->comm.src_buff_size = src_buff_size;
325   other_synchro->comm.src_data = data;
326
327   other_synchro->comm.match_fun = match_fun;
328   other_synchro->comm.copy_data_fun = copy_data_fun;
329
330
331   if (MC_is_active() || MC_record_replay_is_active()) {
332     other_synchro->state = SIMIX_RUNNING;
333     return (detached ? NULL : other_synchro);
334   }
335
336   SIMIX_comm_start(other_synchro);
337   return (detached ? NULL : other_synchro);
338 }
339
340 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
341                          void *dst_buff, size_t *dst_buff_size,
342                          int (*match_fun)(void *, void *, smx_synchro_t),
343                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
344                          void *data, double timeout, double rate)
345 {
346   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
347                            dst_buff_size, match_fun, copy_data_fun, data, rate);
348   SIMCALL_SET_MC_VALUE(simcall, 0);
349   simcall_HANDLER_comm_wait(simcall, comm, timeout);
350 }
351
352 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
353     void *dst_buff, size_t *dst_buff_size,
354     int (*match_fun)(void *, void *, smx_synchro_t),
355     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
356     void *data, double rate)
357 {
358   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
359 }
360
361 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
362     int (*match_fun)(void *, void *, smx_synchro_t),
363     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
364     void *data, double rate)
365 {
366   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
367   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
368
369   smx_synchro_t other_synchro;
370   //communication already done, get it inside the fifo of completed comms
371   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
372
373     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
374     //find a match in the already received fifo
375     other_synchro = _find_matching_comm(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
376     //if not found, assume the receiver came first, register it to the mailbox in the classical way
377     if (!other_synchro)  {
378       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
379       other_synchro = this_synchro;
380       SIMIX_mbox_push(mbox, this_synchro);
381     } else {
382       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
383         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
384         other_synchro->state = SIMIX_DONE;
385         other_synchro->comm.type = SIMIX_COMM_DONE;
386         other_synchro->comm.mbox = NULL;
387       }
388       other_synchro->comm.refcount--;
389       SIMIX_comm_destroy(this_synchro);
390     }
391   } else {
392     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
393
394     /* Look for communication synchro matching our needs. We also provide a description of
395      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
396      *
397      * If it is not found then push our communication into the rendez-vous point */
398     other_synchro = _find_matching_comm(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro,/*remove_matching*/true);
399
400     if (!other_synchro) {
401       XBT_DEBUG("Receive pushed first %zu", mbox->comm_queue->size());
402       other_synchro = this_synchro;
403       SIMIX_mbox_push(mbox, this_synchro);
404     } else {
405       SIMIX_comm_destroy(this_synchro);
406       other_synchro->state = SIMIX_READY;
407       other_synchro->comm.type = SIMIX_COMM_READY;
408       //other_synchro->comm.refcount--;
409     }
410     xbt_fifo_push(dst_proc->comms, other_synchro);
411   }
412
413   /* Setup communication synchro */
414   other_synchro->comm.dst_proc = dst_proc;
415   other_synchro->comm.dst_buff = dst_buff;
416   other_synchro->comm.dst_buff_size = dst_buff_size;
417   other_synchro->comm.dst_data = data;
418
419   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
420     other_synchro->comm.rate = rate;
421
422   other_synchro->comm.match_fun = match_fun;
423   other_synchro->comm.copy_data_fun = copy_data_fun;
424
425   if (MC_is_active() || MC_record_replay_is_active()) {
426     other_synchro->state = SIMIX_RUNNING;
427     return other_synchro;
428   }
429
430   SIMIX_comm_start(other_synchro);
431   return other_synchro;
432 }
433
434 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
435                                    int type, int src, int tag,
436                                    int (*match_fun)(void *, void *, smx_synchro_t),
437                                    void *data){
438   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
439 }
440
441 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
442                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
443 {
444   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
445   smx_synchro_t this_synchro;
446   int smx_type;
447   if(type == 1){
448     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
449     smx_type = SIMIX_COMM_RECEIVE;
450   } else{
451     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
452     smx_type = SIMIX_COMM_SEND;
453   } 
454   smx_synchro_t other_synchro=NULL;
455   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
456     XBT_DEBUG("first check in the permanent recv mailbox, to see if we already got something");
457     other_synchro =
458         _find_matching_comm(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
459   }
460   if (!other_synchro){
461     XBT_DEBUG("check if we have more luck in the normal mailbox");
462     other_synchro =
463         _find_matching_comm(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro,/*remove_matching*/false);
464   }
465   if(other_synchro)
466     other_synchro->comm.refcount--;
467
468   SIMIX_comm_destroy(this_synchro);
469   return other_synchro;
470 }
471
472 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
473 {
474   /* the simcall may be a wait, a send or a recv */
475   surf_action_t sleep;
476
477   /* Associate this simcall to the wait synchro */
478   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
479
480   xbt_fifo_push(synchro->simcalls, simcall);
481   simcall->issuer->waiting_synchro = synchro;
482
483   if (MC_is_active() || MC_record_replay_is_active()) {
484     int idx = SIMCALL_GET_MC_VALUE(simcall);
485     if (idx == 0) {
486       synchro->state = SIMIX_DONE;
487     } else {
488       /* If we reached this point, the wait simcall must have a timeout */
489       /* Otherwise it shouldn't be enabled and executed by the MC */
490       if (timeout == -1)
491         THROW_IMPOSSIBLE;
492
493       if (synchro->comm.src_proc == simcall->issuer)
494         synchro->state = SIMIX_SRC_TIMEOUT;
495       else
496         synchro->state = SIMIX_DST_TIMEOUT;
497     }
498
499     SIMIX_comm_finish(synchro);
500     return;
501   }
502
503   /* If the synchro has already finish perform the error handling, */
504   /* otherwise set up a waiting timeout on the right side          */
505   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
506     SIMIX_comm_finish(synchro);
507   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
508     sleep = surf_host_sleep(simcall->issuer->host, timeout);
509     sleep->setData(synchro);
510
511     if (simcall->issuer == synchro->comm.src_proc)
512       synchro->comm.src_timeout = sleep;
513     else
514       synchro->comm.dst_timeout = sleep;
515   }
516 }
517
518 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
519 {
520   if(MC_is_active() || MC_record_replay_is_active()){
521     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
522     if(simcall_comm_test__get__result(simcall)){
523       synchro->state = SIMIX_DONE;
524       xbt_fifo_push(synchro->simcalls, simcall);
525       SIMIX_comm_finish(synchro);
526     }else{
527       SIMIX_simcall_answer(simcall);
528     }
529     return;
530   }
531
532   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
533   if (simcall_comm_test__get__result(simcall)) {
534     xbt_fifo_push(synchro->simcalls, simcall);
535     SIMIX_comm_finish(synchro);
536   } else {
537     SIMIX_simcall_answer(simcall);
538   }
539 }
540
541 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
542 {
543   unsigned int cursor;
544   smx_synchro_t synchro;
545   simcall_comm_testany__set__result(simcall, -1);
546
547   if (MC_is_active() || MC_record_replay_is_active()){
548     int idx = SIMCALL_GET_MC_VALUE(simcall);
549     if(idx == -1){
550       SIMIX_simcall_answer(simcall);
551     }else{
552       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
553       simcall_comm_testany__set__result(simcall, idx);
554       xbt_fifo_push(synchro->simcalls, simcall);
555       synchro->state = SIMIX_DONE;
556       SIMIX_comm_finish(synchro);
557     }
558     return;
559   }
560
561   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
562     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
563       simcall_comm_testany__set__result(simcall, cursor);
564       xbt_fifo_push(synchro->simcalls, simcall);
565       SIMIX_comm_finish(synchro);
566       return;
567     }
568   }
569   SIMIX_simcall_answer(simcall);
570 }
571
572 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
573 {
574   smx_synchro_t synchro;
575   unsigned int cursor = 0;
576
577   if (MC_is_active() || MC_record_replay_is_active()){
578     int idx = SIMCALL_GET_MC_VALUE(simcall);
579     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
580     xbt_fifo_push(synchro->simcalls, simcall);
581     simcall_comm_waitany__set__result(simcall, idx);
582     synchro->state = SIMIX_DONE;
583     SIMIX_comm_finish(synchro);
584     return;
585   }
586
587   xbt_dynar_foreach(synchros, cursor, synchro){
588     /* associate this simcall to the the synchro */
589     xbt_fifo_push(synchro->simcalls, simcall);
590
591     /* see if the synchro is already finished */
592     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
593       SIMIX_comm_finish(synchro);
594       break;
595     }
596   }
597 }
598
599 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
600 {
601   smx_synchro_t synchro;
602   unsigned int cursor = 0;
603   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
604
605   xbt_dynar_foreach(synchros, cursor, synchro) {
606     xbt_fifo_remove(synchro->simcalls, simcall);
607   }
608 }
609
610 /**
611  *  \brief Starts the simulation of a communication synchro.
612  *  \param synchro the communication synchro
613  */
614 static inline void SIMIX_comm_start(smx_synchro_t synchro)
615 {
616   /* If both the sender and the receiver are already there, start the communication */
617   if (synchro->state == SIMIX_READY) {
618
619     sg_host_t sender = synchro->comm.src_proc->host;
620     sg_host_t receiver = synchro->comm.dst_proc->host;
621
622     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
623               sg_host_get_name(sender), sg_host_get_name(receiver));
624
625     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
626                                                             sender, receiver,
627                                                             synchro->comm.task_size, synchro->comm.rate);
628
629     synchro->comm.surf_comm->setData(synchro);
630
631     synchro->state = SIMIX_RUNNING;
632
633     /* If a link is failed, detect it immediately */
634     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
635       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
636                 sg_host_get_name(sender), sg_host_get_name(receiver));
637       synchro->state = SIMIX_LINK_FAILURE;
638       SIMIX_comm_destroy_internal_actions(synchro);
639     }
640
641     /* If any of the process is suspend, create the synchro but stop its execution,
642        it will be restarted when the sender process resume */
643     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
644         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
645       /* FIXME: check what should happen with the synchro state */
646
647       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
648         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
649                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
650       else
651         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
652                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
653
654       synchro->comm.surf_comm->suspend();
655
656     }
657   }
658 }
659
660 /**
661  * \brief Answers the SIMIX simcalls associated to a communication synchro.
662  * \param synchro a finished communication synchro
663  */
664 void SIMIX_comm_finish(smx_synchro_t synchro)
665 {
666   unsigned int destroy_count = 0;
667   smx_simcall_t simcall;
668
669   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
670
671     /* If a waitany simcall is waiting for this synchro to finish, then remove
672        it from the other synchros in the waitany list. Afterwards, get the
673        position of the actual synchro in the waitany dynar and
674        return it as the result of the simcall */
675
676     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
677       continue; // if process handling comm is killed
678     if (simcall->call == SIMCALL_COMM_WAITANY) {
679       SIMIX_waitany_remove_simcall_from_actions(simcall);
680       if (!MC_is_active() && !MC_record_replay_is_active())
681         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
682     }
683
684     /* If the synchro is still in a rendez-vous point then remove from it */
685     if (synchro->comm.mbox)
686       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
687
688     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
689
690     /* Check out for errors */
691
692     if (simcall->issuer->host->isOff()) {
693       simcall->issuer->context->iwannadie = 1;
694       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
695     } else
696
697     switch (synchro->state) {
698
699     case SIMIX_DONE:
700       XBT_DEBUG("Communication %p complete!", synchro);
701       SIMIX_comm_copy_data(synchro);
702       break;
703
704     case SIMIX_SRC_TIMEOUT:
705       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
706                     "Communication timeouted because of sender");
707       break;
708
709     case SIMIX_DST_TIMEOUT:
710       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
711                     "Communication timeouted because of receiver");
712       break;
713
714     case SIMIX_SRC_HOST_FAILURE:
715       if (simcall->issuer == synchro->comm.src_proc)
716         simcall->issuer->context->iwannadie = 1;
717 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
718       else
719         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
720       break;
721
722     case SIMIX_DST_HOST_FAILURE:
723       if (simcall->issuer == synchro->comm.dst_proc)
724         simcall->issuer->context->iwannadie = 1;
725 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
726       else
727         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
728       break;
729
730     case SIMIX_LINK_FAILURE:
731
732       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
733                 synchro,
734                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
735                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
736                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
737       if (synchro->comm.src_proc == simcall->issuer) {
738         XBT_DEBUG("I'm source");
739       } else if (synchro->comm.dst_proc == simcall->issuer) {
740         XBT_DEBUG("I'm dest");
741       } else {
742         XBT_DEBUG("I'm neither source nor dest");
743       }
744       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
745       break;
746
747     case SIMIX_CANCELED:
748       if (simcall->issuer == synchro->comm.dst_proc)
749         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
750                       "Communication canceled by the sender");
751       else
752         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
753                       "Communication canceled by the receiver");
754       break;
755
756     default:
757       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
758     }
759
760     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
761     if (simcall->issuer->doexception) {
762       if (simcall->call == SIMCALL_COMM_WAITANY) {
763         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
764       }
765       else if (simcall->call == SIMCALL_COMM_TESTANY) {
766         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
767       }
768     }
769
770     if (simcall->issuer->host->isOff()) {
771       simcall->issuer->context->iwannadie = 1;
772     }
773
774     simcall->issuer->waiting_synchro = NULL;
775     xbt_fifo_remove(simcall->issuer->comms, synchro);
776     if(synchro->comm.detached){
777       if(simcall->issuer == synchro->comm.src_proc){
778         if(synchro->comm.dst_proc)
779           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
780       }
781       if(simcall->issuer == synchro->comm.dst_proc){
782         if(synchro->comm.src_proc)
783           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
784       }
785     }
786     SIMIX_simcall_answer(simcall);
787     destroy_count++;
788   }
789
790   while (destroy_count-- > 0)
791     SIMIX_comm_destroy(synchro);
792 }
793
794 /**
795  * \brief This function is called when a Surf communication synchro is finished.
796  * \param synchro the corresponding Simix communication
797  */
798 void SIMIX_post_comm(smx_synchro_t synchro)
799 {
800   /* Update synchro state */
801   if (synchro->comm.src_timeout &&
802       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
803     synchro->state = SIMIX_SRC_TIMEOUT;
804   else if (synchro->comm.dst_timeout &&
805     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
806     synchro->state = SIMIX_DST_TIMEOUT;
807   else if (synchro->comm.src_timeout &&
808     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
809     synchro->state = SIMIX_SRC_HOST_FAILURE;
810   else if (synchro->comm.dst_timeout &&
811       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
812     synchro->state = SIMIX_DST_HOST_FAILURE;
813   else if (synchro->comm.surf_comm &&
814     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
815     XBT_DEBUG("Puta madre. Surf says that the link broke");
816     synchro->state = SIMIX_LINK_FAILURE;
817   } else
818     synchro->state = SIMIX_DONE;
819
820   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
821             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
822
823   /* destroy the surf actions associated with the Simix communication */
824   SIMIX_comm_destroy_internal_actions(synchro);
825
826   /* if there are simcalls associated with the synchro, then answer them */
827   if (xbt_fifo_size(synchro->simcalls)) {
828     SIMIX_comm_finish(synchro);
829   }
830 }
831
832 void SIMIX_comm_cancel(smx_synchro_t synchro)
833 {
834   /* if the synchro is a waiting state means that it is still in a mbox */
835   /* so remove from it and delete it */
836   if (synchro->state == SIMIX_WAITING) {
837     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
838     synchro->state = SIMIX_CANCELED;
839   }
840   else if (!MC_is_active() /* when running the MC there are no surf actions */
841            && !MC_record_replay_is_active()
842            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
843
844     synchro->comm.surf_comm->cancel();
845   }
846 }
847
848 void SIMIX_comm_suspend(smx_synchro_t synchro)
849 {
850   /*FIXME: shall we suspend also the timeout synchro? */
851   if (synchro->comm.surf_comm)
852     synchro->comm.surf_comm->suspend();
853   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
854 }
855
856 void SIMIX_comm_resume(smx_synchro_t synchro)
857 {
858   /*FIXME: check what happen with the timeouts */
859   if (synchro->comm.surf_comm)
860     synchro->comm.surf_comm->resume();
861   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
862 }
863
864
865 /************* synchro Getters **************/
866
867 /**
868  *  \brief get the amount remaining from the communication
869  *  \param synchro The communication
870  */
871 double SIMIX_comm_get_remains(smx_synchro_t synchro)
872 {
873   if(!synchro)
874     return 0;
875
876   double remains;
877   switch (synchro->state) {
878
879   case SIMIX_RUNNING:
880     remains = synchro->comm.surf_comm->getRemains();
881     break;
882
883   case SIMIX_WAITING:
884   case SIMIX_READY:
885     remains = 0; /*FIXME: check what should be returned */
886     break;
887
888   default:
889     remains = 0; /*FIXME: is this correct? */
890     break;
891   }
892   return remains;
893 }
894
895 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
896 {
897   return synchro->state;
898 }
899
900 /**
901  *  \brief Return the user data associated to the sender of the communication
902  *  \param synchro The communication
903  *  \return the user data
904  */
905 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
906 {
907   return synchro->comm.src_data;
908 }
909
910 /**
911  *  \brief Return the user data associated to the receiver of the communication
912  *  \param synchro The communication
913  *  \return the user data
914  */
915 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
916 {
917   return synchro->comm.dst_data;
918 }
919
920 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
921 {
922   return synchro->comm.src_proc;
923 }
924
925 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
926 {
927   return synchro->comm.dst_proc;
928 }
929
930 /******************************************************************************/
931 /*                    SIMIX_comm_copy_data callbacks                       */
932 /******************************************************************************/
933 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
934   &SIMIX_comm_copy_pointer_callback;
935
936 void
937 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
938 {
939   SIMIX_comm_copy_data_callback = callback;
940 }
941
942 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
943 {
944   xbt_assert((buff_size == sizeof(void *)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
945   *(void **) (comm->comm.dst_buff) = buff;
946 }
947
948 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
949 {
950   XBT_DEBUG("Copy the data over");
951   memcpy(comm->comm.dst_buff, buff, buff_size);
952   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
953     xbt_free(buff);
954     comm->comm.src_buff = NULL;
955   }
956 }
957
958
959 /**
960  *  \brief Copy the communication data from the sender's buffer to the receiver's one
961  *  \param comm The communication
962  */
963 void SIMIX_comm_copy_data(smx_synchro_t comm)
964 {
965   size_t buff_size = comm->comm.src_buff_size;
966   /* If there is no data to be copy then return */
967   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
968     return;
969
970   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
971             comm,
972             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
973             comm->comm.src_buff,
974             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
975             comm->comm.dst_buff, buff_size);
976
977   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
978   if (comm->comm.dst_buff_size)
979     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
980
981   /* Update the receiver's buffer size to the copied amount */
982   if (comm->comm.dst_buff_size)
983     *comm->comm.dst_buff_size = buff_size;
984
985   if (buff_size > 0){
986       if(comm->comm.copy_data_fun)
987         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
988       else
989         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
990   }
991
992
993   /* Set the copied flag so we copy data only once */
994   /* (this function might be called from both communication ends) */
995   comm->comm.copied = 1;
996 }