Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[smpi] add a gestion of non-contignous data
[simgrid.git] / src / simix / smx_network.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "smx_private.h"
8 #include "xbt/log.h"
9 #include "mc/mc.h"
10 #include "xbt/dict.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 static xbt_dict_t rdv_points = NULL;
16 unsigned long int smx_total_comms = 0;
17
18 static void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall);
19 static void SIMIX_comm_copy_data(smx_action_t comm);
20 static smx_action_t SIMIX_comm_new(e_smx_comm_type_t type);
21 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm);
22 static smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
23                                         int (*match_fun)(void *, void *,smx_action_t),
24                                         void *user_data, smx_action_t my_action);
25 static smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
26                                         int (*match_fun)(void *, void *,smx_action_t),
27                                         void *user_data, smx_action_t my_action);
28 static void SIMIX_rdv_free(void *data);
29
30 void SIMIX_network_init(void)
31 {
32   rdv_points = xbt_dict_new_homogeneous(SIMIX_rdv_free);
33 }
34
35 void SIMIX_network_exit(void)
36 {
37   xbt_dict_free(&rdv_points);
38 }
39
40 /******************************************************************************/
41 /*                           Rendez-Vous Points                               */
42 /******************************************************************************/
43
44 smx_rdv_t SIMIX_rdv_create(const char *name)
45 {
46   /* two processes may have pushed the same rdv_create simcall at the same time */
47   smx_rdv_t rdv = name ? xbt_dict_get_or_null(rdv_points, name) : NULL;
48
49   if (!rdv) {
50     rdv = xbt_new0(s_smx_rvpoint_t, 1);
51     rdv->name = name ? xbt_strdup(name) : NULL;
52     rdv->comm_fifo = xbt_fifo_new();
53     rdv->done_comm_fifo = xbt_fifo_new();
54     rdv->permanent_receiver=NULL;
55
56     XBT_DEBUG("Creating a mailbox at %p with name %s\n", rdv, name);
57
58     if (rdv->name)
59       xbt_dict_set(rdv_points, rdv->name, rdv, NULL);
60   }
61   return rdv;
62 }
63
64 void SIMIX_rdv_destroy(smx_rdv_t rdv)
65 {
66   if (rdv->name)
67     xbt_dict_remove(rdv_points, rdv->name);
68 }
69
70 void SIMIX_rdv_free(void *data)
71 {
72   XBT_DEBUG("rdv free %p", data);
73   smx_rdv_t rdv = (smx_rdv_t) data;
74   xbt_free(rdv->name);
75   xbt_fifo_free(rdv->comm_fifo);
76   xbt_fifo_free(rdv->done_comm_fifo);
77
78   xbt_free(rdv);  
79 }
80
81 xbt_dict_t SIMIX_get_rdv_points()
82 {
83   return rdv_points;
84 }
85
86 smx_rdv_t SIMIX_rdv_get_by_name(const char *name)
87 {
88   return xbt_dict_get_or_null(rdv_points, name);
89 }
90
91 int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host)
92 {
93   smx_action_t comm = NULL;
94   xbt_fifo_item_t item = NULL;
95   int count = 0;
96
97   xbt_fifo_foreach(rdv->comm_fifo, item, comm, smx_action_t) {
98     if (comm->comm.src_proc->smx_host == host)
99       count++;
100   }
101
102   return count;
103 }
104
105 smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv)
106 {
107   return xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo));
108 }
109
110 /**
111  *  \brief get the receiver (process associated to the mailbox)
112  *  \param rdv The rendez-vous point
113  *  \return process The receiving process (NULL if not set)
114  */
115 smx_process_t SIMIX_rdv_get_receiver(smx_rdv_t rdv)
116 {
117   return rdv->permanent_receiver;
118 }
119
120 /**
121  *  \brief set the receiver of the rendez vous point to allow eager sends
122  *  \param rdv The rendez-vous point
123  *  \param process The receiving process
124  */
125 void SIMIX_rdv_set_receiver(smx_rdv_t rdv , smx_process_t process)
126 {
127   rdv->permanent_receiver=process;
128 }
129
130 /**
131  *  \brief Pushes a communication action into a rendez-vous point
132  *  \param rdv The rendez-vous point
133  *  \param comm The communication action
134  */
135 static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm)
136 {
137   xbt_fifo_push(rdv->comm_fifo, comm);
138   comm->comm.rdv = rdv;
139 }
140
141 /**
142  *  \brief Removes a communication action from a rendez-vous point
143  *  \param rdv The rendez-vous point
144  *  \param comm The communication action
145  */
146 XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm)
147 {
148   xbt_fifo_remove(rdv->comm_fifo, comm);
149   comm->comm.rdv = NULL;
150 }
151
152 /**
153  *  \brief Checks if there is a communication action queued in a fifo matching our needs
154  *  \param type The type of communication we are looking for (comm_send, comm_recv)
155  *  \return The communication action if found, NULL otherwise
156  */
157 smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
158                                  int (*match_fun)(void *, void *,smx_action_t),
159                                  void *this_user_data, smx_action_t my_action)
160 {
161   smx_action_t action;
162   xbt_fifo_item_t item;
163   void* other_user_data = NULL;
164
165   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
166     if (action->comm.type == SIMIX_COMM_SEND) {
167       other_user_data = action->comm.src_data;
168     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
169       other_user_data = action->comm.dst_data;
170     }
171     if (action->comm.type == type &&
172         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
173         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
174       XBT_DEBUG("Found a matching communication action %p", action);
175       xbt_fifo_remove_item(fifo, item);
176       xbt_fifo_free_item(item);
177       action->comm.refcount++;
178       action->comm.rdv = NULL;
179       return action;
180     }
181     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
182               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
183               action, (int)action->comm.type, (int)type);
184   }
185   XBT_DEBUG("No matching communication action found");
186   return NULL;
187 }
188
189
190 /**
191  *  \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there
192  *  \param type The type of communication we are looking for (comm_send, comm_recv)
193  *  \return The communication action if found, NULL otherwise
194  */
195 smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type,
196                                  int (*match_fun)(void *, void *,smx_action_t),
197                                  void *this_user_data, smx_action_t my_action)
198 {
199   smx_action_t action;
200   xbt_fifo_item_t item;
201   void* other_user_data = NULL;
202
203   xbt_fifo_foreach(fifo, item, action, smx_action_t) {
204     if (action->comm.type == SIMIX_COMM_SEND) {
205       other_user_data = action->comm.src_data;
206     } else if (action->comm.type == SIMIX_COMM_RECEIVE) {
207       other_user_data = action->comm.dst_data;
208     }
209     if (action->comm.type == type &&
210         (!match_fun              ||              match_fun(this_user_data,  other_user_data, action)) &&
211         (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data,  my_action))) {
212       XBT_DEBUG("Found a matching communication action %p", action);
213       action->comm.refcount++;
214
215       return action;
216     }
217     XBT_DEBUG("Sorry, communication action %p does not match our needs:"
218               " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)",
219               action, (int)action->comm.type, (int)type);
220   }
221   XBT_DEBUG("No matching communication action found");
222   return NULL;
223 }
224 /******************************************************************************/
225 /*                            Communication Actions                            */
226 /******************************************************************************/
227
228 /**
229  *  \brief Creates a new communicate action
230  *  \param type The direction of communication (comm_send, comm_recv)
231  *  \return The new communicate action
232  */
233 smx_action_t SIMIX_comm_new(e_smx_comm_type_t type)
234 {
235   smx_action_t act;
236
237   /* alloc structures */
238   act = xbt_mallocator_get(simix_global->action_mallocator);
239
240   act->type = SIMIX_ACTION_COMMUNICATE;
241   act->state = SIMIX_WAITING;
242
243   /* set communication */
244   act->comm.type = type;
245   act->comm.refcount = 1;
246
247 #ifdef HAVE_LATENCY_BOUND_TRACKING
248   //initialize with unknown value
249   act->latency_limited = -1;
250 #endif
251
252 #ifdef HAVE_TRACING
253   act->category = NULL;
254 #endif
255
256   XBT_DEBUG("Create communicate action %p", act);
257   ++smx_total_comms;
258
259   return act;
260 }
261
262 /**
263  *  \brief Destroy a communicate action
264  *  \param action The communicate action to be destroyed
265  */
266 void SIMIX_comm_destroy(smx_action_t action)
267 {
268   XBT_DEBUG("Destroy action %p (refcount: %d), state: %d",
269             action, action->comm.refcount, (int)action->state);
270
271   if (action->comm.refcount <= 0) {
272     xbt_backtrace_display_current();
273     xbt_die("The refcount of comm %p is already 0 before decreasing it. "
274             "That's a bug! If you didn't test and/or wait the same communication twice in your code, then the bug is SimGrid's...", action);
275   }
276   action->comm.refcount--;
277   if (action->comm.refcount > 0)
278     return;
279   XBT_DEBUG("Really free communication %p; refcount is now %d", action,
280             action->comm.refcount);
281
282 #ifdef HAVE_LATENCY_BOUND_TRACKING
283   action->latency_limited = SIMIX_comm_is_latency_bounded( action ) ;
284 #endif
285
286   xbt_free(action->name);
287   SIMIX_comm_destroy_internal_actions(action);
288
289   if (action->comm.detached && action->state != SIMIX_DONE) {
290     /* the communication has failed and was detached:
291      * we have to free the buffer */
292     if (action->comm.clean_fun) {
293       action->comm.clean_fun(action->comm.src_buff);
294     }
295     action->comm.src_buff = NULL;
296   }
297
298   xbt_mallocator_release(simix_global->action_mallocator, action);
299 }
300
301 void SIMIX_comm_destroy_internal_actions(smx_action_t action)
302 {
303   if (action->comm.surf_comm){
304 #ifdef HAVE_LATENCY_BOUND_TRACKING
305     action->latency_limited = SIMIX_comm_is_latency_bounded(action);
306 #endif
307     action->comm.surf_comm->model_type->action_unref(action->comm.surf_comm);
308     action->comm.surf_comm = NULL;
309   }
310
311   if (action->comm.src_timeout){
312     action->comm.src_timeout->model_type->action_unref(action->comm.src_timeout);
313     action->comm.src_timeout = NULL;
314   }
315
316   if (action->comm.dst_timeout){
317     action->comm.dst_timeout->model_type->action_unref(action->comm.dst_timeout);
318     action->comm.dst_timeout = NULL;
319   }
320 }
321
322 smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv,
323                               double task_size, double rate,
324                               void *src_buff, size_t src_buff_size,
325                               int (*match_fun)(void *, void *,smx_action_t),
326                               void (*clean_fun)(void *), // used to free the action in case of problem after a detached send
327                               void *data,
328                               int detached)
329 {
330   XBT_DEBUG("send from %p\n", rdv);
331
332   /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
333   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_SEND);
334
335   /* Look for communication action matching our needs. We also provide a description of
336    * ourself so that the other side also gets a chance of choosing if it wants to match with us.
337    *
338    * If it is not found then push our communication into the rendez-vous point */
339   smx_action_t other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_RECEIVE, match_fun, data, this_action);
340
341   if (!other_action) {
342     other_action = this_action;
343
344     if (rdv->permanent_receiver!=NULL){
345       //this mailbox is for small messages, which have to be sent right now
346       other_action->state = SIMIX_READY;
347       other_action->comm.dst_proc=rdv->permanent_receiver;
348       other_action->comm.refcount++;
349       other_action->comm.rdv = rdv;
350       xbt_fifo_push(rdv->done_comm_fifo,other_action);
351       other_action->comm.rdv=rdv;
352       XBT_DEBUG("pushing a message into the permanent receive fifo %p, comm %p \n", rdv, &(other_action->comm));
353
354     }else{
355       SIMIX_rdv_push(rdv, this_action);
356     }
357   } else {
358     XBT_DEBUG("Receive already pushed\n");
359
360     SIMIX_comm_destroy(this_action);
361     --smx_total_comms; // this creation was a pure waste
362
363     other_action->state = SIMIX_READY;
364     other_action->comm.type = SIMIX_COMM_READY;
365
366   }
367   xbt_fifo_push(src_proc->comms, other_action);
368
369   /* if the communication action is detached then decrease the refcount
370    * by one, so it will be eliminated by the receiver's destroy call */
371   if (detached) {
372     other_action->comm.detached = 1;
373     other_action->comm.refcount--;
374     other_action->comm.clean_fun = clean_fun;
375   } else {
376     other_action->comm.clean_fun = NULL;
377   }
378
379   /* Setup the communication action */
380   other_action->comm.src_proc = src_proc;
381   other_action->comm.task_size = task_size;
382   other_action->comm.rate = rate;
383   other_action->comm.src_buff = src_buff;
384   other_action->comm.src_buff_size = src_buff_size;
385   other_action->comm.src_data = data;
386
387   other_action->comm.match_fun = match_fun;
388
389   if (MC_IS_ENABLED) {
390     other_action->state = SIMIX_RUNNING;
391     return other_action;
392   }
393
394   SIMIX_comm_start(other_action);
395   return (detached ? NULL : other_action);
396 }
397
398 smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv,
399                               void *dst_buff, size_t *dst_buff_size,
400                               int (*match_fun)(void *, void *, smx_action_t), void *data)
401 {
402   XBT_DEBUG("recv from %p %p\n", rdv, rdv->comm_fifo);
403   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
404
405   smx_action_t other_action;
406   //communication already done, get it inside the fifo of completed comms
407   //permanent receive v1
408   //int already_received=0;
409   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
410
411     XBT_DEBUG("We have a comm that has probably already been received, trying to match it, to skip the communication\n");
412     //find a match in the already received fifo
413     other_action = SIMIX_fifo_get_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
414     //if not found, assume the receiver came first, register it to the mailbox in the classical way
415     if (!other_action)  {
416       XBT_DEBUG("We have messages in the permanent receive list, but not the one we are looking for, pushing request into fifo\n");
417       other_action = this_action;
418       SIMIX_rdv_push(rdv, this_action);
419     }else{
420       if(other_action->comm.surf_comm &&        SIMIX_comm_get_remains(other_action)==0.0)
421       {
422         XBT_DEBUG("comm %p has been already sent, and is finished, destroy it\n",&(other_action->comm));
423         other_action->state = SIMIX_DONE;
424         other_action->comm.type = SIMIX_COMM_DONE;
425         other_action->comm.rdv = NULL;
426         //SIMIX_comm_destroy(this_action);
427         //--smx_total_comms; // this creation was a pure waste
428         //already_received=1;
429         //other_action->comm.refcount--;
430       }/*else{
431          XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo));
432          }*/
433      // other_action->comm.refcount--;
434       SIMIX_comm_destroy(this_action);
435       --smx_total_comms; // this creation was a pure waste
436     }
437   }else{
438     /* Prepare an action describing us, so that it gets passed to the user-provided filter of other side */
439
440     /* Look for communication action matching our needs. We also provide a description of
441      * ourself so that the other side also gets a chance of choosing if it wants to match with us.
442      *
443      * If it is not found then push our communication into the rendez-vous point */
444     other_action = SIMIX_fifo_get_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
445
446     if (!other_action) {
447       XBT_DEBUG("Receive pushed first %d\n", xbt_fifo_size(rdv->comm_fifo));
448       other_action = this_action;
449       SIMIX_rdv_push(rdv, this_action);
450     } else {
451       SIMIX_comm_destroy(this_action);
452       --smx_total_comms; // this creation was a pure waste
453       other_action->state = SIMIX_READY;
454       other_action->comm.type = SIMIX_COMM_READY;
455    //   other_action->comm.refcount--;
456     }
457     xbt_fifo_push(dst_proc->comms, other_action);
458   }
459
460   /* Setup communication action */
461   other_action->comm.dst_proc = dst_proc;
462   other_action->comm.dst_buff = dst_buff;
463   other_action->comm.dst_buff_size = dst_buff_size;
464   other_action->comm.dst_data = data;
465
466   other_action->comm.match_fun = match_fun;
467
468
469   /*if(already_received)//do the actual copy, because the first one after the comm didn't have all the info
470     SIMIX_comm_copy_data(other_action);*/
471
472
473   if (MC_IS_ENABLED) {
474     other_action->state = SIMIX_RUNNING;
475     return other_action;
476   }
477
478   SIMIX_comm_start(other_action);
479   // }
480   return other_action;
481 }
482
483
484 smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src,
485                               int tag, int (*match_fun)(void *, void *, smx_action_t), void *data)
486 {
487   XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo);
488   smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE);
489
490   smx_action_t other_action;
491   if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){
492     //find a match in the already received fifo
493     other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
494   }else{
495     other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action);
496   }
497   if(other_action)other_action->comm.refcount--;
498
499   SIMIX_comm_destroy(this_action);
500   --smx_total_comms;
501   return other_action;
502 }
503
504 void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx)
505 {
506
507   /* the simcall may be a wait, a send or a recv */
508   surf_action_t sleep;
509
510   /* Associate this simcall to the wait action */
511   XBT_DEBUG("SIMIX_pre_comm_wait, %p", action);
512
513   xbt_fifo_push(action->simcalls, simcall);
514   simcall->issuer->waiting_action = action;
515
516   if (MC_IS_ENABLED) {
517     if (idx == 0) {
518       action->state = SIMIX_DONE;
519     } else {
520       /* If we reached this point, the wait simcall must have a timeout */
521       /* Otherwise it shouldn't be enabled and executed by the MC */
522       if (timeout == -1)
523         THROW_IMPOSSIBLE;
524
525       if (action->comm.src_proc == simcall->issuer)
526         action->state = SIMIX_SRC_TIMEOUT;
527       else
528         action->state = SIMIX_DST_TIMEOUT;
529     }
530
531     SIMIX_comm_finish(action);
532     return;
533   }
534
535   /* If the action has already finish perform the error handling, */
536   /* otherwise set up a waiting timeout on the right side         */
537   if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
538     SIMIX_comm_finish(action);
539   } else { /* if (timeout >= 0) { we need a surf sleep action even when there is no timeout, otherwise surf won't tell us when the host fails */
540     sleep = surf_workstation_model->extension.workstation.sleep(simcall->issuer->smx_host->host, timeout);
541     surf_workstation_model->action_data_set(sleep, action);
542
543     if (simcall->issuer == action->comm.src_proc)
544       action->comm.src_timeout = sleep;
545     else
546       action->comm.dst_timeout = sleep;
547   }
548 }
549
550 void SIMIX_pre_comm_test(smx_simcall_t simcall)
551 {
552   smx_action_t action = simcall->comm_test.comm;
553
554   if(MC_IS_ENABLED){
555     simcall->comm_test.result = action->comm.src_proc && action->comm.dst_proc;
556     if(simcall->comm_test.result){
557       action->state = SIMIX_DONE;
558       xbt_fifo_push(action->simcalls, simcall);
559       SIMIX_comm_finish(action);
560     }else{
561       SIMIX_simcall_answer(simcall);
562     }
563     return;
564   }
565
566   simcall->comm_test.result = (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING);
567   if (simcall->comm_test.result) {
568     xbt_fifo_push(action->simcalls, simcall);
569     SIMIX_comm_finish(action);
570   } else {
571     SIMIX_simcall_answer(simcall);
572   }
573 }
574
575 void SIMIX_pre_comm_testany(smx_simcall_t simcall, int idx)
576 {
577   unsigned int cursor;
578   smx_action_t action;
579   xbt_dynar_t actions = simcall->comm_testany.comms;
580   simcall->comm_testany.result = -1;
581
582   if (MC_IS_ENABLED){
583     if(idx == -1){
584       SIMIX_simcall_answer(simcall);
585     }else{
586       action = xbt_dynar_get_as(actions, idx, smx_action_t);
587       simcall->comm_testany.result = idx;
588       xbt_fifo_push(action->simcalls, simcall);
589       action->state = SIMIX_DONE;
590       SIMIX_comm_finish(action);
591     }
592     return;
593   }
594
595   xbt_dynar_foreach(simcall->comm_testany.comms,cursor,action) {
596     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING) {
597       simcall->comm_testany.result = cursor;
598       xbt_fifo_push(action->simcalls, simcall);
599       SIMIX_comm_finish(action);
600       return;
601     }
602   }
603   SIMIX_simcall_answer(simcall);
604 }
605
606 void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx)
607 {
608   smx_action_t action;
609   unsigned int cursor = 0;
610   xbt_dynar_t actions = simcall->comm_waitany.comms;
611
612   if (MC_IS_ENABLED){
613     action = xbt_dynar_get_as(actions, idx, smx_action_t);
614     xbt_fifo_push(action->simcalls, simcall);
615     simcall->comm_waitany.result = idx;
616     action->state = SIMIX_DONE;
617     SIMIX_comm_finish(action);
618     return;
619   }
620
621   xbt_dynar_foreach(actions, cursor, action){
622     /* associate this simcall to the the action */
623     xbt_fifo_push(action->simcalls, simcall);
624
625     /* see if the action is already finished */
626     if (action->state != SIMIX_WAITING && action->state != SIMIX_RUNNING){
627       SIMIX_comm_finish(action);
628       break;
629     }
630   }
631 }
632
633 void SIMIX_waitany_remove_simcall_from_actions(smx_simcall_t simcall)
634 {
635   smx_action_t action;
636   unsigned int cursor = 0;
637   xbt_dynar_t actions = simcall->comm_waitany.comms;
638
639   xbt_dynar_foreach(actions, cursor, action) {
640     xbt_fifo_remove(action->simcalls, simcall);
641   }
642 }
643
644 /**
645  *  \brief Starts the simulation of a communication action.
646  *  \param action the communication action
647  */
648 XBT_INLINE void SIMIX_comm_start(smx_action_t action)
649 {
650   /* If both the sender and the receiver are already there, start the communication */
651   if (action->state == SIMIX_READY) {
652
653     smx_host_t sender = action->comm.src_proc->smx_host;
654     smx_host_t receiver = action->comm.dst_proc->smx_host;
655
656     XBT_DEBUG("Starting communication %p from '%s' to '%s'", action,
657               SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
658
659     action->comm.surf_comm = surf_workstation_model->extension.workstation.
660       communicate(sender->host, receiver->host, action->comm.task_size, action->comm.rate);
661
662     surf_workstation_model->action_data_set(action->comm.surf_comm, action);
663
664     action->state = SIMIX_RUNNING;
665
666     /* If a link is failed, detect it immediately */
667     if (surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
668       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure",
669                 SIMIX_host_get_name(sender), SIMIX_host_get_name(receiver));
670       action->state = SIMIX_LINK_FAILURE;
671       SIMIX_comm_destroy_internal_actions(action);
672     }
673
674     /* If any of the process is suspend, create the action but stop its execution,
675        it will be restarted when the sender process resume */
676     if (SIMIX_process_is_suspended(action->comm.src_proc) ||
677         SIMIX_process_is_suspended(action->comm.dst_proc)) {
678       /* FIXME: check what should happen with the action state */
679
680       if (SIMIX_process_is_suspended(action->comm.src_proc))
681         XBT_DEBUG("The communication is suspended on startup because src (%s:%s) were suspended since it initiated the communication",
682                   SIMIX_host_get_name(action->comm.src_proc->smx_host), action->comm.src_proc->name);
683       else
684         XBT_DEBUG("The communication is suspended on startup because dst (%s:%s) were suspended since it initiated the communication",
685                   SIMIX_host_get_name(action->comm.dst_proc->smx_host), action->comm.dst_proc->name);
686
687       surf_workstation_model->suspend(action->comm.surf_comm);
688
689     }
690   }
691 }
692
693 /**
694  * \brief Answers the SIMIX simcalls associated to a communication action.
695  * \param action a finished communication action
696  */
697 void SIMIX_comm_finish(smx_action_t action)
698 {
699   unsigned int destroy_count = 0;
700   smx_simcall_t simcall;
701
702   while ((simcall = xbt_fifo_shift(action->simcalls))) {
703
704     /* If a waitany simcall is waiting for this action to finish, then remove
705        it from the other actions in the waitany list. Afterwards, get the
706        position of the actual action in the waitany dynar and
707        return it as the result of the simcall */
708     if (simcall->call == SIMCALL_COMM_WAITANY) {
709       SIMIX_waitany_remove_simcall_from_actions(simcall);
710       if (!MC_IS_ENABLED)
711         simcall->comm_waitany.result = xbt_dynar_search(simcall->comm_waitany.comms, &action);
712     }
713
714     /* If the action is still in a rendez-vous point then remove from it */
715     if (action->comm.rdv)
716       SIMIX_rdv_remove(action->comm.rdv, action);
717
718     XBT_DEBUG("SIMIX_comm_finish: action state = %d", (int)action->state);
719
720     /* Check out for errors */
721     switch (action->state) {
722
723     case SIMIX_DONE:
724       XBT_DEBUG("Communication %p complete!", action);
725       SIMIX_comm_copy_data(action);
726       break;
727
728     case SIMIX_SRC_TIMEOUT:
729       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
730                     "Communication timeouted because of sender");
731       break;
732
733     case SIMIX_DST_TIMEOUT:
734       SMX_EXCEPTION(simcall->issuer, timeout_error, 0,
735                     "Communication timeouted because of receiver");
736       break;
737
738     case SIMIX_SRC_HOST_FAILURE:
739       if (simcall->issuer == action->comm.src_proc)
740         simcall->issuer->context->iwannadie = 1;
741 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
742       else
743         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
744       break;
745
746     case SIMIX_DST_HOST_FAILURE:
747       if (simcall->issuer == action->comm.dst_proc)
748         simcall->issuer->context->iwannadie = 1;
749 //          SMX_EXCEPTION(simcall->issuer, host_error, 0, "Host failed");
750       else
751         SMX_EXCEPTION(simcall->issuer, network_error, 0, "Remote peer failed");
752       break;
753
754     case SIMIX_LINK_FAILURE:
755       XBT_DEBUG("Link failure in action %p between '%s' and '%s': posting an exception to the issuer: %s (%p) detached:%d",
756                 action,
757                 action->comm.src_proc ? action->comm.src_proc->smx_host->name : NULL,
758                 action->comm.dst_proc ? action->comm.dst_proc->smx_host->name : NULL,
759                 simcall->issuer->name, simcall->issuer, action->comm.detached);
760       if (action->comm.src_proc == simcall->issuer) {
761         XBT_DEBUG("I'm source");
762       } else if (action->comm.dst_proc == simcall->issuer) {
763         XBT_DEBUG("I'm dest");
764       } else {
765         XBT_DEBUG("I'm neither source nor dest");
766       }
767       SMX_EXCEPTION(simcall->issuer, network_error, 0, "Link failure");
768       break;
769
770     case SIMIX_CANCELED:
771       if (simcall->issuer == action->comm.dst_proc)
772         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
773                       "Communication canceled by the sender");
774       else
775         SMX_EXCEPTION(simcall->issuer, cancel_error, 0,
776                       "Communication canceled by the receiver");
777       break;
778
779     default:
780       xbt_die("Unexpected action state in SIMIX_comm_finish: %d", (int)action->state);
781     }
782
783     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
784     if (simcall->issuer->doexception) {
785       if (simcall->call == SIMCALL_COMM_WAITANY) {
786         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_waitany.comms, &action);
787       }
788       else if (simcall->call == SIMCALL_COMM_TESTANY) {
789         simcall->issuer->running_ctx->exception.value = xbt_dynar_search(simcall->comm_testany.comms, &action);
790       }
791     }
792
793     if (surf_workstation_model->extension.
794         workstation.get_state(simcall->issuer->smx_host->host) != SURF_RESOURCE_ON) {
795       simcall->issuer->context->iwannadie = 1;
796     }
797
798     simcall->issuer->waiting_action = NULL;
799     xbt_fifo_remove(simcall->issuer->comms, action);
800     SIMIX_simcall_answer(simcall);
801     destroy_count++;
802   }
803
804   while (destroy_count-- > 0)
805     SIMIX_comm_destroy(action);
806 }
807
808 /**
809  * \brief This function is called when a Surf communication action is finished.
810  * \param action the corresponding Simix communication
811  */
812 void SIMIX_post_comm(smx_action_t action)
813 {
814   /* Update action state */
815   if (action->comm.src_timeout &&
816       surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_DONE)
817     action->state = SIMIX_SRC_TIMEOUT;
818   else if (action->comm.dst_timeout &&
819            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_DONE)
820     action->state = SIMIX_DST_TIMEOUT;
821   else if (action->comm.src_timeout &&
822            surf_workstation_model->action_state_get(action->comm.src_timeout) == SURF_ACTION_FAILED)
823     action->state = SIMIX_SRC_HOST_FAILURE;
824   else if (action->comm.dst_timeout &&
825            surf_workstation_model->action_state_get(action->comm.dst_timeout) == SURF_ACTION_FAILED)
826     action->state = SIMIX_DST_HOST_FAILURE;
827   else if (action->comm.surf_comm &&
828            surf_workstation_model->action_state_get(action->comm.surf_comm) == SURF_ACTION_FAILED) {
829     XBT_DEBUG("Puta madre. Surf says that the link broke");
830     action->state = SIMIX_LINK_FAILURE;
831   } else
832     action->state = SIMIX_DONE;
833
834   XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
835             action, (int)action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
836
837   /* destroy the surf actions associated with the Simix communication */
838   SIMIX_comm_destroy_internal_actions(action);
839
840   /* remove the communication action from the list of pending communications
841    * of both processes (if they still exist) */
842   if (action->comm.src_proc) {
843     xbt_fifo_remove(action->comm.src_proc->comms, action);
844   }
845   if (action->comm.dst_proc) {
846     xbt_fifo_remove(action->comm.dst_proc->comms, action);
847   }
848
849   /* if there are simcalls associated with the action, then answer them */
850   if (xbt_fifo_size(action->simcalls)) {
851     SIMIX_comm_finish(action);
852   }
853 }
854
855 void SIMIX_comm_cancel(smx_action_t action)
856 {
857   /* if the action is a waiting state means that it is still in a rdv */
858   /* so remove from it and delete it */
859   if (action->state == SIMIX_WAITING) {
860     SIMIX_rdv_remove(action->comm.rdv, action);
861     action->state = SIMIX_CANCELED;
862   }
863   else if (!MC_IS_ENABLED /* when running the MC there are no surf actions */
864            && (action->state == SIMIX_READY || action->state == SIMIX_RUNNING)) {
865
866     surf_workstation_model->action_cancel(action->comm.surf_comm);
867   }
868 }
869
870 void SIMIX_comm_suspend(smx_action_t action)
871 {
872   /*FIXME: shall we suspend also the timeout actions? */
873   if (action->comm.surf_comm)
874     surf_workstation_model->suspend(action->comm.surf_comm);
875   /* in the other case, the action will be suspended on creation, in SIMIX_comm_start() */
876 }
877
878 void SIMIX_comm_resume(smx_action_t action)
879 {
880   /*FIXME: check what happen with the timeouts */
881   if (action->comm.surf_comm)
882     surf_workstation_model->resume(action->comm.surf_comm);
883   /* in the other case, the action were not really suspended yet, see SIMIX_comm_suspend() and SIMIX_comm_start() */
884 }
885
886
887 /************* Action Getters **************/
888
889 /**
890  *  \brief get the amount remaining from the communication
891  *  \param action The communication
892  */
893 double SIMIX_comm_get_remains(smx_action_t action)
894 {
895   double remains;
896
897   if(!action){
898     return 0;
899   }
900
901   switch (action->state) {
902
903   case SIMIX_RUNNING:
904     remains = surf_workstation_model->get_remains(action->comm.surf_comm);
905     break;
906
907   case SIMIX_WAITING:
908   case SIMIX_READY:
909     remains = 0; /*FIXME: check what should be returned */
910     break;
911
912   default:
913     remains = 0; /*FIXME: is this correct? */
914     break;
915   }
916   return remains;
917 }
918
919 e_smx_state_t SIMIX_comm_get_state(smx_action_t action)
920 {
921   return action->state;
922 }
923
924 /**
925  *  \brief Return the user data associated to the sender of the communication
926  *  \param action The communication
927  *  \return the user data
928  */
929 void* SIMIX_comm_get_src_data(smx_action_t action)
930 {
931   return action->comm.src_data;
932 }
933
934 /**
935  *  \brief Return the user data associated to the receiver of the communication
936  *  \param action The communication
937  *  \return the user data
938  */
939 void* SIMIX_comm_get_dst_data(smx_action_t action)
940 {
941   return action->comm.dst_data;
942 }
943
944 smx_process_t SIMIX_comm_get_src_proc(smx_action_t action)
945 {
946   return action->comm.src_proc;
947 }
948
949 smx_process_t SIMIX_comm_get_dst_proc(smx_action_t action)
950 {
951   return action->comm.dst_proc;
952 }
953
954 #ifdef HAVE_LATENCY_BOUND_TRACKING
955 /**
956  *  \brief verify if communication is latency bounded
957  *  \param comm The communication
958  */
959 XBT_INLINE int SIMIX_comm_is_latency_bounded(smx_action_t action)
960 {
961   if(!action){
962     return 0;
963   }
964   if (action->comm.surf_comm){
965     XBT_DEBUG("Getting latency limited for surf_action (%p)", action->comm.surf_comm);
966     action->latency_limited = surf_workstation_model->get_latency_limited(action->comm.surf_comm);
967     XBT_DEBUG("Action limited is %d", action->latency_limited);
968   }
969   return action->latency_limited;
970 }
971 #endif
972
973 /******************************************************************************/
974 /*                    SIMIX_comm_copy_data callbacks                       */
975 /******************************************************************************/
976 static void (*SIMIX_comm_copy_data_callback) (smx_action_t, void*, size_t) =
977   &SIMIX_comm_copy_pointer_callback;
978
979 void
980 SIMIX_comm_set_copy_data_callback(void (*callback) (smx_action_t, void*, size_t))
981 {
982   SIMIX_comm_copy_data_callback = callback;
983 }
984
985 void SIMIX_comm_copy_pointer_callback(smx_action_t comm, void* buff, size_t buff_size)
986 {
987   xbt_assert((buff_size == sizeof(void *)),
988              "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
989   *(void **) (comm->comm.dst_buff) = buff;
990 }
991
992 void SIMIX_comm_copy_buffer_callback(smx_action_t comm, void* buff, size_t buff_size)
993 {
994   XBT_DEBUG("Copy the data over");
995   memcpy(comm->comm.dst_buff, buff, buff_size);
996   if (comm->comm.detached) { // if this is a detached send, the source buffer was duplicated by SMPI sender to make the original buffer available to the application ASAP
997     xbt_free(buff);
998     comm->comm.src_buff = NULL;
999   }
1000 }
1001
1002
1003 /**
1004  *  \brief Copy the communication data from the sender's buffer to the receiver's one
1005  *  \param comm The communication
1006  */
1007 void SIMIX_comm_copy_data(smx_action_t comm)
1008 {
1009   size_t buff_size = comm->comm.src_buff_size;
1010   /* If there is no data to be copy then return */
1011   if (!comm->comm.src_buff || !comm->comm.dst_buff || comm->comm.copied)
1012     return;
1013
1014   XBT_DEBUG("Copying comm %p data from %s (%p) -> %s (%p) (%zu bytes)",
1015             comm,
1016             comm->comm.src_proc ? comm->comm.src_proc->smx_host->name : "a finished process",
1017             comm->comm.src_buff,
1018             comm->comm.dst_proc ? comm->comm.dst_proc->smx_host->name : "a finished process",
1019             comm->comm.dst_buff, buff_size);
1020
1021   /* Copy at most dst_buff_size bytes of the message to receiver's buffer */
1022   if (comm->comm.dst_buff_size)
1023     buff_size = MIN(buff_size, *(comm->comm.dst_buff_size));
1024
1025   /* Update the receiver's buffer size to the copied amount */
1026   if (comm->comm.dst_buff_size)
1027     *comm->comm.dst_buff_size = buff_size;
1028
1029   if (buff_size > 0)
1030     SIMIX_comm_copy_data_callback (comm, comm->comm.src_buff, buff_size);
1031
1032   /* Set the copied flag so we copy data only once */
1033   /* (this function might be called from both communication ends) */
1034   comm->comm.copied = 1;
1035 }