Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into mc-process
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11 #include "smpi/private.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
15                                 "SIMIX network-related synchronization");
16
17 static xbt_dict_t rdv_points = NULL;
18 XBT_EXPORT_NO_IMPORT(unsigned long int) smx_total_comms = 0;
19
20 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
21 static void SIMIX_comm_copy_data(smx_synchro_t comm);
22 static smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type);
23 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm);
24 static smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
25                                         int (*match_fun)(void *, void *,smx_synchro_t),
26                                         void *user_data, smx_synchro_t my_synchro);
27 static smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
28                                         int (*match_fun)(void *, void *,smx_synchro_t),
29                                         void *user_data, smx_synchro_t my_synchro);
30 static void SIMIX_rdv_free(void *data);
31 static void SIMIX_comm_start(smx_synchro_t synchro);
32
33 void SIMIX_network_init(void)
34 {
35   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
36 }
37
38 void SIMIX_network_exit(void)
39 {
40   xbt_dict_free(&rdv_points);
41 }
42
43 /******************************************************************************/
44 /*                           Rendez-Vous Points                               */
45 /******************************************************************************/
46
47 smx_rdv_t SIMIX_rdv_create(const char *name)
48 {
49   /* two processes may have pushed the same rdv_create simcall at the same time */
50   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
51
52   if (!rdv) {
53     rdv = xbt_new0(s_smx_rvpoint_t, 1);
54     rdv->name = name ? xbt_strdup(name) : NULL;
55     rdv->comm_fifo = xbt_fifo_new();
56     rdv->done_comm_fifo = xbt_fifo_new();
57     rdv->permanent_receiver=NULL;
58
59     XBT_DEBUG("Creating a mailbox at %p with name %s", rdv, name);
60
61     if (rdv->name)
62       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
63   }
64   return rdv;
65 }
66
67 void SIMIX_rdv_destroy(smx_rdv_t rdv)
68 {
69   if (rdv->name)
70     xbt_dict_remove(rdv_points, rdv->name);
71 }
72
73 void SIMIX_rdv_free(void *data)
74 {
75   XBT_DEBUG("rdv free %p", data);
76   smx_rdv_t rdv = (smx_rdv_t) data;
77   xbt_free(rdv->name);
78   xbt_fifo_free(rdv->comm_fifo);
79   xbt_fifo_free(rdv->done_comm_fifo);
80
81   xbt_free(rdv);
82 }
83
84 xbt_dict_t SIMIX_get_rdv_points()
85 {
86   return rdv_points;
87 }
88
89 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
90 {
91   return xbt_dict_get_or_null(rdv_points, name);
92 }
93
94 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
95 {
96   smx_synchro_t comm = NULL;
97   xbt_fifo_item_t item = NULL;
98   int count = 0;
99
100   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_synchro_t) {
101     if (comm->comm.src_proc->smx_host == host)
102       count++;
103   }
104
105   return count;
106 }
107
108 smx_synchro_t SIMIX_rdv_get_head(smx_rdv_t rdv)
109 {
110   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
111 }
112
113 /**
114  *  \brief get the receiver (process associated to the mailbox)
115  *  \param rdv The rendez-vous point
116  *  \return process The receiving process (NULL if not set)
117  */
118 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
119 {
120   return rdv->permanent_receiver;
121 }
122
123 /**
124  *  \brief set the receiver of the rendez vous point to allow eager sends
125  *  \param rdv The rendez-vous point
126  *  \param process The receiving process
127  */
128 void SIMIX_rdv_set_receiver(smx_rdv_t rdv, smx_process_t process)
129 {
130   rdv->permanent_receiver=process;
131 }
132
133 /**
134  *  \brief Pushes a communication synchro into a rendez-vous point
135  *  \param rdv The rendez-vous point
136  *  \param comm The communication synchro
137  */
138 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_synchro_t comm)
139 {
140   xbt_fifo_push(rdv->comm_fifo, comm);
141   comm->comm.rdv = rdv;
142 }
143
144 /**
145  *  \brief Removes a communication synchro from a rendez-vous point
146  *  \param rdv The rendez-vous point
147  *  \param comm The communication synchro
148  */
149 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_synchro_t comm)
150 {
151   xbt_fifo_remove(rdv->comm_fifo, comm);
152   comm->comm.rdv = NULL;
153 }
154
155 /**
156  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs
157  *  \param type The type of communication we are looking for (comm_send, comm_recv)
158  *  \return The communication synchro if found, NULL otherwise
159  */
160 smx_synchro_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
161                                  int (*match_fun)(void *, void *,smx_synchro_t),
162                                  void *this_user_data, smx_synchro_t my_synchro)
163 {
164   smx_synchro_t synchro;
165   xbt_fifo_item_t item;
166   void* other_user_data = NULL;
167
168   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
169     if (synchro->comm.type == SIMIX_COMM_SEND) {
170       other_user_data = synchro->comm.src_data;
171     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
172       other_user_data = synchro->comm.dst_data;
173     }
174     if (synchro->comm.type == type &&
175         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
176         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
177       XBT_DEBUG("Found a matching communication synchro %p", synchro);
178       xbt_fifo_remove_item(fifo, item);
179       xbt_fifo_free_item(item);
180       synchro->comm.refcount++;
181 #ifdef HAVE_MC
182       synchro->comm.rdv_cpy = synchro->comm.rdv;
183 #endif
184       synchro->comm.rdv = NULL;
185       return synchro;
186     }
187     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
188               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
189               synchro, (int)synchro->comm.type, (int)type);
190   }
191   XBT_DEBUG("No matching communication synchro found");
192   return NULL;
193 }
194
195
196 /**
197  *  \brief Checks if there is a communication synchro queued in a fifo matching our needs, but leave it there
198  *  \param type The type of communication we are looking for (comm_send, comm_recv)
199  *  \return The communication synchro if found, NULL otherwise
200  */
201 smx_synchro_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
202                                  int (*match_fun)(void *, void *,smx_synchro_t),
203                                  void *this_user_data, smx_synchro_t my_synchro)
204 {
205   smx_synchro_t synchro;
206   xbt_fifo_item_t item;
207   void* other_user_data = NULL;
208
209   xbt_fifo_foreach(fifo, item, synchro, smx_synchro_t) {
210     if (synchro->comm.type == SIMIX_COMM_SEND) {
211       other_user_data = synchro->comm.src_data;
212     } else if (synchro->comm.type == SIMIX_COMM_RECEIVE) {
213       other_user_data = synchro->comm.dst_data;
214     }
215     if (synchro->comm.type == type &&
216         (!match_fun              ||              match_fun(this_user_data,  other_user_data, synchro)) &&
217         (!synchro->comm.match_fun || synchro->comm.match_fun(other_user_data, this_user_data,  my_synchro))) {
218       XBT_DEBUG("Found a matching communication synchro %p", synchro);
219       synchro->comm.refcount++;
220
221       return synchro;
222     }
223     XBT_DEBUG("Sorry, communication synchro %p does not match our needs:"
224               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
225               synchro, (int)synchro->comm.type, (int)type);
226   }
227   XBT_DEBUG("No matching communication synchro found");
228   return NULL;
229 }
230 /******************************************************************************/
231 /*                          Communication synchros                            */
232 /******************************************************************************/
233
234 /**
235  *  \brief Creates a new communicate synchro
236  *  \param type The direction of communication (comm_send, comm_recv)
237  *  \return The new communicate synchro
238  */
239 smx_synchro_t SIMIX_comm_new(e_smx_comm_type_t type)
240 {
241   smx_synchro_t synchro;
242
243   /* alloc structures */
244   synchro = xbt_mallocator_get(simix_global->synchro_mallocator);
245
246   synchro->type = SIMIX_SYNC_COMMUNICATE;
247   synchro->state = SIMIX_WAITING;
248
249   /* set communication */
250   synchro->comm.type = type;
251   synchro->comm.refcount = 1;
252   synchro->comm.src_data=NULL;
253   synchro->comm.dst_data=NULL;
254
255
256 #ifdef HAVE_LATENCY_BOUND_TRACKING
257   //initialize with unknown value
258   synchro->latency_limited = -1;
259 #endif
260
261   synchro->category = NULL;
262
263   XBT_DEBUG("Create communicate synchro %p", synchro);
264   ++smx_total_comms;
265
266   return synchro;
267 }
268
269 /**
270  *  \brief Destroy a communicate synchro
271  *  \param synchro The communicate synchro to be destroyed
272  */
273 void SIMIX_comm_destroy(smx_synchro_t synchro)
274 {
275   XBT_DEBUG("Destroy synchro %p (refcount: %d), state: %d",
276             synchro, synchro->comm.refcount, (int)synchro->state);
277
278   if (synchro->comm.refcount <= 0) {
279     xbt_backtrace_display_current();
280     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
281             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", synchro);
282   }
283   synchro->comm.refcount--;
284   if (synchro->comm.refcount > 0)
285       return;
286   XBT_DEBUG("Really free communication %p; refcount is now %d", synchro,
287       synchro->comm.refcount);
288
289 #ifdef HAVE_LATENCY_BOUND_TRACKING
290   synchro->latency_limited = SIMIX_comm_is_latency_bounded( synchro ) ;
291 #endif
292
293   xbt_free(synchro->name);
294   SIMIX_comm_destroy_internal_actions(synchro);
295
296   if (synchro->comm.detached && synchro->state != SIMIX_DONE) {
297     /* the communication has failed and was detached:
298      * we have to free the buffer */
299     if (synchro->comm.clean_fun) {
300       synchro->comm.clean_fun(synchro->comm.src_buff);
301     }
302     synchro->comm.src_buff = NULL;
303   }
304
305   if(synchro->comm.rdv)
306     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
307
308   xbt_mallocator_release(simix_global->synchro_mallocator, synchro);
309 }
310
311 void SIMIX_comm_destroy_internal_actions(smx_synchro_t synchro)
312 {
313   if (synchro->comm.surf_comm){
314 #ifdef HAVE_LATENCY_BOUND_TRACKING
315     synchro->latency_limited = SIMIX_comm_is_latency_bounded(synchro);
316 #endif
317     surf_action_unref(synchro->comm.surf_comm);
318     synchro->comm.surf_comm = NULL;
319   }
320
321   if (synchro->comm.src_timeout){
322     surf_action_unref(synchro->comm.src_timeout);
323     synchro->comm.src_timeout = NULL;
324   }
325
326   if (synchro->comm.dst_timeout){
327     surf_action_unref(synchro->comm.dst_timeout);
328     synchro->comm.dst_timeout = NULL;
329   }
330 }
331
332 void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
333                                   double task_size, double rate,
334                                   void *src_buff, size_t src_buff_size,
335                                   int (*match_fun)(void *, void *,smx_synchro_t),
336                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
337                                   void *data, double timeout){
338   smx_synchro_t comm = SIMIX_comm_isend(src, rdv, task_size, rate,
339                                        src_buff, src_buff_size, match_fun, NULL, copy_data_fun,
340                                        data, 0);
341   SIMCALL_SET_MC_VALUE(simcall, 0);
342   simcall_HANDLER_comm_wait(simcall, comm, timeout);
343 }
344 smx_synchro_t simcall_HANDLER_comm_isend(smx_simcall_t simcall, smx_process_t src, smx_rdv_t rdv,
345                                   double task_size, double rate,
346                                   void *src_buff, size_t src_buff_size,
347                                   int (*match_fun)(void *, void *,smx_synchro_t),
348                                   void (*clean_fun)(void *),
349                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
350                                   void *data, int detached){
351   return SIMIX_comm_isend(src, rdv, task_size, rate, src_buff,
352                           src_buff_size, match_fun, clean_fun, copy_data_fun, data, detached);
353
354 }
355 smx_synchro_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
356                               double task_size, double rate,
357                               void *src_buff, size_t src_buff_size,
358                               int (*match_fun)(void *, void *,smx_synchro_t),
359                               void (*clean_fun)(void *), // used to free the synchro in case of problem after a detached send
360                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
361                               void *data,
362                               int detached)
363 {
364   XBT_DEBUG("send from %p", rdv);
365
366   /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
367   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_SEND);
368
369   /* Look for communication synchro matching our needs. We also provide a description of
370    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
371    *
372    * If it is not found then push our communication into the rendez-vous point */
373   smx_synchro_t other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_synchro);
374
375   if (!other_synchro) {
376     other_synchro = this_synchro;
377
378     if (rdv->permanent_receiver!=NULL){
379       //this mailbox is for small messages, which have to be sent right now
380       other_synchro->state = SIMIX_READY;
381       other_synchro->comm.dst_proc=rdv->permanent_receiver;
382       other_synchro->comm.refcount++;
383       xbt_fifo_push(rdv->done_comm_fifo,other_synchro);
384       other_synchro->comm.rdv=rdv;
385       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p", rdv, &(other_synchro->comm));
386
387     }else{
388       SIMIX_rdv_push(rdv, this_synchro);
389     }
390   } else {
391     XBT_DEBUG("Receive already pushed");
392
393     SIMIX_comm_destroy(this_synchro);
394     --smx_total_comms; // this creation was a pure waste
395
396     other_synchro->state = SIMIX_READY;
397     other_synchro->comm.type = SIMIX_COMM_READY;
398
399   }
400   xbt_fifo_push(src_proc->comms, other_synchro);
401
402   /* if the communication synchro is detached then decrease the refcount
403    * by one, so it will be eliminated by the receiver's destroy call */
404   if (detached) {
405     other_synchro->comm.detached = 1;
406     other_synchro->comm.refcount--;
407     other_synchro->comm.clean_fun = clean_fun;
408   } else {
409     other_synchro->comm.clean_fun = NULL;
410   }
411
412   /* Setup the communication synchro */
413   other_synchro->comm.src_proc = src_proc;
414   other_synchro->comm.task_size = task_size;
415   other_synchro->comm.rate = rate;
416   other_synchro->comm.src_buff = src_buff;
417   other_synchro->comm.src_buff_size = src_buff_size;
418   other_synchro->comm.src_data = data;
419
420   other_synchro->comm.match_fun = match_fun;
421   other_synchro->comm.copy_data_fun = copy_data_fun;
422
423
424   if (MC_is_active() || MC_record_replay_is_active()) {
425     other_synchro->state = SIMIX_RUNNING;
426     return (detached ? NULL : other_synchro);
427   }
428
429   SIMIX_comm_start(other_synchro);
430   return (detached ? NULL : other_synchro);
431 }
432
433 void simcall_HANDLER_comm_recv(smx_simcall_t simcall, smx_rdv_t rdv,
434                          void *dst_buff, size_t *dst_buff_size,
435                          int (*match_fun)(void *, void *, smx_synchro_t),
436                          void (*copy_data_fun)(smx_synchro_t, void*, size_t),
437                          void *data, double timeout, double rate)
438 {
439   smx_synchro_t comm = SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff,
440                                        dst_buff_size, match_fun, copy_data_fun, data, rate);
441   SIMCALL_SET_MC_VALUE(simcall, 0);
442   simcall_HANDLER_comm_wait(simcall, comm, timeout);
443 }
444
445 smx_synchro_t simcall_HANDLER_comm_irecv(smx_simcall_t simcall, smx_rdv_t rdv,
446                                   void *dst_buff, size_t *dst_buff_size,
447                                   int (*match_fun)(void *, void *, smx_synchro_t),
448                                   void (*copy_data_fun)(smx_synchro_t, void*, size_t),
449                                   void *data, double rate)
450 {
451   return SIMIX_comm_irecv(simcall->issuer, rdv, dst_buff, dst_buff_size,
452                           match_fun, copy_data_fun, data, rate);
453 }
454
455 smx_synchro_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
456                               void *dst_buff, size_t *dst_buff_size,
457                               int (*match_fun)(void *, void *, smx_synchro_t),
458                               void (*copy_data_fun)(smx_synchro_t, void*, size_t), // used to copy data if not default one
459                               void *data, double rate)
460 {
461   XBT_DEBUG("recv from %p %p", rdv, rdv->comm_fifo);
462   smx_synchro_t this_synchro = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
463
464   smx_synchro_t other_synchro;
465   //communication already done, get it inside the fifo of completed comms
466   //permanent receive v1
467   //int already_received=0;
468   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
469
470     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication");
471     //find a match in the already received fifo
472     other_synchro = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
473     //if not found, assume the receiver came first, register it to the mailbox in the classical way
474     if (!other_synchro)  {
475       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo");
476       other_synchro = this_synchro;
477       SIMIX_rdv_push(rdv, this_synchro);
478     }else{
479       if(other_synchro->comm.surf_comm &&       SIMIX_comm_get_remains(other_synchro)==0.0)
480       {
481         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it",&(other_synchro->comm));
482         other_synchro->state = SIMIX_DONE;
483         other_synchro->comm.type = SIMIX_COMM_DONE;
484         other_synchro->comm.rdv = NULL;
485       }/*else{
486          XBT_DEBUG("Not yet finished, we have to wait %d", xbt_fifo_size(rdv->comm_fifo));
487          }*/
488       other_synchro->comm.refcount--;
489       SIMIX_comm_destroy(this_synchro);
490       --smx_total_comms; // this creation was a pure waste
491     }
492   }else{
493     /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
494
495     /* Look for communication synchro matching our needs. We also provide a description of
496      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
497      *
498      * If it is not found then push our communication into the rendez-vous point */
499     other_synchro = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_synchro);
500
501     if (!other_synchro) {
502       XBT_DEBUG("Receive pushed first %d", xbt_fifo_size(rdv->comm_fifo));
503       other_synchro = this_synchro;
504       SIMIX_rdv_push(rdv, this_synchro);
505     } else {
506       SIMIX_comm_destroy(this_synchro);
507       --smx_total_comms; // this creation was a pure waste
508       other_synchro->state = SIMIX_READY;
509       other_synchro->comm.type = SIMIX_COMM_READY;
510       //other_synchro->comm.refcount--;
511     }
512     xbt_fifo_push(dst_proc->comms, other_synchro);
513   }
514
515   /* Setup communication synchro */
516   other_synchro->comm.dst_proc = dst_proc;
517   other_synchro->comm.dst_buff = dst_buff;
518   other_synchro->comm.dst_buff_size = dst_buff_size;
519   other_synchro->comm.dst_data = data;
520
521   if (rate != -1.0 &&
522       (other_synchro->comm.rate == -1.0 || rate < other_synchro->comm.rate))
523     other_synchro->comm.rate = rate;
524
525   other_synchro->comm.match_fun = match_fun;
526   other_synchro->comm.copy_data_fun = copy_data_fun;
527
528
529   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
530     SIMIX_comm_copy_data(other_synchro);*/
531
532
533   if (MC_is_active() || MC_record_replay_is_active()) {
534     other_synchro->state = SIMIX_RUNNING;
535     return other_synchro;
536   }
537
538   SIMIX_comm_start(other_synchro);
539   // }
540   return other_synchro;
541 }
542
543 smx_synchro_t simcall_HANDLER_comm_iprobe(smx_simcall_t simcall, smx_rdv_t rdv,
544                                    int type, int src, int tag,
545                                    int (*match_fun)(void *, void *, smx_synchro_t),
546                                    void *data){
547   return SIMIX_comm_iprobe(simcall->issuer, rdv, type, src, tag, match_fun, data);
548 }
549
550 smx_synchro_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int type, int src,
551                               int tag, int (*match_fun)(void *, void *, smx_synchro_t), void *data)
552 {
553   XBT_DEBUG("iprobe from %p %p", rdv, rdv->comm_fifo);
554   smx_synchro_t this_synchro;
555   int smx_type;
556   if(type == 1){
557     this_synchro=SIMIX_comm_new(SIMIX_COMM_SEND);
558     smx_type = SIMIX_COMM_RECEIVE;
559   } else{
560     this_synchro=SIMIX_comm_new(SIMIX_COMM_RECEIVE);
561     smx_type = SIMIX_COMM_SEND;
562   } 
563   smx_synchro_t other_synchro=NULL;
564   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
565     //find a match in the already received fifo
566       XBT_DEBUG("first try in the perm recv mailbox");
567
568     other_synchro = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, smx_type, match_fun, data, this_synchro);
569   }
570  // }else{
571     if(!other_synchro){
572         XBT_DEBUG("try in the normal mailbox");
573         other_synchro = SIMIX_fifo_probe_comm(rdv->comm_fifo, smx_type, match_fun, data, this_synchro);
574     }
575 //  }
576   if(other_synchro)other_synchro->comm.refcount--;
577
578   SIMIX_comm_destroy(this_synchro);
579   --smx_total_comms;
580   return other_synchro;
581 }
582
583 void simcall_HANDLER_comm_wait(smx_simcall_t simcall, smx_synchro_t synchro, double timeout)
584 {
585   /* the simcall may be a wait, a send or a recv */
586   surf_action_t sleep;
587
588   /* Associate this simcall to the wait synchro */
589   XBT_DEBUG("simcall_HANDLER_comm_wait, %p", synchro);
590
591   xbt_fifo_push(synchro->simcalls, simcall);
592   simcall->issuer->waiting_synchro = synchro;
593
594   if (MC_is_active() || MC_record_replay_is_active()) {
595     int idx = SIMCALL_GET_MC_VALUE(simcall);
596     if (idx == 0) {
597       synchro->state = SIMIX_DONE;
598     } else {
599       /* If we reached this point, the wait simcall must have a timeout */
600       /* Otherwise it shouldn't be enabled and executed by the MC */
601       if (timeout == -1)
602         THROW_IMPOSSIBLE;
603
604       if (synchro->comm.src_proc == simcall->issuer)
605         synchro->state = SIMIX_SRC_TIMEOUT;
606       else
607         synchro->state = SIMIX_DST_TIMEOUT;
608     }
609
610     SIMIX_comm_finish(synchro);
611     return;
612   }
613
614   /* If the synchro has already finish perform the error handling, */
615   /* otherwise set up a waiting timeout on the right side          */
616   if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
617     SIMIX_comm_finish(synchro);
618   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
619     sleep = surf_workstation_sleep(simcall->issuer->smx_host, timeout);
620     surf_action_set_data(sleep, synchro);
621
622     if (simcall->issuer == synchro->comm.src_proc)
623       synchro->comm.src_timeout = sleep;
624     else
625       synchro->comm.dst_timeout = sleep;
626   }
627 }
628
629 void simcall_HANDLER_comm_test(smx_simcall_t simcall, smx_synchro_t synchro)
630 {
631   if(MC_is_active() || MC_record_replay_is_active()){
632     simcall_comm_test__set__result(simcall, synchro->comm.src_proc && synchro->comm.dst_proc);
633     if(simcall_comm_test__get__result(simcall)){
634       synchro->state = SIMIX_DONE;
635       xbt_fifo_push(synchro->simcalls, simcall);
636       SIMIX_comm_finish(synchro);
637     }else{
638       SIMIX_simcall_answer(simcall);
639     }
640     return;
641   }
642
643   simcall_comm_test__set__result(simcall, (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING));
644   if (simcall_comm_test__get__result(simcall)) {
645     xbt_fifo_push(synchro->simcalls, simcall);
646     SIMIX_comm_finish(synchro);
647   } else {
648     SIMIX_simcall_answer(simcall);
649   }
650 }
651
652 void simcall_HANDLER_comm_testany(smx_simcall_t simcall, xbt_dynar_t synchros)
653 {
654   unsigned int cursor;
655   smx_synchro_t synchro;
656   simcall_comm_testany__set__result(simcall, -1);
657
658   if (MC_is_active() || MC_record_replay_is_active()){
659     int idx = SIMCALL_GET_MC_VALUE(simcall);
660     if(idx == -1){
661       SIMIX_simcall_answer(simcall);
662     }else{
663       synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
664       simcall_comm_testany__set__result(simcall, idx);
665       xbt_fifo_push(synchro->simcalls, simcall);
666       synchro->state = SIMIX_DONE;
667       SIMIX_comm_finish(synchro);
668     }
669     return;
670   }
671
672   xbt_dynar_foreach(simcall_comm_testany__get__comms(simcall), cursor,synchro) {
673     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING) {
674       simcall_comm_testany__set__result(simcall, cursor);
675       xbt_fifo_push(synchro->simcalls, simcall);
676       SIMIX_comm_finish(synchro);
677       return;
678     }
679   }
680   SIMIX_simcall_answer(simcall);
681 }
682
683 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, xbt_dynar_t synchros)
684 {
685   smx_synchro_t synchro;
686   unsigned int cursor = 0;
687
688   if (MC_is_active() || MC_record_replay_is_active()){
689     int idx = SIMCALL_GET_MC_VALUE(simcall);
690     synchro = xbt_dynar_get_as(synchros, idx, smx_synchro_t);
691     xbt_fifo_push(synchro->simcalls, simcall);
692     simcall_comm_waitany__set__result(simcall, idx);
693     synchro->state = SIMIX_DONE;
694     SIMIX_comm_finish(synchro);
695     return;
696   }
697
698   xbt_dynar_foreach(synchros, cursor, synchro){
699     /* associate this simcall to the the synchro */
700     xbt_fifo_push(synchro->simcalls, simcall);
701
702     /* see if the synchro is already finished */
703     if (synchro->state != SIMIX_WAITING && synchro->state != SIMIX_RUNNING){
704       SIMIX_comm_finish(synchro);
705       break;
706     }
707   }
708 }
709
710 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
711 {
712   smx_synchro_t synchro;
713   unsigned int cursor = 0;
714   xbt_dynar_t synchros = simcall_comm_waitany__get__comms(simcall);
715
716   xbt_dynar_foreach(synchros, cursor, synchro) {
717     xbt_fifo_remove(synchro->simcalls, simcall);
718   }
719 }
720
721 /**
722  *  \brief Starts the simulation of a communication synchro.
723  *  \param synchro the communication synchro
724  */
725 static XBT_INLINE void SIMIX_comm_start(smx_synchro_t synchro)
726 {
727   /* If both the sender and the receiver are already there, start the communication */
728   if (synchro->state == SIMIX_READY) {
729
730     smx_host_t sender = synchro->comm.src_proc->smx_host;
731     smx_host_t receiver = synchro->comm.dst_proc->smx_host;
732
733     XBT_DEBUG("Starting communication %p from '%s' to '%s'", synchro,
734               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
735
736     synchro->comm.surf_comm = surf_workstation_model_communicate(surf_workstation_model,
737                                                                     sender, receiver,
738                                                                     synchro->comm.task_size, synchro->comm.rate);
739
740     surf_action_set_data(synchro->comm.surf_comm, synchro);
741
742     synchro->state = SIMIX_RUNNING;
743
744     /* If a link is failed, detect it immediately */
745     if (surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
746       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
747                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
748       synchro->state = SIMIX_LINK_FAILURE;
749       SIMIX_comm_destroy_internal_actions(synchro);
750     }
751
752     /* If any of the process is suspend, create the synchro but stop its execution,
753        it will be restarted when the sender process resume */
754     if (SIMIX_process_is_suspended(synchro->comm.src_proc) ||
755         SIMIX_process_is_suspended(synchro->comm.dst_proc)) {
756       /* FIXME: check what should happen with the synchro state */
757
758       if (SIMIX_process_is_suspended(synchro->comm.src_proc))
759         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
760                   SIMIX_host_get_name(synchro->comm.src_proc->smx_host), synchro->comm.src_proc->name);
761       else
762         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
763                   SIMIX_host_get_name(synchro->comm.dst_proc->smx_host), synchro->comm.dst_proc->name);
764
765       surf_action_suspend(synchro->comm.surf_comm);
766
767     }
768   }
769 }
770
771 /**
772  * \brief Answers the SIMIX simcalls associated to a communication synchro.
773  * \param synchro a finished communication synchro
774  */
775 void SIMIX_comm_finish(smx_synchro_t synchro)
776 {
777   unsigned int destroy_count = 0;
778   smx_simcall_t simcall;
779
780
781   while ((simcall = xbt_fifo_shift(synchro->simcalls))) {
782
783     /* If a waitany simcall is waiting for this synchro to finish, then remove
784        it from the other synchros in the waitany list. Afterwards, get the
785        position of the actual synchro in the waitany dynar and
786        return it as the result of the simcall */
787
788     if (simcall->call == SIMCALL_NONE) //FIXME: maybe a better way to handle this case
789       continue; // if process handling comm is killed
790     if (simcall->call == SIMCALL_COMM_WAITANY) {
791       SIMIX_waitany_remove_simcall_from_actions(simcall);
792       if (!MC_is_active() && !MC_record_replay_is_active())
793         simcall_comm_waitany__set__result(simcall, xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro));
794     }
795
796     /* If the synchro is still in a rendez-vous point then remove from it */
797     if (synchro->comm.rdv)
798       SIMIX_rdv_remove(synchro->comm.rdv, synchro);
799
800     XBT_DEBUG("SIMIX_comm_finish: synchro state = %d", (int)synchro->state);
801
802     /* Check out for errors */
803     switch (synchro->state) {
804
805     case SIMIX_DONE:
806       XBT_DEBUG("Communication %p complete!", synchro);
807       SIMIX_comm_copy_data(synchro);
808       break;
809
810     case SIMIX_SRC_TIMEOUT:
811       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
812                     "Communication timeouted because of sender");
813       break;
814
815     case SIMIX_DST_TIMEOUT:
816       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
817                     "Communication timeouted because of receiver");
818       break;
819
820     case SIMIX_SRC_HOST_FAILURE:
821       if (simcall->issuer == synchro->comm.src_proc)
822         simcall->issuer->context->iwannadie = 1;
823 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
824       else
825         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
826       break;
827
828     case SIMIX_DST_HOST_FAILURE:
829       if (simcall->issuer == synchro->comm.dst_proc)
830         simcall->issuer->context->iwannadie = 1;
831 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
832       else
833         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
834       break;
835
836     case SIMIX_LINK_FAILURE:
837       XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
838                 synchro,
839                 synchro->comm.src_proc ? sg_host_name(synchro->comm.src_proc->smx_host) : NULL,
840                 synchro->comm.dst_proc ? sg_host_name(synchro->comm.dst_proc->smx_host) : NULL,
841                 simcall->issuer->name, simcall->issuer, synchro->comm.detached);
842       if (synchro->comm.src_proc == simcall->issuer) {
843         XBT_DEBUG("I'm source");
844       } else if (synchro->comm.dst_proc == simcall->issuer) {
845         XBT_DEBUG("I'm dest");
846       } else {
847         XBT_DEBUG("I'm neither source nor dest");
848       }
849       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
850       break;
851
852     case SIMIX_CANCELED:
853       if (simcall->issuer == synchro->comm.dst_proc)
854         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
855                       "Communication canceled by the sender");
856       else
857         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
858                       "Communication canceled by the receiver");
859       break;
860
861     default:
862       xbt_die("Unexpected synchro state in SIMIX_comm_finish: %d", (int)synchro->state);
863     }
864
865     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
866     if (simcall->issuer->doexception) {
867       if (simcall->call == SIMCALL_COMM_WAITANY) {
868         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_waitany__get__comms(simcall), &synchro);
869       }
870       else if (simcall->call == SIMCALL_COMM_TESTANY) {
871         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall_comm_testany__get__comms(simcall), &synchro);
872       }
873     }
874
875     if (surf_resource_get_state(surf_workstation_resource_priv(simcall->issuer->smx_host)) != SURF_RESOURCE_ON) {
876       simcall->issuer->context->iwannadie = 1;
877     }
878
879     simcall->issuer->waiting_synchro = NULL;
880     xbt_fifo_remove(simcall->issuer->comms, synchro);
881     if(synchro->comm.detached){
882       if(simcall->issuer == synchro->comm.src_proc){
883         if(synchro->comm.dst_proc)
884           xbt_fifo_remove(synchro->comm.dst_proc->comms, synchro);
885       }
886       if(simcall->issuer == synchro->comm.dst_proc){
887         if(synchro->comm.src_proc)
888           xbt_fifo_remove(synchro->comm.src_proc->comms, synchro);
889       }
890     }
891     SIMIX_simcall_answer(simcall);
892     destroy_count++;
893   }
894
895   while (destroy_count-- > 0)
896     SIMIX_comm_destroy(synchro);
897 }
898
899 /**
900  * \brief This function is called when a Surf communication synchro is finished.
901  * \param synchro the corresponding Simix communication
902  */
903 void SIMIX_post_comm(smx_synchro_t synchro)
904 {
905   /* Update synchro state */
906   if (synchro->comm.src_timeout &&
907       surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_DONE)
908     synchro->state = SIMIX_SRC_TIMEOUT;
909   else if (synchro->comm.dst_timeout &&
910           surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_DONE)
911     synchro->state = SIMIX_DST_TIMEOUT;
912   else if (synchro->comm.src_timeout &&
913           surf_action_get_state(synchro->comm.src_timeout) == SURF_ACTION_FAILED)
914     synchro->state = SIMIX_SRC_HOST_FAILURE;
915   else if (synchro->comm.dst_timeout &&
916       surf_action_get_state(synchro->comm.dst_timeout) == SURF_ACTION_FAILED)
917     synchro->state = SIMIX_DST_HOST_FAILURE;
918   else if (synchro->comm.surf_comm &&
919           surf_action_get_state(synchro->comm.surf_comm) == SURF_ACTION_FAILED) {
920     XBT_DEBUG("Puta madre. Surf says that the link broke");
921     synchro->state = SIMIX_LINK_FAILURE;
922   } else
923     synchro->state = SIMIX_DONE;
924
925   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
926             synchro, (int)synchro->state, synchro->comm.src_proc, synchro->comm.dst_proc, synchro->comm.detached);
927
928   /* destroy the surf actions associated with the Simix communication */
929   SIMIX_comm_destroy_internal_actions(synchro);
930
931   /* if there are simcalls associated with the synchro, then answer them */
932   if (xbt_fifo_size(synchro->simcalls)) {
933     SIMIX_comm_finish(synchro);
934   }
935 }
936
937 void SIMIX_comm_cancel(smx_synchro_t synchro)
938 {
939   /* if the synchro is a waiting state means that it is still in a rdv */
940   /* so remove from it and delete it */
941   if (synchro->state == SIMIX_WAITING) {
942     SIMIX_rdv_remove(synchro->comm.rdv, synchro);
943     synchro->state = SIMIX_CANCELED;
944   }
945   else if (!MC_is_active() /* when running the MC there are no surf actions */
946            && !MC_record_replay_is_active()
947            && (synchro->state == SIMIX_READY || synchro->state == SIMIX_RUNNING)) {
948
949     surf_action_cancel(synchro->comm.surf_comm);
950   }
951 }
952
953 void SIMIX_comm_suspend(smx_synchro_t synchro)
954 {
955   /*FIXME: shall we suspend also the timeout synchro? */
956   if (synchro->comm.surf_comm)
957     surf_action_suspend(synchro->comm.surf_comm);
958   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
959 }
960
961 void SIMIX_comm_resume(smx_synchro_t synchro)
962 {
963   /*FIXME: check what happen with the timeouts */
964   if (synchro->comm.surf_comm)
965     surf_action_resume(synchro->comm.surf_comm);
966   /* in the other case, the synchro were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
967 }
968
969
970 /************* synchro Getters **************/
971
972 /**
973  *  \brief get the amount remaining from the communication
974  *  \param synchro The communication
975  */
976 double SIMIX_comm_get_remains(smx_synchro_t synchro)
977 {
978   double remains;
979
980   if(!synchro){
981     return 0;
982   }
983
984   switch (synchro->state) {
985
986   case SIMIX_RUNNING:
987     remains = surf_action_get_remains(synchro->comm.surf_comm);
988     break;
989
990   case SIMIX_WAITING:
991   case SIMIX_READY:
992     remains = 0; /*FIXME: check what should be returned */
993     break;
994
995   default:
996     remains = 0; /*FIXME: is this correct? */
997     break;
998   }
999   return remains;
1000 }
1001
1002 e_smx_state_t SIMIX_comm_get_state(smx_synchro_t synchro)
1003 {
1004   return synchro->state;
1005 }
1006
1007 /**
1008  *  \brief Return the user data associated to the sender of the communication
1009  *  \param synchro The communication
1010  *  \return the user data
1011  */
1012 void* SIMIX_comm_get_src_data(smx_synchro_t synchro)
1013 {
1014   return synchro->comm.src_data;
1015 }
1016
1017 /**
1018  *  \brief Return the user data associated to the receiver of the communication
1019  *  \param synchro The communication
1020  *  \return the user data
1021  */
1022 void* SIMIX_comm_get_dst_data(smx_synchro_t synchro)
1023 {
1024   return synchro->comm.dst_data;
1025 }
1026
1027 smx_process_t SIMIX_comm_get_src_proc(smx_synchro_t synchro)
1028 {
1029   return synchro->comm.src_proc;
1030 }
1031
1032 smx_process_t SIMIX_comm_get_dst_proc(smx_synchro_t synchro)
1033 {
1034   return synchro->comm.dst_proc;
1035 }
1036
1037 #ifdef HAVE_LATENCY_BOUND_TRACKING
1038 /**
1039  *  \brief verify if communication is latency bounded
1040  *  \param comm The communication
1041  */
1042 int SIMIX_comm_is_latency_bounded(smx_synchro_t synchro)
1043 {
1044   if(!synchro){
1045     return 0;
1046   }
1047   if (synchro->comm.surf_comm){
1048     XBT_DEBUG("Getting latency limited for surf_action (%p)", synchro->comm.surf_comm);
1049     synchro->latency_limited = surf_network_action_get_latency_limited(synchro->comm.surf_comm);
1050     XBT_DEBUG("synchro limited is %d", synchro->latency_limited);
1051   }
1052   return synchro->latency_limited;
1053 }
1054 #endif
1055
1056 /******************************************************************************/
1057 /*                    SIMIX_comm_copy_data callbacks                       */
1058 /******************************************************************************/
1059 static void (*SIMIX_comm_copy_data_callback) (smx_synchro_t, void*, size_t) =
1060   &SIMIX_comm_copy_pointer_callback;
1061
1062 void
1063 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_synchro_t, void*, size_t))
1064 {
1065   SIMIX_comm_copy_data_callback = callback;
1066 }
1067
1068 void SIMIX_comm_copy_pointer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1069 {
1070   xbt_assert((buff_size == sizeof(void *)),
1071              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
1072   *(void **) (comm->comm.dst_buff) = buff;
1073 }
1074
1075 void SIMIX_comm_copy_buffer_callback(smx_synchro_t comm, void* buff, size_t buff_size)
1076 {
1077   XBT_DEBUG("Copy the data over");
1078   memcpy(comm->comm.dst_buff, buff, buff_size);
1079   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
1080     xbt_free(buff);
1081     comm->comm.src_buff = NULL;
1082   }
1083 }
1084
1085
1086 /**
1087  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1088  *  \param comm The communication
1089  */
1090 void SIMIX_comm_copy_data(smx_synchro_t comm)
1091 {
1092   size_t buff_size = comm->comm.src_buff_size;
1093   /* If there is no data to be copy then return */
1094   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1095     return;
1096
1097   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1098             comm,
1099             comm->comm.src_proc ? sg_host_name(comm->comm.src_proc->smx_host) : "a finished process",
1100             comm->comm.src_buff,
1101             comm->comm.dst_proc ? sg_host_name(comm->comm.dst_proc->smx_host) : "a finished process",
1102             comm->comm.dst_buff, buff_size);
1103
1104   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1105   if (comm->comm.dst_buff_size)
1106     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1107
1108   /* Update the receiver's buffer size to the copied amount */
1109   if (comm->comm.dst_buff_size)
1110     *comm->comm.dst_buff_size = buff_size;
1111
1112   if (buff_size > 0){
1113       if(comm->comm.copy_data_fun)
1114         comm->comm.copy_data_fun (comm, comm->comm.src_buff, buff_size);
1115       else
1116         SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1117   }
1118
1119
1120   /* Set the copied flag so we copy data only once */
1121   /* (this function might be called from both communication ends) */
1122   comm->comm.copied = 1;
1123 }