Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some useless cleanups
[simgrid.git] / src / simix / smx_network.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/surf/surf_interface.hpp"
8 #include "src/simix/smx_private.h"
9 #include "xbt/log.h"
10 #include "mc/mc.h"
11 #include "src/mc/mc_replay.h"
12 #include "xbt/dict.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "SIMIX network-related synchronization");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_synchro_t synchro);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_rdv_create(const char *name)
48 {
49   /* two processes may have pushed the same rdv_create simcall at the same time */
50   smx_rdv_t rdv = name ? (smx_rdv_t) xbt_dict_get_or_null(rdv_points, name) : NULL;
51
52   if (!rdv) {
53     rdv = xbt_new0(s_smx_rvpoint_t, 1);
54     rdv->name = name ? xbt_strdup(name) : NULL;
55     rdv->comm_fifo = xbt_fifo_new();
56     rdv->done_comm_fifo = xbt_fifo_new();
57     rdv->permanent_receiver=NULL;
58
59     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
60
61     if (rdv->name)
62       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
63   }
64   return rdv;
65 }
66
67 void SIMIX_rdv_destroy(smx_rdv_t rdv)
68 {
69   if (rdv->name)
70     xbt_dict_remove(rdv_points, rdv->name);
71 }
72
73 void SIMIX_rdv_free(void *data)
74 {
75   XBT_DEBUG("rdv free %p", data);
76   smx_rdv_t rdv = (smx_rdv_t) data;
77   xbt_free(rdv->name);
78   xbt_fifo_free(rdv->comm_fifo);
79   xbt_fifo_free(rdv->done_comm_fifo);
80
81   xbt_free(rdv);
82 }
83
84 xbt_dict_t SIMIX_get_rdv_points()
85 {
86   return rdv_points;
87 }
88
89 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
90 {
91   return (smx_rdv_t) xbt_dict_get_or_null(rdv_points, name);
92 }
93
94 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, sg_host_t host)
95 {
96   smx_synchro_t comm = NULL;
97   xbt_fifo_item_t item = NULL;
98   int count = 0;
99
100   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
101     if (comm->comm.src_proc->host == host)
102       count++;
103   }
104
105   return count;
106 }
107
108 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
109 {
110   return (smx_synchro_t) xbt_fifo_get_item_content(
111     xbt_fifo_get_first_item(rdv->comm_fifo));
112 }
113
114 /**
115  *  \brief get the receiver (process associated to the mailbox)
116  *  \param rdv The rendez-vous point
117  *  \return process The receiving process (NULL if not set)
118  */
119 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
120 {
121   return rdv->permanent_receiver;
122 }
123
124 /**
125  *  \brief set the receiver of the rendez vous point to allow eager sends
126  *  \param rdv The rendez-vous point
127  *  \param process The receiving process
128  */
129 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
130 {
131   rdv->permanent_receiver=process;
132 }
133
134 /**
135  *  \brief Pushes a communication synchro into a rendez-vous point
136  *  \param rdv The rendez-vous point
137  *  \param comm The communication synchro
138  */
139 static inline void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
140 {
141   xbt_fifo_push(rdv->comm_fifo, comm);
142   comm->comm.rdv = rdv;
143 }
144
145 /**
146  *  \brief Removes a communication synchro from a rendez-vous point
147  *  \param rdv The rendez-vous point
148  *  \param comm The communication synchro
149  */
150 void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
151 {
152   xbt_fifo_remove(rdv->comm_fifo, comm);
153   comm->comm.rdv = NULL;
154 }
155
156 /**
157  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
158  *  \param type The type of communication we are looking for (comm_send, comm_recv)
159  *  \return The communication synchro if found, NULL otherwise
160  */
161 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
162                                  int (*match_fun)(void *, void *,smx_synchro_t),
163                                  void *this_user_data, smx_synchro_t my_synchro)
164 {
165   smx_synchro_t synchro;
166   xbt_fifo_item_t item;
167   void* other_user_data = NULL;
168
169   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
170     if (synchro->comm.type == SIMIX_COMM_SEND) {
171       other_user_data = synchro->comm.src_data;
172     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
173       other_user_data = synchro->comm.dst_data;
174     }
175     if (synchro->comm.type == type &&
176         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
177         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
178       XBT_DEBUG("Found a matching communication synchro %p", synchro);
179       xbt_fifo_remove_item(fifo, item);
180       xbt_fifo_free_item(item);
181       synchro->comm.refcount++;
182 #ifdef HAVE_MC
183       synchro->comm.rdv_cpy = synchro->comm.rdv;
184 #endif
185       synchro->comm.rdv = NULL;
186       return synchro;
187     }
188     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
189               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
190               synchro, (int)synchro->comm.type, (int)type);
191   }
192   XBT_DEBUG("No matching communication synchro found");
193   return NULL;
194 }
195
196
197 /**
198  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
199  *  \param type The type of communication we are looking for (comm_send, comm_recv)
200  *  \return The communication synchro if found, NULL otherwise
201  */
202 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
203                                  int (*match_fun)(void *, void *,smx_synchro_t),
204                                  void *this_user_data, smx_synchro_t my_synchro)
205 {
206   smx_synchro_t synchro;
207   xbt_fifo_item_t item;
208   void* other_user_data = NULL;
209
210   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
211     if (synchro->comm.type == SIMIX_COMM_SEND) {
212       other_user_data = synchro->comm.src_data;
213     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
214       other_user_data = synchro->comm.dst_data;
215     }
216     if (synchro->comm.type == type &&
217         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
218         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
219       XBT_DEBUG("Found a matching communication synchro %p", synchro);
220       synchro->comm.refcount++;
221
222       return synchro;
223     }
224     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
225               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
226               synchro, (int)synchro->comm.type, (int)type);
227   }
228   XBT_DEBUG("No matching communication synchro found");
229   return NULL;
230 }
231 /******************************************************************************/
232 /*                          Communication synchros                            */
233 /******************************************************************************/
234
235 /**
236  *  \brief Creates a new communicate synchro
237  *  \param type The direction of communication (comm_send, comm_recv)
238  *  \return The new communicate synchro
239  */
240 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
241 {
242   smx_synchro_t synchro;
243
244   /* alloc structures */
245   synchro = (smx_synchro_t) xbt_mallocator_get(simix_global->synchro_mallocator);
246
247   synchro->type = SIMIX_SYNC_COMMUNICATE;
248   synchro->state = SIMIX_WAITING;
249
250   /* set communication */
251   synchro->comm.type = type;
252   synchro->comm.refcount = 1;
253   synchro->comm.src_data=NULL;
254   synchro->comm.dst_data=NULL;
255
256
257 #ifdef HAVE_LATENCY_BOUND_TRACKING
258   //initialize with unknown value
259   synchro->latency_limited = -1;
260 #endif
261
262   synchro->category = NULL;
263
264   XBT_DEBUG("Create communicate synchro %p", synchro);
265   ++smx_total_comms;
266
267   return synchro;
268 }
269
270 /**
271  *  \brief Destroy a communicate synchro
272  *  \param synchro The communicate synchro to be destroyed
273  */
274 void SIMIX_comm_destroy(smx_synchro_t synchro)
275 {
276   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
277             synchro, synchro->comm.refcount, (int)synchro->state);
278
279   if (synchro->comm.refcount <= 0) {
280     xbt_backtrace_display_current();
281     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
282             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
283   }
284   synchro->comm.refcount--;
285   if (synchro->comm.refcount > 0)
286       return;
287   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
288       synchro->comm.refcount);
289
290 #ifdef HAVE_LATENCY_BOUND_TRACKING
291   synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
292 #endif
293
294   xbt_free(synchro->name);
295   SIMIX_comm_destroy_internal_actions(synchro);
296
297   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
298     /* the communication has failed and was detached:
299      * we have to free the buffer */
300     if (synchro->comm.clean_fun) {
301       synchro->comm.clean_fun(synchro->comm.src_buff);
302     }
303     synchro->comm.src_buff = NULL;
304   }
305
306   if(synchro->comm.rdv)
307     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
308
309   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
310 }
311
312 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
313 {
314   if (synchro->comm.surf_comm){
315 #ifdef HAVE_LATENCY_BOUND_TRACKING
316     synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
317 #endif
318     synchro->comm.surf_comm->unref();
319     synchro->comm.surf_comm = NULL;
320   }
321
322   if (synchro->comm.src_timeout){
323     synchro->comm.src_timeout->unref();
324     synchro->comm.src_timeout = NULL;
325   }
326
327   if (synchro->comm.dst_timeout){
328     synchro->comm.dst_timeout->unref();
329     synchro->comm.dst_timeout = NULL;
330   }
331 }
332
333 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
334                                   double task_size, double rate,
335                                   void *src_buff, size_t src_buff_size,
336                                   int (*match_fun)(void *, void *,smx_synchro_t),
337                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
338           void *data, double timeout){
339   smx_synchro_t comm = simcall_HANDLER_comm_isend(simcall, src, rdv, task_size, rate,
340                            src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
341                data, 0);
342   SIMCALL_SET_MC_VALUE(simcall, 0);
343   simcall_HANDLER_comm_wait(simcall, comm, timeout);
344 }
345 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src_proc, smx_rdv_t rdv,
346                                   double task_size, double rate,
347                                   void *src_buff, size_t src_buff_size,
348                                   int (*match_fun)(void *, void *,smx_synchro_t),
349                                   void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
350                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),// used to copy data if not default one
351                           void *data, int detached)
352 {
353   XBT_DEBUG("send from %p", rdv);
354
355   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
356   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
357
358   /* Look for communication synchro matching our needs. We also provide a description of
359    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
360    *
361    * If it is not found then push our communication into the rendez-vous point */
362   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
363
364   if (!other_synchro) {
365     other_synchro = this_synchro;
366
367     if (rdv->permanent_receiver!=NULL){
368       //this mailbox is for small messages, which have to be sent right now
369       other_synchro->state = SIMIX_READY;
370       other_synchro->comm.dst_proc=rdv->permanent_receiver;
371       other_synchro->comm.refcount++;
372       xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
373       other_synchro->comm.rdv=rdv;
374       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
375
376     }else{
377       SIMIX_rdv_push(rdv, this_synchro);
378     }
379   } else {
380     XBT_DEBUG("Receive already pushed");
381
382     SIMIX_comm_destroy(this_synchro);
383     --smx_total_comms; // this creation was a pure waste
384
385     other_synchro->state = SIMIX_READY;
386     other_synchro->comm.type = SIMIX_COMM_READY;
387
388   }
389   xbt_fifo_push(src_proc->comms, other_synchro);
390
391   /* if the communication synchro is detached then decrease the refcount
392    * by one, so it will be eliminated by the receiver's destroy call */
393   if (detached) {
394     other_synchro->comm.detached = 1;
395     other_synchro->comm.refcount--;
396     other_synchro->comm.clean_fun = clean_fun;
397   } else {
398     other_synchro->comm.clean_fun = NULL;
399   }
400
401   /* Setup the communication synchro */
402   other_synchro->comm.src_proc = src_proc;
403   other_synchro->comm.task_size = task_size;
404   other_synchro->comm.rate = rate;
405   other_synchro->comm.src_buff = src_buff;
406   other_synchro->comm.src_buff_size = src_buff_size;
407   other_synchro->comm.src_data = data;
408
409   other_synchro->comm.match_fun = match_fun;
410   other_synchro->comm.copy_data_fun = copy_data_fun;
411
412
413   if (MC_is_active() || MC_record_replay_is_active()) {
414     other_synchro->state = SIMIX_RUNNING;
415     return (detached ? NULL : other_synchro);
416   }
417
418   SIMIX_comm_start(other_synchro);
419   return (detached ? NULL : other_synchro);
420 }
421
422 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_process_t receiver, smx_rdv_t rdv,
423                          void *dst_buff, size_t *dst_buff_size,
424                          int (*match_fun)(void *, void *, smx_synchro_t),
425                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
426                          void *data, double timeout, double rate)
427 {
428   smx_synchro_t comm = SIMIX_comm_irecv(receiver, rdv, dst_buff,
429                            dst_buff_size, match_fun, copy_data_fun, data, rate);
430   SIMCALL_SET_MC_VALUE(simcall, 0);
431   simcall_HANDLER_comm_wait(simcall, comm, timeout);
432 }
433
434 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_process_t receiver, smx_rdv_t rdv,
435     void *dst_buff, size_t *dst_buff_size,
436     int (*match_fun)(void *, void *, smx_synchro_t),
437     void (*copy_data_fun)(smx_synchro_t, void*, size_t),
438     void *data, double rate)
439 {
440   return SIMIX_comm_irecv(receiver, rdv, dst_buff, dst_buff_size, match_fun, copy_data_fun, data, rate);
441 }
442
443 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, void *dst_buff, size_t *dst_buff_size,
444     int (*match_fun)(void *, void *, smx_synchro_t),
445     void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
446     void *data, double rate)
447 {
448   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
449   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
450
451   smx_synchro_t other_synchro;
452   //communication already done, get it inside the fifo of completed comms
453   if (rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0) {
454
455     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
456     //find a match in the already received fifo
457     other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
458     //if not found, assume the receiver came first, register it to the mailbox in the classical way
459     if (!other_synchro)  {
460       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
461       other_synchro = this_synchro;
462       SIMIX_rdv_push(rdv, this_synchro);
463     } else {
464       if(other_synchro->comm.surf_comm && SIMIX_comm_get_remains(other_synchro)==0.0) {
465         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
466         other_synchro->state = SIMIX_DONE;
467         other_synchro->comm.type = SIMIX_COMM_DONE;
468         other_synchro->comm.rdv = NULL;
469       }
470       other_synchro->comm.refcount--;
471       SIMIX_comm_destroy(this_synchro);
472       --smx_total_comms; // this creation was a pure waste
473     }
474   } else {
475     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
476
477     /* Look for communication synchro matching our needs. We also provide a description of
478      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
479      *
480      * If it is not found then push our communication into the rendez-vous point */
481     other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
482
483     if (!other_synchro) {
484       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
485       other_synchro = this_synchro;
486       SIMIX_rdv_push(rdv, this_synchro);
487     } else {
488       SIMIX_comm_destroy(this_synchro);
489       --smx_total_comms; // this creation was a pure waste
490       other_synchro->state = SIMIX_READY;
491       other_synchro->comm.type = SIMIX_COMM_READY;
492       //other_synchro->comm.refcount--;
493     }
494     xbt_fifo_push(dst_proc->comms, other_synchro);
495   }
496
497   /* Setup communication synchro */
498   other_synchro->comm.dst_proc = dst_proc;
499   other_synchro->comm.dst_buff = dst_buff;
500   other_synchro->comm.dst_buff_size = dst_buff_size;
501   other_synchro->comm.dst_data = data;
502
503   if (rate != -1.0 && (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
504     other_synchro->comm.rate = rate;
505
506   other_synchro->comm.match_fun = match_fun;
507   other_synchro->comm.copy_data_fun = copy_data_fun;
508
509   if (MC_is_active() || MC_record_replay_is_active()) {
510     other_synchro->state = SIMIX_RUNNING;
511     return other_synchro;
512   }
513
514   SIMIX_comm_start(other_synchro);
515   return other_synchro;
516 }
517
518 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
519                                    int type, int src, int tag,
520                                    int (*match_fun)(void *, void *, smx_synchro_t),
521                                    void *data){
522   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
523 }
524
525 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
526                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
527 {
528   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
529   smx_synchro_t this_synchro;
530   int smx_type;
531   if(type == 1){
532     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
533     smx_type = SIMIX_COMM_RECEIVE;
534   } else{
535     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
536     smx_type = SIMIX_COMM_SEND;
537   } 
538   smx_synchro_t other_synchro=NULL;
539   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
540     //find a match in the already received fifo
541       XBT_DEBUG("first try in the perm recv mailbox");
542
543     other_synchro = SIMIX_fifo_probe_comm(
544       rdv->done_comm_fifo, (e_smx_comm_type_t) smx_type,
545       match_fun, data, this_synchro);
546   }
547  // }else{
548     if(!other_synchro){
549         XBT_DEBUG("try in the normal mailbox");
550         other_synchro = SIMIX_fifo_probe_comm(
551           rdv->comm_fifo, (e_smx_comm_type_t) smx_type,
552           match_fun, data, this_synchro);
553     }
554 //  }
555   if(other_synchro)other_synchro->comm.refcount--;
556
557   SIMIX_comm_destroy(this_synchro);
558   --smx_total_comms;
559   return other_synchro;
560 }
561
562 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
563 {
564   /* the simcall may be a wait, a send or a recv */
565   surf_action_t sleep;
566
567   /* Associate this simcall to the wait synchro */
568   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
569
570   xbt_fifo_push(synchro->simcalls, simcall);
571   simcall->issuer->waiting_synchro = synchro;
572
573   if (MC_is_active() || MC_record_replay_is_active()) {
574     int idx = SIMCALL_GET_MC_VALUE(simcall);
575     if (idx == 0) {
576       synchro->state = SIMIX_DONE;
577     } else {
578       /* If we reached this point, the wait simcall must have a timeout */
579       /* Otherwise it shouldn't be enabled and executed by the MC */
580       if (timeout == -1)
581         THROW_IMPOSSIBLE;
582
583       if (synchro->comm.src_proc == simcall->issuer)
584         synchro->state = SIMIX_SRC_TIMEOUT;
585       else
586         synchro->state = SIMIX_DST_TIMEOUT;
587     }
588
589     SIMIX_comm_finish(synchro);
590     return;
591   }
592
593   /* If the synchro has already finish perform the error handling, */
594   /* otherwise set up a waiting timeout on the right side          */
595   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
596     SIMIX_comm_finish(synchro);
597   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
598     sleep = surf_host_sleep(simcall->issuer->host, timeout);
599     sleep->setData(synchro);
600
601     if (simcall->issuer == synchro->comm.src_proc)
602       synchro->comm.src_timeout = sleep;
603     else
604       synchro->comm.dst_timeout = sleep;
605   }
606 }
607
608 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
609 {
610   if(MC_is_active() || MC_record_replay_is_active()){
611     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
612     if(simcall_comm_test__get__result(simcall)){
613       synchro->state = SIMIX_DONE;
614       xbt_fifo_push(synchro->simcalls, simcall);
615       SIMIX_comm_finish(synchro);
616     }else{
617       SIMIX_simcall_answer(simcall);
618     }
619     return;
620   }
621
622   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
623   if (simcall_comm_test__get__result(simcall)) {
624     xbt_fifo_push(synchro->simcalls, simcall);
625     SIMIX_comm_finish(synchro);
626   } else {
627     SIMIX_simcall_answer(simcall);
628   }
629 }
630
631 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
632 {
633   unsigned int cursor;
634   smx_synchro_t synchro;
635   simcall_comm_testany__set__result(simcall, -1);
636
637   if (MC_is_active() || MC_record_replay_is_active()){
638     int idx = SIMCALL_GET_MC_VALUE(simcall);
639     if(idx == -1){
640       SIMIX_simcall_answer(simcall);
641     }else{
642       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
643       simcall_comm_testany__set__result(simcall, idx);
644       xbt_fifo_push(synchro->simcalls, simcall);
645       synchro->state = SIMIX_DONE;
646       SIMIX_comm_finish(synchro);
647     }
648     return;
649   }
650
651   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
652     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
653       simcall_comm_testany__set__result(simcall, cursor);
654       xbt_fifo_push(synchro->simcalls, simcall);
655       SIMIX_comm_finish(synchro);
656       return;
657     }
658   }
659   SIMIX_simcall_answer(simcall);
660 }
661
662 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
663 {
664   smx_synchro_t synchro;
665   unsigned int cursor = 0;
666
667   if (MC_is_active() || MC_record_replay_is_active()){
668     int idx = SIMCALL_GET_MC_VALUE(simcall);
669     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
670     xbt_fifo_push(synchro->simcalls, simcall);
671     simcall_comm_waitany__set__result(simcall, idx);
672     synchro->state = SIMIX_DONE;
673     SIMIX_comm_finish(synchro);
674     return;
675   }
676
677   xbt_dynar_foreach(synchros, cursor, synchro){
678     /* associate this simcall to the the synchro */
679     xbt_fifo_push(synchro->simcalls, simcall);
680
681     /* see if the synchro is already finished */
682     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
683       SIMIX_comm_finish(synchro);
684       break;
685     }
686   }
687 }
688
689 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
690 {
691   smx_synchro_t synchro;
692   unsigned int cursor = 0;
693   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
694
695   xbt_dynar_foreach(synchros, cursor, synchro) {
696     xbt_fifo_remove(synchro->simcalls, simcall);
697   }
698 }
699
700 /**
701  *  \brief Starts the simulation of a communication synchro.
702  *  \param synchro the communication synchro
703  */
704 static inline void SIMIX_comm_start(smx_synchro_t synchro)
705 {
706   /* If both the sender and the receiver are already there, start the communication */
707   if (synchro->state == SIMIX_READY) {
708
709     sg_host_t sender = synchro->comm.src_proc->host;
710     sg_host_t receiver = synchro->comm.dst_proc->host;
711
712     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
713               sg_host_get_name(sender), sg_host_get_name(receiver));
714
715     synchro->comm.surf_comm = surf_network_model_communicate(surf_network_model,
716                                                             sender, receiver,
717                                                             synchro->comm.task_size, synchro->comm.rate);
718
719     synchro->comm.surf_comm->setData(synchro);
720
721     synchro->state = SIMIX_RUNNING;
722
723     /* If a link is failed, detect it immediately */
724     if (synchro->comm.surf_comm->getState() == SURF_ACTION_FAILED) {
725       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
726                 sg_host_get_name(sender), sg_host_get_name(receiver));
727       synchro->state = SIMIX_LINK_FAILURE;
728       SIMIX_comm_destroy_internal_actions(synchro);
729     }
730
731     /* If any of the process is suspend, create the synchro but stop its execution,
732        it will be restarted when the sender process resume */
733     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
734         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
735       /* FIXME: check what should happen with the synchro state */
736
737       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
738         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
739                   sg_host_get_name(synchro->comm.src_proc->host), synchro->comm.src_proc->name);
740       else
741         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
742                   sg_host_get_name(synchro->comm.dst_proc->host), synchro->comm.dst_proc->name);
743
744       synchro->comm.surf_comm->suspend();
745
746     }
747   }
748 }
749
750 /**
751  * \brief Answers the SIMIX simcalls associated to a communication synchro.
752  * \param synchro a finished communication synchro
753  */
754 void SIMIX_comm_finish(smx_synchro_t synchro)
755 {
756   unsigned int destroy_count = 0;
757   smx_simcall_t simcall;
758
759   while ((simcall = (smx_simcall_t) xbt_fifo_shift(synchro->simcalls))) {
760
761     /* If a waitany simcall is waiting for this synchro to finish, then remove
762        it from the other synchros in the waitany list. Afterwards, get the
763        position of the actual synchro in the waitany dynar and
764        return it as the result of the simcall */
765
766     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
767       continue; // if process handling comm is killed
768     if (simcall->call == SIMCALL_COMM_WAITANY) {
769       SIMIX_waitany_remove_simcall_from_actions(simcall);
770       if (!MC_is_active() && !MC_record_replay_is_active())
771         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
772     }
773
774     /* If the synchro is still in a rendez-vous point then remove from it */
775     if (synchro->comm.rdv)
776       SIMIX_rdv_remove(synchro->comm.rdv, synchro);
777
778     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
779
780     /* Check out for errors */
781
782     if (simcall->issuer->host->isOff()) {
783       simcall->issuer->context->iwannadie = 1;
784       SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
785     } else
786
787     switch (synchro->state) {
788
789     case SIMIX_DONE:
790       XBT_DEBUG("Communication %p complete!", synchro);
791       SIMIX_comm_copy_data(synchro);
792       break;
793
794     case SIMIX_SRC_TIMEOUT:
795       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
796                     "Communication timeouted because of sender");
797       break;
798
799     case SIMIX_DST_TIMEOUT:
800       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
801                     "Communication timeouted because of receiver");
802       break;
803
804     case SIMIX_SRC_HOST_FAILURE:
805       if (simcall->issuer == synchro->comm.src_proc)
806         simcall->issuer->context->iwannadie = 1;
807 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
808       else
809         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
810       break;
811
812     case SIMIX_DST_HOST_FAILURE:
813       if (simcall->issuer == synchro->comm.dst_proc)
814         simcall->issuer->context->iwannadie = 1;
815 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
816       else
817         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
818       break;
819
820     case SIMIX_LINK_FAILURE:
821
822       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
823                 synchro,
824                 synchro->comm.src_proc ? sg_host_get_name(synchro->comm.src_proc->host) : NULL,
825                 synchro->comm.dst_proc ? sg_host_get_name(synchro->comm.dst_proc->host) : NULL,
826                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
827       if (synchro->comm.src_proc == simcall->issuer) {
828         XBT_DEBUG("I'm source");
829       } else if (synchro->comm.dst_proc == simcall->issuer) {
830         XBT_DEBUG("I'm dest");
831       } else {
832         XBT_DEBUG("I'm neither source nor dest");
833       }
834       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
835       break;
836
837     case SIMIX_CANCELED:
838       if (simcall->issuer == synchro->comm.dst_proc)
839         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
840                       "Communication canceled by the sender");
841       else
842         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
843                       "Communication canceled by the receiver");
844       break;
845
846     default:
847       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
848     }
849
850     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
851     if (simcall->issuer->doexception) {
852       if (simcall->call == SIMCALL_COMM_WAITANY) {
853         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
854       }
855       else if (simcall->call == SIMCALL_COMM_TESTANY) {
856         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
857       }
858     }
859
860     if (simcall->issuer->host->isOff()) {
861       simcall->issuer->context->iwannadie = 1;
862     }
863
864     simcall->issuer->waiting_synchro = NULL;
865     xbt_fifo_remove(simcall->issuer->comms, synchro);
866     if(synchro->comm.detached){
867       if(simcall->issuer == synchro->comm.src_proc){
868         if(synchro->comm.dst_proc)
869           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
870       }
871       if(simcall->issuer == synchro->comm.dst_proc){
872         if(synchro->comm.src_proc)
873           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
874       }
875     }
876     SIMIX_simcall_answer(simcall);
877     destroy_count++;
878   }
879
880   while (destroy_count-- > 0)
881     SIMIX_comm_destroy(synchro);
882 }
883
884 /**
885  * \brief This function is called when a Surf communication synchro is finished.
886  * \param synchro the corresponding Simix communication
887  */
888 void SIMIX_post_comm(smx_synchro_t synchro)
889 {
890   /* Update synchro state */
891   if (synchro->comm.src_timeout &&
892       synchro->comm.src_timeout->getState() == SURF_ACTION_DONE)
893     synchro->state = SIMIX_SRC_TIMEOUT;
894   else if (synchro->comm.dst_timeout &&
895     synchro->comm.dst_timeout->getState() == SURF_ACTION_DONE)
896     synchro->state = SIMIX_DST_TIMEOUT;
897   else if (synchro->comm.src_timeout &&
898     synchro->comm.src_timeout->getState() == SURF_ACTION_FAILED)
899     synchro->state = SIMIX_SRC_HOST_FAILURE;
900   else if (synchro->comm.dst_timeout &&
901       synchro->comm.dst_timeout->getState() == SURF_ACTION_FAILED)
902     synchro->state = SIMIX_DST_HOST_FAILURE;
903   else if (synchro->comm.surf_comm &&
904     synchro->comm.surf_comm->getState() == SURF_ACTION_FAILED) {
905     XBT_DEBUG("Puta madre. Surf says that the link broke");
906     synchro->state = SIMIX_LINK_FAILURE;
907   } else
908     synchro->state = SIMIX_DONE;
909
910   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
911             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
912
913   /* destroy the surf actions associated with the Simix communication */
914   SIMIX_comm_destroy_internal_actions(synchro);
915
916   /* if there are simcalls associated with the synchro, then answer them */
917   if (xbt_fifo_size(synchro->simcalls)) {
918     SIMIX_comm_finish(synchro);
919   }
920 }
921
922 void SIMIX_comm_cancel(smx_synchro_t synchro)
923 {
924   /* if the synchro is a waiting state means that it is still in a rdv */
925   /* so remove from it and delete it */
926   if (synchro->state == SIMIX_WAITING) {
927     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
928     synchro->state = SIMIX_CANCELED;
929   }
930   else if (!MC_is_active() /* when running the MC there are no surf actions */
931            && !MC_record_replay_is_active()
932            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
933
934     synchro->comm.surf_comm->cancel();
935   }
936 }
937
938 void SIMIX_comm_suspend(smx_synchro_t synchro)
939 {
940   /*FIXME: shall we suspend also the timeout synchro? */
941   if (synchro->comm.surf_comm)
942     synchro->comm.surf_comm->suspend();
943   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
944 }
945
946 void SIMIX_comm_resume(smx_synchro_t synchro)
947 {
948   /*FIXME: check what happen with the timeouts */
949   if (synchro->comm.surf_comm)
950     synchro->comm.surf_comm->resume();
951   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
952 }
953
954
955 /************* synchro Getters **************/
956
957 /**
958  *  \brief get the amount remaining from the communication
959  *  \param synchro The communication
960  */
961 double SIMIX_comm_get_remains(smx_synchro_t synchro)
962 {
963   double remains;
964
965   if(!synchro){
966     return 0;
967   }
968
969   switch (synchro->state) {
970
971   case SIMIX_RUNNING:
972     remains = synchro->comm.surf_comm->getRemains();
973     break;
974
975   case SIMIX_WAITING:
976   case SIMIX_READY:
977     remains = 0; /*FIXME: check what should be returned */
978     break;
979
980   default:
981     remains = 0; /*FIXME: is this correct? */
982     break;
983   }
984   return remains;
985 }
986
987 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
988 {
989   return synchro->state;
990 }
991
992 /**
993  *  \brief Return the user data associated to the sender of the communication
994  *  \param synchro The communication
995  *  \return the user data
996  */
997 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
998 {
999   return synchro->comm.src_data;
1000 }
1001
1002 /**
1003  *  \brief Return the user data associated to the receiver of the communication
1004  *  \param synchro The communication
1005  *  \return the user data
1006  */
1007 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1008 {
1009   return synchro->comm.dst_data;
1010 }
1011
1012 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1013 {
1014   return synchro->comm.src_proc;
1015 }
1016
1017 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1018 {
1019   return synchro->comm.dst_proc;
1020 }
1021
1022 #ifdef HAVE_LATENCY_BOUND_TRACKING
1023 /**
1024  *  \brief verify if communication is latency bounded
1025  *  \param comm The communication
1026  */
1027 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1028 {
1029   if(!synchro){
1030     return 0;
1031   }
1032   if (synchro->comm.surf_comm){
1033     XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1034     synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1035     XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1036   }
1037   return synchro->latency_limited;
1038 }
1039 #endif
1040
1041 /******************************************************************************/
1042 /*                    SIMIX_comm_copy_data callbacks                       */
1043 /******************************************************************************/
1044 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1045   &SIMIX_comm_copy_pointer_callback;
1046
1047 void
1048 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1049 {
1050   SIMIX_comm_copy_data_callback = callback;
1051 }
1052
1053 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1054 {
1055   xbt_assert((buff_size == sizeof(void *)),
1056              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1057   *(void **) (comm->comm.dst_buff) = buff;
1058 }
1059
1060 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1061 {
1062   XBT_DEBUG("Copy the data over");
1063   memcpy(comm->comm.dst_buff, buff, buff_size);
1064   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1065     xbt_free(buff);
1066     comm->comm.src_buff = NULL;
1067   }
1068 }
1069
1070
1071 /**
1072  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1073  *  \param comm The communication
1074  */
1075 void SIMIX_comm_copy_data(smx_synchro_t comm)
1076 {
1077   size_t buff_size = comm->comm.src_buff_size;
1078   /* If there is no data to be copy then return */
1079   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1080     return;
1081
1082   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1083             comm,
1084             comm->comm.src_proc ? sg_host_get_name(comm->comm.src_proc->host) : "a finished process",
1085             comm->comm.src_buff,
1086             comm->comm.dst_proc ? sg_host_get_name(comm->comm.dst_proc->host) : "a finished process",
1087             comm->comm.dst_buff, buff_size);
1088
1089   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1090   if (comm->comm.dst_buff_size)
1091     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1092
1093   /* Update the receiver's buffer size to the copied amount */
1094   if (comm->comm.dst_buff_size)
1095     *comm->comm.dst_buff_size = buff_size;
1096
1097   if (buff_size > 0){
1098       if(comm->comm.copy_data_fun)
1099         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1100       else
1101         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1102   }
1103
1104
1105   /* Set the copied flag so we copy data only once */
1106   /* (this function might be called from both communication ends) */
1107   comm->comm.copied = 1;
1108 }