Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
change mmalloc.h into a public header
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "xbt/sysdep.h"
9 #include "xbt/log.h"
10 #include "mailbox.h"
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
14                                 "Logging specific to MSG (gos)");
15
16 /** \ingroup msg_gos_functions
17  *
18  * \brief Return the last value returned by a MSG function (except
19  * MSG_get_errno...).
20  */
21 MSG_error_t MSG_get_errno(void)
22 {
23   return PROCESS_GET_ERRNO();
24 }
25
26 /** \ingroup msg_gos_functions
27  * \brief Executes a task and waits for its termination.
28  *
29  * This function is used for describing the behavior of an agent. It
30  * takes only one parameter.
31  * \param task a #m_task_t to execute on the location on which the
32    agent is running.
33  * \return #MSG_FATAL if \a task is not properly initialized and
34  * #MSG_OK otherwise.
35  */
36 MSG_error_t MSG_task_execute(m_task_t task)
37 {
38   simdata_task_t simdata = NULL;
39   m_process_t self = MSG_process_self();
40   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
41   CHECK_HOST();
42 #ifdef HAVE_TRACING
43   TRACE_msg_task_execute_start (task);
44 #endif
45
46   simdata = task->simdata;
47
48   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
49               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
50
51   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
52
53   if (simdata->computation_amount == 0) {
54 #ifdef HAVE_TRACING
55     TRACE_msg_task_execute_end (task);
56 #endif
57     return MSG_OK;
58   }
59   simdata->refcount++;
60   SIMIX_mutex_lock(simdata->mutex);
61   simdata->compute =
62     SIMIX_action_execute(SIMIX_host_self(), task->name,
63                          simdata->computation_amount);
64   SIMIX_action_set_priority(simdata->compute, simdata->priority);
65
66   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
67   self->simdata->waiting_action = simdata->compute;
68   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
69   do {
70     SIMIX_cond_wait(simdata->cond, simdata->mutex);
71     state = SIMIX_action_get_state(simdata->compute);
72   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
73   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
74   self->simdata->waiting_action = NULL;
75
76   SIMIX_mutex_unlock(simdata->mutex);
77   simdata->refcount--;
78
79   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
80     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
81     SIMIX_action_destroy(task->simdata->compute);
82     simdata->computation_amount = 0.0;
83     simdata->comm = NULL;
84     simdata->compute = NULL;
85 #ifdef HAVE_TRACING
86     TRACE_msg_task_execute_end (task);
87 #endif
88     MSG_RETURN(MSG_OK);
89   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
90     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
91     SIMIX_action_destroy(task->simdata->compute);
92     simdata->comm = NULL;
93     simdata->compute = NULL;
94 #ifdef HAVE_TRACING
95     TRACE_msg_task_execute_end (task);
96 #endif
97     MSG_RETURN(MSG_HOST_FAILURE);
98   } else {
99     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
100     SIMIX_action_destroy(task->simdata->compute);
101     simdata->comm = NULL;
102     simdata->compute = NULL;
103 #ifdef HAVE_TRACING
104     TRACE_msg_task_execute_end (task);
105 #endif
106     MSG_RETURN(MSG_TASK_CANCELLED);
107   }
108 }
109
110 /** \ingroup m_task_management
111  * \brief Creates a new #m_task_t (a parallel one....).
112  *
113  * A constructor for #m_task_t taking six arguments and returning the
114    corresponding object.
115  * \param name a name for the object. It is for user-level information
116    and can be NULL.
117  * \param host_nb the number of hosts implied in the parallel task.
118  * \param host_list an array of \p host_nb m_host_t.
119  * \param computation_amount an array of \p host_nb
120    doubles. computation_amount[i] is the total number of operations
121    that have to be performed on host_list[i].
122  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
123  * \param data a pointer to any data may want to attach to the new
124    object.  It is for user-level information and can be NULL. It can
125    be retrieved with the function \ref MSG_task_get_data.
126  * \see m_task_t
127  * \return The new corresponding object.
128  */
129 m_task_t
130 MSG_parallel_task_create(const char *name, int host_nb,
131                          const m_host_t * host_list,
132                          double *computation_amount,
133                          double *communication_amount, void *data)
134 {
135   int i;
136   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
137   m_task_t task = xbt_new0(s_m_task_t, 1);
138   task->simdata = simdata;
139
140   /* Task structure */
141   task->name = xbt_strdup(name);
142   task->data = data;
143
144   /* Simulator Data */
145   simdata->computation_amount = 0;
146   simdata->message_size = 0;
147   simdata->cond = SIMIX_cond_init();
148   simdata->mutex = SIMIX_mutex_init();
149   simdata->compute = NULL;
150   simdata->comm = NULL;
151   simdata->rate = -1.0;
152   simdata->refcount = 1;
153   simdata->sender = NULL;
154   simdata->receiver = NULL;
155   simdata->source = NULL;
156
157   simdata->host_nb = host_nb;
158   simdata->host_list = xbt_new0(smx_host_t, host_nb);
159   simdata->comp_amount = computation_amount;
160   simdata->comm_amount = communication_amount;
161
162   for (i = 0; i < host_nb; i++)
163     simdata->host_list[i] = host_list[i]->simdata->smx_host;
164
165   return task;
166 }
167
168 MSG_error_t MSG_parallel_task_execute(m_task_t task)
169 {
170   simdata_task_t simdata = NULL;
171   m_process_t self = MSG_process_self();
172   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
173   CHECK_HOST();
174
175   simdata = task->simdata;
176
177   xbt_assert0((!simdata->compute)
178               && (task->simdata->refcount == 1),
179               "This task is executed somewhere else. Go fix your code!");
180
181   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
182
183   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
184
185   simdata->refcount++;
186
187   SIMIX_mutex_lock(simdata->mutex);
188   simdata->compute =
189     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
190                                   simdata->host_list, simdata->comp_amount,
191                                   simdata->comm_amount, 1.0, -1.0);
192
193   self->simdata->waiting_action = simdata->compute;
194   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
195   do {
196     SIMIX_cond_wait(simdata->cond, simdata->mutex);
197     state = SIMIX_action_get_state(task->simdata->compute);
198   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
199
200   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
201   self->simdata->waiting_action = NULL;
202
203
204   SIMIX_mutex_unlock(simdata->mutex);
205   simdata->refcount--;
206
207   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
208     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
209     SIMIX_action_destroy(task->simdata->compute);
210     simdata->computation_amount = 0.0;
211     simdata->comm = NULL;
212     simdata->compute = NULL;
213     MSG_RETURN(MSG_OK);
214   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
215     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
216     SIMIX_action_destroy(task->simdata->compute);
217     simdata->comm = NULL;
218     simdata->compute = NULL;
219     MSG_RETURN(MSG_HOST_FAILURE);
220   } else {
221     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
222     SIMIX_action_destroy(task->simdata->compute);
223     simdata->comm = NULL;
224     simdata->compute = NULL;
225     MSG_RETURN(MSG_TASK_CANCELLED);
226   }
227
228 }
229
230
231 /** \ingroup msg_gos_functions
232  * \brief Sleep for the specified number of seconds
233  *
234  * Makes the current process sleep until \a time seconds have elapsed.
235  *
236  * \param nb_sec a number of second
237  */
238 MSG_error_t MSG_process_sleep(double nb_sec)
239 {
240   smx_action_t act_sleep;
241   m_process_t proc = MSG_process_self();
242   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
243   smx_mutex_t mutex;
244   smx_cond_t cond;
245
246 #ifdef HAVE_TRACING
247   TRACE_msg_process_sleep_in (MSG_process_self());
248 #endif
249
250   /* create action to sleep */
251   act_sleep =
252     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
253                        nb_sec);
254
255   mutex = SIMIX_mutex_init();
256   SIMIX_mutex_lock(mutex);
257
258   /* create conditional and register action to it */
259   cond = SIMIX_cond_init();
260
261   proc->simdata->waiting_action = act_sleep;
262   SIMIX_register_action_to_condition(act_sleep, cond);
263   do {
264     SIMIX_cond_wait(cond, mutex);
265     state = SIMIX_action_get_state(act_sleep);
266   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
267   proc->simdata->waiting_action = NULL;
268   SIMIX_unregister_action_to_condition(act_sleep, cond);
269   SIMIX_mutex_unlock(mutex);
270
271   /* remove variables */
272   SIMIX_cond_destroy(cond);
273   SIMIX_mutex_destroy(mutex);
274
275   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
276     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
277       SIMIX_action_destroy(act_sleep);
278 #ifdef HAVE_TRACING
279       TRACE_msg_process_sleep_out (MSG_process_self());
280 #endif
281       MSG_RETURN(MSG_HOST_FAILURE);
282     }
283   } else {
284     SIMIX_action_destroy(act_sleep);
285 #ifdef HAVE_TRACING
286     TRACE_msg_process_sleep_out (MSG_process_self());
287 #endif
288     MSG_RETURN(MSG_HOST_FAILURE);
289   }
290
291   SIMIX_action_destroy(act_sleep);
292 #ifdef HAVE_TRACING
293   TRACE_msg_process_sleep_out (MSG_process_self());
294 #endif
295   MSG_RETURN(MSG_OK);
296 }
297
298 /** \ingroup msg_gos_functions
299  * \brief Listen on \a channel and waits for receiving a task from \a host.
300  *
301  * It takes three parameters.
302  * \param task a memory location for storing a #m_task_t. It will
303    hold a task when this function will return. Thus \a task should not
304    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
305    those two condition does not hold, there will be a warning message.
306  * \param channel the channel on which the agent should be
307    listening. This value has to be >=0 and < than the maximal
308    number of channels fixed with MSG_set_channel_number().
309  * \param host the host that is to be watched.
310  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
311    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
312  */
313 MSG_error_t
314 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
315 {
316   return MSG_task_get_ext(task, channel, -1, host);
317 }
318
319 /** \ingroup msg_gos_functions
320  * \brief Listen on a channel and wait for receiving a task.
321  *
322  * It takes two parameters.
323  * \param task a memory location for storing a #m_task_t. It will
324    hold a task when this function will return. Thus \a task should not
325    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
326    those two condition does not hold, there will be a warning message.
327  * \param channel the channel on which the agent should be
328    listening. This value has to be >=0 and < than the maximal
329    number of channels fixed with MSG_set_channel_number().
330  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
331  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
332  */
333 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
334 {
335   return MSG_task_get_with_timeout(task, channel, -1);
336 }
337
338 /** \ingroup msg_gos_functions
339  * \brief Listen on a channel and wait for receiving a task with a timeout.
340  *
341  * It takes three parameters.
342  * \param task a memory location for storing a #m_task_t. It will
343    hold a task when this function will return. Thus \a task should not
344    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
345    those two condition does not hold, there will be a warning message.
346  * \param channel the channel on which the agent should be
347    listening. This value has to be >=0 and < than the maximal
348    number of channels fixed with MSG_set_channel_number().
349  * \param max_duration the maximum time to wait for a task before giving
350     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
351     will not be modified and will still be
352     equal to \c NULL when returning.
353  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
354    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
355  */
356 MSG_error_t
357 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
358                           double max_duration)
359 {
360   return MSG_task_get_ext(task, channel, max_duration, NULL);
361 }
362
363 /** \defgroup msg_gos_functions MSG Operating System Functions
364  *  \brief This section describes the functions that can be used
365  *  by an agent for handling some task.
366  */
367
368 MSG_error_t
369 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
370                  m_host_t host)
371 {
372   xbt_assert1((channel >= 0)
373               && (channel < msg_global->max_channel), "Invalid channel %d",
374               channel);
375
376   return
377     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
378                              (MSG_host_self(), channel), task, host, timeout);
379 }
380
381 MSG_error_t
382 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
383 {
384   return MSG_task_receive_ext(task, alias, -1, host);
385 }
386
387 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
388 {
389   return MSG_task_receive_with_timeout(task, alias, -1);
390 }
391
392 MSG_error_t
393 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
394                               double timeout)
395 {
396   return MSG_task_receive_ext(task, alias, timeout, NULL);
397 }
398
399 MSG_error_t
400 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
401                      m_host_t host)
402 {
403   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
404                                   timeout);
405 }
406
407
408 /** \ingroup msg_gos_functions
409  * \brief Put a task on a channel of an host and waits for the end of the
410  * transmission.
411  *
412  * This function is used for describing the behavior of an agent. It
413  * takes three parameter.
414  * \param task a #m_task_t to send on another location. This task
415    will not be usable anymore when the function will return. There is
416    no automatic task duplication and you have to save your parameters
417    before calling this function. Tasks are unique and once it has been
418    sent to another location, you should not access it anymore. You do
419    not need to call MSG_task_destroy() but to avoid using, as an
420    effect of inattention, this task anymore, you definitely should
421    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
422    can be transfered iff it has been correctly created with
423    MSG_task_create().
424  * \param dest the destination of the message
425  * \param channel the channel on which the agent should put this
426    task. This value has to be >=0 and < than the maximal number of
427    channels fixed with MSG_set_channel_number().
428  * \return #MSG_FATAL if \a task is not properly initialized and
429  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
430  * this function was called was shut down. Returns
431  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
432  * (network failure, dest failure)
433  */
434 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
435 {
436   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
437 }
438
439 /** \ingroup msg_gos_functions
440  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
441  * rate.
442  *
443  * \sa MSG_task_put
444  */
445 MSG_error_t
446 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
447                      double maxrate)
448 {
449   task->simdata->rate = maxrate;
450   return MSG_task_put(task, dest, channel);
451 }
452
453 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
454  * host (with a timeout on the waiting of the destination host) and
455  * waits for the end of the transmission.
456  *
457  * This function is used for describing the behavior of an agent. It
458  * takes four parameter.
459  * \param task a #m_task_t to send on another location. This task
460    will not be usable anymore when the function will return. There is
461    no automatic task duplication and you have to save your parameters
462    before calling this function. Tasks are unique and once it has been
463    sent to another location, you should not access it anymore. You do
464    not need to call MSG_task_destroy() but to avoid using, as an
465    effect of inattention, this task anymore, you definitely should
466    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
467    can be transfered iff it has been correctly created with
468    MSG_task_create().
469  * \param dest the destination of the message
470  * \param channel the channel on which the agent should put this
471    task. This value has to be >=0 and < than the maximal number of
472    channels fixed with MSG_set_channel_number().
473  * \param timeout the maximum time to wait for a task before giving
474     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
475     will not be modified
476  * \return #MSG_FATAL if \a task is not properly initialized and
477    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
478    this function was called was shut down. Returns
479    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
480    (network failure, dest failure, timeout...)
481  */
482 MSG_error_t
483 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
484                           double timeout)
485 {
486   xbt_assert1((channel >= 0)
487               && (channel < msg_global->max_channel), "Invalid channel %d",
488               channel);
489
490   return
491     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
492                                  task, timeout);
493 }
494
495 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
496 {
497   return MSG_task_send_with_timeout(task, alias, -1);
498 }
499
500
501 MSG_error_t
502 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
503 {
504   task->simdata->rate = maxrate;
505   return MSG_task_send(task, alias);
506 }
507
508
509 MSG_error_t
510 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
511 {
512   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
513                                       timeout);
514 }
515
516 int MSG_task_listen(const char *alias)
517 {
518   CHECK_HOST();
519
520   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
521 }
522
523 /** \ingroup msg_gos_functions
524  * \brief Test whether there is a pending communication on a channel.
525  *
526  * It takes one parameter.
527  * \param channel the channel on which the agent should be
528    listening. This value has to be >=0 and < than the maximal
529    number of channels fixed with MSG_set_channel_number().
530  * \return 1 if there is a pending communication and 0 otherwise
531  */
532 int MSG_task_Iprobe(m_channel_t channel)
533 {
534   xbt_assert1((channel >= 0)
535               && (channel < msg_global->max_channel), "Invalid channel %d",
536               channel);
537
538   CHECK_HOST();
539
540   return
541     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
542                           (MSG_host_self(), channel));
543 }
544
545 /** \ingroup msg_gos_functions
546
547  * \brief Return the number of tasks waiting to be received on a \a
548    channel and sent by \a host.
549  *
550  * It takes two parameters.
551  * \param channel the channel on which the agent should be
552    listening. This value has to be >=0 and < than the maximal
553    number of channels fixed with MSG_set_channel_number().
554  * \param host the host that is to be watched.
555  * \return the number of tasks waiting to be received on \a channel
556    and sent by \a host.
557  */
558 int MSG_task_probe_from_host(int channel, m_host_t host)
559 {
560   xbt_assert1((channel >= 0)
561               && (channel < msg_global->max_channel), "Invalid channel %d",
562               channel);
563
564   CHECK_HOST();
565
566   return
567     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
568                                              (MSG_host_self(), channel),
569                                              host);
570
571 }
572
573 int MSG_task_listen_from_host(const char *alias, m_host_t host)
574 {
575   CHECK_HOST();
576
577   return
578     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
579                                              host);
580 }
581
582 /** \ingroup msg_gos_functions
583  * \brief Test whether there is a pending communication on a channel, and who sent it.
584  *
585  * It takes one parameter.
586  * \param channel the channel on which the agent should be
587    listening. This value has to be >=0 and < than the maximal
588    number of channels fixed with MSG_set_channel_number().
589  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
590  */
591 int MSG_task_probe_from(m_channel_t channel)
592 {
593   m_task_t task;
594
595   CHECK_HOST();
596
597   xbt_assert1((channel >= 0)
598               && (channel < msg_global->max_channel), "Invalid channel %d",
599               channel);
600
601   if (NULL ==
602       (task =
603        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
604                             (MSG_host_self(), channel))))
605     return -1;
606
607   return MSG_process_get_PID(task->simdata->sender);
608 }
609
610 int MSG_task_listen_from(const char *alias)
611 {
612   m_task_t task;
613
614   CHECK_HOST();
615
616   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
617     return -1;
618
619   return MSG_process_get_PID(task->simdata->sender);
620 }
621
622 /** \ingroup msg_gos_functions
623  * \brief Wait for at most \a max_duration second for a task reception
624    on \a channel.
625
626  * \a PID is updated with the PID of the first process that triggered this event if any.
627  *
628  * It takes three parameters:
629  * \param channel the channel on which the agent should be
630    listening. This value has to be >=0 and < than the maximal.
631    number of channels fixed with MSG_set_channel_number().
632  * \param PID a memory location for storing an int.
633  * \param timeout the maximum time to wait for a task before
634     giving up. In the case of a reception, *\a PID will be updated
635     with the PID of the first process to send a task.
636  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
637    and #MSG_OK otherwise.
638  */
639 MSG_error_t
640 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
641 {
642   m_host_t h = NULL;
643   simdata_host_t h_simdata = NULL;
644   m_task_t t;
645   int first_time = 1;
646   smx_cond_t cond;
647   msg_mailbox_t mailbox;
648
649   xbt_assert1((channel >= 0)
650               && (channel < msg_global->max_channel), "Invalid channel %d",
651               channel);
652
653   if (PID) {
654     *PID = -1;
655   }
656
657   if (timeout == 0.0) {
658     *PID = MSG_task_probe_from(channel);
659     MSG_RETURN(MSG_OK);
660   } else {
661     CHECK_HOST();
662     h = MSG_host_self();
663     h_simdata = h->simdata;
664
665     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
666
667     while (MSG_mailbox_is_empty(mailbox)) {
668       if (timeout > 0) {
669         if (!first_time) {
670           MSG_RETURN(MSG_OK);
671         }
672       }
673
674       SIMIX_mutex_lock(h_simdata->mutex);
675
676       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
677                   "A process is already blocked on this channel %d", channel);
678
679       cond = SIMIX_cond_init();
680
681       MSG_mailbox_set_cond(mailbox, cond);
682
683       if (timeout > 0) {
684         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
685       } else {
686         SIMIX_cond_wait(cond, h_simdata->mutex);
687       }
688
689       SIMIX_cond_destroy(cond);
690       SIMIX_mutex_unlock(h_simdata->mutex);
691
692       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
693         MSG_RETURN(MSG_HOST_FAILURE);
694       }
695
696       MSG_mailbox_set_cond(mailbox, NULL);
697       first_time = 0;
698     }
699
700     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
701       MSG_RETURN(MSG_OK);
702
703
704     if (PID) {
705       *PID = MSG_process_get_PID(t->simdata->sender);
706     }
707
708     MSG_RETURN(MSG_OK);
709   }
710 }
711
712
713 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
714 {
715   m_host_t h = NULL;
716   simdata_host_t h_simdata = NULL;
717   m_task_t t;
718   int first_time = 1;
719   smx_cond_t cond;
720   msg_mailbox_t mailbox;
721
722   if (PID) {
723     *PID = -1;
724   }
725
726   if (timeout == 0.0) {
727     *PID = MSG_task_listen_from(alias);
728     MSG_RETURN(MSG_OK);
729   } else {
730     CHECK_HOST();
731     h = MSG_host_self();
732     h_simdata = h->simdata;
733
734     DEBUG2("Probing on alias %s (%s)", alias, h->name);
735
736     mailbox = MSG_mailbox_get_by_alias(alias);
737
738     while (MSG_mailbox_is_empty(mailbox)) {
739       if (timeout > 0) {
740         if (!first_time) {
741           MSG_RETURN(MSG_OK);
742         }
743       }
744
745       SIMIX_mutex_lock(h_simdata->mutex);
746
747       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
748                   "A process is already blocked on this alias %s", alias);
749
750       cond = SIMIX_cond_init();
751
752       MSG_mailbox_set_cond(mailbox, cond);
753
754       if (timeout > 0) {
755         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
756       } else {
757         SIMIX_cond_wait(cond, h_simdata->mutex);
758       }
759
760       SIMIX_cond_destroy(cond);
761       SIMIX_mutex_unlock(h_simdata->mutex);
762
763       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
764         MSG_RETURN(MSG_HOST_FAILURE);
765       }
766
767       MSG_mailbox_set_cond(mailbox, NULL);
768       first_time = 0;
769     }
770
771     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
772       MSG_RETURN(MSG_OK);
773
774
775     if (PID) {
776       *PID = MSG_process_get_PID(t->simdata->sender);
777     }
778
779     MSG_RETURN(MSG_OK);
780   }
781 }