Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
make sure that MSG_task_execute() is not called by error on parallel tasks
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "xbt/sysdep.h"
9 #include "xbt/log.h"
10 #include "mailbox.h"
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
14                                 "Logging specific to MSG (gos)");
15
16 /** \ingroup msg_gos_functions
17  *
18  * \brief Return the last value returned by a MSG function (except
19  * MSG_get_errno...).
20  */
21 MSG_error_t MSG_get_errno(void)
22 {
23   return PROCESS_GET_ERRNO();
24 }
25
26 /** \ingroup msg_gos_functions
27  * \brief Executes a task and waits for its termination.
28  *
29  * This function is used for describing the behavior of an agent. It
30  * takes only one parameter.
31  * \param task a #m_task_t to execute on the location on which the
32    agent is running.
33  * \return #MSG_FATAL if \a task is not properly initialized and
34  * #MSG_OK otherwise.
35  */
36 MSG_error_t MSG_task_execute(m_task_t task)
37 {
38   simdata_task_t simdata = NULL;
39   m_process_t self = MSG_process_self();
40   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
41   CHECK_HOST();
42
43   simdata = task->simdata;
44
45   xbt_assert0(simdata->host_nb==0, "This is a parallel task. Go to hell.");
46
47 #ifdef HAVE_TRACING
48   TRACE_msg_task_execute_start (task);
49 #endif
50
51
52   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
53               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
54
55   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
56
57   if (simdata->computation_amount == 0) {
58 #ifdef HAVE_TRACING
59     TRACE_msg_task_execute_end (task);
60 #endif
61     return MSG_OK;
62   }
63   simdata->refcount++;
64   SIMIX_mutex_lock(simdata->mutex);
65   simdata->compute =
66     SIMIX_action_execute(SIMIX_host_self(), task->name,
67                          simdata->computation_amount);
68   SIMIX_action_set_priority(simdata->compute, simdata->priority);
69
70   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
71   self->simdata->waiting_action = simdata->compute;
72   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
73   do {
74     SIMIX_cond_wait(simdata->cond, simdata->mutex);
75     state = SIMIX_action_get_state(simdata->compute);
76   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
77   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
78   self->simdata->waiting_action = NULL;
79
80   SIMIX_mutex_unlock(simdata->mutex);
81   simdata->refcount--;
82
83   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
84     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
85     SIMIX_action_destroy(task->simdata->compute);
86     simdata->computation_amount = 0.0;
87     simdata->comm = NULL;
88     simdata->compute = NULL;
89 #ifdef HAVE_TRACING
90     TRACE_msg_task_execute_end (task);
91 #endif
92     MSG_RETURN(MSG_OK);
93   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
94     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
95     SIMIX_action_destroy(task->simdata->compute);
96     simdata->comm = NULL;
97     simdata->compute = NULL;
98 #ifdef HAVE_TRACING
99     TRACE_msg_task_execute_end (task);
100 #endif
101     MSG_RETURN(MSG_HOST_FAILURE);
102   } else {
103     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
104     SIMIX_action_destroy(task->simdata->compute);
105     simdata->comm = NULL;
106     simdata->compute = NULL;
107 #ifdef HAVE_TRACING
108     TRACE_msg_task_execute_end (task);
109 #endif
110     MSG_RETURN(MSG_TASK_CANCELLED);
111   }
112 }
113
114 /** \ingroup m_task_management
115  * \brief Creates a new #m_task_t (a parallel one....).
116  *
117  * A constructor for #m_task_t taking six arguments and returning the
118    corresponding object.
119  * \param name a name for the object. It is for user-level information
120    and can be NULL.
121  * \param host_nb the number of hosts implied in the parallel task.
122  * \param host_list an array of \p host_nb m_host_t.
123  * \param computation_amount an array of \p host_nb
124    doubles. computation_amount[i] is the total number of operations
125    that have to be performed on host_list[i].
126  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
127  * \param data a pointer to any data may want to attach to the new
128    object.  It is for user-level information and can be NULL. It can
129    be retrieved with the function \ref MSG_task_get_data.
130  * \see m_task_t
131  * \return The new corresponding object.
132  */
133 m_task_t
134 MSG_parallel_task_create(const char *name, int host_nb,
135                          const m_host_t * host_list,
136                          double *computation_amount,
137                          double *communication_amount, void *data)
138 {
139   int i;
140   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
141   m_task_t task = xbt_new0(s_m_task_t, 1);
142   task->simdata = simdata;
143
144   /* Task structure */
145   task->name = xbt_strdup(name);
146   task->data = data;
147
148   /* Simulator Data */
149   simdata->computation_amount = 0;
150   simdata->message_size = 0;
151   simdata->cond = SIMIX_cond_init();
152   simdata->mutex = SIMIX_mutex_init();
153   simdata->compute = NULL;
154   simdata->comm = NULL;
155   simdata->rate = -1.0;
156   simdata->refcount = 1;
157   simdata->sender = NULL;
158   simdata->receiver = NULL;
159   simdata->source = NULL;
160
161   simdata->host_nb = host_nb;
162   simdata->host_list = xbt_new0(smx_host_t, host_nb);
163   simdata->comp_amount = computation_amount;
164   simdata->comm_amount = communication_amount;
165
166   for (i = 0; i < host_nb; i++)
167     simdata->host_list[i] = host_list[i]->simdata->smx_host;
168
169   return task;
170 }
171
172 MSG_error_t MSG_parallel_task_execute(m_task_t task)
173 {
174   simdata_task_t simdata = NULL;
175   m_process_t self = MSG_process_self();
176   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
177   CHECK_HOST();
178
179   simdata = task->simdata;
180
181   xbt_assert0((!simdata->compute)
182               && (task->simdata->refcount == 1),
183               "This task is executed somewhere else. Go fix your code!");
184
185   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
186
187   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
188
189   simdata->refcount++;
190
191   SIMIX_mutex_lock(simdata->mutex);
192   simdata->compute =
193     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
194                                   simdata->host_list, simdata->comp_amount,
195                                   simdata->comm_amount, 1.0, -1.0);
196
197   self->simdata->waiting_action = simdata->compute;
198   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
199   do {
200     SIMIX_cond_wait(simdata->cond, simdata->mutex);
201     state = SIMIX_action_get_state(task->simdata->compute);
202   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
203
204   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
205   self->simdata->waiting_action = NULL;
206
207
208   SIMIX_mutex_unlock(simdata->mutex);
209   simdata->refcount--;
210
211   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
212     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
213     SIMIX_action_destroy(task->simdata->compute);
214     simdata->computation_amount = 0.0;
215     simdata->comm = NULL;
216     simdata->compute = NULL;
217     MSG_RETURN(MSG_OK);
218   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
219     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
220     SIMIX_action_destroy(task->simdata->compute);
221     simdata->comm = NULL;
222     simdata->compute = NULL;
223     MSG_RETURN(MSG_HOST_FAILURE);
224   } else {
225     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
226     SIMIX_action_destroy(task->simdata->compute);
227     simdata->comm = NULL;
228     simdata->compute = NULL;
229     MSG_RETURN(MSG_TASK_CANCELLED);
230   }
231
232 }
233
234
235 /** \ingroup msg_gos_functions
236  * \brief Sleep for the specified number of seconds
237  *
238  * Makes the current process sleep until \a time seconds have elapsed.
239  *
240  * \param nb_sec a number of second
241  */
242 MSG_error_t MSG_process_sleep(double nb_sec)
243 {
244   smx_action_t act_sleep;
245   m_process_t proc = MSG_process_self();
246   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
247   smx_mutex_t mutex;
248   smx_cond_t cond;
249
250 #ifdef HAVE_TRACING
251   TRACE_msg_process_sleep_in (MSG_process_self());
252 #endif
253
254   /* create action to sleep */
255   act_sleep =
256     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
257                        nb_sec);
258
259   mutex = SIMIX_mutex_init();
260   SIMIX_mutex_lock(mutex);
261
262   /* create conditional and register action to it */
263   cond = SIMIX_cond_init();
264
265   proc->simdata->waiting_action = act_sleep;
266   SIMIX_register_action_to_condition(act_sleep, cond);
267   do {
268     SIMIX_cond_wait(cond, mutex);
269     state = SIMIX_action_get_state(act_sleep);
270   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
271   proc->simdata->waiting_action = NULL;
272   SIMIX_unregister_action_to_condition(act_sleep, cond);
273   SIMIX_mutex_unlock(mutex);
274
275   /* remove variables */
276   SIMIX_cond_destroy(cond);
277   SIMIX_mutex_destroy(mutex);
278
279   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
280     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
281       SIMIX_action_destroy(act_sleep);
282 #ifdef HAVE_TRACING
283       TRACE_msg_process_sleep_out (MSG_process_self());
284 #endif
285       MSG_RETURN(MSG_HOST_FAILURE);
286     }
287   } else {
288     SIMIX_action_destroy(act_sleep);
289 #ifdef HAVE_TRACING
290     TRACE_msg_process_sleep_out (MSG_process_self());
291 #endif
292     MSG_RETURN(MSG_HOST_FAILURE);
293   }
294
295   SIMIX_action_destroy(act_sleep);
296 #ifdef HAVE_TRACING
297   TRACE_msg_process_sleep_out (MSG_process_self());
298 #endif
299   MSG_RETURN(MSG_OK);
300 }
301
302 /** \ingroup msg_gos_functions
303  * \brief Listen on \a channel and waits for receiving a task from \a host.
304  *
305  * It takes three parameters.
306  * \param task a memory location for storing a #m_task_t. It will
307    hold a task when this function will return. Thus \a task should not
308    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
309    those two condition does not hold, there will be a warning message.
310  * \param channel the channel on which the agent should be
311    listening. This value has to be >=0 and < than the maximal
312    number of channels fixed with MSG_set_channel_number().
313  * \param host the host that is to be watched.
314  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
315    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
316  */
317 MSG_error_t
318 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
319 {
320   return MSG_task_get_ext(task, channel, -1, host);
321 }
322
323 /** \ingroup msg_gos_functions
324  * \brief Listen on a channel and wait for receiving a task.
325  *
326  * It takes two parameters.
327  * \param task a memory location for storing a #m_task_t. It will
328    hold a task when this function will return. Thus \a task should not
329    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
330    those two condition does not hold, there will be a warning message.
331  * \param channel the channel on which the agent should be
332    listening. This value has to be >=0 and < than the maximal
333    number of channels fixed with MSG_set_channel_number().
334  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
335  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
336  */
337 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
338 {
339   return MSG_task_get_with_timeout(task, channel, -1);
340 }
341
342 /** \ingroup msg_gos_functions
343  * \brief Listen on a channel and wait for receiving a task with a timeout.
344  *
345  * It takes three parameters.
346  * \param task a memory location for storing a #m_task_t. It will
347    hold a task when this function will return. Thus \a task should not
348    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
349    those two condition does not hold, there will be a warning message.
350  * \param channel the channel on which the agent should be
351    listening. This value has to be >=0 and < than the maximal
352    number of channels fixed with MSG_set_channel_number().
353  * \param max_duration the maximum time to wait for a task before giving
354     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
355     will not be modified and will still be
356     equal to \c NULL when returning.
357  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
358    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
359  */
360 MSG_error_t
361 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
362                           double max_duration)
363 {
364   return MSG_task_get_ext(task, channel, max_duration, NULL);
365 }
366
367 /** \defgroup msg_gos_functions MSG Operating System Functions
368  *  \brief This section describes the functions that can be used
369  *  by an agent for handling some task.
370  */
371
372 MSG_error_t
373 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
374                  m_host_t host)
375 {
376   xbt_assert1((channel >= 0)
377               && (channel < msg_global->max_channel), "Invalid channel %d",
378               channel);
379
380   return
381     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
382                              (MSG_host_self(), channel), task, host, timeout);
383 }
384
385 MSG_error_t
386 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
387 {
388   return MSG_task_receive_ext(task, alias, -1, host);
389 }
390
391 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
392 {
393   return MSG_task_receive_with_timeout(task, alias, -1);
394 }
395
396 MSG_error_t
397 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
398                               double timeout)
399 {
400   return MSG_task_receive_ext(task, alias, timeout, NULL);
401 }
402
403 MSG_error_t
404 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
405                      m_host_t host)
406 {
407   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
408                                   timeout);
409 }
410
411 msg_comm_t MSG_task_isend(m_task_t task, const char *alias) {
412   simdata_task_t t_simdata = NULL;
413   m_process_t process = MSG_process_self();
414   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
415
416   CHECK_HOST();
417
418   /* FIXME: these functions are not tracable */
419
420   /* Prepare the task to send */
421   t_simdata = task->simdata;
422   t_simdata->sender = process;
423   t_simdata->source = MSG_host_self();
424
425   xbt_assert0(t_simdata->refcount == 1,
426               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
427
428   t_simdata->refcount++;
429   msg_global->sent_msg++;
430
431   process->simdata->waiting_task = task;
432
433   /* Send it by calling SIMIX network layer */
434
435   /* Kept for semantical compatibility with older implementation */
436   if(mailbox->cond)
437     SIMIX_cond_signal(mailbox->cond);
438
439   return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
440       task, sizeof(void*), &t_simdata->comm);
441 }
442
443 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias) {
444   smx_comm_t comm;
445   smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
446   msg_mailbox_t mailbox=MSG_mailbox_get_by_alias(alias);
447   size_t size = sizeof(void*);
448
449   CHECK_HOST();
450
451   /* FIXME: these functions are not tracable */
452
453   memset(&comm,0,sizeof(comm));
454
455   /* Kept for compatibility with older implementation */
456   xbt_assert1(!MSG_mailbox_get_cond(mailbox),
457               "A process is already blocked on this channel %s",
458               MSG_mailbox_get_alias(mailbox));
459
460   /* Sanity check */
461   xbt_assert0(task, "Null pointer for the task storage");
462
463   if (*task)
464     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
465
466   /* Try to receive it by calling SIMIX network layer */
467   return SIMIX_network_irecv(rdv, task, &size);
468 }
469 int MSG_comm_test(msg_comm_t comm) {
470   return SIMIX_network_test(comm);
471 }
472 MSG_error_t MSG_comm_wait(msg_comm_t comm,double timeout) {
473   xbt_ex_t e;
474   MSG_error_t res = MSG_OK;
475   TRY {
476     SIMIX_network_wait(comm,timeout);
477     //  (*task)->simdata->refcount--;
478
479     /* FIXME: these functions are not tracable */
480   }  CATCH(e){
481       switch(e.category){
482         case host_error:
483           res = MSG_HOST_FAILURE;
484           break;
485         case network_error:
486           res = MSG_TRANSFER_FAILURE;
487           break;
488         case timeout_error:
489           res = MSG_TIMEOUT;
490           break;
491         default:
492           xbt_die(bprintf("Unhandled SIMIX network exception: %s",e.msg));
493       }
494       xbt_ex_free(e);
495     }
496   return res;
497 }
498
499 /** \ingroup msg_gos_functions
500  * \brief Put a task on a channel of an host and waits for the end of the
501  * transmission.
502  *
503  * This function is used for describing the behavior of an agent. It
504  * takes three parameter.
505  * \param task a #m_task_t to send on another location. This task
506    will not be usable anymore when the function will return. There is
507    no automatic task duplication and you have to save your parameters
508    before calling this function. Tasks are unique and once it has been
509    sent to another location, you should not access it anymore. You do
510    not need to call MSG_task_destroy() but to avoid using, as an
511    effect of inattention, this task anymore, you definitely should
512    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
513    can be transfered iff it has been correctly created with
514    MSG_task_create().
515  * \param dest the destination of the message
516  * \param channel the channel on which the agent should put this
517    task. This value has to be >=0 and < than the maximal number of
518    channels fixed with MSG_set_channel_number().
519  * \return #MSG_FATAL if \a task is not properly initialized and
520  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
521  * this function was called was shut down. Returns
522  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
523  * (network failure, dest failure)
524  */
525 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
526 {
527   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
528 }
529
530 /** \ingroup msg_gos_functions
531  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
532  * rate.
533  *
534  * \sa MSG_task_put
535  */
536 MSG_error_t
537 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
538                      double maxrate)
539 {
540   task->simdata->rate = maxrate;
541   return MSG_task_put(task, dest, channel);
542 }
543
544 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
545  * host (with a timeout on the waiting of the destination host) and
546  * waits for the end of the transmission.
547  *
548  * This function is used for describing the behavior of an agent. It
549  * takes four parameter.
550  * \param task a #m_task_t to send on another location. This task
551    will not be usable anymore when the function will return. There is
552    no automatic task duplication and you have to save your parameters
553    before calling this function. Tasks are unique and once it has been
554    sent to another location, you should not access it anymore. You do
555    not need to call MSG_task_destroy() but to avoid using, as an
556    effect of inattention, this task anymore, you definitely should
557    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
558    can be transfered iff it has been correctly created with
559    MSG_task_create().
560  * \param dest the destination of the message
561  * \param channel the channel on which the agent should put this
562    task. This value has to be >=0 and < than the maximal number of
563    channels fixed with MSG_set_channel_number().
564  * \param timeout the maximum time to wait for a task before giving
565     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
566     will not be modified
567  * \return #MSG_FATAL if \a task is not properly initialized and
568    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
569    this function was called was shut down. Returns
570    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
571    (network failure, dest failure, timeout...)
572  */
573 MSG_error_t
574 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
575                           double timeout)
576 {
577   xbt_assert1((channel >= 0)
578               && (channel < msg_global->max_channel), "Invalid channel %d",
579               channel);
580
581   return
582     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
583                                  task, timeout);
584 }
585
586 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
587 {
588   return MSG_task_send_with_timeout(task, alias, -1);
589 }
590
591
592 MSG_error_t
593 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
594 {
595   task->simdata->rate = maxrate;
596   return MSG_task_send(task, alias);
597 }
598
599
600 MSG_error_t
601 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
602 {
603   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
604                                       timeout);
605 }
606
607 int MSG_task_listen(const char *alias)
608 {
609   CHECK_HOST();
610
611   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
612 }
613
614 /** \ingroup msg_gos_functions
615  * \brief Test whether there is a pending communication on a channel.
616  *
617  * It takes one parameter.
618  * \param channel the channel on which the agent should be
619    listening. This value has to be >=0 and < than the maximal
620    number of channels fixed with MSG_set_channel_number().
621  * \return 1 if there is a pending communication and 0 otherwise
622  */
623 int MSG_task_Iprobe(m_channel_t channel)
624 {
625   xbt_assert1((channel >= 0)
626               && (channel < msg_global->max_channel), "Invalid channel %d",
627               channel);
628
629   CHECK_HOST();
630
631   return
632     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
633                           (MSG_host_self(), channel));
634 }
635
636 /** \ingroup msg_gos_functions
637
638  * \brief Return the number of tasks waiting to be received on a \a
639    channel and sent by \a host.
640  *
641  * It takes two parameters.
642  * \param channel the channel on which the agent should be
643    listening. This value has to be >=0 and < than the maximal
644    number of channels fixed with MSG_set_channel_number().
645  * \param host the host that is to be watched.
646  * \return the number of tasks waiting to be received on \a channel
647    and sent by \a host.
648  */
649 int MSG_task_probe_from_host(int channel, m_host_t host)
650 {
651   xbt_assert1((channel >= 0)
652               && (channel < msg_global->max_channel), "Invalid channel %d",
653               channel);
654
655   CHECK_HOST();
656
657   return
658     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
659                                              (MSG_host_self(), channel),
660                                              host);
661
662 }
663
664 int MSG_task_listen_from_host(const char *alias, m_host_t host)
665 {
666   CHECK_HOST();
667
668   return
669     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
670                                              host);
671 }
672
673 /** \ingroup msg_gos_functions
674  * \brief Test whether there is a pending communication on a channel, and who sent it.
675  *
676  * It takes one parameter.
677  * \param channel the channel on which the agent should be
678    listening. This value has to be >=0 and < than the maximal
679    number of channels fixed with MSG_set_channel_number().
680  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
681  */
682 int MSG_task_probe_from(m_channel_t channel)
683 {
684   m_task_t task;
685
686   CHECK_HOST();
687
688   xbt_assert1((channel >= 0)
689               && (channel < msg_global->max_channel), "Invalid channel %d",
690               channel);
691
692   if (NULL ==
693       (task =
694        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
695                             (MSG_host_self(), channel))))
696     return -1;
697
698   return MSG_process_get_PID(task->simdata->sender);
699 }
700
701 int MSG_task_listen_from(const char *alias)
702 {
703   m_task_t task;
704
705   CHECK_HOST();
706
707   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
708     return -1;
709
710   return MSG_process_get_PID(task->simdata->sender);
711 }
712
713 /** \ingroup msg_gos_functions
714  * \brief Wait for at most \a max_duration second for a task reception
715    on \a channel.
716
717  * \a PID is updated with the PID of the first process that triggered this event if any.
718  *
719  * It takes three parameters:
720  * \param channel the channel on which the agent should be
721    listening. This value has to be >=0 and < than the maximal.
722    number of channels fixed with MSG_set_channel_number().
723  * \param PID a memory location for storing an int.
724  * \param timeout the maximum time to wait for a task before
725     giving up. In the case of a reception, *\a PID will be updated
726     with the PID of the first process to send a task.
727  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
728    and #MSG_OK otherwise.
729  */
730 MSG_error_t
731 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
732 {
733   m_host_t h = NULL;
734   simdata_host_t h_simdata = NULL;
735   m_task_t t;
736   int first_time = 1;
737   smx_cond_t cond;
738   msg_mailbox_t mailbox;
739
740   xbt_assert1((channel >= 0)
741               && (channel < msg_global->max_channel), "Invalid channel %d",
742               channel);
743
744   if (PID) {
745     *PID = -1;
746   }
747
748   if (timeout == 0.0) {
749     *PID = MSG_task_probe_from(channel);
750     MSG_RETURN(MSG_OK);
751   } else {
752     CHECK_HOST();
753     h = MSG_host_self();
754     h_simdata = h->simdata;
755
756     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
757
758     while (MSG_mailbox_is_empty(mailbox)) {
759       if (timeout > 0) {
760         if (!first_time) {
761           MSG_RETURN(MSG_OK);
762         }
763       }
764
765       SIMIX_mutex_lock(h_simdata->mutex);
766
767       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
768                   "A process is already blocked on this channel %d", channel);
769
770       cond = SIMIX_cond_init();
771
772       MSG_mailbox_set_cond(mailbox, cond);
773
774       if (timeout > 0) {
775         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
776       } else {
777         SIMIX_cond_wait(cond, h_simdata->mutex);
778       }
779
780       SIMIX_cond_destroy(cond);
781       SIMIX_mutex_unlock(h_simdata->mutex);
782
783       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
784         MSG_RETURN(MSG_HOST_FAILURE);
785       }
786
787       MSG_mailbox_set_cond(mailbox, NULL);
788       first_time = 0;
789     }
790
791     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
792       MSG_RETURN(MSG_OK);
793
794
795     if (PID) {
796       *PID = MSG_process_get_PID(t->simdata->sender);
797     }
798
799     MSG_RETURN(MSG_OK);
800   }
801 }
802
803
804 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
805 {
806   m_host_t h = NULL;
807   simdata_host_t h_simdata = NULL;
808   m_task_t t;
809   int first_time = 1;
810   smx_cond_t cond;
811   msg_mailbox_t mailbox;
812
813   if (PID) {
814     *PID = -1;
815   }
816
817   if (timeout == 0.0) {
818     *PID = MSG_task_listen_from(alias);
819     MSG_RETURN(MSG_OK);
820   } else {
821     CHECK_HOST();
822     h = MSG_host_self();
823     h_simdata = h->simdata;
824
825     DEBUG2("Probing on alias %s (%s)", alias, h->name);
826
827     mailbox = MSG_mailbox_get_by_alias(alias);
828
829     while (MSG_mailbox_is_empty(mailbox)) {
830       if (timeout > 0) {
831         if (!first_time) {
832           MSG_RETURN(MSG_OK);
833         }
834       }
835
836       SIMIX_mutex_lock(h_simdata->mutex);
837
838       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
839                   "A process is already blocked on this alias %s", alias);
840
841       cond = SIMIX_cond_init();
842
843       MSG_mailbox_set_cond(mailbox, cond);
844
845       if (timeout > 0) {
846         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
847       } else {
848         SIMIX_cond_wait(cond, h_simdata->mutex);
849       }
850
851       SIMIX_cond_destroy(cond);
852       SIMIX_mutex_unlock(h_simdata->mutex);
853
854       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
855         MSG_RETURN(MSG_HOST_FAILURE);
856       }
857
858       MSG_mailbox_set_cond(mailbox, NULL);
859       first_time = 0;
860     }
861
862     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
863       MSG_RETURN(MSG_OK);
864
865
866     if (PID) {
867       *PID = MSG_process_get_PID(t->simdata->sender);
868     }
869
870     MSG_RETURN(MSG_OK);
871   }
872 }