Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
prune some useless code in mailboxes
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
9 #include "xbt/log.h"
10 #include "mc/mc.h"
11 #include "src/mc/mc_replay.h"
12 #include "xbt/dict.h"
13 #include "simgrid/s4u/mailbox.hpp"
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
16
17 static void SIMIX_mbox_free(void *data);
18 static xbt_dict_t mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
31
32 void SIMIX_mailbox_exit(void)
33 {
34   xbt_dict_free(&mailboxes);
35 }
36
37 /******************************************************************************/
38 /*                           Rendez-Vous Points                               */
39 /******************************************************************************/
40
41 smx_mailbox_t SIMIX_mbox_create(const char *name)
42 {
43   xbt_assert(name, "Mailboxes must have a name");
44   /* two processes may have pushed the same mbox_create simcall at the same time */
45   smx_mailbox_t mbox = (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
46
47   if (!mbox) {
48     mbox = xbt_new0(s_smx_mailbox_t, 1);
49     mbox->name = xbt_strdup(name);
50     mbox->comm_fifo = xbt_fifo_new();
51     mbox->done_comm_fifo = xbt_fifo_new();
52     mbox->permanent_receiver=NULL;
53
54     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
55     xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
56   }
57   return mbox;
58 }
59
60 void SIMIX_mbox_free(void *data)
61 {
62   XBT_DEBUG("mbox free %p", data);
63   smx_mailbox_t mbox = (smx_mailbox_t) data;
64   xbt_free(mbox->name);
65   xbt_fifo_free(mbox->comm_fifo);
66   xbt_fifo_free(mbox->done_comm_fifo);
67
68   xbt_free(mbox);
69 }
70
71 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
72 {
73   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
74 }
75
76 int SIMIX_mbox_comm_count_by_host(smx_mailbox_t mbox, sg_host_t host)
77 {
78   smx_synchro_t comm = NULL;
79   xbt_fifo_item_t item = NULL;
80   int count = 0;
81
82   xbt_fifo_foreach(mbox->comm_fifo, item, comm, smx_synchro_t) {
83     if (comm->comm.src_proc->host == host)
84       count++;
85   }
86
87   return count;
88 }
89
90 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
91 {
92   return (smx_synchro_t) xbt_fifo_get_item_content(
93     xbt_fifo_get_first_item(mbox->comm_fifo));
94 }
95
96 /**
97  *  \brief get the receiver (process associated to the mailbox)
98  *  \param mbox The rendez-vous point
99  *  \return process The receiving process (NULL if not set)
100  */
101 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
102 {
103   return mbox->permanent_receiver;
104 }
105
106 /**
107  *  \brief set the receiver of the rendez vous point to allow eager sends
108  *  \param mbox The rendez-vous point
109  *  \param process The receiving process
110  */
111 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
112 {
113   mbox->permanent_receiver=process;
114 }
115
116 /**
117  *  \brief Pushes a communication synchro into a rendez-vous point
118  *  \param mbox The mailbox
119  *  \param comm The communication synchro
120  */
121 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
122 {
123   xbt_fifo_push(mbox->comm_fifo, comm);
124   comm->comm.mbox = mbox;
125 }
126
127 /**
128  *  \brief Removes a communication synchro from a rendez-vous point
129  *  \param mbox The rendez-vous point
130  *  \param comm The communication synchro
131  */
132 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
133 {
134   xbt_fifo_remove(mbox->comm_fifo, comm);
135   comm->comm.mbox = NULL;
136 }
137
138 /**
139  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
140  *  \param type The type of communication we are looking for (comm_send, comm_recv)
141  *  \return The communication synchro if found, NULL otherwise
142  */
143 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
144                                  int (*match_fun)(void *, void *,smx_synchro_t),
145                                  void *this_user_data, smx_synchro_t my_synchro)
146 {
147   smx_synchro_t synchro;
148   xbt_fifo_item_t item;
149   void* other_user_data = NULL;
150
151   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
152     if (synchro->comm.type == SIMIX_COMM_SEND) {
153       other_user_data = synchro->comm.src_data;
154     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
155       other_user_data = synchro->comm.dst_data;
156     }
157     if (synchro->comm.type == type &&
158         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
159         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
160       XBT_DEBUG("Found a matching communication synchro %p", synchro);
161       xbt_fifo_remove_item(fifo, item);
162       xbt_fifo_free_item(item);
163       synchro->comm.refcount++;
164 #if HAVE_MC
165       synchro->comm.mbox_cpy = synchro->comm.mbox;
166 #endif
167       synchro->comm.mbox = NULL;
168       return synchro;
169     }
170     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
171               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
172               synchro, (int)synchro->comm.type, (int)type);
173   }
174   XBT_DEBUG("No matching communication synchro found");
175   return NULL;
176 }
177
178
179 /**
180  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
181  *  \param type The type of communication we are looking for (comm_send, comm_recv)
182  *  \return The communication synchro if found, NULL otherwise
183  */
184 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
185                                  int (*match_fun)(void *, void *,smx_synchro_t),
186                                  void *this_user_data, smx_synchro_t my_synchro)
187 {
188   smx_synchro_t synchro;
189   xbt_fifo_item_t item;
190   void* other_user_data = NULL;
191
192   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
193     if (synchro->comm.type == SIMIX_COMM_SEND) {
194       other_user_data = synchro->comm.src_data;
195     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
196       other_user_data = synchro->comm.dst_data;
197     }
198     if (synchro->comm.type == type &&
199         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
200         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
201       XBT_DEBUG("Found a matching communication synchro %p", synchro);
202       synchro->comm.refcount++;
203
204       return synchro;
205     }
206     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
207               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
208               synchro, (int)synchro->comm.type, (int)type);
209   }
210   XBT_DEBUG("No matching communication synchro found");
211   return NULL;
212 }
213 /******************************************************************************/
214 /*                          Communication synchros                            */
215 /******************************************************************************/
216
217 /**
218  *  \brief Creates a new communicate synchro
219  *  \param type The direction of communication (comm_send, comm_recv)
220  *  \return The new communicate synchro
221  */
222 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
223 {
224   smx_synchro_t synchro;
225
226   /* alloc structures */
227   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
228
229   synchro->type = SIMIX_SYNC_COMMUNICATE;
230   synchro->state = SIMIX_WAITING;
231
232   /* set communication */
233   synchro->comm.type = type;
234   synchro->comm.refcount = 1;
235   synchro->comm.src_data=NULL;
236   synchro->comm.dst_data=NULL;
237
238   synchro->category = NULL;
239
240   XBT_DEBUG("Create communicate synchro %p", synchro);
241
242   return synchro;
243 }
244
245 /**
246  *  \brief Destroy a communicate synchro
247  *  \param synchro The communicate synchro to be destroyed
248  */
249 void SIMIX_comm_destroy(smx_synchro_t synchro)
250 {
251   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
252             synchro, synchro->comm.refcount, (int)synchro->state);
253
254   if (synchro->comm.refcount <= 0) {
255     xbt_backtrace_display_current();
256     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
257             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
258   }
259   synchro->comm.refcount--;
260   if (synchro->comm.refcount > 0)
261       return;
262   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
263       synchro->comm.refcount);
264
265   xbt_free(synchro->name);
266   SIMIX_comm_destroy_internal_actions(synchro);
267
268   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
269     /* the communication has failed and was detached:
270      * we have to free the buffer */
271     if (synchro->comm.clean_fun) {
272       synchro->comm.clean_fun(synchro->comm.src_buff);
273     }
274     synchro->comm.src_buff = NULL;
275   }
276
277   if(synchro->comm.mbox)
278     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
279
280   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
281 }
282
283 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
284 {
285   if (synchro->comm.surf_comm){
286     synchro->comm.surf_comm->unref();
287     synchro->comm.surf_comm = NULL;
288   }
289
290   if (synchro->comm.src_timeout){
291     synchro->comm.src_timeout->unref();
292     synchro->comm.src_timeout = NULL;
293   }
294
295   if (synchro->comm.dst_timeout){
296     synchro->comm.dst_timeout->unref();
297     synchro->comm.dst_timeout = NULL;
298   }
299 }
300
301 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
302                                   double task_size, double rate,
303                                   void *src_buff, size_t src_buff_size,
304                                   int (*match_fun)(void *, void *,smx_synchro_t),
305                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
306           void *data, double timeout){
307   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
308                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
309                data, 0);
310   SIMCALL_SET_MC_VALUE(simcall, 0);
311   simcall_HANDLER_comm_wait(simcall, comm, timeout);
312 }
313 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
314                                   double task_size, double rate,
315                                   void *src_buff, size_t src_buff_size,
316                                   int (*match_fun)(void *, void *,smx_synchro_t),
317                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
318                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
319                           void *data, int detached)
320 {
321   XBT_DEBUG("send from %p", mbox);
322
323   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
324   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
325
326   /* Look for communication synchro matching our needs. We also provide a description of
327    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
328    *
329    * If it is not found then push our communication into the rendez-vous point */
330   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
331
332   if (!other_synchro) {
333     other_synchro = this_synchro;
334
335     if (mbox->permanent_receiver!=NULL){
336       //this mailbox is for small messages, which have to be sent right now
337       other_synchro->state = SIMIX_READY;
338       other_synchro->comm.dst_proc=mbox->permanent_receiver;
339       other_synchro->comm.refcount++;
340       xbt_fifo_push(mbox->done_comm_fifo,other_synchro);
341       other_synchro->comm.mbox=mbox;
342       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
343
344     }else{
345       SIMIX_mbox_push(mbox, this_synchro);
346     }
347   } else {
348     XBT_DEBUG("Receive already pushed");
349
350     SIMIX_comm_destroy(this_synchro);
351
352     other_synchro->state = SIMIX_READY;
353     other_synchro->comm.type = SIMIX_COMM_READY;
354
355   }
356   xbt_fifo_push(src_proc->comms, other_synchro);
357
358   /* if the communication synchro is detached then decrease the refcount
359    * by one, so it will be eliminated by the receiver's destroy call */
360   if (detached) {
361     other_synchro->comm.detached = 1;
362     other_synchro->comm.refcount--;
363     other_synchro->comm.clean_fun = clean_fun;
364   } else {
365     other_synchro->comm.clean_fun = NULL;
366   }
367
368   /* Setup the communication synchro */
369   other_synchro->comm.src_proc = src_proc;
370   other_synchro->comm.task_size = task_size;
371   other_synchro->comm.rate = rate;
372   other_synchro->comm.src_buff = src_buff;
373   other_synchro->comm.src_buff_size = src_buff_size;
374   other_synchro->comm.src_data = data;
375
376   other_synchro->comm.match_fun = match_fun;
377   other_synchro->comm.copy_data_fun = copy_data_fun;
378
379
380   if (MC_is_active() || MC_record_replay_is_active()) {
381     other_synchro->state = SIMIX_RUNNING;
382     return (detached ? NULL : other_synchro);
383   }
384
385   SIMIX_comm_start(other_synchro);
386   return (detached ? NULL : other_synchro);
387 }
388
389 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
390                          void *dst_buff, size_t *dst_buff_size,
391                          int (*match_fun)(void *, void *, smx_synchro_t),
392                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
393                          void *data, double timeout, double rate)
394 {
395   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
396                            dst_buff_size, match_fun, copy_data_fun, data, rate);
397   SIMCALL_SET_MC_VALUE(simcall, 0);
398   simcall_HANDLER_comm_wait(simcall, comm, timeout);
399 }
400
401 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
402     void *dst_buff, size_t *dst_buff_size,
403     int (*match_fun)(void *, void *, smx_synchro_t),
404     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
405     void *data, double rate)
406 {
407   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
408 }
409
410 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
411     int (*match_fun)(void *, void *, smx_synchro_t),
412     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
413     void *data, double rate)
414 {
415   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_fifo);
416   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
417
418   smx_synchro_t other_synchro;
419   //communication already done, get it inside the fifo of completed comms
420   if (mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0) {
421
422     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
423     //find a match in the already received fifo
424     other_synchro = SIMIX_fifo_get_comm(mbox->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
425     //if not found, assume the receiver came first, register it to the mailbox in the classical way
426     if (!other_synchro)  {
427       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
428       other_synchro = this_synchro;
429       SIMIX_mbox_push(mbox, this_synchro);
430     } else {
431       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
432         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
433         other_synchro->state = SIMIX_DONE;
434         other_synchro->comm.type = SIMIX_COMM_DONE;
435         other_synchro->comm.mbox = NULL;
436       }
437       other_synchro->comm.refcount--;
438       SIMIX_comm_destroy(this_synchro);
439     }
440   } else {
441     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
442
443     /* Look for communication synchro matching our needs. We also provide a description of
444      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
445      *
446      * If it is not found then push our communication into the rendez-vous point */
447     other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
448
449     if (!other_synchro) {
450       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(mbox->comm_fifo));
451       other_synchro = this_synchro;
452       SIMIX_mbox_push(mbox, this_synchro);
453     } else {
454       SIMIX_comm_destroy(this_synchro);
455       other_synchro->state = SIMIX_READY;
456       other_synchro->comm.type = SIMIX_COMM_READY;
457       //other_synchro->comm.refcount--;
458     }
459     xbt_fifo_push(dst_proc->comms, other_synchro);
460   }
461
462   /* Setup communication synchro */
463   other_synchro->comm.dst_proc = dst_proc;
464   other_synchro->comm.dst_buff = dst_buff;
465   other_synchro->comm.dst_buff_size = dst_buff_size;
466   other_synchro->comm.dst_data = data;
467
468   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
469     other_synchro->comm.rate = rate;
470
471   other_synchro->comm.match_fun = match_fun;
472   other_synchro->comm.copy_data_fun = copy_data_fun;
473
474   if (MC_is_active() || MC_record_replay_is_active()) {
475     other_synchro->state = SIMIX_RUNNING;
476     return other_synchro;
477   }
478
479   SIMIX_comm_start(other_synchro);
480   return other_synchro;
481 }
482
483 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
484                                    int type, int src, int tag,
485                                    int (*match_fun)(void *, void *, smx_synchro_t),
486                                    void *data){
487   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
488 }
489
490 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
491                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
492 {
493   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_fifo);
494   smx_synchro_t this_synchro;
495   int smx_type;
496   if(type == 1){
497     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
498     smx_type = SIMIX_COMM_RECEIVE;
499   } else{
500     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
501     smx_type = SIMIX_COMM_SEND;
502   } 
503   smx_synchro_t other_synchro=NULL;
504   if(mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0){
505     //find a match in the already received fifo
506       XBT_DEBUG("first try in the perm recv mailbox");
507
508     other_synchro = SIMIX_fifo_probe_comm(
509       mbox->done_comm_fifo, (e_smx_comm_type_t) smx_type,
510       match_fun, data, this_synchro);
511   }
512  // }else{
513     if(!other_synchro){
514         XBT_DEBUG("try in the normal mailbox");
515         other_synchro = SIMIX_fifo_probe_comm(
516           mbox->comm_fifo, (e_smx_comm_type_t) smx_type,
517           match_fun, data, this_synchro);
518     }
519 //  }
520   if(other_synchro)other_synchro->comm.refcount--;
521
522   SIMIX_comm_destroy(this_synchro);
523   return other_synchro;
524 }
525
526 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
527 {
528   /* the simcall may be a wait, a send or a recv */
529   surf_action_t sleep;
530
531   /* Associate this simcall to the wait synchro */
532   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
533
534   xbt_fifo_push(synchro->simcalls, simcall);
535   simcall->issuer->waiting_synchro = synchro;
536
537   if (MC_is_active() || MC_record_replay_is_active()) {
538     int idx = SIMCALL_GET_MC_VALUE(simcall);
539     if (idx == 0) {
540       synchro->state = SIMIX_DONE;
541     } else {
542       /* If we reached this point, the wait simcall must have a timeout */
543       /* Otherwise it shouldn't be enabled and executed by the MC */
544       if (timeout == -1)
545         THROW_IMPOSSIBLE;
546
547       if (synchro->comm.src_proc == simcall->issuer)
548         synchro->state = SIMIX_SRC_TIMEOUT;
549       else
550         synchro->state = SIMIX_DST_TIMEOUT;
551     }
552
553     SIMIX_comm_finish(synchro);
554     return;
555   }
556
557   /* If the synchro has already finish perform the error handling, */
558   /* otherwise set up a waiting timeout on the right side          */
559   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
560     SIMIX_comm_finish(synchro);
561   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
562     sleep = surf_host_sleep(simcall->issuer->host, timeout);
563     sleep->setData(synchro);
564
565     if (simcall->issuer == synchro->comm.src_proc)
566       synchro->comm.src_timeout = sleep;
567     else
568       synchro->comm.dst_timeout = sleep;
569   }
570 }
571
572 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
573 {
574   if(MC_is_active() || MC_record_replay_is_active()){
575     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
576     if(simcall_comm_test__get__result(simcall)){
577       synchro->state = SIMIX_DONE;
578       xbt_fifo_push(synchro->simcalls, simcall);
579       SIMIX_comm_finish(synchro);
580     }else{
581       SIMIX_simcall_answer(simcall);
582     }
583     return;
584   }
585
586   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
587   if (simcall_comm_test__get__result(simcall)) {
588     xbt_fifo_push(synchro->simcalls, simcall);
589     SIMIX_comm_finish(synchro);
590   } else {
591     SIMIX_simcall_answer(simcall);
592   }
593 }
594
595 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
596 {
597   unsigned int cursor;
598   smx_synchro_t synchro;
599   simcall_comm_testany__set__result(simcall, -1);
600
601   if (MC_is_active() || MC_record_replay_is_active()){
602     int idx = SIMCALL_GET_MC_VALUE(simcall);
603     if(idx == -1){
604       SIMIX_simcall_answer(simcall);
605     }else{
606       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
607       simcall_comm_testany__set__result(simcall, idx);
608       xbt_fifo_push(synchro->simcalls, simcall);
609       synchro->state = SIMIX_DONE;
610       SIMIX_comm_finish(synchro);
611     }
612     return;
613   }
614
615   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
616     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
617       simcall_comm_testany__set__result(simcall, cursor);
618       xbt_fifo_push(synchro->simcalls, simcall);
619       SIMIX_comm_finish(synchro);
620       return;
621     }
622   }
623   SIMIX_simcall_answer(simcall);
624 }
625
626 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
627 {
628   smx_synchro_t synchro;
629   unsigned int cursor = 0;
630
631   if (MC_is_active() || MC_record_replay_is_active()){
632     int idx = SIMCALL_GET_MC_VALUE(simcall);
633     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
634     xbt_fifo_push(synchro->simcalls, simcall);
635     simcall_comm_waitany__set__result(simcall, idx);
636     synchro->state = SIMIX_DONE;
637     SIMIX_comm_finish(synchro);
638     return;
639   }
640
641   xbt_dynar_foreach(synchros, cursor, synchro){
642     /* associate this simcall to the the synchro */
643     xbt_fifo_push(synchro->simcalls, simcall);
644
645     /* see if the synchro is already finished */
646     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
647       SIMIX_comm_finish(synchro);
648       break;
649     }
650   }
651 }
652
653 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
654 {
655   smx_synchro_t synchro;
656   unsigned int cursor = 0;
657   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
658
659   xbt_dynar_foreach(synchros, cursor, synchro) {
660     xbt_fifo_remove(synchro->simcalls, simcall);
661   }
662 }
663
664 /**
665  *  \brief Starts the simulation of a communication synchro.
666  *  \param synchro the communication synchro
667  */
668 static inline void SIMIX_comm_start(smx_synchro_t synchro)
669 {
670   /* If both the sender and the receiver are already there, start the communication */
671   if (synchro->state == SIMIX_READY) {
672
673     sg_host_t sender = synchro->comm.src_proc->host;
674     sg_host_t receiver = synchro->comm.dst_proc->host;
675
676     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
677               sg_host_get_name(sender), sg_host_get_name(receiver));
678
679     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
680                                                             sender, receiver,
681                                                             synchro->comm.task_size, synchro->comm.rate);
682
683     synchro->comm.surf_comm->setData(synchro);
684
685     synchro->state = SIMIX_RUNNING;
686
687     /* If a link is failed, detect it immediately */
688     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
689       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
690                 sg_host_get_name(sender), sg_host_get_name(receiver));
691       synchro->state = SIMIX_LINK_FAILURE;
692       SIMIX_comm_destroy_internal_actions(synchro);
693     }
694
695     /* If any of the process is suspend, create the synchro but stop its execution,
696        it will be restarted when the sender process resume */
697     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
698         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
699       /* FIXME: check what should happen with the synchro state */
700
701       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
702         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
703                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
704       else
705         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
706                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
707
708       synchro->comm.surf_comm->suspend();
709
710     }
711   }
712 }
713
714 /**
715  * \brief Answers the SIMIX simcalls associated to a communication synchro.
716  * \param synchro a finished communication synchro
717  */
718 void SIMIX_comm_finish(smx_synchro_t synchro)
719 {
720   unsigned int destroy_count = 0;
721   smx_simcall_t simcall;
722
723   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
724
725     /* If a waitany simcall is waiting for this synchro to finish, then remove
726        it from the other synchros in the waitany list. Afterwards, get the
727        position of the actual synchro in the waitany dynar and
728        return it as the result of the simcall */
729
730     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
731       continue; // if process handling comm is killed
732     if (simcall->call == SIMCALL_COMM_WAITANY) {
733       SIMIX_waitany_remove_simcall_from_actions(simcall);
734       if (!MC_is_active() && !MC_record_replay_is_active())
735         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
736     }
737
738     /* If the synchro is still in a rendez-vous point then remove from it */
739     if (synchro->comm.mbox)
740       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
741
742     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
743
744     /* Check out for errors */
745
746     if (simcall->issuer->host->isOff()) {
747       simcall->issuer->context->iwannadie = 1;
748       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
749     } else
750
751     switch (synchro->state) {
752
753     case SIMIX_DONE:
754       XBT_DEBUG("Communication %p complete!", synchro);
755       SIMIX_comm_copy_data(synchro);
756       break;
757
758     case SIMIX_SRC_TIMEOUT:
759       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
760                     "Communication timeouted because of sender");
761       break;
762
763     case SIMIX_DST_TIMEOUT:
764       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
765                     "Communication timeouted because of receiver");
766       break;
767
768     case SIMIX_SRC_HOST_FAILURE:
769       if (simcall->issuer == synchro->comm.src_proc)
770         simcall->issuer->context->iwannadie = 1;
771 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
772       else
773         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
774       break;
775
776     case SIMIX_DST_HOST_FAILURE:
777       if (simcall->issuer == synchro->comm.dst_proc)
778         simcall->issuer->context->iwannadie = 1;
779 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
780       else
781         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
782       break;
783
784     case SIMIX_LINK_FAILURE:
785
786       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
787                 synchro,
788                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
789                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
790                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
791       if (synchro->comm.src_proc == simcall->issuer) {
792         XBT_DEBUG("I'm source");
793       } else if (synchro->comm.dst_proc == simcall->issuer) {
794         XBT_DEBUG("I'm dest");
795       } else {
796         XBT_DEBUG("I'm neither source nor dest");
797       }
798       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
799       break;
800
801     case SIMIX_CANCELED:
802       if (simcall->issuer == synchro->comm.dst_proc)
803         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
804                       "Communication canceled by the sender");
805       else
806         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
807                       "Communication canceled by the receiver");
808       break;
809
810     default:
811       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
812     }
813
814     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
815     if (simcall->issuer->doexception) {
816       if (simcall->call == SIMCALL_COMM_WAITANY) {
817         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
818       }
819       else if (simcall->call == SIMCALL_COMM_TESTANY) {
820         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
821       }
822     }
823
824     if (simcall->issuer->host->isOff()) {
825       simcall->issuer->context->iwannadie = 1;
826     }
827
828     simcall->issuer->waiting_synchro = NULL;
829     xbt_fifo_remove(simcall->issuer->comms, synchro);
830     if(synchro->comm.detached){
831       if(simcall->issuer == synchro->comm.src_proc){
832         if(synchro->comm.dst_proc)
833           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
834       }
835       if(simcall->issuer == synchro->comm.dst_proc){
836         if(synchro->comm.src_proc)
837           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
838       }
839     }
840     SIMIX_simcall_answer(simcall);
841     destroy_count++;
842   }
843
844   while (destroy_count-- > 0)
845     SIMIX_comm_destroy(synchro);
846 }
847
848 /**
849  * \brief This function is called when a Surf communication synchro is finished.
850  * \param synchro the corresponding Simix communication
851  */
852 void SIMIX_post_comm(smx_synchro_t synchro)
853 {
854   /* Update synchro state */
855   if (synchro->comm.src_timeout &&
856       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
857     synchro->state = SIMIX_SRC_TIMEOUT;
858   else if (synchro->comm.dst_timeout &&
859     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
860     synchro->state = SIMIX_DST_TIMEOUT;
861   else if (synchro->comm.src_timeout &&
862     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
863     synchro->state = SIMIX_SRC_HOST_FAILURE;
864   else if (synchro->comm.dst_timeout &&
865       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
866     synchro->state = SIMIX_DST_HOST_FAILURE;
867   else if (synchro->comm.surf_comm &&
868     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
869     XBT_DEBUG("Puta madre. Surf says that the link broke");
870     synchro->state = SIMIX_LINK_FAILURE;
871   } else
872     synchro->state = SIMIX_DONE;
873
874   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
875             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
876
877   /* destroy the surf actions associated with the Simix communication */
878   SIMIX_comm_destroy_internal_actions(synchro);
879
880   /* if there are simcalls associated with the synchro, then answer them */
881   if (xbt_fifo_size(synchro->simcalls)) {
882     SIMIX_comm_finish(synchro);
883   }
884 }
885
886 void SIMIX_comm_cancel(smx_synchro_t synchro)
887 {
888   /* if the synchro is a waiting state means that it is still in a mbox */
889   /* so remove from it and delete it */
890   if (synchro->state == SIMIX_WAITING) {
891     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
892     synchro->state = SIMIX_CANCELED;
893   }
894   else if (!MC_is_active() /* when running the MC there are no surf actions */
895            && !MC_record_replay_is_active()
896            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
897
898     synchro->comm.surf_comm->cancel();
899   }
900 }
901
902 void SIMIX_comm_suspend(smx_synchro_t synchro)
903 {
904   /*FIXME: shall we suspend also the timeout synchro? */
905   if (synchro->comm.surf_comm)
906     synchro->comm.surf_comm->suspend();
907   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
908 }
909
910 void SIMIX_comm_resume(smx_synchro_t synchro)
911 {
912   /*FIXME: check what happen with the timeouts */
913   if (synchro->comm.surf_comm)
914     synchro->comm.surf_comm->resume();
915   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
916 }
917
918
919 /************* synchro Getters **************/
920
921 /**
922  *  \brief get the amount remaining from the communication
923  *  \param synchro The communication
924  */
925 double SIMIX_comm_get_remains(smx_synchro_t synchro)
926 {
927   double remains;
928
929   if(!synchro){
930     return 0;
931   }
932
933   switch (synchro->state) {
934
935   case SIMIX_RUNNING:
936     remains = synchro->comm.surf_comm->getRemains();
937     break;
938
939   case SIMIX_WAITING:
940   case SIMIX_READY:
941     remains = 0; /*FIXME: check what should be returned */
942     break;
943
944   default:
945     remains = 0; /*FIXME: is this correct? */
946     break;
947   }
948   return remains;
949 }
950
951 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
952 {
953   return synchro->state;
954 }
955
956 /**
957  *  \brief Return the user data associated to the sender of the communication
958  *  \param synchro The communication
959  *  \return the user data
960  */
961 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
962 {
963   return synchro->comm.src_data;
964 }
965
966 /**
967  *  \brief Return the user data associated to the receiver of the communication
968  *  \param synchro The communication
969  *  \return the user data
970  */
971 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
972 {
973   return synchro->comm.dst_data;
974 }
975
976 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
977 {
978   return synchro->comm.src_proc;
979 }
980
981 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
982 {
983   return synchro->comm.dst_proc;
984 }
985
986 /******************************************************************************/
987 /*                    SIMIX_comm_copy_data callbacks                       */
988 /******************************************************************************/
989 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
990   &SIMIX_comm_copy_pointer_callback;
991
992 void
993 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
994 {
995   SIMIX_comm_copy_data_callback = callback;
996 }
997
998 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
999 {
1000   xbt_assert((buff_size == sizeof(void *)),
1001              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1002   *(void **) (comm->comm.dst_buff) = buff;
1003 }
1004
1005 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1006 {
1007   XBT_DEBUG("Copy the data over");
1008   memcpy(comm->comm.dst_buff, buff, buff_size);
1009   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1010     xbt_free(buff);
1011     comm->comm.src_buff = NULL;
1012   }
1013 }
1014
1015
1016 /**
1017  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1018  *  \param comm The communication
1019  */
1020 void SIMIX_comm_copy_data(smx_synchro_t comm)
1021 {
1022   size_t buff_size = comm->comm.src_buff_size;
1023   /* If there is no data to be copy then return */
1024   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1025     return;
1026
1027   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1028             comm,
1029             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1030             comm->comm.src_buff,
1031             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1032             comm->comm.dst_buff, buff_size);
1033
1034   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1035   if (comm->comm.dst_buff_size)
1036     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1037
1038   /* Update the receiver's buffer size to the copied amount */
1039   if (comm->comm.dst_buff_size)
1040     *comm->comm.dst_buff_size = buff_size;
1041
1042   if (buff_size > 0){
1043       if(comm->comm.copy_data_fun)
1044         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1045       else
1046         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1047   }
1048
1049
1050   /* Set the copied flag so we copy data only once */
1051   /* (this function might be called from both communication ends) */
1052   comm->comm.copied = 1;
1053 }