Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG: add a function to register a task copy callback
[simgrid.git] / src / msg / msg_gos.c
1 /* Copyright (c) 2004-2011. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "msg_private.h"
7 #include "msg_mailbox.h"
8 #include "mc/mc.h"
9 #include "xbt/log.h"
10 #include "xbt/sysdep.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
13                                 "Logging specific to MSG (gos)");
14
15 /** \ingroup msg_gos_functions
16  * \brief Executes a task and waits for its termination.
17  *
18  * This function is used for describing the behavior of an agent. It
19  * takes only one parameter.
20  * \param task a #m_task_t to execute on the location on which the
21  agent is running.
22  * \return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED
23  * or #MSG_HOST_FAILURE otherwise
24  */
25 MSG_error_t MSG_task_execute(m_task_t task)
26 {
27   simdata_task_t simdata = NULL;
28   simdata_process_t p_simdata;
29   e_smx_state_t comp_state;
30   CHECK_HOST();
31
32   simdata = task->simdata;
33
34   xbt_assert(simdata->host_nb == 0,
35               "This is a parallel task. Go to hell.");
36
37 #ifdef HAVE_TRACING
38   TRACE_msg_task_execute_start(task);
39 #endif
40
41   xbt_assert((!simdata->compute) && (task->simdata->isused == 0),
42               "This task is executed somewhere else. Go fix your code! %d",
43               task->simdata->isused);
44
45   XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self()));
46
47   if (simdata->computation_amount == 0) {
48 #ifdef HAVE_TRACING
49     TRACE_msg_task_execute_end(task);
50 #endif
51     return MSG_OK;
52   }
53
54   m_process_t self = SIMIX_process_self();
55   p_simdata = SIMIX_process_self_get_data(self);
56   simdata->isused=1;
57   simdata->compute =
58       SIMIX_req_host_execute(task->name, p_simdata->m_host->simdata->smx_host,
59                            simdata->computation_amount,
60                            simdata->priority);
61 #ifdef HAVE_TRACING
62   SIMIX_req_set_category(simdata->compute, task->category);
63 #endif
64
65   p_simdata->waiting_action = simdata->compute;
66   comp_state = SIMIX_req_host_execution_wait(simdata->compute);
67   p_simdata->waiting_action = NULL;
68
69   simdata->isused=0;
70
71   XBT_DEBUG("Execution task '%s' finished in state %d", task->name, comp_state);
72   if (comp_state == SIMIX_DONE) {
73     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
74     simdata->computation_amount = 0.0;
75     simdata->comm = NULL;
76     simdata->compute = NULL;
77 #ifdef HAVE_TRACING
78     TRACE_msg_task_execute_end(task);
79 #endif
80     MSG_RETURN(MSG_OK);
81   } else if (SIMIX_req_host_get_state(SIMIX_host_self()) == 0) {
82     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
83     simdata->comm = NULL;
84     simdata->compute = NULL;
85 #ifdef HAVE_TRACING
86     TRACE_msg_task_execute_end(task);
87 #endif
88     MSG_RETURN(MSG_HOST_FAILURE);
89   } else {
90     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
91     simdata->comm = NULL;
92     simdata->compute = NULL;
93 #ifdef HAVE_TRACING
94     TRACE_msg_task_execute_end(task);
95 #endif
96     MSG_RETURN(MSG_TASK_CANCELED);
97   }
98 }
99
100 /** \ingroup m_task_management
101  * \brief Creates a new #m_task_t (a parallel one....).
102  *
103  * A constructor for #m_task_t taking six arguments and returning the
104  corresponding object.
105  * \param name a name for the object. It is for user-level information
106  and can be NULL.
107  * \param host_nb the number of hosts implied in the parallel task.
108  * \param host_list an array of \p host_nb m_host_t.
109  * \param computation_amount an array of \p host_nb
110  doubles. computation_amount[i] is the total number of operations
111  that have to be performed on host_list[i].
112  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
113  * \param data a pointer to any data may want to attach to the new
114  object.  It is for user-level information and can be NULL. It can
115  be retrieved with the function \ref MSG_task_get_data.
116  * \see m_task_t
117  * \return The new corresponding object.
118  */
119 m_task_t
120 MSG_parallel_task_create(const char *name, int host_nb,
121                          const m_host_t * host_list,
122                          double *computation_amount,
123                          double *communication_amount, void *data)
124 {
125   int i;
126   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
127   m_task_t task = xbt_new0(s_m_task_t, 1);
128   task->simdata = simdata;
129
130   /* Task structure */
131   task->name = xbt_strdup(name);
132   task->data = data;
133
134   /* Simulator Data */
135   simdata->computation_amount = 0;
136   simdata->message_size = 0;
137   simdata->compute = NULL;
138   simdata->comm = NULL;
139   simdata->rate = -1.0;
140   simdata->isused = 0;
141   simdata->sender = NULL;
142   simdata->receiver = NULL;
143   simdata->source = NULL;
144
145   simdata->host_nb = host_nb;
146   simdata->host_list = xbt_new0(smx_host_t, host_nb);
147   simdata->comp_amount = computation_amount;
148   simdata->comm_amount = communication_amount;
149
150   for (i = 0; i < host_nb; i++)
151     simdata->host_list[i] = host_list[i]->simdata->smx_host;
152
153   return task;
154 }
155
156 MSG_error_t MSG_parallel_task_execute(m_task_t task)
157 {
158   simdata_task_t simdata = NULL;
159   e_smx_state_t comp_state;
160   simdata_process_t p_simdata;
161   CHECK_HOST();
162
163   simdata = task->simdata;
164   p_simdata = SIMIX_process_self_get_data(SIMIX_process_self());
165
166   xbt_assert((!simdata->compute)
167               && (task->simdata->isused == 0),
168               "This task is executed somewhere else. Go fix your code!");
169
170   xbt_assert(simdata->host_nb,
171               "This is not a parallel task. Go to hell.");
172
173   XBT_DEBUG("Parallel computing on %s", p_simdata->m_host->name);
174
175   simdata->isused=1;
176
177   simdata->compute =
178       SIMIX_req_host_parallel_execute(task->name, simdata->host_nb,
179                                   simdata->host_list,
180                                   simdata->comp_amount,
181                                   simdata->comm_amount, 1.0, -1.0);
182   XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
183
184   p_simdata->waiting_action = simdata->compute;
185   comp_state = SIMIX_req_host_execution_wait(simdata->compute);
186   p_simdata->waiting_action = NULL;
187
188   XBT_DEBUG("Finished waiting for execution of action %p, state = %d", simdata->compute, comp_state);
189
190   simdata->isused=0;
191
192   if (comp_state == SIMIX_DONE) {
193     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
194     simdata->computation_amount = 0.0;
195     simdata->comm = NULL;
196     simdata->compute = NULL;
197     MSG_RETURN(MSG_OK);
198   } else if (SIMIX_req_host_get_state(SIMIX_host_self()) == 0) {
199     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
200     simdata->comm = NULL;
201     simdata->compute = NULL;
202     MSG_RETURN(MSG_HOST_FAILURE);
203   } else {
204     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
205     simdata->comm = NULL;
206     simdata->compute = NULL;
207     MSG_RETURN(MSG_TASK_CANCELED);
208   }
209 }
210
211
212 /** \ingroup msg_gos_functions
213  * \brief Sleep for the specified number of seconds
214  *
215  * Makes the current process sleep until \a time seconds have elapsed.
216  *
217  * \param nb_sec a number of second
218  */
219 MSG_error_t MSG_process_sleep(double nb_sec)
220 {
221   e_smx_state_t state;
222   /*m_process_t proc = MSG_process_self();*/
223
224 #ifdef HAVE_TRACING
225   TRACE_msg_process_sleep_in(MSG_process_self());
226 #endif
227
228   /* create action to sleep */
229   state = SIMIX_req_process_sleep(nb_sec);
230
231   /*proc->simdata->waiting_action = act_sleep;
232
233   FIXME: check if not setting the waiting_action breaks something on msg
234   
235   proc->simdata->waiting_action = NULL;*/
236   
237   if (state == SIMIX_DONE) {
238 #ifdef HAVE_TRACING
239   TRACE_msg_process_sleep_out(MSG_process_self());
240 #endif
241     MSG_RETURN(MSG_OK);
242   } else {
243 #ifdef HAVE_TRACING
244     TRACE_msg_process_sleep_out(MSG_process_self());
245 #endif
246     MSG_RETURN(MSG_HOST_FAILURE);
247   }
248 }
249
250 /** \ingroup msg_gos_functions
251  * \brief Listen on \a channel and waits for receiving a task from \a host.
252  *
253  * It takes three parameters.
254  * \param task a memory location for storing a #m_task_t. It will
255  hold a task when this function will return. Thus \a task should not
256  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
257  those two condition does not hold, there will be a warning message.
258  * \param channel the channel on which the agent should be
259  listening. This value has to be >=0 and < than the maximal
260  number of channels fixed with MSG_set_channel_number().
261  * \param host the host that is to be watched.
262  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
263  */
264 MSG_error_t
265 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
266 {
267   return MSG_task_get_ext(task, channel, -1, host);
268 }
269
270 /** \ingroup msg_gos_functions
271  * \brief Listen on a channel and wait for receiving a task.
272  *
273  * It takes two parameters.
274  * \param task a memory location for storing a #m_task_t. It will
275  hold a task when this function will return. Thus \a task should not
276  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
277  those two condition does not hold, there will be a warning message.
278  * \param channel the channel on which the agent should be
279  listening. This value has to be >=0 and < than the maximal
280  number of channels fixed with MSG_set_channel_number().
281  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
282  */
283 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
284 {
285   return MSG_task_get_with_timeout(task, channel, -1);
286 }
287
288 /** \ingroup msg_gos_functions
289  * \brief Listen on a channel and wait for receiving a task with a timeout.
290  *
291  * It takes three parameters.
292  * \param task a memory location for storing a #m_task_t. It will
293  hold a task when this function will return. Thus \a task should not
294  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
295  those two condition does not hold, there will be a warning message.
296  * \param channel the channel on which the agent should be
297  listening. This value has to be >=0 and < than the maximal
298  number of channels fixed with MSG_set_channel_number().
299  * \param max_duration the maximum time to wait for a task before giving
300  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
301  will not be modified and will still be
302  equal to \c NULL when returning.
303  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
304  */
305 MSG_error_t
306 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
307                           double max_duration)
308 {
309   return MSG_task_get_ext(task, channel, max_duration, NULL);
310 }
311
312 /** \defgroup msg_gos_functions MSG Operating System Functions
313  *  \brief This section describes the functions that can be used
314  *  by an agent for handling some task.
315  */
316
317 MSG_error_t
318 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
319                  m_host_t host)
320 {
321   xbt_assert((channel >= 0)
322               && (channel < msg_global->max_channel), "Invalid channel %d",
323               channel);
324
325   return
326       MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
327                                (MSG_host_self(), channel), task, host,
328                                timeout);
329 }
330
331 MSG_error_t
332 MSG_task_receive_from_host(m_task_t * task, const char *alias,
333                            m_host_t host)
334 {
335   return MSG_task_receive_ext(task, alias, -1, host);
336 }
337
338 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
339 {
340   return MSG_task_receive_with_timeout(task, alias, -1);
341 }
342
343 MSG_error_t
344 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
345                               double timeout)
346 {
347   return MSG_task_receive_ext(task, alias, timeout, NULL);
348 }
349
350 MSG_error_t
351 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
352                      m_host_t host)
353 {
354   XBT_DEBUG
355       ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
356        alias);
357   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
358                                   host, timeout);
359 }
360
361 /** \ingroup msg_gos_functions
362  * \brief Sends a task on a mailbox.
363  *
364  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
365  * to end the communication.
366  *
367  * \param task a #m_task_t to send on another location.
368  * \param alias name of the mailbox to sent the task to
369  * \return the msg_comm_t communication created
370  */
371 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
372 {
373   return MSG_task_isend_with_matching(task,alias,NULL,NULL);
374 }
375
376 /** \ingroup msg_gos_functions
377  * \brief Sends a task on a mailbox, with support for matching requests
378  *
379  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
380  * to end the communication.
381  *
382  * \param task a #m_task_t to send on another location.
383  * \param alias name of the mailbox to sent the task to
384  * \param match_fun boolean function taking the #match_data provided by sender (here), and the one of the receiver (if any) and returning whether they match
385  * \param match_data user provided data passed to match_fun
386  * \return the msg_comm_t communication created
387  */
388 XBT_INLINE msg_comm_t MSG_task_isend_with_matching(m_task_t task, const char *alias,
389     int (*match_fun)(void*,void*),
390     void *match_data)
391 {
392   simdata_task_t t_simdata = NULL;
393   m_process_t process = MSG_process_self();
394   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
395
396   CHECK_HOST();
397
398   /* FIXME: these functions are not traceable */
399
400   /* Prepare the task to send */
401   t_simdata = task->simdata;
402   t_simdata->sender = process;
403   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data(process))->m_host;
404
405   xbt_assert(t_simdata->isused == 0,
406               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
407
408   t_simdata->isused = 1;
409   msg_global->sent_msg++;
410
411   /* Send it by calling SIMIX network layer */
412   msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
413   comm->task_sent = task;
414   comm->task_received = NULL;
415   comm->status = MSG_OK;
416   comm->s_comm =
417     SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
418                          t_simdata->rate, task, sizeof(void *), match_fun, NULL, match_data, 0);
419   t_simdata->comm = comm->s_comm; /* FIXME: is the field t_simdata->comm still useful? */
420
421   return comm;
422 }
423
424 /** \ingroup msg_gos_functions
425  * \brief Sends a task on a mailbox.
426  *
427  * This is a non blocking detached send function.
428  * Think of it as a best effort send. Keep in mind that the third parameter
429  * is only called if the communication fails. If the communication does work,
430  * it is responsibility of the receiver code to free anything related to
431  * the task, as usual. More details on this can be obtained on
432  * <a href="http://lists.gforge.inria.fr/pipermail/simgrid-user/2011-November/002649.html">this thread</a>
433  * in the SimGrid-user mailing list archive.
434  *
435  * \param task a #m_task_t to send on another location.
436  * \param alias name of the mailbox to sent the task to
437  * \param cleanup a function to destroy the task if the
438  * communication fails (if NULL, MSG_task_destroy() will
439  * be used by default)
440  */
441 void MSG_task_dsend(m_task_t task, const char *alias, void_f_pvoid_t cleanup)
442 {
443   simdata_task_t t_simdata = NULL;
444   m_process_t process = MSG_process_self();
445   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
446
447   CHECK_HOST();
448
449   if (cleanup == NULL) {
450     cleanup = (void_f_pvoid_t) MSG_task_destroy;
451   }
452
453   /* FIXME: these functions are not traceable */
454
455   /* Prepare the task to send */
456   t_simdata = task->simdata;
457   t_simdata->sender = process;
458   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data(process))->m_host;
459
460   xbt_assert(t_simdata->isused == 0,
461               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
462
463   t_simdata->isused = 1;
464   msg_global->sent_msg++;
465
466   /* Send it by calling SIMIX network layer */
467   smx_action_t comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
468                        t_simdata->rate, task, sizeof(void *), NULL,cleanup, NULL, 1);
469   t_simdata->comm = comm;
470 }
471
472 /** \ingroup msg_gos_functions
473  * \brief Starts listening for receiving a task from an asynchronous communication.
474  *
475  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
476  * to end the communication.
477  *
478  * \param task a memory location for storing a #m_task_t.
479  * \param name of the mailbox to receive the task on
480  * \return the msg_comm_t communication created
481  */
482 msg_comm_t MSG_task_irecv(m_task_t *task, const char *name)
483 {
484   smx_rdv_t rdv = MSG_mailbox_get_by_alias(name);
485
486   CHECK_HOST();
487
488   /* FIXME: these functions are not tracable */
489
490   /* Sanity check */
491   xbt_assert(task, "Null pointer for the task storage");
492
493   if (*task)
494     XBT_CRITICAL
495         ("MSG_task_irecv() was asked to write in a non empty task struct.");
496
497   /* Try to receive it by calling SIMIX network layer */
498   msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
499   comm->task_sent = NULL;
500   comm->task_received = task;
501   comm->status = MSG_OK;
502   comm->s_comm = SIMIX_req_comm_irecv(rdv, task, NULL, NULL, NULL);
503
504   return comm;
505 }
506
507 /** \ingroup msg_gos_functions
508  * \brief Checks whether a communication is done, and if yes, finalizes it.
509  * \param comm the communication to test
510  * \return TRUE if the communication is finished
511  * (but it may have failed, use MSG_comm_get_status() to know its status)
512  * or FALSE if the communication is not finished yet
513  * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
514  */
515 int MSG_comm_test(msg_comm_t comm)
516 {
517   xbt_ex_t e;
518   int finished = 0;
519   TRY {
520     finished = SIMIX_req_comm_test(comm->s_comm);
521
522     if (finished && comm->task_received != NULL) {
523       /* I am the receiver */
524       (*comm->task_received)->simdata->isused = 0;
525     }
526   }
527   CATCH(e) {
528     switch (e.category) {
529
530       case host_error:
531         comm->status = MSG_HOST_FAILURE;
532         finished = 1;
533         break;
534
535       case network_error:
536         comm->status = MSG_TRANSFER_FAILURE;
537         finished = 1;
538         break;
539
540       case timeout_error:
541         comm->status = MSG_TIMEOUT;
542         finished = 1;
543         break;
544
545       default:
546         RETHROW;
547     }
548     xbt_ex_free(e);
549   }
550
551   return finished;
552 }
553
554 /** \ingroup msg_gos_functions
555  * \brief This function checks if a communication is finished.
556  * \param comms a vector of communications
557  * \return the position of the finished communication if any
558  * (but it may have failed, use MSG_comm_get_status() to know its status),
559  * or -1 if none is finished
560  */
561 int MSG_comm_testany(xbt_dynar_t comms)
562 {
563   xbt_ex_t e;
564   int finished_index = -1;
565
566   /* create the equivalent dynar with SIMIX objects */
567   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
568   msg_comm_t comm;
569   unsigned int cursor;
570   xbt_dynar_foreach(comms, cursor, comm) {
571     xbt_dynar_push(s_comms, &comm->s_comm);
572   }
573
574   MSG_error_t status = MSG_OK;
575   TRY {
576     finished_index = SIMIX_req_comm_testany(s_comms);
577   }
578   CATCH(e) {
579     switch (e.category) {
580
581       case host_error:
582         finished_index = e.value;
583         status = MSG_HOST_FAILURE;
584         break;
585
586       case network_error:
587         finished_index = e.value;
588         status = MSG_TRANSFER_FAILURE;
589         break;
590
591       case timeout_error:
592         finished_index = e.value;
593         status = MSG_TIMEOUT;
594         break;
595
596       default:
597         RETHROW;
598     }
599     xbt_ex_free(e);
600   }
601   xbt_dynar_free(&s_comms);
602
603   if (finished_index != -1) {
604     comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
605     /* the communication is finished */
606     comm->status = status;
607
608     if (status == MSG_OK && comm->task_received != NULL) {
609       /* I am the receiver */
610       (*comm->task_received)->simdata->isused = 0;
611     }
612   }
613
614   return finished_index;
615 }
616
617 /** \ingroup msg_gos_functions
618  * \brief Destroys a communication.
619  * \param comm the communication to destroy.
620  */
621 void MSG_comm_destroy(msg_comm_t comm)
622 {
623   if (comm->task_received != NULL
624       && *comm->task_received != NULL
625       && MSG_comm_get_status(comm) == MSG_OK) {
626     (*comm->task_received)->simdata->isused = 0;
627   }
628
629   xbt_free(comm);
630 }
631
632 /** \ingroup msg_gos_functions
633  * \brief Wait for the completion of a communication.
634  *
635  * It takes two parameters.
636  * \param comm the communication to wait.
637  * \param timeout Wait until the communication terminates or the timeout occurs
638  * \return MSG_error_t
639  */
640 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
641 {
642   xbt_ex_t e;
643   TRY {
644     SIMIX_req_comm_wait(comm->s_comm, timeout);
645
646     if (comm->task_received != NULL) {
647       /* I am the receiver */
648       (*comm->task_received)->simdata->isused = 0;
649     }
650
651     /* FIXME: these functions are not traceable */
652   }
653   CATCH(e) {
654     switch (e.category) {
655     case host_error:
656       comm->status = MSG_HOST_FAILURE;
657       break;
658     case network_error:
659       comm->status = MSG_TRANSFER_FAILURE;
660       break;
661     case timeout_error:
662       comm->status = MSG_TIMEOUT;
663       break;
664     default:
665       RETHROW;
666     }
667     xbt_ex_free(e);
668   }
669
670   return comm->status;
671 }
672
673 /** \ingroup msg_gos_functions
674 * \brief This function is called by a sender and permit to wait for each communication
675 *
676 * \param comm a vector of communication
677 * \param nb_elem is the size of the comm vector
678 * \param timeout for each call of MSG_comm_wait
679 */
680 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
681 {
682   int i = 0;
683   for (i = 0; i < nb_elem; i++) {
684     MSG_comm_wait(comm[i], timeout);
685   }
686 }
687
688 /** \ingroup msg_gos_functions
689  * \brief This function waits for the first communication finished in a list.
690  * \param comms a vector of communications
691  * \return the position of the first finished communication
692  * (but it may have failed, use MSG_comm_get_status() to know its status)
693  */
694 int MSG_comm_waitany(xbt_dynar_t comms)
695 {
696   xbt_ex_t e;
697   int finished_index = -1;
698
699   /* create the equivalent dynar with SIMIX objects */
700   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
701   msg_comm_t comm;
702   unsigned int cursor;
703   xbt_dynar_foreach(comms, cursor, comm) {
704     xbt_dynar_push(s_comms, &comm->s_comm);
705   }
706
707   MSG_error_t status = MSG_OK;
708   TRY {
709     finished_index = SIMIX_req_comm_waitany(s_comms);
710   }
711   CATCH(e) {
712     switch (e.category) {
713
714       case host_error:
715         finished_index = e.value;
716         status = MSG_HOST_FAILURE;
717         break;
718
719       case network_error:
720         finished_index = e.value;
721         status = MSG_TRANSFER_FAILURE;
722         break;
723
724       case timeout_error:
725         finished_index = e.value;
726         status = MSG_TIMEOUT;
727         break;
728
729       default:
730         RETHROW;
731     }
732     xbt_ex_free(e);
733   }
734
735   xbt_assert(finished_index != -1, "WaitAny returned -1");
736   xbt_dynar_free(&s_comms);
737
738   comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
739   /* the communication is finished */
740   comm->status = status;
741
742   return finished_index;
743 }
744
745 /**
746  * \ingroup msg_gos_functions
747  * \brief Returns the error (if any) that occured during a finished communication.
748  * \param comm a finished communication
749  * \return the status of the communication, or MSG_OK if no error occured
750  * during the communication
751  */
752 MSG_error_t MSG_comm_get_status(msg_comm_t comm) {
753
754   return comm->status;
755 }
756
757 m_task_t MSG_comm_get_task(msg_comm_t comm)
758 {
759   xbt_assert(comm, "Invalid parameter");
760
761   return comm->task_received ? *comm->task_received : comm->task_sent;
762 }
763
764 /**
765  * \brief This function is called by SIMIX to copy the data of a comm.
766  * \param comm the comm
767  * \param buff_size size of the buffer
768  */
769 void MSG_comm_copy_data_from_SIMIX(smx_action_t comm, size_t buff_size) {
770
771   // copy the task
772   SIMIX_comm_copy_pointer_callback(comm, buff_size);
773
774   // notify the user callback if any
775   if (msg_global->task_copy_callback) {
776     msg_global->task_copy_callback(SIMIX_req_comm_get_src_data(comm),
777         SIMIX_req_comm_get_src_proc(comm), SIMIX_req_comm_get_dst_proc(comm));
778   }
779 }
780
781 /** \ingroup msg_gos_functions
782  * \brief Put a task on a channel of an host and waits for the end of the
783  * transmission.
784  *
785  * This function is used for describing the behavior of an agent. It
786  * takes three parameter.
787  * \param task a #m_task_t to send on another location. This task
788  will not be usable anymore when the function will return. There is
789  no automatic task duplication and you have to save your parameters
790  before calling this function. Tasks are unique and once it has been
791  sent to another location, you should not access it anymore. You do
792  not need to call MSG_task_destroy() but to avoid using, as an
793  effect of inattention, this task anymore, you definitely should
794  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
795  can be transfered iff it has been correctly created with
796  MSG_task_create().
797  * \param dest the destination of the message
798  * \param channel the channel on which the agent should put this
799  task. This value has to be >=0 and < than the maximal number of
800  channels fixed with MSG_set_channel_number().
801  * \return #MSG_HOST_FAILURE if the host on which
802  * this function was called was shut down,
803  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
804  * (network failure, dest failure) or #MSG_OK if it succeeded.
805  */
806 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
807 {
808   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
809 }
810
811 /** \ingroup msg_gos_functions
812  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
813  * rate.
814  *
815  * \sa MSG_task_put
816  */
817 MSG_error_t
818 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
819                      double maxrate)
820 {
821   task->simdata->rate = maxrate;
822   return MSG_task_put(task, dest, channel);
823 }
824
825 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
826  * host (with a timeout on the waiting of the destination host) and
827  * waits for the end of the transmission.
828  *
829  * This function is used for describing the behavior of an agent. It
830  * takes four parameter.
831  * \param task a #m_task_t to send on another location. This task
832  will not be usable anymore when the function will return. There is
833  no automatic task duplication and you have to save your parameters
834  before calling this function. Tasks are unique and once it has been
835  sent to another location, you should not access it anymore. You do
836  not need to call MSG_task_destroy() but to avoid using, as an
837  effect of inattention, this task anymore, you definitely should
838  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
839  can be transfered iff it has been correctly created with
840  MSG_task_create().
841  * \param dest the destination of the message
842  * \param channel the channel on which the agent should put this
843  task. This value has to be >=0 and < than the maximal number of
844  channels fixed with MSG_set_channel_number().
845  * \param timeout the maximum time to wait for a task before giving
846  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
847  will not be modified
848  * \return #MSG_HOST_FAILURE if the host on which
849 this function was called was shut down,
850 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
851 (network failure, dest failure, timeout...) or #MSG_OK if the communication succeeded.
852  */
853 MSG_error_t
854 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
855                           m_channel_t channel, double timeout)
856 {
857   xbt_assert((channel >= 0)
858               && (channel < msg_global->max_channel), "Invalid channel %d",
859               channel);
860
861   XBT_DEBUG("MSG_task_put_with_timout: Trying to send a task to '%s'", dest->name);
862   return
863       MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
864                                    (dest, channel), task, timeout);
865 }
866
867 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
868 {
869   XBT_DEBUG("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
870   return MSG_task_send_with_timeout(task, alias, -1);
871 }
872
873
874 MSG_error_t
875 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
876 {
877   task->simdata->rate = maxrate;
878   return MSG_task_send(task, alias);
879 }
880
881
882 MSG_error_t
883 MSG_task_send_with_timeout(m_task_t task, const char *alias,
884                            double timeout)
885 {
886   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
887                                       task, timeout);
888 }
889
890 int MSG_task_listen(const char *alias)
891 {
892   CHECK_HOST();
893
894   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
895 }
896
897 /** \ingroup msg_gos_functions
898  * \brief Test whether there is a pending communication on a channel.
899  *
900  * It takes one parameter.
901  * \param channel the channel on which the agent should be
902  listening. This value has to be >=0 and < than the maximal
903  number of channels fixed with MSG_set_channel_number().
904  * \return 1 if there is a pending communication and 0 otherwise
905  */
906 int MSG_task_Iprobe(m_channel_t channel)
907 {
908   xbt_assert((channel >= 0)
909               && (channel < msg_global->max_channel), "Invalid channel %d",
910               channel);
911
912   CHECK_HOST();
913
914   return
915       !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
916                             (MSG_host_self(), channel));
917 }
918
919 /** \ingroup msg_gos_functions
920
921  * \brief Return the number of tasks waiting to be received on a \a
922  channel and sent by \a host.
923  *
924  * It takes two parameters.
925  * \param channel the channel on which the agent should be
926  listening. This value has to be >=0 and < than the maximal
927  number of channels fixed with MSG_set_channel_number().
928  * \param host the host that is to be watched.
929  * \return the number of tasks waiting to be received on \a channel
930  and sent by \a host.
931  */
932 int MSG_task_probe_from_host(int channel, m_host_t host)
933 {
934   xbt_assert((channel >= 0)
935               && (channel < msg_global->max_channel), "Invalid channel %d",
936               channel);
937
938   CHECK_HOST();
939
940   return
941       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
942                                                (MSG_host_self(), channel),
943                                                host);
944
945 }
946
947 int MSG_task_listen_from_host(const char *alias, m_host_t host)
948 {
949   CHECK_HOST();
950
951   return
952       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
953                                                (alias), host);
954 }
955
956 /** \ingroup msg_gos_functions
957  * \brief Test whether there is a pending communication on a channel, and who sent it.
958  *
959  * It takes one parameter.
960  * \param channel the channel on which the agent should be
961  listening. This value has to be >=0 and < than the maximal
962  number of channels fixed with MSG_set_channel_number().
963  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
964  */
965 int MSG_task_probe_from(m_channel_t channel)
966 {
967   m_task_t task;
968
969   CHECK_HOST();
970
971   xbt_assert((channel >= 0)
972               && (channel < msg_global->max_channel), "Invalid channel %d",
973               channel);
974
975   if (NULL ==
976       (task =
977        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
978                             (MSG_host_self(), channel))))
979     return -1;
980
981   return MSG_process_get_PID(task->simdata->sender);
982 }
983
984 int MSG_task_listen_from(const char *alias)
985 {
986   m_task_t task;
987
988   CHECK_HOST();
989
990   if (NULL ==
991       (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
992     return -1;
993
994   return MSG_process_get_PID(task->simdata->sender);
995 }
996
997 #ifdef MSG_USE_DEPRECATED
998 /** \ingroup msg_gos_functions
999  *
1000  * \brief Return the last value returned by a MSG function (except
1001  * MSG_get_errno...).
1002  */
1003 MSG_error_t MSG_get_errno(void)
1004 {
1005   return PROCESS_GET_ERRNO();
1006 }
1007 #endif