Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
129cfd7c93bae821cc96b9cbc9747ef46f76d10f
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "mc/mc_replay.h"
11 #include "xbt/dict.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
14                                 "SIMIX network-related synchronization");
15
16 static xbt_dict_t rdv_points = NULL;
17 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
18
19 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
20 static void SIMIX_comm_copy_data(smx_synchro_t comm);
21 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
22 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
23 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
24                                         int (*match_fun)(void *, void *,smx_synchro_t),
25                                         void *user_data, smx_synchro_t my_synchro);
26 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
27                                         int (*match_fun)(void *, void *,smx_synchro_t),
28                                         void *user_data, smx_synchro_t my_synchro);
29 static void SIMIX_rdv_free(void *data);
30 static void SIMIX_comm_start(smx_synchro_t synchro);
31
32 void SIMIX_network_init(void)
33 {
34   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
35 }
36
37 void SIMIX_network_exit(void)
38 {
39   xbt_dict_free(&rdv_points);
40 }
41
42 /******************************************************************************/
43 /*                           Rendez-Vous Points                               */
44 /******************************************************************************/
45
46 smx_rdv_t SIMIX_rdv_create(const char *name)
47 {
48   /* two processes may have pushed the same rdv_create simcall at the same time */
49   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
50
51   if (!rdv) {
52     rdv = xbt_new0(s_smx_rvpoint_t, 1);
53     rdv->name = name ? xbt_strdup(name) : NULL;
54     rdv->comm_fifo = xbt_fifo_new();
55     rdv->done_comm_fifo = xbt_fifo_new();
56     rdv->permanent_receiver=NULL;
57
58     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
59
60     if (rdv->name)
61       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
62   }
63   return rdv;
64 }
65
66 void SIMIX_rdv_destroy(smx_rdv_t rdv)
67 {
68   if (rdv->name)
69     xbt_dict_remove(rdv_points, rdv->name);
70 }
71
72 void SIMIX_rdv_free(void *data)
73 {
74   XBT_DEBUG("rdv free %p", data);
75   smx_rdv_t rdv = (smx_rdv_t) data;
76   xbt_free(rdv->name);
77   xbt_fifo_free(rdv->comm_fifo);
78   xbt_fifo_free(rdv->done_comm_fifo);
79
80   xbt_free(rdv);
81 }
82
83 xbt_dict_t SIMIX_get_rdv_points()
84 {
85   return rdv_points;
86 }
87
88 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
89 {
90   return xbt_dict_get_or_null(rdv_points, name);
91 }
92
93 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, sg_host_t host)
94 {
95   smx_synchro_t comm = NULL;
96   xbt_fifo_item_t item = NULL;
97   int count = 0;
98
99   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
100     if (comm->comm.src_proc->host == host)
101       count++;
102   }
103
104   return count;
105 }
106
107 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
108 {
109   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
110 }
111
112 /**
113  *  \brief get the receiver (process associated to the mailbox)
114  *  \param rdv The rendez-vous point
115  *  \return process The receiving process (NULL if not set)
116  */
117 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
118 {
119   return rdv->permanent_receiver;
120 }
121
122 /**
123  *  \brief set the receiver of the rendez vous point to allow eager sends
124  *  \param rdv The rendez-vous point
125  *  \param process The receiving process
126  */
127 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
128 {
129   rdv->permanent_receiver=process;
130 }
131
132 /**
133  *  \brief Pushes a communication synchro into a rendez-vous point
134  *  \param rdv The rendez-vous point
135  *  \param comm The communication synchro
136  */
137 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
138 {
139   xbt_fifo_push(rdv->comm_fifo, comm);
140   comm->comm.rdv = rdv;
141 }
142
143 /**
144  *  \brief Removes a communication synchro from a rendez-vous point
145  *  \param rdv The rendez-vous point
146  *  \param comm The communication synchro
147  */
148 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
149 {
150   xbt_fifo_remove(rdv->comm_fifo, comm);
151   comm->comm.rdv = NULL;
152 }
153
154 /**
155  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
156  *  \param type The type of communication we are looking for (comm_send, comm_recv)
157  *  \return The communication synchro if found, NULL otherwise
158  */
159 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
160                                  int (*match_fun)(void *, void *,smx_synchro_t),
161                                  void *this_user_data, smx_synchro_t my_synchro)
162 {
163   smx_synchro_t synchro;
164   xbt_fifo_item_t item;
165   void* other_user_data = NULL;
166
167   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
168     if (synchro->comm.type == SIMIX_COMM_SEND) {
169       other_user_data = synchro->comm.src_data;
170     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
171       other_user_data = synchro->comm.dst_data;
172     }
173     if (synchro->comm.type == type &&
174         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
175         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
176       XBT_DEBUG("Found a matching communication synchro %p", synchro);
177       xbt_fifo_remove_item(fifo, item);
178       xbt_fifo_free_item(item);
179       synchro->comm.refcount++;
180 #ifdef HAVE_MC
181       synchro->comm.rdv_cpy = synchro->comm.rdv;
182 #endif
183       synchro->comm.rdv = NULL;
184       return synchro;
185     }
186     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
187               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
188               synchro, (int)synchro->comm.type, (int)type);
189   }
190   XBT_DEBUG("No matching communication synchro found");
191   return NULL;
192 }
193
194
195 /**
196  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
197  *  \param type The type of communication we are looking for (comm_send, comm_recv)
198  *  \return The communication synchro if found, NULL otherwise
199  */
200 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
201                                  int (*match_fun)(void *, void *,smx_synchro_t),
202                                  void *this_user_data, smx_synchro_t my_synchro)
203 {
204   smx_synchro_t synchro;
205   xbt_fifo_item_t item;
206   void* other_user_data = NULL;
207
208   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
209     if (synchro->comm.type == SIMIX_COMM_SEND) {
210       other_user_data = synchro->comm.src_data;
211     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
212       other_user_data = synchro->comm.dst_data;
213     }
214     if (synchro->comm.type == type &&
215         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
216         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
217       XBT_DEBUG("Found a matching communication synchro %p", synchro);
218       synchro->comm.refcount++;
219
220       return synchro;
221     }
222     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
223               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
224               synchro, (int)synchro->comm.type, (int)type);
225   }
226   XBT_DEBUG("No matching communication synchro found");
227   return NULL;
228 }
229 /******************************************************************************/
230 /*                          Communication synchros                            */
231 /******************************************************************************/
232
233 /**
234  *  \brief Creates a new communicate synchro
235  *  \param type The direction of communication (comm_send, comm_recv)
236  *  \return The new communicate synchro
237  */
238 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
239 {
240   smx_synchro_t synchro;
241
242   /* alloc structures */
243   synchro = xbt_mallocator_get(simix_global->synchro_mallocator);
244
245   synchro->type = SIMIX_SYNC_COMMUNICATE;
246   synchro->state = SIMIX_WAITING;
247
248   /* set communication */
249   synchro->comm.type = type;
250   synchro->comm.refcount = 1;
251   synchro->comm.src_data=NULL;
252   synchro->comm.dst_data=NULL;
253
254
255 #ifdef HAVE_LATENCY_BOUND_TRACKING
256   //initialize with unknown value
257   synchro->latency_limited = -1;
258 #endif
259
260   synchro->category = NULL;
261
262   XBT_DEBUG("Create communicate synchro %p", synchro);
263   ++smx_total_comms;
264
265   return synchro;
266 }
267
268 /**
269  *  \brief Destroy a communicate synchro
270  *  \param synchro The communicate synchro to be destroyed
271  */
272 void SIMIX_comm_destroy(smx_synchro_t synchro)
273 {
274   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
275             synchro, synchro->comm.refcount, (int)synchro->state);
276
277   if (synchro->comm.refcount <= 0) {
278     xbt_backtrace_display_current();
279     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
280             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
281   }
282   synchro->comm.refcount--;
283   if (synchro->comm.refcount > 0)
284       return;
285   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
286       synchro->comm.refcount);
287
288 #ifdef HAVE_LATENCY_BOUND_TRACKING
289   synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
290 #endif
291
292   xbt_free(synchro->name);
293   SIMIX_comm_destroy_internal_actions(synchro);
294
295   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
296     /* the communication has failed and was detached:
297      * we have to free the buffer */
298     if (synchro->comm.clean_fun) {
299       synchro->comm.clean_fun(synchro->comm.src_buff);
300     }
301     synchro->comm.src_buff = NULL;
302   }
303
304   if(synchro->comm.rdv)
305     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
306
307   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
308 }
309
310 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
311 {
312   if (synchro->comm.surf_comm){
313 #ifdef HAVE_LATENCY_BOUND_TRACKING
314     synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
315 #endif
316     surf_action_unref(synchro->comm.surf_comm);
317     synchro->comm.surf_comm = NULL;
318   }
319
320   if (synchro->comm.src_timeout){
321     surf_action_unref(synchro->comm.src_timeout);
322     synchro->comm.src_timeout = NULL;
323   }
324
325   if (synchro->comm.dst_timeout){
326     surf_action_unref(synchro->comm.dst_timeout);
327     synchro->comm.dst_timeout = NULL;
328   }
329 }
330
331 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
332                                   double task_size, double rate,
333                                   void *src_buff, size_t src_buff_size,
334                                   int (*match_fun)(void *, void *,smx_synchro_t),
335                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
336                                   void *data, double timeout){
337   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, rdv, task_size, rate,
338                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
339                                        data, 0);
340   SIMCALL_SET_MC_VALUE(simcall, 0);
341   simcall_HANDLER_comm_wait(simcall, comm, timeout);
342 }
343 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_rdv_t rdv,
344                                   double task_size, double rate,
345                                   void *src_buff, size_t src_buff_size,
346                                   int (*match_fun)(void *, void *,smx_synchro_t),
347                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
348                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
349                                                   void *data, int detached)
350 {
351   XBT_DEBUG("send from %p", rdv);
352
353   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
354   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
355
356   /* Look for communication synchro matching our needs. We also provide a description of
357    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
358    *
359    * If it is not found then push our communication into the rendez-vous point */
360   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
361
362   if (!other_synchro) {
363     other_synchro = this_synchro;
364
365     if (rdv->permanent_receiver!=NULL){
366       //this mailbox is for small messages, which have to be sent right now
367       other_synchro->state = SIMIX_READY;
368       other_synchro->comm.dst_proc=rdv->permanent_receiver;
369       other_synchro->comm.refcount++;
370       xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
371       other_synchro->comm.rdv=rdv;
372       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
373
374     }else{
375       SIMIX_rdv_push(rdv, this_synchro);
376     }
377   } else {
378     XBT_DEBUG("Receive already pushed");
379
380     SIMIX_comm_destroy(this_synchro);
381     --smx_total_comms; // this creation was a pure waste
382
383     other_synchro->state = SIMIX_READY;
384     other_synchro->comm.type = SIMIX_COMM_READY;
385
386   }
387   xbt_fifo_push(src_proc->comms, other_synchro);
388
389   /* if the communication synchro is detached then decrease the refcount
390    * by one, so it will be eliminated by the receiver's destroy call */
391   if (detached) {
392     other_synchro->comm.detached = 1;
393     other_synchro->comm.refcount--;
394     other_synchro->comm.clean_fun = clean_fun;
395   } else {
396     other_synchro->comm.clean_fun = NULL;
397   }
398
399   /* Setup the communication synchro */
400   other_synchro->comm.src_proc = src_proc;
401   other_synchro->comm.task_size = task_size;
402   other_synchro->comm.rate = rate;
403   other_synchro->comm.src_buff = src_buff;
404   other_synchro->comm.src_buff_size = src_buff_size;
405   other_synchro->comm.src_data = data;
406
407   other_synchro->comm.match_fun = match_fun;
408   other_synchro->comm.copy_data_fun = copy_data_fun;
409
410
411   if (MC_is_active() || MC_record_replay_is_active()) {
412     other_synchro->state = SIMIX_RUNNING;
413     return (detached ? NULL : other_synchro);
414   }
415
416   SIMIX_comm_start(other_synchro);
417   return (detached ? NULL : other_synchro);
418 }
419
420 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_rdv_t rdv,
421                          void *dst_buff, size_t *dst_buff_size,
422                          int (*match_fun)(void *, void *, smx_synchro_t),
423                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
424                          void *data, double timeout, double rate)
425 {
426   smx_synchro_t comm = SIMIX_comm_irecv(receiver, rdv, dst_buff,
427                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
428   SIMCALL_SET_MC_VALUE(simcall, 0);
429   simcall_HANDLER_comm_wait(simcall, comm, timeout);
430 }
431
432 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_rdv_t rdv,
433                                   void *dst_buff, size_t *dst_buff_size,
434                                   int (*match_fun)(void *, void *, smx_synchro_t),
435                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
436                                   void *data, double rate)
437 {
438   return SIMIX_comm_irecv(receiver, rdv, dst_buff, dst_buff_size,
439                           match_fun, copy_data_fun, data, rate);
440 }
441
442 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
443                               void *dst_buff, size_t *dst_buff_size,
444                               int (*match_fun)(void *, void *, smx_synchro_t),
445                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
446                               void *data, double rate)
447 {
448   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
449   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
450
451   smx_synchro_t other_synchro;
452   //communication already done, get it inside the fifo of completed comms
453   //permanent receive v1
454   //int already_received=0;
455   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
456
457     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
458     //find a match in the already received fifo
459     other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
460     //if not found, assume the receiver came first, register it to the mailbox in the classical way
461     if (!other_synchro)  {
462       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
463       other_synchro = this_synchro;
464       SIMIX_rdv_push(rdv, this_synchro);
465     }else{
466       if(other_synchro->comm.surf_comm &&       SIMIX_comm_get_remains(other_synchro)==0.0)
467       {
468         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
469         other_synchro->state = SIMIX_DONE;
470         other_synchro->comm.type = SIMIX_COMM_DONE;
471         other_synchro->comm.rdv = NULL;
472       }/*else{
473          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
474          }*/
475       other_synchro->comm.refcount--;
476       SIMIX_comm_destroy(this_synchro);
477       --smx_total_comms; // this creation was a pure waste
478     }
479   }else{
480     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
481
482     /* Look for communication synchro matching our needs. We also provide a description of
483      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
484      *
485      * If it is not found then push our communication into the rendez-vous point */
486     other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
487
488     if (!other_synchro) {
489       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
490       other_synchro = this_synchro;
491       SIMIX_rdv_push(rdv, this_synchro);
492     } else {
493       SIMIX_comm_destroy(this_synchro);
494       --smx_total_comms; // this creation was a pure waste
495       other_synchro->state = SIMIX_READY;
496       other_synchro->comm.type = SIMIX_COMM_READY;
497       //other_synchro->comm.refcount--;
498     }
499     xbt_fifo_push(dst_proc->comms, other_synchro);
500   }
501
502   /* Setup communication synchro */
503   other_synchro->comm.dst_proc = dst_proc;
504   other_synchro->comm.dst_buff = dst_buff;
505   other_synchro->comm.dst_buff_size = dst_buff_size;
506   other_synchro->comm.dst_data = data;
507
508   if (rate != -1.0 &&
509       (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
510     other_synchro->comm.rate = rate;
511
512   other_synchro->comm.match_fun = match_fun;
513   other_synchro->comm.copy_data_fun = copy_data_fun;
514
515
516   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
517     SIMIX_comm_copy_data(other_synchro);*/
518
519
520   if (MC_is_active() || MC_record_replay_is_active()) {
521     other_synchro->state = SIMIX_RUNNING;
522     return other_synchro;
523   }
524
525   SIMIX_comm_start(other_synchro);
526   // }
527   return other_synchro;
528 }
529
530 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
531                                    int type, int src, int tag,
532                                    int (*match_fun)(void *, void *, smx_synchro_t),
533                                    void *data){
534   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
535 }
536
537 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
538                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
539 {
540   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
541   smx_synchro_t this_synchro;
542   int smx_type;
543   if(type == 1){
544     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
545     smx_type = SIMIX_COMM_RECEIVE;
546   } else{
547     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
548     smx_type = SIMIX_COMM_SEND;
549   } 
550   smx_synchro_t other_synchro=NULL;
551   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
552     //find a match in the already received fifo
553       XBT_DEBUG("first try in the perm recv mailbox");
554
555     other_synchro = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_synchro);
556   }
557  // }else{
558     if(!other_synchro){
559         XBT_DEBUG("try in the normal mailbox");
560         other_synchro = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_synchro);
561     }
562 //  }
563   if(other_synchro)other_synchro->comm.refcount--;
564
565   SIMIX_comm_destroy(this_synchro);
566   --smx_total_comms;
567   return other_synchro;
568 }
569
570 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
571 {
572   /* the simcall may be a wait, a send or a recv */
573   surf_action_t sleep;
574
575   /* Associate this simcall to the wait synchro */
576   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
577
578   xbt_fifo_push(synchro->simcalls, simcall);
579   simcall->issuer->waiting_synchro = synchro;
580
581   if (MC_is_active() || MC_record_replay_is_active()) {
582     int idx = SIMCALL_GET_MC_VALUE(simcall);
583     if (idx == 0) {
584       synchro->state = SIMIX_DONE;
585     } else {
586       /* If we reached this point, the wait simcall must have a timeout */
587       /* Otherwise it shouldn't be enabled and executed by the MC */
588       if (timeout == -1)
589         THROW_IMPOSSIBLE;
590
591       if (synchro->comm.src_proc == simcall->issuer)
592         synchro->state = SIMIX_SRC_TIMEOUT;
593       else
594         synchro->state = SIMIX_DST_TIMEOUT;
595     }
596
597     SIMIX_comm_finish(synchro);
598     return;
599   }
600
601   /* If the synchro has already finish perform the error handling, */
602   /* otherwise set up a waiting timeout on the right side          */
603   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
604     SIMIX_comm_finish(synchro);
605   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
606     sleep = surf_host_sleep(simcall->issuer->host, timeout);
607     surf_action_set_data(sleep, synchro);
608
609     if (simcall->issuer == synchro->comm.src_proc)
610       synchro->comm.src_timeout = sleep;
611     else
612       synchro->comm.dst_timeout = sleep;
613   }
614 }
615
616 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
617 {
618   if(MC_is_active() || MC_record_replay_is_active()){
619     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
620     if(simcall_comm_test__get__result(simcall)){
621       synchro->state = SIMIX_DONE;
622       xbt_fifo_push(synchro->simcalls, simcall);
623       SIMIX_comm_finish(synchro);
624     }else{
625       SIMIX_simcall_answer(simcall);
626     }
627     return;
628   }
629
630   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
631   if (simcall_comm_test__get__result(simcall)) {
632     xbt_fifo_push(synchro->simcalls, simcall);
633     SIMIX_comm_finish(synchro);
634   } else {
635     SIMIX_simcall_answer(simcall);
636   }
637 }
638
639 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
640 {
641   unsigned int cursor;
642   smx_synchro_t synchro;
643   simcall_comm_testany__set__result(simcall, -1);
644
645   if (MC_is_active() || MC_record_replay_is_active()){
646     int idx = SIMCALL_GET_MC_VALUE(simcall);
647     if(idx == -1){
648       SIMIX_simcall_answer(simcall);
649     }else{
650       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
651       simcall_comm_testany__set__result(simcall, idx);
652       xbt_fifo_push(synchro->simcalls, simcall);
653       synchro->state = SIMIX_DONE;
654       SIMIX_comm_finish(synchro);
655     }
656     return;
657   }
658
659   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
660     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
661       simcall_comm_testany__set__result(simcall, cursor);
662       xbt_fifo_push(synchro->simcalls, simcall);
663       SIMIX_comm_finish(synchro);
664       return;
665     }
666   }
667   SIMIX_simcall_answer(simcall);
668 }
669
670 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
671 {
672   smx_synchro_t synchro;
673   unsigned int cursor = 0;
674
675   if (MC_is_active() || MC_record_replay_is_active()){
676     int idx = SIMCALL_GET_MC_VALUE(simcall);
677     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
678     xbt_fifo_push(synchro->simcalls, simcall);
679     simcall_comm_waitany__set__result(simcall, idx);
680     synchro->state = SIMIX_DONE;
681     SIMIX_comm_finish(synchro);
682     return;
683   }
684
685   xbt_dynar_foreach(synchros, cursor, synchro){
686     /* associate this simcall to the the synchro */
687     xbt_fifo_push(synchro->simcalls, simcall);
688
689     /* see if the synchro is already finished */
690     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
691       SIMIX_comm_finish(synchro);
692       break;
693     }
694   }
695 }
696
697 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
698 {
699   smx_synchro_t synchro;
700   unsigned int cursor = 0;
701   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
702
703   xbt_dynar_foreach(synchros, cursor, synchro) {
704     xbt_fifo_remove(synchro->simcalls, simcall);
705   }
706 }
707
708 /**
709  *  \brief Starts the simulation of a communication synchro.
710  *  \param synchro the communication synchro
711  */
712 static XBT_INLINE void SIMIX_comm_start(smx_synchro_t synchro)
713 {
714   /* If both the sender and the receiver are already there, start the communication */
715   if (synchro->state == SIMIX_READY) {
716
717     sg_host_t sender = synchro->comm.src_proc->host;
718     sg_host_t receiver = synchro->comm.dst_proc->host;
719
720     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
721               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
722
723     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
724                                                                     sender, receiver,
725                                                                     synchro->comm.task_size, synchro->comm.rate);
726
727     surf_action_set_data(synchro->comm.surf_comm, synchro);
728
729     synchro->state = SIMIX_RUNNING;
730
731     /* If a link is failed, detect it immediately */
732     if (surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
733       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
734                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
735       synchro->state = SIMIX_LINK_FAILURE;
736       SIMIX_comm_destroy_internal_actions(synchro);
737     }
738
739     /* If any of the process is suspend, create the synchro but stop its execution,
740        it will be restarted when the sender process resume */
741     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
742         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
743       /* FIXME: check what should happen with the synchro state */
744
745       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
746         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
747                   SIMIX_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
748       else
749         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
750                   SIMIX_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
751
752       surf_action_suspend(synchro->comm.surf_comm);
753
754     }
755   }
756 }
757
758 /**
759  * \brief Answers the SIMIX simcalls associated to a communication synchro.
760  * \param synchro a finished communication synchro
761  */
762 void SIMIX_comm_finish(smx_synchro_t synchro)
763 {
764   unsigned int destroy_count = 0;
765   smx_simcall_t simcall;
766
767   while ((simcall = xbt_fifo_shift(synchro->simcalls))) {
768
769     /* If a waitany simcall is waiting for this synchro to finish, then remove
770        it from the other synchros in the waitany list. Afterwards, get the
771        position of the actual synchro in the waitany dynar and
772        return it as the result of the simcall */
773
774     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
775       continue; // if process handling comm is killed
776     if (simcall->call == SIMCALL_COMM_WAITANY) {
777       SIMIX_waitany_remove_simcall_from_actions(simcall);
778       if (!MC_is_active() && !MC_record_replay_is_active())
779         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
780     }
781
782     /* If the synchro is still in a rendez-vous point then remove from it */
783     if (synchro->comm.rdv)
784       SIMIX_rdv_remove(synchro->comm.rdv, synchro);
785
786     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
787
788     /* Check out for errors */
789
790     if (surf_host_get_state(surf_host_resource_priv(
791           simcall->issuer->host)) != SURF_RESOURCE_ON) {
792       simcall->issuer->context->iwannadie = 1;
793       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
794     } else
795
796     switch (synchro->state) {
797
798     case SIMIX_DONE:
799       XBT_DEBUG("Communication %p complete!", synchro);
800       SIMIX_comm_copy_data(synchro);
801       break;
802
803     case SIMIX_SRC_TIMEOUT:
804       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
805                     "Communication timeouted because of sender");
806       break;
807
808     case SIMIX_DST_TIMEOUT:
809       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
810                     "Communication timeouted because of receiver");
811       break;
812
813     case SIMIX_SRC_HOST_FAILURE:
814       if (simcall->issuer == synchro->comm.src_proc)
815         simcall->issuer->context->iwannadie = 1;
816 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
817       else
818         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
819       break;
820
821     case SIMIX_DST_HOST_FAILURE:
822       if (simcall->issuer == synchro->comm.dst_proc)
823         simcall->issuer->context->iwannadie = 1;
824 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
825       else
826         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
827       break;
828
829     case SIMIX_LINK_FAILURE:
830
831       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
832                 synchro,
833                 synchro->comm.src_proc ? sg_host_name(synchro->comm.src_proc->host) : NULL,
834                 synchro->comm.dst_proc ? sg_host_name(synchro->comm.dst_proc->host) : NULL,
835                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
836       if (synchro->comm.src_proc == simcall->issuer) {
837         XBT_DEBUG("I'm source");
838       } else if (synchro->comm.dst_proc == simcall->issuer) {
839         XBT_DEBUG("I'm dest");
840       } else {
841         XBT_DEBUG("I'm neither source nor dest");
842       }
843       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
844       break;
845
846     case SIMIX_CANCELED:
847       if (simcall->issuer == synchro->comm.dst_proc)
848         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
849                       "Communication canceled by the sender");
850       else
851         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
852                       "Communication canceled by the receiver");
853       break;
854
855     default:
856       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
857     }
858
859     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
860     if (simcall->issuer->doexception) {
861       if (simcall->call == SIMCALL_COMM_WAITANY) {
862         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
863       }
864       else if (simcall->call == SIMCALL_COMM_TESTANY) {
865         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
866       }
867     }
868
869     if (surf_host_get_state(surf_host_resource_priv(simcall->issuer->host)) != SURF_RESOURCE_ON) {
870       simcall->issuer->context->iwannadie = 1;
871     }
872
873     simcall->issuer->waiting_synchro = NULL;
874     xbt_fifo_remove(simcall->issuer->comms, synchro);
875     if(synchro->comm.detached){
876       if(simcall->issuer == synchro->comm.src_proc){
877         if(synchro->comm.dst_proc)
878           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
879       }
880       if(simcall->issuer == synchro->comm.dst_proc){
881         if(synchro->comm.src_proc)
882           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
883       }
884     }
885     SIMIX_simcall_answer(simcall);
886     destroy_count++;
887   }
888
889   while (destroy_count-- > 0)
890     SIMIX_comm_destroy(synchro);
891 }
892
893 /**
894  * \brief This function is called when a Surf communication synchro is finished.
895  * \param synchro the corresponding Simix communication
896  */
897 void SIMIX_post_comm(smx_synchro_t synchro)
898 {
899   /* Update synchro state */
900   if (synchro->comm.src_timeout &&
901       surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_DONE)
902     synchro->state = SIMIX_SRC_TIMEOUT;
903   else if (synchro->comm.dst_timeout &&
904           surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_DONE)
905     synchro->state = SIMIX_DST_TIMEOUT;
906   else if (synchro->comm.src_timeout &&
907           surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_FAILED)
908     synchro->state = SIMIX_SRC_HOST_FAILURE;
909   else if (synchro->comm.dst_timeout &&
910       surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_FAILED)
911     synchro->state = SIMIX_DST_HOST_FAILURE;
912   else if (synchro->comm.surf_comm &&
913           surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
914     XBT_DEBUG("Puta madre. Surf says that the link broke");
915     synchro->state = SIMIX_LINK_FAILURE;
916   } else
917     synchro->state = SIMIX_DONE;
918
919   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
920             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
921
922   /* destroy the surf actions associated with the Simix communication */
923   SIMIX_comm_destroy_internal_actions(synchro);
924
925   /* if there are simcalls associated with the synchro, then answer them */
926   if (xbt_fifo_size(synchro->simcalls)) {
927     SIMIX_comm_finish(synchro);
928   }
929 }
930
931 void SIMIX_comm_cancel(smx_synchro_t synchro)
932 {
933   /* if the synchro is a waiting state means that it is still in a rdv */
934   /* so remove from it and delete it */
935   if (synchro->state == SIMIX_WAITING) {
936     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
937     synchro->state = SIMIX_CANCELED;
938   }
939   else if (!MC_is_active() /* when running the MC there are no surf actions */
940            && !MC_record_replay_is_active()
941            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
942
943     surf_action_cancel(synchro->comm.surf_comm);
944   }
945 }
946
947 void SIMIX_comm_suspend(smx_synchro_t synchro)
948 {
949   /*FIXME: shall we suspend also the timeout synchro? */
950   if (synchro->comm.surf_comm)
951     surf_action_suspend(synchro->comm.surf_comm);
952   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
953 }
954
955 void SIMIX_comm_resume(smx_synchro_t synchro)
956 {
957   /*FIXME: check what happen with the timeouts */
958   if (synchro->comm.surf_comm)
959     surf_action_resume(synchro->comm.surf_comm);
960   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
961 }
962
963
964 /************* synchro Getters **************/
965
966 /**
967  *  \brief get the amount remaining from the communication
968  *  \param synchro The communication
969  */
970 double SIMIX_comm_get_remains(smx_synchro_t synchro)
971 {
972   double remains;
973
974   if(!synchro){
975     return 0;
976   }
977
978   switch (synchro->state) {
979
980   case SIMIX_RUNNING:
981     remains = surf_action_get_remains(synchro->comm.surf_comm);
982     break;
983
984   case SIMIX_WAITING:
985   case SIMIX_READY:
986     remains = 0; /*FIXME: check what should be returned */
987     break;
988
989   default:
990     remains = 0; /*FIXME: is this correct? */
991     break;
992   }
993   return remains;
994 }
995
996 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
997 {
998   return synchro->state;
999 }
1000
1001 /**
1002  *  \brief Return the user data associated to the sender of the communication
1003  *  \param synchro The communication
1004  *  \return the user data
1005  */
1006 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
1007 {
1008   return synchro->comm.src_data;
1009 }
1010
1011 /**
1012  *  \brief Return the user data associated to the receiver of the communication
1013  *  \param synchro The communication
1014  *  \return the user data
1015  */
1016 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1017 {
1018   return synchro->comm.dst_data;
1019 }
1020
1021 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1022 {
1023   return synchro->comm.src_proc;
1024 }
1025
1026 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1027 {
1028   return synchro->comm.dst_proc;
1029 }
1030
1031 #ifdef HAVE_LATENCY_BOUND_TRACKING
1032 /**
1033  *  \brief verify if communication is latency bounded
1034  *  \param comm The communication
1035  */
1036 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1037 {
1038   if(!synchro){
1039     return 0;
1040   }
1041   if (synchro->comm.surf_comm){
1042     XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1043     synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1044     XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1045   }
1046   return synchro->latency_limited;
1047 }
1048 #endif
1049
1050 /******************************************************************************/
1051 /*                    SIMIX_comm_copy_data callbacks                       */
1052 /******************************************************************************/
1053 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1054   &SIMIX_comm_copy_pointer_callback;
1055
1056 void
1057 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1058 {
1059   SIMIX_comm_copy_data_callback = callback;
1060 }
1061
1062 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1063 {
1064   xbt_assert((buff_size == sizeof(void *)),
1065              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1066   *(void **) (comm->comm.dst_buff) = buff;
1067 }
1068
1069 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1070 {
1071   XBT_DEBUG("Copy the data over");
1072   memcpy(comm->comm.dst_buff, buff, buff_size);
1073   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1074     xbt_free(buff);
1075     comm->comm.src_buff = NULL;
1076   }
1077 }
1078
1079
1080 /**
1081  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1082  *  \param comm The communication
1083  */
1084 void SIMIX_comm_copy_data(smx_synchro_t comm)
1085 {
1086   size_t buff_size = comm->comm.src_buff_size;
1087   /* If there is no data to be copy then return */
1088   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1089     return;
1090
1091   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1092             comm,
1093             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->host) : "a finished process",
1094             comm->comm.src_buff,
1095             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->host) : "a finished process",
1096             comm->comm.dst_buff, buff_size);
1097
1098   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1099   if (comm->comm.dst_buff_size)
1100     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1101
1102   /* Update the receiver's buffer size to the copied amount */
1103   if (comm->comm.dst_buff_size)
1104     *comm->comm.dst_buff_size = buff_size;
1105
1106   if (buff_size > 0){
1107       if(comm->comm.copy_data_fun)
1108         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1109       else
1110         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1111   }
1112
1113
1114   /* Set the copied flag so we copy data only once */
1115   /* (this function might be called from both communication ends) */
1116   comm->comm.copied = 1;
1117 }