Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use std::deque instead of xbt_fifo within mailboxes
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
9 #include "xbt/log.h"
10 #include "mc/mc.h"
11 #include "src/mc/mc_replay.h"
12 #include "xbt/dict.h"
13 #include "simgrid/s4u/mailbox.hpp"
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
16
17 static void SIMIX_mbox_free(void *data);
18 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_deque_get_filtered(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
31
32 void SIMIX_mailbox_exit(void)
33 {
34   xbt_dict_free(&mailboxes);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
42 {
43   xbt_assert(name, "Mailboxes must have a name");
44   /* two processes may have pushed the same mbox_create simcall at the same time */
45   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
46
47   if (!mbox) {
48     mbox = xbt_new0(s_smx_mailbox_t, 1);
49     mbox->name = xbt_strdup(name);
50     mbox->comm_queue = new std::deque<smx_synchro_t>();
51     mbox->done_comm_queue = nullptr; // Allocated on need only
52     mbox->permanent_receiver=NULL;
53
54     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56   }
57   return mbox;
58 }
59
60 void SIMIX_mbox_free(void *data)
61 {
62   XBT_DEBUG("mbox free %p", data);
63   smx_mailbox_t mbox = (smx_mailbox_t) data;
64   xbt_free(mbox->name);
65   delete mbox->comm_queue;
66   delete mbox->done_comm_queue;
67
68   xbt_free(mbox);
69 }
70
71 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
72 {
73   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
74 }
75
76 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
77 {
78   return mbox->comm_queue->front();
79 }
80
81 /**
82  *  \brief get the receiver (process associated to the mailbox)
83  *  \param mbox The rendez-vous point
84  *  \return process The receiving process (NULL if not set)
85  */
86 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
87 {
88   return mbox->permanent_receiver;
89 }
90
91 /**
92  *  \brief set the receiver of the rendez vous point to allow eager sends
93  *  \param mbox The rendez-vous point
94  *  \param process The receiving process
95  */
96 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
97 {
98   mbox->permanent_receiver=process;
99   if (mbox->done_comm_queue == nullptr)
100     mbox->done_comm_queue = new std::deque<smx_synchro_t>();
101 }
102
103 /**
104  *  \brief Pushes a communication synchro into a rendez-vous point
105  *  \param mbox The mailbox
106  *  \param comm The communication synchro
107  */
108 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
109 {
110   mbox->comm_queue->push_back(comm);
111   comm->comm.mbox = mbox;
112 }
113
114 /**
115  *  \brief Removes a communication synchro from a rendez-vous point
116  *  \param mbox The rendez-vous point
117  *  \param comm The communication synchro
118  */
119 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
120 {
121   for (auto it = mbox->comm_queue->begin(); it != mbox->comm_queue->end(); it++)
122     if (*it == comm) {
123       mbox->comm_queue->erase(it);
124       break;
125     }
126
127   comm->comm.mbox = NULL;
128 }
129
130 /**
131  *  \brief Checks if there is a communication synchro queued in a deque matching our needs
132  *  \param type The type of communication we are looking for (comm_send, comm_recv)
133  *  \return The communication synchro if found, NULL otherwise
134  */
135 static smx_synchro_t
136 SIMIX_deque_get_filtered(std::deque<smx_synchro_t> *deque, e_smx_comm_type_t type,
137                     int (*match_fun)(void *, void *,smx_synchro_t),
138                     void *this_user_data, smx_synchro_t my_synchro)
139 {
140   void* other_user_data = NULL;
141
142   for(auto it = deque->begin(); it != deque->end(); it++){
143     smx_synchro_t synchro = *it;
144     if (synchro->comm.type == SIMIX_COMM_SEND) {
145       other_user_data = synchro->comm.src_data;
146     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
147       other_user_data = synchro->comm.dst_data;
148     }
149     if (synchro->comm.type == type &&
150         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
151         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
152       XBT_DEBUG("Found a matching communication synchro %p", synchro);
153       deque->erase(it);
154       synchro->comm.refcount++;
155 #if HAVE_MC
156       synchro->comm.mbox_cpy = synchro->comm.mbox;
157 #endif
158       synchro->comm.mbox = NULL;
159       return synchro;
160     }
161     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
162               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
163               synchro, (int)synchro->comm.type, (int)type);
164   }
165   XBT_DEBUG("No matching communication synchro found");
166   return NULL;
167 }
168
169
170 /**
171  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
172  *  \param type The type of communication we are looking for (comm_send, comm_recv)
173  *  \return The communication synchro if found, NULL otherwise
174  */
175 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
176                                  int (*match_fun)(void *, void *,smx_synchro_t),
177                                  void *this_user_data, smx_synchro_t my_synchro)
178 {
179   smx_synchro_t synchro;
180   xbt_fifo_item_t item;
181   void* other_user_data = NULL;
182
183   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
184     if (synchro->comm.type == SIMIX_COMM_SEND) {
185       other_user_data = synchro->comm.src_data;
186     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
187       other_user_data = synchro->comm.dst_data;
188     }
189     if (synchro->comm.type == type &&
190         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
191         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
192       XBT_DEBUG("Found a matching communication synchro %p", synchro);
193       synchro->comm.refcount++;
194
195       return synchro;
196     }
197     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
198               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
199               synchro, (int)synchro->comm.type, (int)type);
200   }
201   XBT_DEBUG("No matching communication synchro found");
202   return NULL;
203 }
204 /******************************************************************************/
205 /*                          Communication synchros                            */
206 /******************************************************************************/
207
208 /**
209  *  \brief Creates a new communicate synchro
210  *  \param type The direction of communication (comm_send, comm_recv)
211  *  \return The new communicate synchro
212  */
213 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
214 {
215   smx_synchro_t synchro;
216
217   /* alloc structures */
218   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
219
220   synchro->type = SIMIX_SYNC_COMMUNICATE;
221   synchro->state = SIMIX_WAITING;
222
223   /* set communication */
224   synchro->comm.type = type;
225   synchro->comm.refcount = 1;
226   synchro->comm.src_data=NULL;
227   synchro->comm.dst_data=NULL;
228
229   synchro->category = NULL;
230
231   XBT_DEBUG("Create communicate synchro %p", synchro);
232
233   return synchro;
234 }
235
236 /**
237  *  \brief Destroy a communicate synchro
238  *  \param synchro The communicate synchro to be destroyed
239  */
240 void SIMIX_comm_destroy(smx_synchro_t synchro)
241 {
242   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
243             synchro, synchro->comm.refcount, (int)synchro->state);
244
245   if (synchro->comm.refcount <= 0) {
246     xbt_backtrace_display_current();
247     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
248             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
249   }
250   synchro->comm.refcount--;
251   if (synchro->comm.refcount > 0)
252       return;
253   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
254       synchro->comm.refcount);
255
256   xbt_free(synchro->name);
257   SIMIX_comm_destroy_internal_actions(synchro);
258
259   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
260     /* the communication has failed and was detached:
261      * we have to free the buffer */
262     if (synchro->comm.clean_fun) {
263       synchro->comm.clean_fun(synchro->comm.src_buff);
264     }
265     synchro->comm.src_buff = NULL;
266   }
267
268   if(synchro->comm.mbox)
269     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
270
271   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
272 }
273
274 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
275 {
276   if (synchro->comm.surf_comm){
277     synchro->comm.surf_comm->unref();
278     synchro->comm.surf_comm = NULL;
279   }
280
281   if (synchro->comm.src_timeout){
282     synchro->comm.src_timeout->unref();
283     synchro->comm.src_timeout = NULL;
284   }
285
286   if (synchro->comm.dst_timeout){
287     synchro->comm.dst_timeout->unref();
288     synchro->comm.dst_timeout = NULL;
289   }
290 }
291
292 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
293                                   double task_size, double rate,
294                                   void *src_buff, size_t src_buff_size,
295                                   int (*match_fun)(void *, void *,smx_synchro_t),
296                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
297           void *data, double timeout){
298   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
299                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
300                data, 0);
301   SIMCALL_SET_MC_VALUE(simcall, 0);
302   simcall_HANDLER_comm_wait(simcall, comm, timeout);
303 }
304 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
305                                   double task_size, double rate,
306                                   void *src_buff, size_t src_buff_size,
307                                   int (*match_fun)(void *, void *,smx_synchro_t),
308                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
309                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
310                           void *data, int detached)
311 {
312   XBT_DEBUG("send from %p", mbox);
313
314   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
315   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
316
317   /* Look for communication synchro matching our needs. We also provide a description of
318    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
319    *
320    * If it is not found then push our communication into the rendez-vous point */
321   smx_synchro_t other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
322
323   if (!other_synchro) {
324     other_synchro = this_synchro;
325
326     if (mbox->permanent_receiver!=NULL){
327       //this mailbox is for small messages, which have to be sent right now
328       other_synchro->state = SIMIX_READY;
329       other_synchro->comm.dst_proc=mbox->permanent_receiver;
330       other_synchro->comm.refcount++;
331       mbox->done_comm_queue->push_back(other_synchro);
332       other_synchro->comm.mbox=mbox;
333       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
334
335     }else{
336       SIMIX_mbox_push(mbox, this_synchro);
337     }
338   } else {
339     XBT_DEBUG("Receive already pushed");
340
341     SIMIX_comm_destroy(this_synchro);
342
343     other_synchro->state = SIMIX_READY;
344     other_synchro->comm.type = SIMIX_COMM_READY;
345
346   }
347   xbt_fifo_push(src_proc->comms, other_synchro);
348
349   /* if the communication synchro is detached then decrease the refcount
350    * by one, so it will be eliminated by the receiver's destroy call */
351   if (detached) {
352     other_synchro->comm.detached = 1;
353     other_synchro->comm.refcount--;
354     other_synchro->comm.clean_fun = clean_fun;
355   } else {
356     other_synchro->comm.clean_fun = NULL;
357   }
358
359   /* Setup the communication synchro */
360   other_synchro->comm.src_proc = src_proc;
361   other_synchro->comm.task_size = task_size;
362   other_synchro->comm.rate = rate;
363   other_synchro->comm.src_buff = src_buff;
364   other_synchro->comm.src_buff_size = src_buff_size;
365   other_synchro->comm.src_data = data;
366
367   other_synchro->comm.match_fun = match_fun;
368   other_synchro->comm.copy_data_fun = copy_data_fun;
369
370
371   if (MC_is_active() || MC_record_replay_is_active()) {
372     other_synchro->state = SIMIX_RUNNING;
373     return (detached ? NULL : other_synchro);
374   }
375
376   SIMIX_comm_start(other_synchro);
377   return (detached ? NULL : other_synchro);
378 }
379
380 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
381                          void *dst_buff, size_t *dst_buff_size,
382                          int (*match_fun)(void *, void *, smx_synchro_t),
383                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
384                          void *data, double timeout, double rate)
385 {
386   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
387                            dst_buff_size, match_fun, copy_data_fun, data, rate);
388   SIMCALL_SET_MC_VALUE(simcall, 0);
389   simcall_HANDLER_comm_wait(simcall, comm, timeout);
390 }
391
392 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
393     void *dst_buff, size_t *dst_buff_size,
394     int (*match_fun)(void *, void *, smx_synchro_t),
395     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
396     void *data, double rate)
397 {
398   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
399 }
400
401 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
402     int (*match_fun)(void *, void *, smx_synchro_t),
403     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
404     void *data, double rate)
405 {
406   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_queue);
407   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
408
409   smx_synchro_t other_synchro;
410   //communication already done, get it inside the fifo of completed comms
411   if (mbox->permanent_receiver && ! mbox->done_comm_queue->empty()) {
412
413     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
414     //find a match in the already received fifo
415     other_synchro = SIMIX_deque_get_filtered(mbox->done_comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
416     //if not found, assume the receiver came first, register it to the mailbox in the classical way
417     if (!other_synchro)  {
418       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
419       other_synchro = this_synchro;
420       SIMIX_mbox_push(mbox, this_synchro);
421     } else {
422       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
423         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
424         other_synchro->state = SIMIX_DONE;
425         other_synchro->comm.type = SIMIX_COMM_DONE;
426         other_synchro->comm.mbox = NULL;
427       }
428       other_synchro->comm.refcount--;
429       SIMIX_comm_destroy(this_synchro);
430     }
431   } else {
432     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
433
434     /* Look for communication synchro matching our needs. We also provide a description of
435      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
436      *
437      * If it is not found then push our communication into the rendez-vous point */
438     other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, SIMIX_COMM_SEND, match_fun, data, this_synchro);
439
440     if (!other_synchro) {
441       XBT_DEBUG("Receive pushed first %lu", mbox->comm_queue->size());
442       other_synchro = this_synchro;
443       SIMIX_mbox_push(mbox, this_synchro);
444     } else {
445       SIMIX_comm_destroy(this_synchro);
446       other_synchro->state = SIMIX_READY;
447       other_synchro->comm.type = SIMIX_COMM_READY;
448       //other_synchro->comm.refcount--;
449     }
450     xbt_fifo_push(dst_proc->comms, other_synchro);
451   }
452
453   /* Setup communication synchro */
454   other_synchro->comm.dst_proc = dst_proc;
455   other_synchro->comm.dst_buff = dst_buff;
456   other_synchro->comm.dst_buff_size = dst_buff_size;
457   other_synchro->comm.dst_data = data;
458
459   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
460     other_synchro->comm.rate = rate;
461
462   other_synchro->comm.match_fun = match_fun;
463   other_synchro->comm.copy_data_fun = copy_data_fun;
464
465   if (MC_is_active() || MC_record_replay_is_active()) {
466     other_synchro->state = SIMIX_RUNNING;
467     return other_synchro;
468   }
469
470   SIMIX_comm_start(other_synchro);
471   return other_synchro;
472 }
473
474 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
475                                    int type, int src, int tag,
476                                    int (*match_fun)(void *, void *, smx_synchro_t),
477                                    void *data){
478   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
479 }
480
481 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
482                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
483 {
484   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_queue);
485   smx_synchro_t this_synchro;
486   int smx_type;
487   if(type == 1){
488     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
489     smx_type = SIMIX_COMM_RECEIVE;
490   } else{
491     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
492     smx_type = SIMIX_COMM_SEND;
493   } 
494   smx_synchro_t other_synchro=NULL;
495   if(mbox->permanent_receiver && ! mbox->done_comm_queue->empty()){
496     //find a match in the already received fifo
497       XBT_DEBUG("first check in the perm recv mailbox");
498
499     other_synchro = SIMIX_deque_get_filtered(mbox->done_comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
500   }
501  // }else{
502     if (!other_synchro){
503         XBT_DEBUG("try in the normal mailbox");
504         other_synchro = SIMIX_deque_get_filtered(mbox->comm_queue, (e_smx_comm_type_t) smx_type, match_fun, data, this_synchro);
505     }
506 //  }
507   if(other_synchro)other_synchro->comm.refcount--;
508
509   SIMIX_comm_destroy(this_synchro);
510   return other_synchro;
511 }
512
513 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
514 {
515   /* the simcall may be a wait, a send or a recv */
516   surf_action_t sleep;
517
518   /* Associate this simcall to the wait synchro */
519   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
520
521   xbt_fifo_push(synchro->simcalls, simcall);
522   simcall->issuer->waiting_synchro = synchro;
523
524   if (MC_is_active() || MC_record_replay_is_active()) {
525     int idx = SIMCALL_GET_MC_VALUE(simcall);
526     if (idx == 0) {
527       synchro->state = SIMIX_DONE;
528     } else {
529       /* If we reached this point, the wait simcall must have a timeout */
530       /* Otherwise it shouldn't be enabled and executed by the MC */
531       if (timeout == -1)
532         THROW_IMPOSSIBLE;
533
534       if (synchro->comm.src_proc == simcall->issuer)
535         synchro->state = SIMIX_SRC_TIMEOUT;
536       else
537         synchro->state = SIMIX_DST_TIMEOUT;
538     }
539
540     SIMIX_comm_finish(synchro);
541     return;
542   }
543
544   /* If the synchro has already finish perform the error handling, */
545   /* otherwise set up a waiting timeout on the right side          */
546   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
547     SIMIX_comm_finish(synchro);
548   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
549     sleep = surf_host_sleep(simcall->issuer->host, timeout);
550     sleep->setData(synchro);
551
552     if (simcall->issuer == synchro->comm.src_proc)
553       synchro->comm.src_timeout = sleep;
554     else
555       synchro->comm.dst_timeout = sleep;
556   }
557 }
558
559 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
560 {
561   if(MC_is_active() || MC_record_replay_is_active()){
562     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
563     if(simcall_comm_test__get__result(simcall)){
564       synchro->state = SIMIX_DONE;
565       xbt_fifo_push(synchro->simcalls, simcall);
566       SIMIX_comm_finish(synchro);
567     }else{
568       SIMIX_simcall_answer(simcall);
569     }
570     return;
571   }
572
573   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
574   if (simcall_comm_test__get__result(simcall)) {
575     xbt_fifo_push(synchro->simcalls, simcall);
576     SIMIX_comm_finish(synchro);
577   } else {
578     SIMIX_simcall_answer(simcall);
579   }
580 }
581
582 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
583 {
584   unsigned int cursor;
585   smx_synchro_t synchro;
586   simcall_comm_testany__set__result(simcall, -1);
587
588   if (MC_is_active() || MC_record_replay_is_active()){
589     int idx = SIMCALL_GET_MC_VALUE(simcall);
590     if(idx == -1){
591       SIMIX_simcall_answer(simcall);
592     }else{
593       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
594       simcall_comm_testany__set__result(simcall, idx);
595       xbt_fifo_push(synchro->simcalls, simcall);
596       synchro->state = SIMIX_DONE;
597       SIMIX_comm_finish(synchro);
598     }
599     return;
600   }
601
602   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
603     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
604       simcall_comm_testany__set__result(simcall, cursor);
605       xbt_fifo_push(synchro->simcalls, simcall);
606       SIMIX_comm_finish(synchro);
607       return;
608     }
609   }
610   SIMIX_simcall_answer(simcall);
611 }
612
613 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
614 {
615   smx_synchro_t synchro;
616   unsigned int cursor = 0;
617
618   if (MC_is_active() || MC_record_replay_is_active()){
619     int idx = SIMCALL_GET_MC_VALUE(simcall);
620     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
621     xbt_fifo_push(synchro->simcalls, simcall);
622     simcall_comm_waitany__set__result(simcall, idx);
623     synchro->state = SIMIX_DONE;
624     SIMIX_comm_finish(synchro);
625     return;
626   }
627
628   xbt_dynar_foreach(synchros, cursor, synchro){
629     /* associate this simcall to the the synchro */
630     xbt_fifo_push(synchro->simcalls, simcall);
631
632     /* see if the synchro is already finished */
633     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
634       SIMIX_comm_finish(synchro);
635       break;
636     }
637   }
638 }
639
640 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
641 {
642   smx_synchro_t synchro;
643   unsigned int cursor = 0;
644   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
645
646   xbt_dynar_foreach(synchros, cursor, synchro) {
647     xbt_fifo_remove(synchro->simcalls, simcall);
648   }
649 }
650
651 /**
652  *  \brief Starts the simulation of a communication synchro.
653  *  \param synchro the communication synchro
654  */
655 static inline void SIMIX_comm_start(smx_synchro_t synchro)
656 {
657   /* If both the sender and the receiver are already there, start the communication */
658   if (synchro->state == SIMIX_READY) {
659
660     sg_host_t sender = synchro->comm.src_proc->host;
661     sg_host_t receiver = synchro->comm.dst_proc->host;
662
663     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
664               sg_host_get_name(sender), sg_host_get_name(receiver));
665
666     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
667                                                             sender, receiver,
668                                                             synchro->comm.task_size, synchro->comm.rate);
669
670     synchro->comm.surf_comm->setData(synchro);
671
672     synchro->state = SIMIX_RUNNING;
673
674     /* If a link is failed, detect it immediately */
675     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
676       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
677                 sg_host_get_name(sender), sg_host_get_name(receiver));
678       synchro->state = SIMIX_LINK_FAILURE;
679       SIMIX_comm_destroy_internal_actions(synchro);
680     }
681
682     /* If any of the process is suspend, create the synchro but stop its execution,
683        it will be restarted when the sender process resume */
684     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
685         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
686       /* FIXME: check what should happen with the synchro state */
687
688       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
689         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
690                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
691       else
692         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
693                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
694
695       synchro->comm.surf_comm->suspend();
696
697     }
698   }
699 }
700
701 /**
702  * \brief Answers the SIMIX simcalls associated to a communication synchro.
703  * \param synchro a finished communication synchro
704  */
705 void SIMIX_comm_finish(smx_synchro_t synchro)
706 {
707   unsigned int destroy_count = 0;
708   smx_simcall_t simcall;
709
710   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
711
712     /* If a waitany simcall is waiting for this synchro to finish, then remove
713        it from the other synchros in the waitany list. Afterwards, get the
714        position of the actual synchro in the waitany dynar and
715        return it as the result of the simcall */
716
717     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
718       continue; // if process handling comm is killed
719     if (simcall->call == SIMCALL_COMM_WAITANY) {
720       SIMIX_waitany_remove_simcall_from_actions(simcall);
721       if (!MC_is_active() && !MC_record_replay_is_active())
722         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
723     }
724
725     /* If the synchro is still in a rendez-vous point then remove from it */
726     if (synchro->comm.mbox)
727       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
728
729     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
730
731     /* Check out for errors */
732
733     if (simcall->issuer->host->isOff()) {
734       simcall->issuer->context->iwannadie = 1;
735       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
736     } else
737
738     switch (synchro->state) {
739
740     case SIMIX_DONE:
741       XBT_DEBUG("Communication %p complete!", synchro);
742       SIMIX_comm_copy_data(synchro);
743       break;
744
745     case SIMIX_SRC_TIMEOUT:
746       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
747                     "Communication timeouted because of sender");
748       break;
749
750     case SIMIX_DST_TIMEOUT:
751       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
752                     "Communication timeouted because of receiver");
753       break;
754
755     case SIMIX_SRC_HOST_FAILURE:
756       if (simcall->issuer == synchro->comm.src_proc)
757         simcall->issuer->context->iwannadie = 1;
758 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
759       else
760         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
761       break;
762
763     case SIMIX_DST_HOST_FAILURE:
764       if (simcall->issuer == synchro->comm.dst_proc)
765         simcall->issuer->context->iwannadie = 1;
766 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
767       else
768         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
769       break;
770
771     case SIMIX_LINK_FAILURE:
772
773       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
774                 synchro,
775                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
776                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
777                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
778       if (synchro->comm.src_proc == simcall->issuer) {
779         XBT_DEBUG("I'm source");
780       } else if (synchro->comm.dst_proc == simcall->issuer) {
781         XBT_DEBUG("I'm dest");
782       } else {
783         XBT_DEBUG("I'm neither source nor dest");
784       }
785       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
786       break;
787
788     case SIMIX_CANCELED:
789       if (simcall->issuer == synchro->comm.dst_proc)
790         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
791                       "Communication canceled by the sender");
792       else
793         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
794                       "Communication canceled by the receiver");
795       break;
796
797     default:
798       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
799     }
800
801     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
802     if (simcall->issuer->doexception) {
803       if (simcall->call == SIMCALL_COMM_WAITANY) {
804         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
805       }
806       else if (simcall->call == SIMCALL_COMM_TESTANY) {
807         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
808       }
809     }
810
811     if (simcall->issuer->host->isOff()) {
812       simcall->issuer->context->iwannadie = 1;
813     }
814
815     simcall->issuer->waiting_synchro = NULL;
816     xbt_fifo_remove(simcall->issuer->comms, synchro);
817     if(synchro->comm.detached){
818       if(simcall->issuer == synchro->comm.src_proc){
819         if(synchro->comm.dst_proc)
820           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
821       }
822       if(simcall->issuer == synchro->comm.dst_proc){
823         if(synchro->comm.src_proc)
824           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
825       }
826     }
827     SIMIX_simcall_answer(simcall);
828     destroy_count++;
829   }
830
831   while (destroy_count-- > 0)
832     SIMIX_comm_destroy(synchro);
833 }
834
835 /**
836  * \brief This function is called when a Surf communication synchro is finished.
837  * \param synchro the corresponding Simix communication
838  */
839 void SIMIX_post_comm(smx_synchro_t synchro)
840 {
841   /* Update synchro state */
842   if (synchro->comm.src_timeout &&
843       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
844     synchro->state = SIMIX_SRC_TIMEOUT;
845   else if (synchro->comm.dst_timeout &&
846     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
847     synchro->state = SIMIX_DST_TIMEOUT;
848   else if (synchro->comm.src_timeout &&
849     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
850     synchro->state = SIMIX_SRC_HOST_FAILURE;
851   else if (synchro->comm.dst_timeout &&
852       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
853     synchro->state = SIMIX_DST_HOST_FAILURE;
854   else if (synchro->comm.surf_comm &&
855     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
856     XBT_DEBUG("Puta madre. Surf says that the link broke");
857     synchro->state = SIMIX_LINK_FAILURE;
858   } else
859     synchro->state = SIMIX_DONE;
860
861   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
862             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
863
864   /* destroy the surf actions associated with the Simix communication */
865   SIMIX_comm_destroy_internal_actions(synchro);
866
867   /* if there are simcalls associated with the synchro, then answer them */
868   if (xbt_fifo_size(synchro->simcalls)) {
869     SIMIX_comm_finish(synchro);
870   }
871 }
872
873 void SIMIX_comm_cancel(smx_synchro_t synchro)
874 {
875   /* if the synchro is a waiting state means that it is still in a mbox */
876   /* so remove from it and delete it */
877   if (synchro->state == SIMIX_WAITING) {
878     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
879     synchro->state = SIMIX_CANCELED;
880   }
881   else if (!MC_is_active() /* when running the MC there are no surf actions */
882            && !MC_record_replay_is_active()
883            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
884
885     synchro->comm.surf_comm->cancel();
886   }
887 }
888
889 void SIMIX_comm_suspend(smx_synchro_t synchro)
890 {
891   /*FIXME: shall we suspend also the timeout synchro? */
892   if (synchro->comm.surf_comm)
893     synchro->comm.surf_comm->suspend();
894   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
895 }
896
897 void SIMIX_comm_resume(smx_synchro_t synchro)
898 {
899   /*FIXME: check what happen with the timeouts */
900   if (synchro->comm.surf_comm)
901     synchro->comm.surf_comm->resume();
902   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
903 }
904
905
906 /************* synchro Getters **************/
907
908 /**
909  *  \brief get the amount remaining from the communication
910  *  \param synchro The communication
911  */
912 double SIMIX_comm_get_remains(smx_synchro_t synchro)
913 {
914   double remains;
915
916   if(!synchro){
917     return 0;
918   }
919
920   switch (synchro->state) {
921
922   case SIMIX_RUNNING:
923     remains = synchro->comm.surf_comm->getRemains();
924     break;
925
926   case SIMIX_WAITING:
927   case SIMIX_READY:
928     remains = 0; /*FIXME: check what should be returned */
929     break;
930
931   default:
932     remains = 0; /*FIXME: is this correct? */
933     break;
934   }
935   return remains;
936 }
937
938 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
939 {
940   return synchro->state;
941 }
942
943 /**
944  *  \brief Return the user data associated to the sender of the communication
945  *  \param synchro The communication
946  *  \return the user data
947  */
948 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
949 {
950   return synchro->comm.src_data;
951 }
952
953 /**
954  *  \brief Return the user data associated to the receiver of the communication
955  *  \param synchro The communication
956  *  \return the user data
957  */
958 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
959 {
960   return synchro->comm.dst_data;
961 }
962
963 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
964 {
965   return synchro->comm.src_proc;
966 }
967
968 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
969 {
970   return synchro->comm.dst_proc;
971 }
972
973 /******************************************************************************/
974 /*                    SIMIX_comm_copy_data callbacks                       */
975 /******************************************************************************/
976 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
977   &SIMIX_comm_copy_pointer_callback;
978
979 void
980 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
981 {
982   SIMIX_comm_copy_data_callback = callback;
983 }
984
985 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
986 {
987   xbt_assert((buff_size == sizeof(void *)),
988              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
989   *(void **) (comm->comm.dst_buff) = buff;
990 }
991
992 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
993 {
994   XBT_DEBUG("Copy the data over");
995   memcpy(comm->comm.dst_buff, buff, buff_size);
996   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
997     xbt_free(buff);
998     comm->comm.src_buff = NULL;
999   }
1000 }
1001
1002
1003 /**
1004  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1005  *  \param comm The communication
1006  */
1007 void SIMIX_comm_copy_data(smx_synchro_t comm)
1008 {
1009   size_t buff_size = comm->comm.src_buff_size;
1010   /* If there is no data to be copy then return */
1011   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1012     return;
1013
1014   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1015             comm,
1016             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1017             comm->comm.src_buff,
1018             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1019             comm->comm.dst_buff, buff_size);
1020
1021   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1022   if (comm->comm.dst_buff_size)
1023     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1024
1025   /* Update the receiver's buffer size to the copied amount */
1026   if (comm->comm.dst_buff_size)
1027     *comm->comm.dst_buff_size = buff_size;
1028
1029   if (buff_size > 0){
1030       if(comm->comm.copy_data_fun)
1031         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1032       else
1033         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1034   }
1035
1036
1037   /* Set the copied flag so we copy data only once */
1038   /* (this function might be called from both communication ends) */
1039   comm->comm.copied = 1;
1040 }