Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
refactor rdv to mailbox for consistency
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
9 #include "xbt/log.h"
10 #include "mc/mc.h"
11 #include "src/mc/mc_replay.h"
12 #include "xbt/dict.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
15
16 static xbt_dict_t mailboxes = NULL;
17 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
18
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
23 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
24                                         int (*match_fun)(void *, void *,smx_synchro_t),
25                                         void *user_data, smx_synchro_t my_synchro);
26 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
27                                         int (*match_fun)(void *, void *,smx_synchro_t),
28                                         void *user_data, smx_synchro_t my_synchro);
29 static void SIMIX_mbox_free(void *data);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
31
32 void SIMIX_network_init(void)
33 {
34   mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&mailboxes);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_mailbox_t SIMIX_mbox_create(const char *name)
47 {
48   /* two processes may have pushed the same mbox_create simcall at the same time */
49   smx_mailbox_t mbox = name ? (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name) : NULL;
50
51   if (!mbox) {
52     mbox = xbt_new0(s_smx_mailbox_t, 1);
53     mbox->name = name ? xbt_strdup(name) : NULL;
54     mbox->comm_fifo = xbt_fifo_new();
55     mbox->done_comm_fifo = xbt_fifo_new();
56     mbox->permanent_receiver=NULL;
57
58     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
59
60     if (mbox->name)
61       xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
62   }
63   return mbox;
64 }
65
66 void SIMIX_mbox_destroy(smx_mailbox_t mbox)
67 {
68   if (mbox->name)
69     xbt_dict_remove(mailboxes, mbox->name);
70 }
71
72 void SIMIX_mbox_free(void *data)
73 {
74   XBT_DEBUG("mbox free %p", data);
75   smx_mailbox_t mbox = (smx_mailbox_t) data;
76   xbt_free(mbox->name);
77   xbt_fifo_free(mbox->comm_fifo);
78   xbt_fifo_free(mbox->done_comm_fifo);
79
80   xbt_free(mbox);
81 }
82
83 xbt_dict_t SIMIX_get_mailboxes()
84 {
85   return mailboxes;
86 }
87
88 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
89 {
90   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
91 }
92
93 int SIMIX_mbox_comm_count_by_host(smx_mailbox_t mbox, sg_host_t host)
94 {
95   smx_synchro_t comm = NULL;
96   xbt_fifo_item_t item = NULL;
97   int count = 0;
98
99   xbt_fifo_foreach(mbox->comm_fifo, item, comm, smx_synchro_t) {
100     if (comm->comm.src_proc->host == host)
101       count++;
102   }
103
104   return count;
105 }
106
107 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
108 {
109   return (smx_synchro_t) xbt_fifo_get_item_content(
110     xbt_fifo_get_first_item(mbox->comm_fifo));
111 }
112
113 /**
114  *  \brief get the receiver (process associated to the mailbox)
115  *  \param mbox The rendez-vous point
116  *  \return process The receiving process (NULL if not set)
117  */
118 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
119 {
120   return mbox->permanent_receiver;
121 }
122
123 /**
124  *  \brief set the receiver of the rendez vous point to allow eager sends
125  *  \param mbox The rendez-vous point
126  *  \param process The receiving process
127  */
128 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
129 {
130   mbox->permanent_receiver=process;
131 }
132
133 /**
134  *  \brief Pushes a communication synchro into a rendez-vous point
135  *  \param mbox The mailbox
136  *  \param comm The communication synchro
137  */
138 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
139 {
140   xbt_fifo_push(mbox->comm_fifo, comm);
141   comm->comm.mbox = mbox;
142 }
143
144 /**
145  *  \brief Removes a communication synchro from a rendez-vous point
146  *  \param mbox The rendez-vous point
147  *  \param comm The communication synchro
148  */
149 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
150 {
151   xbt_fifo_remove(mbox->comm_fifo, comm);
152   comm->comm.mbox = NULL;
153 }
154
155 /**
156  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
157  *  \param type The type of communication we are looking for (comm_send, comm_recv)
158  *  \return The communication synchro if found, NULL otherwise
159  */
160 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
161                                  int (*match_fun)(void *, void *,smx_synchro_t),
162                                  void *this_user_data, smx_synchro_t my_synchro)
163 {
164   smx_synchro_t synchro;
165   xbt_fifo_item_t item;
166   void* other_user_data = NULL;
167
168   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
169     if (synchro->comm.type == SIMIX_COMM_SEND) {
170       other_user_data = synchro->comm.src_data;
171     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
172       other_user_data = synchro->comm.dst_data;
173     }
174     if (synchro->comm.type == type &&
175         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
176         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
177       XBT_DEBUG("Found a matching communication synchro %p", synchro);
178       xbt_fifo_remove_item(fifo, item);
179       xbt_fifo_free_item(item);
180       synchro->comm.refcount++;
181 #if HAVE_MC
182       synchro->comm.mbox_cpy = synchro->comm.mbox;
183 #endif
184       synchro->comm.mbox = NULL;
185       return synchro;
186     }
187     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
188               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
189               synchro, (int)synchro->comm.type, (int)type);
190   }
191   XBT_DEBUG("No matching communication synchro found");
192   return NULL;
193 }
194
195
196 /**
197  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
198  *  \param type The type of communication we are looking for (comm_send, comm_recv)
199  *  \return The communication synchro if found, NULL otherwise
200  */
201 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
202                                  int (*match_fun)(void *, void *,smx_synchro_t),
203                                  void *this_user_data, smx_synchro_t my_synchro)
204 {
205   smx_synchro_t synchro;
206   xbt_fifo_item_t item;
207   void* other_user_data = NULL;
208
209   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
210     if (synchro->comm.type == SIMIX_COMM_SEND) {
211       other_user_data = synchro->comm.src_data;
212     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
213       other_user_data = synchro->comm.dst_data;
214     }
215     if (synchro->comm.type == type &&
216         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
217         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
218       XBT_DEBUG("Found a matching communication synchro %p", synchro);
219       synchro->comm.refcount++;
220
221       return synchro;
222     }
223     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
224               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
225               synchro, (int)synchro->comm.type, (int)type);
226   }
227   XBT_DEBUG("No matching communication synchro found");
228   return NULL;
229 }
230 /******************************************************************************/
231 /*                          Communication synchros                            */
232 /******************************************************************************/
233
234 /**
235  *  \brief Creates a new communicate synchro
236  *  \param type The direction of communication (comm_send, comm_recv)
237  *  \return The new communicate synchro
238  */
239 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
240 {
241   smx_synchro_t synchro;
242
243   /* alloc structures */
244   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
245
246   synchro->type = SIMIX_SYNC_COMMUNICATE;
247   synchro->state = SIMIX_WAITING;
248
249   /* set communication */
250   synchro->comm.type = type;
251   synchro->comm.refcount = 1;
252   synchro->comm.src_data=NULL;
253   synchro->comm.dst_data=NULL;
254
255   synchro->category = NULL;
256
257   XBT_DEBUG("Create communicate synchro %p", synchro);
258   ++smx_total_comms;
259
260   return synchro;
261 }
262
263 /**
264  *  \brief Destroy a communicate synchro
265  *  \param synchro The communicate synchro to be destroyed
266  */
267 void SIMIX_comm_destroy(smx_synchro_t synchro)
268 {
269   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
270             synchro, synchro->comm.refcount, (int)synchro->state);
271
272   if (synchro->comm.refcount <= 0) {
273     xbt_backtrace_display_current();
274     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
275             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
276   }
277   synchro->comm.refcount--;
278   if (synchro->comm.refcount > 0)
279       return;
280   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
281       synchro->comm.refcount);
282
283   xbt_free(synchro->name);
284   SIMIX_comm_destroy_internal_actions(synchro);
285
286   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
287     /* the communication has failed and was detached:
288      * we have to free the buffer */
289     if (synchro->comm.clean_fun) {
290       synchro->comm.clean_fun(synchro->comm.src_buff);
291     }
292     synchro->comm.src_buff = NULL;
293   }
294
295   if(synchro->comm.mbox)
296     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
297
298   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
299 }
300
301 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
302 {
303   if (synchro->comm.surf_comm){
304     synchro->comm.surf_comm->unref();
305     synchro->comm.surf_comm = NULL;
306   }
307
308   if (synchro->comm.src_timeout){
309     synchro->comm.src_timeout->unref();
310     synchro->comm.src_timeout = NULL;
311   }
312
313   if (synchro->comm.dst_timeout){
314     synchro->comm.dst_timeout->unref();
315     synchro->comm.dst_timeout = NULL;
316   }
317 }
318
319 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
320                                   double task_size, double rate,
321                                   void *src_buff, size_t src_buff_size,
322                                   int (*match_fun)(void *, void *,smx_synchro_t),
323                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
324           void *data, double timeout){
325   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
326                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
327                data, 0);
328   SIMCALL_SET_MC_VALUE(simcall, 0);
329   simcall_HANDLER_comm_wait(simcall, comm, timeout);
330 }
331 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
332                                   double task_size, double rate,
333                                   void *src_buff, size_t src_buff_size,
334                                   int (*match_fun)(void *, void *,smx_synchro_t),
335                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
336                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
337                           void *data, int detached)
338 {
339   XBT_DEBUG("send from %p", mbox);
340
341   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
342   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
343
344   /* Look for communication synchro matching our needs. We also provide a description of
345    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
346    *
347    * If it is not found then push our communication into the rendez-vous point */
348   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
349
350   if (!other_synchro) {
351     other_synchro = this_synchro;
352
353     if (mbox->permanent_receiver!=NULL){
354       //this mailbox is for small messages, which have to be sent right now
355       other_synchro->state = SIMIX_READY;
356       other_synchro->comm.dst_proc=mbox->permanent_receiver;
357       other_synchro->comm.refcount++;
358       xbt_fifo_push(mbox->done_comm_fifo,other_synchro);
359       other_synchro->comm.mbox=mbox;
360       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
361
362     }else{
363       SIMIX_mbox_push(mbox, this_synchro);
364     }
365   } else {
366     XBT_DEBUG("Receive already pushed");
367
368     SIMIX_comm_destroy(this_synchro);
369     --smx_total_comms; // this creation was a pure waste
370
371     other_synchro->state = SIMIX_READY;
372     other_synchro->comm.type = SIMIX_COMM_READY;
373
374   }
375   xbt_fifo_push(src_proc->comms, other_synchro);
376
377   /* if the communication synchro is detached then decrease the refcount
378    * by one, so it will be eliminated by the receiver's destroy call */
379   if (detached) {
380     other_synchro->comm.detached = 1;
381     other_synchro->comm.refcount--;
382     other_synchro->comm.clean_fun = clean_fun;
383   } else {
384     other_synchro->comm.clean_fun = NULL;
385   }
386
387   /* Setup the communication synchro */
388   other_synchro->comm.src_proc = src_proc;
389   other_synchro->comm.task_size = task_size;
390   other_synchro->comm.rate = rate;
391   other_synchro->comm.src_buff = src_buff;
392   other_synchro->comm.src_buff_size = src_buff_size;
393   other_synchro->comm.src_data = data;
394
395   other_synchro->comm.match_fun = match_fun;
396   other_synchro->comm.copy_data_fun = copy_data_fun;
397
398
399   if (MC_is_active() || MC_record_replay_is_active()) {
400     other_synchro->state = SIMIX_RUNNING;
401     return (detached ? NULL : other_synchro);
402   }
403
404   SIMIX_comm_start(other_synchro);
405   return (detached ? NULL : other_synchro);
406 }
407
408 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
409                          void *dst_buff, size_t *dst_buff_size,
410                          int (*match_fun)(void *, void *, smx_synchro_t),
411                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
412                          void *data, double timeout, double rate)
413 {
414   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
415                            dst_buff_size, match_fun, copy_data_fun, data, rate);
416   SIMCALL_SET_MC_VALUE(simcall, 0);
417   simcall_HANDLER_comm_wait(simcall, comm, timeout);
418 }
419
420 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
421     void *dst_buff, size_t *dst_buff_size,
422     int (*match_fun)(void *, void *, smx_synchro_t),
423     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
424     void *data, double rate)
425 {
426   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
427 }
428
429 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
430     int (*match_fun)(void *, void *, smx_synchro_t),
431     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
432     void *data, double rate)
433 {
434   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_fifo);
435   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
436
437   smx_synchro_t other_synchro;
438   //communication already done, get it inside the fifo of completed comms
439   if (mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0) {
440
441     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
442     //find a match in the already received fifo
443     other_synchro = SIMIX_fifo_get_comm(mbox->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
444     //if not found, assume the receiver came first, register it to the mailbox in the classical way
445     if (!other_synchro)  {
446       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
447       other_synchro = this_synchro;
448       SIMIX_mbox_push(mbox, this_synchro);
449     } else {
450       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
451         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
452         other_synchro->state = SIMIX_DONE;
453         other_synchro->comm.type = SIMIX_COMM_DONE;
454         other_synchro->comm.mbox = NULL;
455       }
456       other_synchro->comm.refcount--;
457       SIMIX_comm_destroy(this_synchro);
458       --smx_total_comms; // this creation was a pure waste
459     }
460   } else {
461     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
462
463     /* Look for communication synchro matching our needs. We also provide a description of
464      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
465      *
466      * If it is not found then push our communication into the rendez-vous point */
467     other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
468
469     if (!other_synchro) {
470       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(mbox->comm_fifo));
471       other_synchro = this_synchro;
472       SIMIX_mbox_push(mbox, this_synchro);
473     } else {
474       SIMIX_comm_destroy(this_synchro);
475       --smx_total_comms; // this creation was a pure waste
476       other_synchro->state = SIMIX_READY;
477       other_synchro->comm.type = SIMIX_COMM_READY;
478       //other_synchro->comm.refcount--;
479     }
480     xbt_fifo_push(dst_proc->comms, other_synchro);
481   }
482
483   /* Setup communication synchro */
484   other_synchro->comm.dst_proc = dst_proc;
485   other_synchro->comm.dst_buff = dst_buff;
486   other_synchro->comm.dst_buff_size = dst_buff_size;
487   other_synchro->comm.dst_data = data;
488
489   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
490     other_synchro->comm.rate = rate;
491
492   other_synchro->comm.match_fun = match_fun;
493   other_synchro->comm.copy_data_fun = copy_data_fun;
494
495   if (MC_is_active() || MC_record_replay_is_active()) {
496     other_synchro->state = SIMIX_RUNNING;
497     return other_synchro;
498   }
499
500   SIMIX_comm_start(other_synchro);
501   return other_synchro;
502 }
503
504 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
505                                    int type, int src, int tag,
506                                    int (*match_fun)(void *, void *, smx_synchro_t),
507                                    void *data){
508   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
509 }
510
511 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
512                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
513 {
514   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_fifo);
515   smx_synchro_t this_synchro;
516   int smx_type;
517   if(type == 1){
518     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
519     smx_type = SIMIX_COMM_RECEIVE;
520   } else{
521     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
522     smx_type = SIMIX_COMM_SEND;
523   } 
524   smx_synchro_t other_synchro=NULL;
525   if(mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0){
526     //find a match in the already received fifo
527       XBT_DEBUG("first try in the perm recv mailbox");
528
529     other_synchro = SIMIX_fifo_probe_comm(
530       mbox->done_comm_fifo, (e_smx_comm_type_t) smx_type,
531       match_fun, data, this_synchro);
532   }
533  // }else{
534     if(!other_synchro){
535         XBT_DEBUG("try in the normal mailbox");
536         other_synchro = SIMIX_fifo_probe_comm(
537           mbox->comm_fifo, (e_smx_comm_type_t) smx_type,
538           match_fun, data, this_synchro);
539     }
540 //  }
541   if(other_synchro)other_synchro->comm.refcount--;
542
543   SIMIX_comm_destroy(this_synchro);
544   --smx_total_comms;
545   return other_synchro;
546 }
547
548 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
549 {
550   /* the simcall may be a wait, a send or a recv */
551   surf_action_t sleep;
552
553   /* Associate this simcall to the wait synchro */
554   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
555
556   xbt_fifo_push(synchro->simcalls, simcall);
557   simcall->issuer->waiting_synchro = synchro;
558
559   if (MC_is_active() || MC_record_replay_is_active()) {
560     int idx = SIMCALL_GET_MC_VALUE(simcall);
561     if (idx == 0) {
562       synchro->state = SIMIX_DONE;
563     } else {
564       /* If we reached this point, the wait simcall must have a timeout */
565       /* Otherwise it shouldn't be enabled and executed by the MC */
566       if (timeout == -1)
567         THROW_IMPOSSIBLE;
568
569       if (synchro->comm.src_proc == simcall->issuer)
570         synchro->state = SIMIX_SRC_TIMEOUT;
571       else
572         synchro->state = SIMIX_DST_TIMEOUT;
573     }
574
575     SIMIX_comm_finish(synchro);
576     return;
577   }
578
579   /* If the synchro has already finish perform the error handling, */
580   /* otherwise set up a waiting timeout on the right side          */
581   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
582     SIMIX_comm_finish(synchro);
583   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
584     sleep = surf_host_sleep(simcall->issuer->host, timeout);
585     sleep->setData(synchro);
586
587     if (simcall->issuer == synchro->comm.src_proc)
588       synchro->comm.src_timeout = sleep;
589     else
590       synchro->comm.dst_timeout = sleep;
591   }
592 }
593
594 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
595 {
596   if(MC_is_active() || MC_record_replay_is_active()){
597     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
598     if(simcall_comm_test__get__result(simcall)){
599       synchro->state = SIMIX_DONE;
600       xbt_fifo_push(synchro->simcalls, simcall);
601       SIMIX_comm_finish(synchro);
602     }else{
603       SIMIX_simcall_answer(simcall);
604     }
605     return;
606   }
607
608   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
609   if (simcall_comm_test__get__result(simcall)) {
610     xbt_fifo_push(synchro->simcalls, simcall);
611     SIMIX_comm_finish(synchro);
612   } else {
613     SIMIX_simcall_answer(simcall);
614   }
615 }
616
617 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
618 {
619   unsigned int cursor;
620   smx_synchro_t synchro;
621   simcall_comm_testany__set__result(simcall, -1);
622
623   if (MC_is_active() || MC_record_replay_is_active()){
624     int idx = SIMCALL_GET_MC_VALUE(simcall);
625     if(idx == -1){
626       SIMIX_simcall_answer(simcall);
627     }else{
628       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
629       simcall_comm_testany__set__result(simcall, idx);
630       xbt_fifo_push(synchro->simcalls, simcall);
631       synchro->state = SIMIX_DONE;
632       SIMIX_comm_finish(synchro);
633     }
634     return;
635   }
636
637   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
638     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
639       simcall_comm_testany__set__result(simcall, cursor);
640       xbt_fifo_push(synchro->simcalls, simcall);
641       SIMIX_comm_finish(synchro);
642       return;
643     }
644   }
645   SIMIX_simcall_answer(simcall);
646 }
647
648 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
649 {
650   smx_synchro_t synchro;
651   unsigned int cursor = 0;
652
653   if (MC_is_active() || MC_record_replay_is_active()){
654     int idx = SIMCALL_GET_MC_VALUE(simcall);
655     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
656     xbt_fifo_push(synchro->simcalls, simcall);
657     simcall_comm_waitany__set__result(simcall, idx);
658     synchro->state = SIMIX_DONE;
659     SIMIX_comm_finish(synchro);
660     return;
661   }
662
663   xbt_dynar_foreach(synchros, cursor, synchro){
664     /* associate this simcall to the the synchro */
665     xbt_fifo_push(synchro->simcalls, simcall);
666
667     /* see if the synchro is already finished */
668     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
669       SIMIX_comm_finish(synchro);
670       break;
671     }
672   }
673 }
674
675 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
676 {
677   smx_synchro_t synchro;
678   unsigned int cursor = 0;
679   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
680
681   xbt_dynar_foreach(synchros, cursor, synchro) {
682     xbt_fifo_remove(synchro->simcalls, simcall);
683   }
684 }
685
686 /**
687  *  \brief Starts the simulation of a communication synchro.
688  *  \param synchro the communication synchro
689  */
690 static inline void SIMIX_comm_start(smx_synchro_t synchro)
691 {
692   /* If both the sender and the receiver are already there, start the communication */
693   if (synchro->state == SIMIX_READY) {
694
695     sg_host_t sender = synchro->comm.src_proc->host;
696     sg_host_t receiver = synchro->comm.dst_proc->host;
697
698     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
699               sg_host_get_name(sender), sg_host_get_name(receiver));
700
701     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
702                                                             sender, receiver,
703                                                             synchro->comm.task_size, synchro->comm.rate);
704
705     synchro->comm.surf_comm->setData(synchro);
706
707     synchro->state = SIMIX_RUNNING;
708
709     /* If a link is failed, detect it immediately */
710     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
711       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
712                 sg_host_get_name(sender), sg_host_get_name(receiver));
713       synchro->state = SIMIX_LINK_FAILURE;
714       SIMIX_comm_destroy_internal_actions(synchro);
715     }
716
717     /* If any of the process is suspend, create the synchro but stop its execution,
718        it will be restarted when the sender process resume */
719     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
720         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
721       /* FIXME: check what should happen with the synchro state */
722
723       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
724         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
725                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
726       else
727         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
728                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
729
730       synchro->comm.surf_comm->suspend();
731
732     }
733   }
734 }
735
736 /**
737  * \brief Answers the SIMIX simcalls associated to a communication synchro.
738  * \param synchro a finished communication synchro
739  */
740 void SIMIX_comm_finish(smx_synchro_t synchro)
741 {
742   unsigned int destroy_count = 0;
743   smx_simcall_t simcall;
744
745   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
746
747     /* If a waitany simcall is waiting for this synchro to finish, then remove
748        it from the other synchros in the waitany list. Afterwards, get the
749        position of the actual synchro in the waitany dynar and
750        return it as the result of the simcall */
751
752     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
753       continue; // if process handling comm is killed
754     if (simcall->call == SIMCALL_COMM_WAITANY) {
755       SIMIX_waitany_remove_simcall_from_actions(simcall);
756       if (!MC_is_active() && !MC_record_replay_is_active())
757         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
758     }
759
760     /* If the synchro is still in a rendez-vous point then remove from it */
761     if (synchro->comm.mbox)
762       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
763
764     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
765
766     /* Check out for errors */
767
768     if (simcall->issuer->host->isOff()) {
769       simcall->issuer->context->iwannadie = 1;
770       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
771     } else
772
773     switch (synchro->state) {
774
775     case SIMIX_DONE:
776       XBT_DEBUG("Communication %p complete!", synchro);
777       SIMIX_comm_copy_data(synchro);
778       break;
779
780     case SIMIX_SRC_TIMEOUT:
781       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
782                     "Communication timeouted because of sender");
783       break;
784
785     case SIMIX_DST_TIMEOUT:
786       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
787                     "Communication timeouted because of receiver");
788       break;
789
790     case SIMIX_SRC_HOST_FAILURE:
791       if (simcall->issuer == synchro->comm.src_proc)
792         simcall->issuer->context->iwannadie = 1;
793 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
794       else
795         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
796       break;
797
798     case SIMIX_DST_HOST_FAILURE:
799       if (simcall->issuer == synchro->comm.dst_proc)
800         simcall->issuer->context->iwannadie = 1;
801 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
802       else
803         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
804       break;
805
806     case SIMIX_LINK_FAILURE:
807
808       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
809                 synchro,
810                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
811                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
812                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
813       if (synchro->comm.src_proc == simcall->issuer) {
814         XBT_DEBUG("I'm source");
815       } else if (synchro->comm.dst_proc == simcall->issuer) {
816         XBT_DEBUG("I'm dest");
817       } else {
818         XBT_DEBUG("I'm neither source nor dest");
819       }
820       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
821       break;
822
823     case SIMIX_CANCELED:
824       if (simcall->issuer == synchro->comm.dst_proc)
825         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
826                       "Communication canceled by the sender");
827       else
828         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
829                       "Communication canceled by the receiver");
830       break;
831
832     default:
833       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
834     }
835
836     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
837     if (simcall->issuer->doexception) {
838       if (simcall->call == SIMCALL_COMM_WAITANY) {
839         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
840       }
841       else if (simcall->call == SIMCALL_COMM_TESTANY) {
842         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
843       }
844     }
845
846     if (simcall->issuer->host->isOff()) {
847       simcall->issuer->context->iwannadie = 1;
848     }
849
850     simcall->issuer->waiting_synchro = NULL;
851     xbt_fifo_remove(simcall->issuer->comms, synchro);
852     if(synchro->comm.detached){
853       if(simcall->issuer == synchro->comm.src_proc){
854         if(synchro->comm.dst_proc)
855           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
856       }
857       if(simcall->issuer == synchro->comm.dst_proc){
858         if(synchro->comm.src_proc)
859           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
860       }
861     }
862     SIMIX_simcall_answer(simcall);
863     destroy_count++;
864   }
865
866   while (destroy_count-- > 0)
867     SIMIX_comm_destroy(synchro);
868 }
869
870 /**
871  * \brief This function is called when a Surf communication synchro is finished.
872  * \param synchro the corresponding Simix communication
873  */
874 void SIMIX_post_comm(smx_synchro_t synchro)
875 {
876   /* Update synchro state */
877   if (synchro->comm.src_timeout &&
878       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
879     synchro->state = SIMIX_SRC_TIMEOUT;
880   else if (synchro->comm.dst_timeout &&
881     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
882     synchro->state = SIMIX_DST_TIMEOUT;
883   else if (synchro->comm.src_timeout &&
884     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
885     synchro->state = SIMIX_SRC_HOST_FAILURE;
886   else if (synchro->comm.dst_timeout &&
887       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
888     synchro->state = SIMIX_DST_HOST_FAILURE;
889   else if (synchro->comm.surf_comm &&
890     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
891     XBT_DEBUG("Puta madre. Surf says that the link broke");
892     synchro->state = SIMIX_LINK_FAILURE;
893   } else
894     synchro->state = SIMIX_DONE;
895
896   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
897             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
898
899   /* destroy the surf actions associated with the Simix communication */
900   SIMIX_comm_destroy_internal_actions(synchro);
901
902   /* if there are simcalls associated with the synchro, then answer them */
903   if (xbt_fifo_size(synchro->simcalls)) {
904     SIMIX_comm_finish(synchro);
905   }
906 }
907
908 void SIMIX_comm_cancel(smx_synchro_t synchro)
909 {
910   /* if the synchro is a waiting state means that it is still in a mbox */
911   /* so remove from it and delete it */
912   if (synchro->state == SIMIX_WAITING) {
913     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
914     synchro->state = SIMIX_CANCELED;
915   }
916   else if (!MC_is_active() /* when running the MC there are no surf actions */
917            && !MC_record_replay_is_active()
918            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
919
920     synchro->comm.surf_comm->cancel();
921   }
922 }
923
924 void SIMIX_comm_suspend(smx_synchro_t synchro)
925 {
926   /*FIXME: shall we suspend also the timeout synchro? */
927   if (synchro->comm.surf_comm)
928     synchro->comm.surf_comm->suspend();
929   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
930 }
931
932 void SIMIX_comm_resume(smx_synchro_t synchro)
933 {
934   /*FIXME: check what happen with the timeouts */
935   if (synchro->comm.surf_comm)
936     synchro->comm.surf_comm->resume();
937   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
938 }
939
940
941 /************* synchro Getters **************/
942
943 /**
944  *  \brief get the amount remaining from the communication
945  *  \param synchro The communication
946  */
947 double SIMIX_comm_get_remains(smx_synchro_t synchro)
948 {
949   double remains;
950
951   if(!synchro){
952     return 0;
953   }
954
955   switch (synchro->state) {
956
957   case SIMIX_RUNNING:
958     remains = synchro->comm.surf_comm->getRemains();
959     break;
960
961   case SIMIX_WAITING:
962   case SIMIX_READY:
963     remains = 0; /*FIXME: check what should be returned */
964     break;
965
966   default:
967     remains = 0; /*FIXME: is this correct? */
968     break;
969   }
970   return remains;
971 }
972
973 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
974 {
975   return synchro->state;
976 }
977
978 /**
979  *  \brief Return the user data associated to the sender of the communication
980  *  \param synchro The communication
981  *  \return the user data
982  */
983 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
984 {
985   return synchro->comm.src_data;
986 }
987
988 /**
989  *  \brief Return the user data associated to the receiver of the communication
990  *  \param synchro The communication
991  *  \return the user data
992  */
993 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
994 {
995   return synchro->comm.dst_data;
996 }
997
998 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
999 {
1000   return synchro->comm.src_proc;
1001 }
1002
1003 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1004 {
1005   return synchro->comm.dst_proc;
1006 }
1007
1008 /******************************************************************************/
1009 /*                    SIMIX_comm_copy_data callbacks                       */
1010 /******************************************************************************/
1011 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1012   &SIMIX_comm_copy_pointer_callback;
1013
1014 void
1015 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1016 {
1017   SIMIX_comm_copy_data_callback = callback;
1018 }
1019
1020 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1021 {
1022   xbt_assert((buff_size == sizeof(void *)),
1023              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1024   *(void **) (comm->comm.dst_buff) = buff;
1025 }
1026
1027 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1028 {
1029   XBT_DEBUG("Copy the data over");
1030   memcpy(comm->comm.dst_buff, buff, buff_size);
1031   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1032     xbt_free(buff);
1033     comm->comm.src_buff = NULL;
1034   }
1035 }
1036
1037
1038 /**
1039  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1040  *  \param comm The communication
1041  */
1042 void SIMIX_comm_copy_data(smx_synchro_t comm)
1043 {
1044   size_t buff_size = comm->comm.src_buff_size;
1045   /* If there is no data to be copy then return */
1046   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1047     return;
1048
1049   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1050             comm,
1051             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1052             comm->comm.src_buff,
1053             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1054             comm->comm.dst_buff, buff_size);
1055
1056   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1057   if (comm->comm.dst_buff_size)
1058     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1059
1060   /* Update the receiver's buffer size to the copied amount */
1061   if (comm->comm.dst_buff_size)
1062     *comm->comm.dst_buff_size = buff_size;
1063
1064   if (buff_size > 0){
1065       if(comm->comm.copy_data_fun)
1066         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1067       else
1068         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1069   }
1070
1071
1072   /* Set the copied flag so we copy data only once */
1073   /* (this function might be called from both communication ends) */
1074   comm->comm.copied = 1;
1075 }