Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
useless global variable, hindering MC equality
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
9 #include "xbt/log.h"
10 #include "mc/mc.h"
11 #include "src/mc/mc_replay.h"
12 #include "xbt/dict.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
15
16 static xbt_dict_t mailboxes = NULL;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_synchro_t comm);
20 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm);
22 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_synchro_t),
24                                         void *user_data, smx_synchro_t my_synchro);
25 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_synchro_t),
27                                         void *user_data, smx_synchro_t my_synchro);
28 static void SIMIX_mbox_free(void *data);
29 static void SIMIX_comm_start(smx_synchro_t synchro);
30
31 void SIMIX_network_init(void)
32 {
33   mailboxes = xbt_dict_new_homogeneous(SIMIX_mbox_free);
34 }
35
36 void SIMIX_network_exit(void)
37 {
38   xbt_dict_free(&mailboxes);
39 }
40
41 /******************************************************************************/
42 /*                           Rendez-Vous Points                               */
43 /******************************************************************************/
44
45 smx_mailbox_t SIMIX_mbox_create(const char *name)
46 {
47   /* two processes may have pushed the same mbox_create simcall at the same time */
48   smx_mailbox_t mbox = name ? (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name) : NULL;
49
50   if (!mbox) {
51     mbox = xbt_new0(s_smx_mailbox_t, 1);
52     mbox->name = name ? xbt_strdup(name) : NULL;
53     mbox->comm_fifo = xbt_fifo_new();
54     mbox->done_comm_fifo = xbt_fifo_new();
55     mbox->permanent_receiver=NULL;
56
57     XBT_DEBUG("Creating a mailbox at %p with name %s", mbox, name);
58
59     if (mbox->name)
60       xbt_dict_set(mailboxes, mbox->name, mbox, NULL);
61   }
62   return mbox;
63 }
64
65 void SIMIX_mbox_destroy(smx_mailbox_t mbox)
66 {
67   if (mbox->name)
68     xbt_dict_remove(mailboxes, mbox->name);
69 }
70
71 void SIMIX_mbox_free(void *data)
72 {
73   XBT_DEBUG("mbox free %p", data);
74   smx_mailbox_t mbox = (smx_mailbox_t) data;
75   xbt_free(mbox->name);
76   xbt_fifo_free(mbox->comm_fifo);
77   xbt_fifo_free(mbox->done_comm_fifo);
78
79   xbt_free(mbox);
80 }
81
82 smx_mailbox_t SIMIX_mbox_get_by_name(const char *name)
83 {
84   return (smx_mailbox_t) xbt_dict_get_or_null(mailboxes, name);
85 }
86
87 int SIMIX_mbox_comm_count_by_host(smx_mailbox_t mbox, sg_host_t host)
88 {
89   smx_synchro_t comm = NULL;
90   xbt_fifo_item_t item = NULL;
91   int count = 0;
92
93   xbt_fifo_foreach(mbox->comm_fifo, item, comm, smx_synchro_t) {
94     if (comm->comm.src_proc->host == host)
95       count++;
96   }
97
98   return count;
99 }
100
101 smx_synchro_t SIMIX_mbox_get_head(smx_mailbox_t mbox)
102 {
103   return (smx_synchro_t) xbt_fifo_get_item_content(
104     xbt_fifo_get_first_item(mbox->comm_fifo));
105 }
106
107 /**
108  *  \brief get the receiver (process associated to the mailbox)
109  *  \param mbox The rendez-vous point
110  *  \return process The receiving process (NULL if not set)
111  */
112 smx_process_t SIMIX_mbox_get_receiver(smx_mailbox_t mbox)
113 {
114   return mbox->permanent_receiver;
115 }
116
117 /**
118  *  \brief set the receiver of the rendez vous point to allow eager sends
119  *  \param mbox The rendez-vous point
120  *  \param process The receiving process
121  */
122 void SIMIX_mbox_set_receiver(smx_mailbox_t mbox, smx_process_t process)
123 {
124   mbox->permanent_receiver=process;
125 }
126
127 /**
128  *  \brief Pushes a communication synchro into a rendez-vous point
129  *  \param mbox The mailbox
130  *  \param comm The communication synchro
131  */
132 static inline void SIMIX_mbox_push(smx_mailbox_t mbox, smx_synchro_t comm)
133 {
134   xbt_fifo_push(mbox->comm_fifo, comm);
135   comm->comm.mbox = mbox;
136 }
137
138 /**
139  *  \brief Removes a communication synchro from a rendez-vous point
140  *  \param mbox The rendez-vous point
141  *  \param comm The communication synchro
142  */
143 void SIMIX_mbox_remove(smx_mailbox_t mbox, smx_synchro_t comm)
144 {
145   xbt_fifo_remove(mbox->comm_fifo, comm);
146   comm->comm.mbox = NULL;
147 }
148
149 /**
150  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
151  *  \param type The type of communication we are looking for (comm_send, comm_recv)
152  *  \return The communication synchro if found, NULL otherwise
153  */
154 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
155                                  int (*match_fun)(void *, void *,smx_synchro_t),
156                                  void *this_user_data, smx_synchro_t my_synchro)
157 {
158   smx_synchro_t synchro;
159   xbt_fifo_item_t item;
160   void* other_user_data = NULL;
161
162   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
163     if (synchro->comm.type == SIMIX_COMM_SEND) {
164       other_user_data = synchro->comm.src_data;
165     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
166       other_user_data = synchro->comm.dst_data;
167     }
168     if (synchro->comm.type == type &&
169         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
170         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
171       XBT_DEBUG("Found a matching communication synchro %p", synchro);
172       xbt_fifo_remove_item(fifo, item);
173       xbt_fifo_free_item(item);
174       synchro->comm.refcount++;
175 #if HAVE_MC
176       synchro->comm.mbox_cpy = synchro->comm.mbox;
177 #endif
178       synchro->comm.mbox = NULL;
179       return synchro;
180     }
181     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
182               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
183               synchro, (int)synchro->comm.type, (int)type);
184   }
185   XBT_DEBUG("No matching communication synchro found");
186   return NULL;
187 }
188
189
190 /**
191  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
192  *  \param type The type of communication we are looking for (comm_send, comm_recv)
193  *  \return The communication synchro if found, NULL otherwise
194  */
195 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
196                                  int (*match_fun)(void *, void *,smx_synchro_t),
197                                  void *this_user_data, smx_synchro_t my_synchro)
198 {
199   smx_synchro_t synchro;
200   xbt_fifo_item_t item;
201   void* other_user_data = NULL;
202
203   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
204     if (synchro->comm.type == SIMIX_COMM_SEND) {
205       other_user_data = synchro->comm.src_data;
206     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
207       other_user_data = synchro->comm.dst_data;
208     }
209     if (synchro->comm.type == type &&
210         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
211         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
212       XBT_DEBUG("Found a matching communication synchro %p", synchro);
213       synchro->comm.refcount++;
214
215       return synchro;
216     }
217     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
218               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
219               synchro, (int)synchro->comm.type, (int)type);
220   }
221   XBT_DEBUG("No matching communication synchro found");
222   return NULL;
223 }
224 /******************************************************************************/
225 /*                          Communication synchros                            */
226 /******************************************************************************/
227
228 /**
229  *  \brief Creates a new communicate synchro
230  *  \param type The direction of communication (comm_send, comm_recv)
231  *  \return The new communicate synchro
232  */
233 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
234 {
235   smx_synchro_t synchro;
236
237   /* alloc structures */
238   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
239
240   synchro->type = SIMIX_SYNC_COMMUNICATE;
241   synchro->state = SIMIX_WAITING;
242
243   /* set communication */
244   synchro->comm.type = type;
245   synchro->comm.refcount = 1;
246   synchro->comm.src_data=NULL;
247   synchro->comm.dst_data=NULL;
248
249   synchro->category = NULL;
250
251   XBT_DEBUG("Create communicate synchro %p", synchro);
252
253   return synchro;
254 }
255
256 /**
257  *  \brief Destroy a communicate synchro
258  *  \param synchro The communicate synchro to be destroyed
259  */
260 void SIMIX_comm_destroy(smx_synchro_t synchro)
261 {
262   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
263             synchro, synchro->comm.refcount, (int)synchro->state);
264
265   if (synchro->comm.refcount <= 0) {
266     xbt_backtrace_display_current();
267     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
268             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
269   }
270   synchro->comm.refcount--;
271   if (synchro->comm.refcount > 0)
272       return;
273   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
274       synchro->comm.refcount);
275
276   xbt_free(synchro->name);
277   SIMIX_comm_destroy_internal_actions(synchro);
278
279   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
280     /* the communication has failed and was detached:
281      * we have to free the buffer */
282     if (synchro->comm.clean_fun) {
283       synchro->comm.clean_fun(synchro->comm.src_buff);
284     }
285     synchro->comm.src_buff = NULL;
286   }
287
288   if(synchro->comm.mbox)
289     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
290
291   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
292 }
293
294 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
295 {
296   if (synchro->comm.surf_comm){
297     synchro->comm.surf_comm->unref();
298     synchro->comm.surf_comm = NULL;
299   }
300
301   if (synchro->comm.src_timeout){
302     synchro->comm.src_timeout->unref();
303     synchro->comm.src_timeout = NULL;
304   }
305
306   if (synchro->comm.dst_timeout){
307     synchro->comm.dst_timeout->unref();
308     synchro->comm.dst_timeout = NULL;
309   }
310 }
311
312 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_mailbox_t mbox,
313                                   double task_size, double rate,
314                                   void *src_buff, size_t src_buff_size,
315                                   int (*match_fun)(void *, void *,smx_synchro_t),
316                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
317           void *data, double timeout){
318   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, mbox, task_size, rate,
319                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
320                data, 0);
321   SIMCALL_SET_MC_VALUE(simcall, 0);
322   simcall_HANDLER_comm_wait(simcall, comm, timeout);
323 }
324 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_mailbox_t mbox,
325                                   double task_size, double rate,
326                                   void *src_buff, size_t src_buff_size,
327                                   int (*match_fun)(void *, void *,smx_synchro_t),
328                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
329                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
330                           void *data, int detached)
331 {
332   XBT_DEBUG("send from %p", mbox);
333
334   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
335   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
336
337   /* Look for communication synchro matching our needs. We also provide a description of
338    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
339    *
340    * If it is not found then push our communication into the rendez-vous point */
341   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
342
343   if (!other_synchro) {
344     other_synchro = this_synchro;
345
346     if (mbox->permanent_receiver!=NULL){
347       //this mailbox is for small messages, which have to be sent right now
348       other_synchro->state = SIMIX_READY;
349       other_synchro->comm.dst_proc=mbox->permanent_receiver;
350       other_synchro->comm.refcount++;
351       xbt_fifo_push(mbox->done_comm_fifo,other_synchro);
352       other_synchro->comm.mbox=mbox;
353       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", mbox, &(other_synchro->comm));
354
355     }else{
356       SIMIX_mbox_push(mbox, this_synchro);
357     }
358   } else {
359     XBT_DEBUG("Receive already pushed");
360
361     SIMIX_comm_destroy(this_synchro);
362
363     other_synchro->state = SIMIX_READY;
364     other_synchro->comm.type = SIMIX_COMM_READY;
365
366   }
367   xbt_fifo_push(src_proc->comms, other_synchro);
368
369   /* if the communication synchro is detached then decrease the refcount
370    * by one, so it will be eliminated by the receiver's destroy call */
371   if (detached) {
372     other_synchro->comm.detached = 1;
373     other_synchro->comm.refcount--;
374     other_synchro->comm.clean_fun = clean_fun;
375   } else {
376     other_synchro->comm.clean_fun = NULL;
377   }
378
379   /* Setup the communication synchro */
380   other_synchro->comm.src_proc = src_proc;
381   other_synchro->comm.task_size = task_size;
382   other_synchro->comm.rate = rate;
383   other_synchro->comm.src_buff = src_buff;
384   other_synchro->comm.src_buff_size = src_buff_size;
385   other_synchro->comm.src_data = data;
386
387   other_synchro->comm.match_fun = match_fun;
388   other_synchro->comm.copy_data_fun = copy_data_fun;
389
390
391   if (MC_is_active() || MC_record_replay_is_active()) {
392     other_synchro->state = SIMIX_RUNNING;
393     return (detached ? NULL : other_synchro);
394   }
395
396   SIMIX_comm_start(other_synchro);
397   return (detached ? NULL : other_synchro);
398 }
399
400 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
401                          void *dst_buff, size_t *dst_buff_size,
402                          int (*match_fun)(void *, void *, smx_synchro_t),
403                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
404                          void *data, double timeout, double rate)
405 {
406   smx_synchro_t comm = SIMIX_comm_irecv(receiver, mbox, dst_buff,
407                            dst_buff_size, match_fun, copy_data_fun, data, rate);
408   SIMCALL_SET_MC_VALUE(simcall, 0);
409   simcall_HANDLER_comm_wait(simcall, comm, timeout);
410 }
411
412 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_mailbox_t mbox,
413     void *dst_buff, size_t *dst_buff_size,
414     int (*match_fun)(void *, void *, smx_synchro_t),
415     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
416     void *data, double rate)
417 {
418   return SIMIX_comm_irecv(receiver, mbox, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
419 }
420
421 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_mailbox_t mbox, void *dst_buff, size_t *dst_buff_size,
422     int (*match_fun)(void *, void *, smx_synchro_t),
423     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
424     void *data, double rate)
425 {
426   XBT_DEBUG("recv from %p %p", mbox, mbox->comm_fifo);
427   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
428
429   smx_synchro_t other_synchro;
430   //communication already done, get it inside the fifo of completed comms
431   if (mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0) {
432
433     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
434     //find a match in the already received fifo
435     other_synchro = SIMIX_fifo_get_comm(mbox->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
436     //if not found, assume the receiver came first, register it to the mailbox in the classical way
437     if (!other_synchro)  {
438       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
439       other_synchro = this_synchro;
440       SIMIX_mbox_push(mbox, this_synchro);
441     } else {
442       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
443         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
444         other_synchro->state = SIMIX_DONE;
445         other_synchro->comm.type = SIMIX_COMM_DONE;
446         other_synchro->comm.mbox = NULL;
447       }
448       other_synchro->comm.refcount--;
449       SIMIX_comm_destroy(this_synchro);
450     }
451   } else {
452     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
453
454     /* Look for communication synchro matching our needs. We also provide a description of
455      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
456      *
457      * If it is not found then push our communication into the rendez-vous point */
458     other_synchro = SIMIX_fifo_get_comm(mbox->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
459
460     if (!other_synchro) {
461       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(mbox->comm_fifo));
462       other_synchro = this_synchro;
463       SIMIX_mbox_push(mbox, this_synchro);
464     } else {
465       SIMIX_comm_destroy(this_synchro);
466       other_synchro->state = SIMIX_READY;
467       other_synchro->comm.type = SIMIX_COMM_READY;
468       //other_synchro->comm.refcount--;
469     }
470     xbt_fifo_push(dst_proc->comms, other_synchro);
471   }
472
473   /* Setup communication synchro */
474   other_synchro->comm.dst_proc = dst_proc;
475   other_synchro->comm.dst_buff = dst_buff;
476   other_synchro->comm.dst_buff_size = dst_buff_size;
477   other_synchro->comm.dst_data = data;
478
479   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
480     other_synchro->comm.rate = rate;
481
482   other_synchro->comm.match_fun = match_fun;
483   other_synchro->comm.copy_data_fun = copy_data_fun;
484
485   if (MC_is_active() || MC_record_replay_is_active()) {
486     other_synchro->state = SIMIX_RUNNING;
487     return other_synchro;
488   }
489
490   SIMIX_comm_start(other_synchro);
491   return other_synchro;
492 }
493
494 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_mailbox_t mbox,
495                                    int type, int src, int tag,
496                                    int (*match_fun)(void *, void *, smx_synchro_t),
497                                    void *data){
498   return SIMIX_comm_iprobe(simcall->issuer, mbox, type, src, tag, match_fun, data);
499 }
500
501 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_mailbox_t mbox, int type, int src,
502                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
503 {
504   XBT_DEBUG("iprobe from %p %p", mbox, mbox->comm_fifo);
505   smx_synchro_t this_synchro;
506   int smx_type;
507   if(type == 1){
508     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
509     smx_type = SIMIX_COMM_RECEIVE;
510   } else{
511     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
512     smx_type = SIMIX_COMM_SEND;
513   } 
514   smx_synchro_t other_synchro=NULL;
515   if(mbox->permanent_receiver && xbt_fifo_size(mbox->done_comm_fifo)!=0){
516     //find a match in the already received fifo
517       XBT_DEBUG("first try in the perm recv mailbox");
518
519     other_synchro = SIMIX_fifo_probe_comm(
520       mbox->done_comm_fifo, (e_smx_comm_type_t) smx_type,
521       match_fun, data, this_synchro);
522   }
523  // }else{
524     if(!other_synchro){
525         XBT_DEBUG("try in the normal mailbox");
526         other_synchro = SIMIX_fifo_probe_comm(
527           mbox->comm_fifo, (e_smx_comm_type_t) smx_type,
528           match_fun, data, this_synchro);
529     }
530 //  }
531   if(other_synchro)other_synchro->comm.refcount--;
532
533   SIMIX_comm_destroy(this_synchro);
534   return other_synchro;
535 }
536
537 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
538 {
539   /* the simcall may be a wait, a send or a recv */
540   surf_action_t sleep;
541
542   /* Associate this simcall to the wait synchro */
543   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
544
545   xbt_fifo_push(synchro->simcalls, simcall);
546   simcall->issuer->waiting_synchro = synchro;
547
548   if (MC_is_active() || MC_record_replay_is_active()) {
549     int idx = SIMCALL_GET_MC_VALUE(simcall);
550     if (idx == 0) {
551       synchro->state = SIMIX_DONE;
552     } else {
553       /* If we reached this point, the wait simcall must have a timeout */
554       /* Otherwise it shouldn't be enabled and executed by the MC */
555       if (timeout == -1)
556         THROW_IMPOSSIBLE;
557
558       if (synchro->comm.src_proc == simcall->issuer)
559         synchro->state = SIMIX_SRC_TIMEOUT;
560       else
561         synchro->state = SIMIX_DST_TIMEOUT;
562     }
563
564     SIMIX_comm_finish(synchro);
565     return;
566   }
567
568   /* If the synchro has already finish perform the error handling, */
569   /* otherwise set up a waiting timeout on the right side          */
570   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
571     SIMIX_comm_finish(synchro);
572   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
573     sleep = surf_host_sleep(simcall->issuer->host, timeout);
574     sleep->setData(synchro);
575
576     if (simcall->issuer == synchro->comm.src_proc)
577       synchro->comm.src_timeout = sleep;
578     else
579       synchro->comm.dst_timeout = sleep;
580   }
581 }
582
583 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
584 {
585   if(MC_is_active() || MC_record_replay_is_active()){
586     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
587     if(simcall_comm_test__get__result(simcall)){
588       synchro->state = SIMIX_DONE;
589       xbt_fifo_push(synchro->simcalls, simcall);
590       SIMIX_comm_finish(synchro);
591     }else{
592       SIMIX_simcall_answer(simcall);
593     }
594     return;
595   }
596
597   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
598   if (simcall_comm_test__get__result(simcall)) {
599     xbt_fifo_push(synchro->simcalls, simcall);
600     SIMIX_comm_finish(synchro);
601   } else {
602     SIMIX_simcall_answer(simcall);
603   }
604 }
605
606 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
607 {
608   unsigned int cursor;
609   smx_synchro_t synchro;
610   simcall_comm_testany__set__result(simcall, -1);
611
612   if (MC_is_active() || MC_record_replay_is_active()){
613     int idx = SIMCALL_GET_MC_VALUE(simcall);
614     if(idx == -1){
615       SIMIX_simcall_answer(simcall);
616     }else{
617       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
618       simcall_comm_testany__set__result(simcall, idx);
619       xbt_fifo_push(synchro->simcalls, simcall);
620       synchro->state = SIMIX_DONE;
621       SIMIX_comm_finish(synchro);
622     }
623     return;
624   }
625
626   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
627     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
628       simcall_comm_testany__set__result(simcall, cursor);
629       xbt_fifo_push(synchro->simcalls, simcall);
630       SIMIX_comm_finish(synchro);
631       return;
632     }
633   }
634   SIMIX_simcall_answer(simcall);
635 }
636
637 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
638 {
639   smx_synchro_t synchro;
640   unsigned int cursor = 0;
641
642   if (MC_is_active() || MC_record_replay_is_active()){
643     int idx = SIMCALL_GET_MC_VALUE(simcall);
644     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
645     xbt_fifo_push(synchro->simcalls, simcall);
646     simcall_comm_waitany__set__result(simcall, idx);
647     synchro->state = SIMIX_DONE;
648     SIMIX_comm_finish(synchro);
649     return;
650   }
651
652   xbt_dynar_foreach(synchros, cursor, synchro){
653     /* associate this simcall to the the synchro */
654     xbt_fifo_push(synchro->simcalls, simcall);
655
656     /* see if the synchro is already finished */
657     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
658       SIMIX_comm_finish(synchro);
659       break;
660     }
661   }
662 }
663
664 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
665 {
666   smx_synchro_t synchro;
667   unsigned int cursor = 0;
668   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
669
670   xbt_dynar_foreach(synchros, cursor, synchro) {
671     xbt_fifo_remove(synchro->simcalls, simcall);
672   }
673 }
674
675 /**
676  *  \brief Starts the simulation of a communication synchro.
677  *  \param synchro the communication synchro
678  */
679 static inline void SIMIX_comm_start(smx_synchro_t synchro)
680 {
681   /* If both the sender and the receiver are already there, start the communication */
682   if (synchro->state == SIMIX_READY) {
683
684     sg_host_t sender = synchro->comm.src_proc->host;
685     sg_host_t receiver = synchro->comm.dst_proc->host;
686
687     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
688               sg_host_get_name(sender), sg_host_get_name(receiver));
689
690     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
691                                                             sender, receiver,
692                                                             synchro->comm.task_size, synchro->comm.rate);
693
694     synchro->comm.surf_comm->setData(synchro);
695
696     synchro->state = SIMIX_RUNNING;
697
698     /* If a link is failed, detect it immediately */
699     if (synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
700       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
701                 sg_host_get_name(sender), sg_host_get_name(receiver));
702       synchro->state = SIMIX_LINK_FAILURE;
703       SIMIX_comm_destroy_internal_actions(synchro);
704     }
705
706     /* If any of the process is suspend, create the synchro but stop its execution,
707        it will be restarted when the sender process resume */
708     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
709         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
710       /* FIXME: check what should happen with the synchro state */
711
712       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
713         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
714                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
715       else
716         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
717                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
718
719       synchro->comm.surf_comm->suspend();
720
721     }
722   }
723 }
724
725 /**
726  * \brief Answers the SIMIX simcalls associated to a communication synchro.
727  * \param synchro a finished communication synchro
728  */
729 void SIMIX_comm_finish(smx_synchro_t synchro)
730 {
731   unsigned int destroy_count = 0;
732   smx_simcall_t simcall;
733
734   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
735
736     /* If a waitany simcall is waiting for this synchro to finish, then remove
737        it from the other synchros in the waitany list. Afterwards, get the
738        position of the actual synchro in the waitany dynar and
739        return it as the result of the simcall */
740
741     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
742       continue; // if process handling comm is killed
743     if (simcall->call == SIMCALL_COMM_WAITANY) {
744       SIMIX_waitany_remove_simcall_from_actions(simcall);
745       if (!MC_is_active() && !MC_record_replay_is_active())
746         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
747     }
748
749     /* If the synchro is still in a rendez-vous point then remove from it */
750     if (synchro->comm.mbox)
751       SIMIX_mbox_remove(synchro->comm.mbox, synchro);
752
753     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
754
755     /* Check out for errors */
756
757     if (simcall->issuer->host->isOff()) {
758       simcall->issuer->context->iwannadie = 1;
759       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
760     } else
761
762     switch (synchro->state) {
763
764     case SIMIX_DONE:
765       XBT_DEBUG("Communication %p complete!", synchro);
766       SIMIX_comm_copy_data(synchro);
767       break;
768
769     case SIMIX_SRC_TIMEOUT:
770       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
771                     "Communication timeouted because of sender");
772       break;
773
774     case SIMIX_DST_TIMEOUT:
775       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
776                     "Communication timeouted because of receiver");
777       break;
778
779     case SIMIX_SRC_HOST_FAILURE:
780       if (simcall->issuer == synchro->comm.src_proc)
781         simcall->issuer->context->iwannadie = 1;
782 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
783       else
784         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
785       break;
786
787     case SIMIX_DST_HOST_FAILURE:
788       if (simcall->issuer == synchro->comm.dst_proc)
789         simcall->issuer->context->iwannadie = 1;
790 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
791       else
792         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
793       break;
794
795     case SIMIX_LINK_FAILURE:
796
797       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
798                 synchro,
799                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
800                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
801                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
802       if (synchro->comm.src_proc == simcall->issuer) {
803         XBT_DEBUG("I'm source");
804       } else if (synchro->comm.dst_proc == simcall->issuer) {
805         XBT_DEBUG("I'm dest");
806       } else {
807         XBT_DEBUG("I'm neither source nor dest");
808       }
809       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
810       break;
811
812     case SIMIX_CANCELED:
813       if (simcall->issuer == synchro->comm.dst_proc)
814         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
815                       "Communication canceled by the sender");
816       else
817         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
818                       "Communication canceled by the receiver");
819       break;
820
821     default:
822       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
823     }
824
825     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
826     if (simcall->issuer->doexception) {
827       if (simcall->call == SIMCALL_COMM_WAITANY) {
828         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
829       }
830       else if (simcall->call == SIMCALL_COMM_TESTANY) {
831         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
832       }
833     }
834
835     if (simcall->issuer->host->isOff()) {
836       simcall->issuer->context->iwannadie = 1;
837     }
838
839     simcall->issuer->waiting_synchro = NULL;
840     xbt_fifo_remove(simcall->issuer->comms, synchro);
841     if(synchro->comm.detached){
842       if(simcall->issuer == synchro->comm.src_proc){
843         if(synchro->comm.dst_proc)
844           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
845       }
846       if(simcall->issuer == synchro->comm.dst_proc){
847         if(synchro->comm.src_proc)
848           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
849       }
850     }
851     SIMIX_simcall_answer(simcall);
852     destroy_count++;
853   }
854
855   while (destroy_count-- > 0)
856     SIMIX_comm_destroy(synchro);
857 }
858
859 /**
860  * \brief This function is called when a Surf communication synchro is finished.
861  * \param synchro the corresponding Simix communication
862  */
863 void SIMIX_post_comm(smx_synchro_t synchro)
864 {
865   /* Update synchro state */
866   if (synchro->comm.src_timeout &&
867       synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::done)
868     synchro->state = SIMIX_SRC_TIMEOUT;
869   else if (synchro->comm.dst_timeout &&
870     synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::done)
871     synchro->state = SIMIX_DST_TIMEOUT;
872   else if (synchro->comm.src_timeout &&
873     synchro->comm.src_timeout->getState() == simgrid::surf::Action::State::failed)
874     synchro->state = SIMIX_SRC_HOST_FAILURE;
875   else if (synchro->comm.dst_timeout &&
876       synchro->comm.dst_timeout->getState() == simgrid::surf::Action::State::failed)
877     synchro->state = SIMIX_DST_HOST_FAILURE;
878   else if (synchro->comm.surf_comm &&
879     synchro->comm.surf_comm->getState() == simgrid::surf::Action::State::failed) {
880     XBT_DEBUG("Puta madre. Surf says that the link broke");
881     synchro->state = SIMIX_LINK_FAILURE;
882   } else
883     synchro->state = SIMIX_DONE;
884
885   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
886             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
887
888   /* destroy the surf actions associated with the Simix communication */
889   SIMIX_comm_destroy_internal_actions(synchro);
890
891   /* if there are simcalls associated with the synchro, then answer them */
892   if (xbt_fifo_size(synchro->simcalls)) {
893     SIMIX_comm_finish(synchro);
894   }
895 }
896
897 void SIMIX_comm_cancel(smx_synchro_t synchro)
898 {
899   /* if the synchro is a waiting state means that it is still in a mbox */
900   /* so remove from it and delete it */
901   if (synchro->state == SIMIX_WAITING) {
902     SIMIX_mbox_remove(synchro->comm.mbox, synchro);
903     synchro->state = SIMIX_CANCELED;
904   }
905   else if (!MC_is_active() /* when running the MC there are no surf actions */
906            && !MC_record_replay_is_active()
907            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
908
909     synchro->comm.surf_comm->cancel();
910   }
911 }
912
913 void SIMIX_comm_suspend(smx_synchro_t synchro)
914 {
915   /*FIXME: shall we suspend also the timeout synchro? */
916   if (synchro->comm.surf_comm)
917     synchro->comm.surf_comm->suspend();
918   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
919 }
920
921 void SIMIX_comm_resume(smx_synchro_t synchro)
922 {
923   /*FIXME: check what happen with the timeouts */
924   if (synchro->comm.surf_comm)
925     synchro->comm.surf_comm->resume();
926   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
927 }
928
929
930 /************* synchro Getters **************/
931
932 /**
933  *  \brief get the amount remaining from the communication
934  *  \param synchro The communication
935  */
936 double SIMIX_comm_get_remains(smx_synchro_t synchro)
937 {
938   double remains;
939
940   if(!synchro){
941     return 0;
942   }
943
944   switch (synchro->state) {
945
946   case SIMIX_RUNNING:
947     remains = synchro->comm.surf_comm->getRemains();
948     break;
949
950   case SIMIX_WAITING:
951   case SIMIX_READY:
952     remains = 0; /*FIXME: check what should be returned */
953     break;
954
955   default:
956     remains = 0; /*FIXME: is this correct? */
957     break;
958   }
959   return remains;
960 }
961
962 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
963 {
964   return synchro->state;
965 }
966
967 /**
968  *  \brief Return the user data associated to the sender of the communication
969  *  \param synchro The communication
970  *  \return the user data
971  */
972 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
973 {
974   return synchro->comm.src_data;
975 }
976
977 /**
978  *  \brief Return the user data associated to the receiver of the communication
979  *  \param synchro The communication
980  *  \return the user data
981  */
982 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
983 {
984   return synchro->comm.dst_data;
985 }
986
987 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
988 {
989   return synchro->comm.src_proc;
990 }
991
992 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
993 {
994   return synchro->comm.dst_proc;
995 }
996
997 /******************************************************************************/
998 /*                    SIMIX_comm_copy_data callbacks                       */
999 /******************************************************************************/
1000 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1001   &SIMIX_comm_copy_pointer_callback;
1002
1003 void
1004 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1005 {
1006   SIMIX_comm_copy_data_callback = callback;
1007 }
1008
1009 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1010 {
1011   xbt_assert((buff_size == sizeof(void *)),
1012              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1013   *(void **) (comm->comm.dst_buff) = buff;
1014 }
1015
1016 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1017 {
1018   XBT_DEBUG("Copy the data over");
1019   memcpy(comm->comm.dst_buff, buff, buff_size);
1020   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1021     xbt_free(buff);
1022     comm->comm.src_buff = NULL;
1023   }
1024 }
1025
1026
1027 /**
1028  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1029  *  \param comm The communication
1030  */
1031 void SIMIX_comm_copy_data(smx_synchro_t comm)
1032 {
1033   size_t buff_size = comm->comm.src_buff_size;
1034   /* If there is no data to be copy then return */
1035   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1036     return;
1037
1038   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1039             comm,
1040             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1041             comm->comm.src_buff,
1042             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1043             comm->comm.dst_buff, buff_size);
1044
1045   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1046   if (comm->comm.dst_buff_size)
1047     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1048
1049   /* Update the receiver's buffer size to the copied amount */
1050   if (comm->comm.dst_buff_size)
1051     *comm->comm.dst_buff_size = buff_size;
1052
1053   if (buff_size > 0){
1054       if(comm->comm.copy_data_fun)
1055         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1056       else
1057         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1058   }
1059
1060
1061   /* Set the copied flag so we copy data only once */
1062   /* (this function might be called from both communication ends) */
1063   comm->comm.copied = 1;
1064 }