Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG_comm_destroy should not assume that the task still exists
[simgrid.git] / src / msg / msg_gos.c
1 /* Copyright (c) 2004-2011. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "msg_private.h"
7 #include "msg_mailbox.h"
8 #include "mc/mc.h"
9 #include "xbt/log.h"
10 #include "xbt/sysdep.h"
11 #include "simix/private.h" // FIXME
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
14                                 "Logging specific to MSG (gos)");
15
16 /** \ingroup msg_gos_functions
17  * \brief Executes a task and waits for its termination.
18  *
19  * This function is used for describing the behavior of an agent. It
20  * takes only one parameter.
21  * \param task a #m_task_t to execute on the location on which the
22  agent is running.
23  * \return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED
24  * or #MSG_HOST_FAILURE otherwise
25  */
26 MSG_error_t MSG_task_execute(m_task_t task)
27 {
28   simdata_task_t simdata = NULL;
29   simdata_process_t p_simdata;
30   e_smx_state_t comp_state;
31   CHECK_HOST();
32
33   simdata = task->simdata;
34
35   xbt_assert(simdata->host_nb == 0,
36               "This is a parallel task. Go to hell.");
37
38 #ifdef HAVE_TRACING
39   TRACE_msg_task_execute_start(task);
40 #endif
41
42   xbt_assert((!simdata->compute) && (task->simdata->isused == 0),
43               "This task is executed somewhere else. Go fix your code! %d",
44               task->simdata->isused);
45
46   XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self()));
47
48   if (simdata->computation_amount == 0) {
49 #ifdef HAVE_TRACING
50     TRACE_msg_task_execute_end(task);
51 #endif
52     return MSG_OK;
53   }
54
55   m_process_t self = SIMIX_process_self();
56   p_simdata = SIMIX_process_self_get_data(self);
57   simdata->isused=1;
58   simdata->compute =
59       SIMIX_req_host_execute(task->name, p_simdata->m_host->simdata->smx_host,
60                            simdata->computation_amount,
61                            simdata->priority);
62 #ifdef HAVE_TRACING
63   SIMIX_req_set_category(simdata->compute, task->category);
64 #endif
65
66   p_simdata->waiting_action = simdata->compute;
67   comp_state = SIMIX_req_host_execution_wait(simdata->compute);
68   p_simdata->waiting_action = NULL;
69
70   simdata->isused=0;
71
72   XBT_DEBUG("Execution task '%s' finished in state %d", task->name, comp_state);
73   if (comp_state == SIMIX_DONE) {
74     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
75     simdata->computation_amount = 0.0;
76     simdata->comm = NULL;
77     simdata->compute = NULL;
78 #ifdef HAVE_TRACING
79     TRACE_msg_task_execute_end(task);
80 #endif
81     MSG_RETURN(MSG_OK);
82   } else if (SIMIX_req_host_get_state(SIMIX_host_self()) == 0) {
83     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
84     simdata->comm = NULL;
85     simdata->compute = NULL;
86 #ifdef HAVE_TRACING
87     TRACE_msg_task_execute_end(task);
88 #endif
89     MSG_RETURN(MSG_HOST_FAILURE);
90   } else {
91     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
92     simdata->comm = NULL;
93     simdata->compute = NULL;
94 #ifdef HAVE_TRACING
95     TRACE_msg_task_execute_end(task);
96 #endif
97     MSG_RETURN(MSG_TASK_CANCELED);
98   }
99 }
100
101 /** \ingroup m_task_management
102  * \brief Creates a new #m_task_t (a parallel one....).
103  *
104  * A constructor for #m_task_t taking six arguments and returning the
105  corresponding object.
106  * \param name a name for the object. It is for user-level information
107  and can be NULL.
108  * \param host_nb the number of hosts implied in the parallel task.
109  * \param host_list an array of \p host_nb m_host_t.
110  * \param computation_amount an array of \p host_nb
111  doubles. computation_amount[i] is the total number of operations
112  that have to be performed on host_list[i].
113  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
114  * \param data a pointer to any data may want to attach to the new
115  object.  It is for user-level information and can be NULL. It can
116  be retrieved with the function \ref MSG_task_get_data.
117  * \see m_task_t
118  * \return The new corresponding object.
119  */
120 m_task_t
121 MSG_parallel_task_create(const char *name, int host_nb,
122                          const m_host_t * host_list,
123                          double *computation_amount,
124                          double *communication_amount, void *data)
125 {
126   int i;
127   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
128   m_task_t task = xbt_new0(s_m_task_t, 1);
129   task->simdata = simdata;
130
131   /* Task structure */
132   task->name = xbt_strdup(name);
133   task->data = data;
134
135   /* Simulator Data */
136   simdata->computation_amount = 0;
137   simdata->message_size = 0;
138   simdata->compute = NULL;
139   simdata->comm = NULL;
140   simdata->rate = -1.0;
141   simdata->isused = 0;
142   simdata->sender = NULL;
143   simdata->receiver = NULL;
144   simdata->source = NULL;
145
146   simdata->host_nb = host_nb;
147   simdata->host_list = xbt_new0(smx_host_t, host_nb);
148   simdata->comp_amount = computation_amount;
149   simdata->comm_amount = communication_amount;
150
151   for (i = 0; i < host_nb; i++)
152     simdata->host_list[i] = host_list[i]->simdata->smx_host;
153
154   return task;
155 }
156
157 MSG_error_t MSG_parallel_task_execute(m_task_t task)
158 {
159   simdata_task_t simdata = NULL;
160   e_smx_state_t comp_state;
161   simdata_process_t p_simdata;
162   CHECK_HOST();
163
164   simdata = task->simdata;
165   p_simdata = SIMIX_process_self_get_data(SIMIX_process_self());
166
167   xbt_assert((!simdata->compute)
168               && (task->simdata->isused == 0),
169               "This task is executed somewhere else. Go fix your code!");
170
171   xbt_assert(simdata->host_nb,
172               "This is not a parallel task. Go to hell.");
173
174   XBT_DEBUG("Parallel computing on %s", p_simdata->m_host->name);
175
176   simdata->isused=1;
177
178   simdata->compute =
179       SIMIX_req_host_parallel_execute(task->name, simdata->host_nb,
180                                   simdata->host_list,
181                                   simdata->comp_amount,
182                                   simdata->comm_amount, 1.0, -1.0);
183   XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
184
185   p_simdata->waiting_action = simdata->compute;
186   comp_state = SIMIX_req_host_execution_wait(simdata->compute);
187   p_simdata->waiting_action = NULL;
188
189   XBT_DEBUG("Finished waiting for execution of action %p, state = %d", simdata->compute, comp_state);
190
191   simdata->isused=0;
192
193   if (comp_state == SIMIX_DONE) {
194     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
195     simdata->computation_amount = 0.0;
196     simdata->comm = NULL;
197     simdata->compute = NULL;
198     MSG_RETURN(MSG_OK);
199   } else if (SIMIX_req_host_get_state(SIMIX_host_self()) == 0) {
200     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
201     simdata->comm = NULL;
202     simdata->compute = NULL;
203     MSG_RETURN(MSG_HOST_FAILURE);
204   } else {
205     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
206     simdata->comm = NULL;
207     simdata->compute = NULL;
208     MSG_RETURN(MSG_TASK_CANCELED);
209   }
210 }
211
212
213 /** \ingroup msg_gos_functions
214  * \brief Sleep for the specified number of seconds
215  *
216  * Makes the current process sleep until \a time seconds have elapsed.
217  *
218  * \param nb_sec a number of second
219  */
220 MSG_error_t MSG_process_sleep(double nb_sec)
221 {
222   e_smx_state_t state;
223   /*m_process_t proc = MSG_process_self();*/
224
225 #ifdef HAVE_TRACING
226   TRACE_msg_process_sleep_in(MSG_process_self());
227 #endif
228
229   /* create action to sleep */
230   state = SIMIX_req_process_sleep(nb_sec);
231
232   /*proc->simdata->waiting_action = act_sleep;
233
234   FIXME: check if not setting the waiting_action breaks something on msg
235   
236   proc->simdata->waiting_action = NULL;*/
237   
238   if (state == SIMIX_DONE) {
239 #ifdef HAVE_TRACING
240   TRACE_msg_process_sleep_out(MSG_process_self());
241 #endif
242     MSG_RETURN(MSG_OK);
243   } else {
244 #ifdef HAVE_TRACING
245     TRACE_msg_process_sleep_out(MSG_process_self());
246 #endif
247     MSG_RETURN(MSG_HOST_FAILURE);
248   }
249 }
250
251 /** \ingroup msg_gos_functions
252  * \brief Listen on \a channel and waits for receiving a task from \a host.
253  *
254  * It takes three parameters.
255  * \param task a memory location for storing a #m_task_t. It will
256  hold a task when this function will return. Thus \a task should not
257  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
258  those two condition does not hold, there will be a warning message.
259  * \param channel the channel on which the agent should be
260  listening. This value has to be >=0 and < than the maximal
261  number of channels fixed with MSG_set_channel_number().
262  * \param host the host that is to be watched.
263  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
264  */
265 MSG_error_t
266 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
267 {
268   return MSG_task_get_ext(task, channel, -1, host);
269 }
270
271 /** \ingroup msg_gos_functions
272  * \brief Listen on a channel and wait for receiving a task.
273  *
274  * It takes two parameters.
275  * \param task a memory location for storing a #m_task_t. It will
276  hold a task when this function will return. Thus \a task should not
277  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
278  those two condition does not hold, there will be a warning message.
279  * \param channel the channel on which the agent should be
280  listening. This value has to be >=0 and < than the maximal
281  number of channels fixed with MSG_set_channel_number().
282  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
283  */
284 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
285 {
286   return MSG_task_get_with_timeout(task, channel, -1);
287 }
288
289 /** \ingroup msg_gos_functions
290  * \brief Listen on a channel and wait for receiving a task with a timeout.
291  *
292  * It takes three parameters.
293  * \param task a memory location for storing a #m_task_t. It will
294  hold a task when this function will return. Thus \a task should not
295  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
296  those two condition does not hold, there will be a warning message.
297  * \param channel the channel on which the agent should be
298  listening. This value has to be >=0 and < than the maximal
299  number of channels fixed with MSG_set_channel_number().
300  * \param max_duration the maximum time to wait for a task before giving
301  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
302  will not be modified and will still be
303  equal to \c NULL when returning.
304  * \return a #MSG_error_t indicating whether the operation was successful (#MSG_OK), or why it failed otherwise.
305  */
306 MSG_error_t
307 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
308                           double max_duration)
309 {
310   return MSG_task_get_ext(task, channel, max_duration, NULL);
311 }
312
313 /** \defgroup msg_gos_functions MSG Operating System Functions
314  *  \brief This section describes the functions that can be used
315  *  by an agent for handling some task.
316  */
317
318 MSG_error_t
319 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
320                  m_host_t host)
321 {
322   xbt_assert((channel >= 0)
323               && (channel < msg_global->max_channel), "Invalid channel %d",
324               channel);
325
326   return
327       MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
328                                (MSG_host_self(), channel), task, host,
329                                timeout);
330 }
331
332 MSG_error_t
333 MSG_task_receive_from_host(m_task_t * task, const char *alias,
334                            m_host_t host)
335 {
336   return MSG_task_receive_ext(task, alias, -1, host);
337 }
338
339 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
340 {
341   return MSG_task_receive_with_timeout(task, alias, -1);
342 }
343
344 MSG_error_t
345 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
346                               double timeout)
347 {
348   return MSG_task_receive_ext(task, alias, timeout, NULL);
349 }
350
351 MSG_error_t
352 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
353                      m_host_t host)
354 {
355   XBT_DEBUG
356       ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
357        alias);
358   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
359                                   host, timeout);
360 }
361
362 /** \ingroup msg_gos_functions
363  * \brief Sends a task on a mailbox.
364  *
365  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
366  * to end the communication.
367  *
368  * \param task a #m_task_t to send on another location.
369  * \param alias name of the mailbox to sent the task to
370  * \return the msg_comm_t communication created
371  */
372 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
373 {
374   return MSG_task_isend_with_matching(task,alias,NULL,NULL);
375 }
376
377 /** \ingroup msg_gos_functions
378  * \brief Sends a task on a mailbox, with support for matching requests
379  *
380  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
381  * to end the communication.
382  *
383  * \param task a #m_task_t to send on another location.
384  * \param alias name of the mailbox to sent the task to
385  * \param match_fun boolean function taking the #match_data provided by sender (here), and the one of the receiver (if any) and returning whether they match
386  * \param match_data user provided data passed to match_fun
387  * \return the msg_comm_t communication created
388  */
389 XBT_INLINE msg_comm_t MSG_task_isend_with_matching(m_task_t task, const char *alias,
390     int (*match_fun)(void*,void*),
391     void *match_data)
392 {
393   simdata_task_t t_simdata = NULL;
394   m_process_t process = MSG_process_self();
395   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
396
397   CHECK_HOST();
398
399   /* FIXME: these functions are not traceable */
400
401   /* Prepare the task to send */
402   t_simdata = task->simdata;
403   t_simdata->sender = process;
404   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data(process))->m_host;
405
406   xbt_assert(t_simdata->isused == 0,
407               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
408
409   t_simdata->isused = 1;
410   msg_global->sent_msg++;
411
412   /* Send it by calling SIMIX network layer */
413   msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
414   comm->task_sent = task;
415   comm->task_received = NULL;
416   comm->status = MSG_OK;
417   comm->s_comm =
418     SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
419                          t_simdata->rate, task, sizeof(void *), match_fun, NULL, match_data, 0);
420   t_simdata->comm = comm->s_comm; /* FIXME: is the field t_simdata->comm still useful? */
421
422   return comm;
423 }
424
425 /** \ingroup msg_gos_functions
426  * \brief Sends a task on a mailbox.
427  *
428  * This is a non blocking detached send function.
429  * Think of it as a best effort send. Keep in mind that the third parameter
430  * is only called if the communication fails. If the communication does work,
431  * it is responsibility of the receiver code to free anything related to
432  * the task, as usual. More details on this can be obtained on
433  * <a href="http://lists.gforge.inria.fr/pipermail/simgrid-user/2011-November/002649.html">this thread</a>
434  * in the SimGrid-user mailing list archive.
435  *
436  * \param task a #m_task_t to send on another location.
437  * \param alias name of the mailbox to sent the task to
438  * \param cleanup a function to destroy the task if the
439  * communication fails (if NULL, MSG_task_destroy() will
440  * be used by default)
441  */
442 void MSG_task_dsend(m_task_t task, const char *alias, void_f_pvoid_t cleanup)
443 {
444   simdata_task_t t_simdata = NULL;
445   m_process_t process = MSG_process_self();
446   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
447
448   CHECK_HOST();
449
450   if (cleanup == NULL) {
451     cleanup = (void_f_pvoid_t) MSG_task_destroy;
452   }
453
454   /* FIXME: these functions are not traceable */
455
456   /* Prepare the task to send */
457   t_simdata = task->simdata;
458   t_simdata->sender = process;
459   t_simdata->source = ((simdata_process_t) SIMIX_process_self_get_data(process))->m_host;
460
461   xbt_assert(t_simdata->isused == 0,
462               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
463
464   t_simdata->isused = 1;
465   msg_global->sent_msg++;
466
467   /* Send it by calling SIMIX network layer */
468   smx_action_t comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size,
469                        t_simdata->rate, task, sizeof(void *), NULL,cleanup, NULL, 1);
470   t_simdata->comm = comm;
471 }
472
473 /** \ingroup msg_gos_functions
474  * \brief Starts listening for receiving a task from an asynchronous communication.
475  *
476  * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test()
477  * to end the communication.
478  *
479  * \param task a memory location for storing a #m_task_t.
480  * \param name of the mailbox to receive the task on
481  * \return the msg_comm_t communication created
482  */
483 msg_comm_t MSG_task_irecv(m_task_t *task, const char *name)
484 {
485   smx_rdv_t rdv = MSG_mailbox_get_by_alias(name);
486
487   CHECK_HOST();
488
489   /* FIXME: these functions are not tracable */
490
491   /* Sanity check */
492   xbt_assert(task, "Null pointer for the task storage");
493
494   if (*task)
495     XBT_CRITICAL
496         ("MSG_task_irecv() was asked to write in a non empty task struct.");
497
498   /* Try to receive it by calling SIMIX network layer */
499   msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
500   comm->task_sent = NULL;
501   comm->task_received = task;
502   comm->status = MSG_OK;
503   comm->s_comm = SIMIX_req_comm_irecv(rdv, task, NULL, NULL, NULL);
504
505   return comm;
506 }
507
508 /** \ingroup msg_gos_functions
509  * \brief Checks whether a communication is done, and if yes, finalizes it.
510  * \param comm the communication to test
511  * \return TRUE if the communication is finished
512  * (but it may have failed, use MSG_comm_get_status() to know its status)
513  * or FALSE if the communication is not finished yet
514  * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
515  */
516 int MSG_comm_test(msg_comm_t comm)
517 {
518   xbt_ex_t e;
519   int finished = 0;
520   TRY {
521     finished = SIMIX_req_comm_test(comm->s_comm);
522
523     if (finished && comm->task_received != NULL) {
524       /* I am the receiver */
525       (*comm->task_received)->simdata->isused = 0;
526     }
527   }
528   CATCH(e) {
529     switch (e.category) {
530
531       case host_error:
532         comm->status = MSG_HOST_FAILURE;
533         finished = 1;
534         break;
535
536       case network_error:
537         comm->status = MSG_TRANSFER_FAILURE;
538         finished = 1;
539         break;
540
541       case timeout_error:
542         comm->status = MSG_TIMEOUT;
543         finished = 1;
544         break;
545
546       default:
547         RETHROW;
548     }
549     xbt_ex_free(e);
550   }
551
552   return finished;
553 }
554
555 /** \ingroup msg_gos_functions
556  * \brief This function checks if a communication is finished.
557  * \param comms a vector of communications
558  * \return the position of the finished communication if any
559  * (but it may have failed, use MSG_comm_get_status() to know its status),
560  * or -1 if none is finished
561  */
562 int MSG_comm_testany(xbt_dynar_t comms)
563 {
564   xbt_ex_t e;
565   int finished_index = -1;
566
567   /* create the equivalent dynar with SIMIX objects */
568   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
569   msg_comm_t comm;
570   unsigned int cursor;
571   xbt_dynar_foreach(comms, cursor, comm) {
572     xbt_dynar_push(s_comms, &comm->s_comm);
573   }
574
575   MSG_error_t status = MSG_OK;
576   TRY {
577     finished_index = SIMIX_req_comm_testany(s_comms);
578   }
579   CATCH(e) {
580     switch (e.category) {
581
582       case host_error:
583         finished_index = e.value;
584         status = MSG_HOST_FAILURE;
585         break;
586
587       case network_error:
588         finished_index = e.value;
589         status = MSG_TRANSFER_FAILURE;
590         break;
591
592       case timeout_error:
593         finished_index = e.value;
594         status = MSG_TIMEOUT;
595         break;
596
597       default:
598         RETHROW;
599     }
600     xbt_ex_free(e);
601   }
602   xbt_dynar_free(&s_comms);
603
604   if (finished_index != -1) {
605     comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
606     /* the communication is finished */
607     comm->status = status;
608
609     if (status == MSG_OK && comm->task_received != NULL) {
610       /* I am the receiver */
611       (*comm->task_received)->simdata->isused = 0;
612     }
613   }
614
615   return finished_index;
616 }
617
618 /** \ingroup msg_gos_functions
619  * \brief Destroys a communication.
620  * \param comm the communication to destroy.
621  */
622 void MSG_comm_destroy(msg_comm_t comm)
623 {
624   xbt_free(comm);
625 }
626
627 /** \ingroup msg_gos_functions
628  * \brief Wait for the completion of a communication.
629  *
630  * It takes two parameters.
631  * \param comm the communication to wait.
632  * \param timeout Wait until the communication terminates or the timeout occurs
633  * \return MSG_error_t
634  */
635 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
636 {
637   xbt_ex_t e;
638   TRY {
639     SIMIX_req_comm_wait(comm->s_comm, timeout);
640
641     if (comm->task_received != NULL) {
642       /* I am the receiver */
643       (*comm->task_received)->simdata->isused = 0;
644     }
645
646     /* FIXME: these functions are not traceable */
647   }
648   CATCH(e) {
649     switch (e.category) {
650     case host_error:
651       comm->status = MSG_HOST_FAILURE;
652       break;
653     case network_error:
654       comm->status = MSG_TRANSFER_FAILURE;
655       break;
656     case timeout_error:
657       comm->status = MSG_TIMEOUT;
658       break;
659     default:
660       RETHROW;
661     }
662     xbt_ex_free(e);
663   }
664
665   return comm->status;
666 }
667
668 /** \ingroup msg_gos_functions
669 * \brief This function is called by a sender and permit to wait for each communication
670 *
671 * \param comm a vector of communication
672 * \param nb_elem is the size of the comm vector
673 * \param timeout for each call of MSG_comm_wait
674 */
675 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
676 {
677   int i = 0;
678   for (i = 0; i < nb_elem; i++) {
679     MSG_comm_wait(comm[i], timeout);
680   }
681 }
682
683 /** \ingroup msg_gos_functions
684  * \brief This function waits for the first communication finished in a list.
685  * \param comms a vector of communications
686  * \return the position of the first finished communication
687  * (but it may have failed, use MSG_comm_get_status() to know its status)
688  */
689 int MSG_comm_waitany(xbt_dynar_t comms)
690 {
691   xbt_ex_t e;
692   int finished_index = -1;
693
694   /* create the equivalent dynar with SIMIX objects */
695   xbt_dynar_t s_comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
696   msg_comm_t comm;
697   unsigned int cursor;
698   xbt_dynar_foreach(comms, cursor, comm) {
699     xbt_dynar_push(s_comms, &comm->s_comm);
700   }
701
702   MSG_error_t status = MSG_OK;
703   TRY {
704     finished_index = SIMIX_req_comm_waitany(s_comms);
705   }
706   CATCH(e) {
707     switch (e.category) {
708
709       case host_error:
710         finished_index = e.value;
711         status = MSG_HOST_FAILURE;
712         break;
713
714       case network_error:
715         finished_index = e.value;
716         status = MSG_TRANSFER_FAILURE;
717         break;
718
719       case timeout_error:
720         finished_index = e.value;
721         status = MSG_TIMEOUT;
722         break;
723
724       default:
725         RETHROW;
726     }
727     xbt_ex_free(e);
728   }
729
730   xbt_assert(finished_index != -1, "WaitAny returned -1");
731   xbt_dynar_free(&s_comms);
732
733   comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
734   /* the communication is finished */
735   comm->status = status;
736
737   if (comm->task_received != NULL) {
738     /* I am the receiver */
739     (*comm->task_received)->simdata->isused = 0;
740   }
741
742   return finished_index;
743 }
744
745 /**
746  * \ingroup msg_gos_functions
747  * \brief Returns the error (if any) that occured during a finished communication.
748  * \param comm a finished communication
749  * \return the status of the communication, or MSG_OK if no error occured
750  * during the communication
751  */
752 MSG_error_t MSG_comm_get_status(msg_comm_t comm) {
753
754   return comm->status;
755 }
756
757 m_task_t MSG_comm_get_task(msg_comm_t comm)
758 {
759   xbt_assert(comm, "Invalid parameter");
760
761   return comm->task_received ? *comm->task_received : comm->task_sent;
762 }
763
764 /**
765  * \brief This function is called by SIMIX to copy the data of a comm.
766  * \param comm the comm
767  * \param buff_size size of the buffer
768  */
769 void MSG_comm_copy_data_from_SIMIX(smx_action_t comm, size_t buff_size) {
770
771   // copy the task
772   SIMIX_comm_copy_pointer_callback(comm, buff_size);
773
774   // notify the user callback if any
775   if (msg_global->task_copy_callback) {
776     msg_global->task_copy_callback(SIMIX_req_comm_get_src_data(comm),
777         SIMIX_req_comm_get_src_proc(comm), SIMIX_req_comm_get_dst_proc(comm));
778   }
779 }
780
781 /** \ingroup msg_gos_functions
782  * \brief Put a task on a channel of an host and waits for the end of the
783  * transmission.
784  *
785  * This function is used for describing the behavior of an agent. It
786  * takes three parameter.
787  * \param task a #m_task_t to send on another location. This task
788  will not be usable anymore when the function will return. There is
789  no automatic task duplication and you have to save your parameters
790  before calling this function. Tasks are unique and once it has been
791  sent to another location, you should not access it anymore. You do
792  not need to call MSG_task_destroy() but to avoid using, as an
793  effect of inattention, this task anymore, you definitely should
794  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
795  can be transfered iff it has been correctly created with
796  MSG_task_create().
797  * \param dest the destination of the message
798  * \param channel the channel on which the agent should put this
799  task. This value has to be >=0 and < than the maximal number of
800  channels fixed with MSG_set_channel_number().
801  * \return #MSG_HOST_FAILURE if the host on which
802  * this function was called was shut down,
803  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
804  * (network failure, dest failure) or #MSG_OK if it succeeded.
805  */
806 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
807 {
808   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
809 }
810
811 /** \ingroup msg_gos_functions
812  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
813  * rate.
814  *
815  * \sa MSG_task_put
816  */
817 MSG_error_t
818 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
819                      double maxrate)
820 {
821   task->simdata->rate = maxrate;
822   return MSG_task_put(task, dest, channel);
823 }
824
825 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
826  * host (with a timeout on the waiting of the destination host) and
827  * waits for the end of the transmission.
828  *
829  * This function is used for describing the behavior of an agent. It
830  * takes four parameter.
831  * \param task a #m_task_t to send on another location. This task
832  will not be usable anymore when the function will return. There is
833  no automatic task duplication and you have to save your parameters
834  before calling this function. Tasks are unique and once it has been
835  sent to another location, you should not access it anymore. You do
836  not need to call MSG_task_destroy() but to avoid using, as an
837  effect of inattention, this task anymore, you definitely should
838  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
839  can be transfered iff it has been correctly created with
840  MSG_task_create().
841  * \param dest the destination of the message
842  * \param channel the channel on which the agent should put this
843  task. This value has to be >=0 and < than the maximal number of
844  channels fixed with MSG_set_channel_number().
845  * \param timeout the maximum time to wait for a task before giving
846  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
847  will not be modified
848  * \return #MSG_HOST_FAILURE if the host on which
849 this function was called was shut down,
850 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
851 (network failure, dest failure, timeout...) or #MSG_OK if the communication succeeded.
852  */
853 MSG_error_t
854 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
855                           m_channel_t channel, double timeout)
856 {
857   xbt_assert((channel >= 0)
858               && (channel < msg_global->max_channel), "Invalid channel %d",
859               channel);
860
861   XBT_DEBUG("MSG_task_put_with_timout: Trying to send a task to '%s'", dest->name);
862   return
863       MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
864                                    (dest, channel), task, timeout);
865 }
866
867 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
868 {
869   XBT_DEBUG("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
870   return MSG_task_send_with_timeout(task, alias, -1);
871 }
872
873
874 MSG_error_t
875 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
876 {
877   task->simdata->rate = maxrate;
878   return MSG_task_send(task, alias);
879 }
880
881
882 MSG_error_t
883 MSG_task_send_with_timeout(m_task_t task, const char *alias,
884                            double timeout)
885 {
886   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
887                                       task, timeout);
888 }
889
890 int MSG_task_listen(const char *alias)
891 {
892   CHECK_HOST();
893
894   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
895 }
896
897 /** \ingroup msg_gos_functions
898  * \brief Test whether there is a pending communication on a channel.
899  *
900  * It takes one parameter.
901  * \param channel the channel on which the agent should be
902  listening. This value has to be >=0 and < than the maximal
903  number of channels fixed with MSG_set_channel_number().
904  * \return 1 if there is a pending communication and 0 otherwise
905  */
906 int MSG_task_Iprobe(m_channel_t channel)
907 {
908   xbt_assert((channel >= 0)
909               && (channel < msg_global->max_channel), "Invalid channel %d",
910               channel);
911
912   CHECK_HOST();
913
914   return
915       !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
916                             (MSG_host_self(), channel));
917 }
918
919 /** \ingroup msg_gos_functions
920
921  * \brief Return the number of tasks waiting to be received on a \a
922  channel and sent by \a host.
923  *
924  * It takes two parameters.
925  * \param channel the channel on which the agent should be
926  listening. This value has to be >=0 and < than the maximal
927  number of channels fixed with MSG_set_channel_number().
928  * \param host the host that is to be watched.
929  * \return the number of tasks waiting to be received on \a channel
930  and sent by \a host.
931  */
932 int MSG_task_probe_from_host(int channel, m_host_t host)
933 {
934   xbt_assert((channel >= 0)
935               && (channel < msg_global->max_channel), "Invalid channel %d",
936               channel);
937
938   CHECK_HOST();
939
940   return
941       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
942                                                (MSG_host_self(), channel),
943                                                host);
944
945 }
946
947 int MSG_task_listen_from_host(const char *alias, m_host_t host)
948 {
949   CHECK_HOST();
950
951   return
952       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
953                                                (alias), host);
954 }
955
956 /** \ingroup msg_gos_functions
957  * \brief Test whether there is a pending communication on a channel, and who sent it.
958  *
959  * It takes one parameter.
960  * \param channel the channel on which the agent should be
961  listening. This value has to be >=0 and < than the maximal
962  number of channels fixed with MSG_set_channel_number().
963  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
964  */
965 int MSG_task_probe_from(m_channel_t channel)
966 {
967   m_task_t task;
968
969   CHECK_HOST();
970
971   xbt_assert((channel >= 0)
972               && (channel < msg_global->max_channel), "Invalid channel %d",
973               channel);
974
975   if (NULL ==
976       (task =
977        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
978                             (MSG_host_self(), channel))))
979     return -1;
980
981   return MSG_process_get_PID(task->simdata->sender);
982 }
983
984 int MSG_task_listen_from(const char *alias)
985 {
986   m_task_t task;
987
988   CHECK_HOST();
989
990   if (NULL ==
991       (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
992     return -1;
993
994   return MSG_process_get_PID(task->simdata->sender);
995 }
996
997 #ifdef MSG_USE_DEPRECATED
998 /** \ingroup msg_gos_functions
999  *
1000  * \brief Return the last value returned by a MSG function (except
1001  * MSG_get_errno...).
1002  */
1003 MSG_error_t MSG_get_errno(void)
1004 {
1005   return PROCESS_GET_ERRNO();
1006 }
1007 #endif