Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
321bfef6fa4ba634a501bd974c1d53ac8ade064c
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \ingroup msg_gos_functions
20  *
21  * \brief Return the last value returned by a MSG function (except
22  * MSG_get_errno...).
23  */
24 MSG_error_t MSG_get_errno(void)
25 {
26   return PROCESS_GET_ERRNO();
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Executes a task and waits for its termination.
31  *
32  * This function is used for describing the behavior of an agent. It
33  * takes only one parameter.
34  * \param task a #m_task_t to execute on the location on which the
35    agent is running.
36  * \return #MSG_FATAL if \a task is not properly initialized and
37  * #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_execute(m_task_t task)
40 {
41   simdata_task_t simdata = NULL;
42   m_process_t self = MSG_process_self();
43   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
44   CHECK_HOST();
45
46   simdata = task->simdata;
47   xbt_assert1((!simdata->compute)
48               && (task->simdata->refcount == 1),
49               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
50
51   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
52
53   simdata->refcount++;
54   SIMIX_mutex_lock(simdata->mutex);
55   simdata->compute =
56     SIMIX_action_execute(SIMIX_host_self(), task->name,
57                          simdata->computation_amount);
58   SIMIX_action_set_priority(simdata->compute, simdata->priority);
59
60   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
61   self->simdata->waiting_action = simdata->compute;
62   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
63   do {
64     SIMIX_cond_wait(simdata->cond, simdata->mutex);
65     state = SIMIX_action_get_state(simdata->compute);
66   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
67   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
68   self->simdata->waiting_action = NULL;
69
70   SIMIX_mutex_unlock(simdata->mutex);
71   simdata->refcount--;
72
73   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
74     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
75     SIMIX_action_destroy(task->simdata->compute);
76     simdata->computation_amount = 0.0;
77     simdata->comm = NULL;
78     simdata->compute = NULL;
79     MSG_RETURN(MSG_OK);
80   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
81     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
82     SIMIX_action_destroy(task->simdata->compute);
83     simdata->comm = NULL;
84     simdata->compute = NULL;
85     MSG_RETURN(MSG_HOST_FAILURE);
86   } else {
87     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
88     SIMIX_action_destroy(task->simdata->compute);
89     simdata->comm = NULL;
90     simdata->compute = NULL;
91     MSG_RETURN(MSG_TASK_CANCELLED);
92   }
93 }
94
95 /** \ingroup m_task_management
96  * \brief Creates a new #m_task_t (a parallel one....).
97  *
98  * A constructor for #m_task_t taking six arguments and returning the
99    corresponding object.
100  * \param name a name for the object. It is for user-level information
101    and can be NULL.
102  * \param host_nb the number of hosts implied in the parallel task.
103  * \param host_list an array of \p host_nb m_host_t.
104  * \param computation_amount an array of \p host_nb
105    doubles. computation_amount[i] is the total number of operations
106    that have to be performed on host_list[i].
107  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
108  * \param data a pointer to any data may want to attach to the new
109    object.  It is for user-level information and can be NULL. It can
110    be retrieved with the function \ref MSG_task_get_data.
111  * \see m_task_t
112  * \return The new corresponding object.
113  */
114 m_task_t
115 MSG_parallel_task_create(const char *name, int host_nb,
116                          const m_host_t * host_list,
117                          double *computation_amount,
118                          double *communication_amount, void *data)
119 {
120   int i;
121   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
122   m_task_t task = xbt_new0(s_m_task_t, 1);
123   task->simdata = simdata;
124
125   /* Task structure */
126   task->name = xbt_strdup(name);
127   task->data = data;
128
129   /* Simulator Data */
130   simdata->computation_amount = 0;
131   simdata->message_size = 0;
132   simdata->cond = SIMIX_cond_init();
133   simdata->mutex = SIMIX_mutex_init();
134   simdata->compute = NULL;
135   simdata->comm = NULL;
136   simdata->rate = -1.0;
137   simdata->refcount = 1;
138   simdata->sender = NULL;
139   simdata->receiver = NULL;
140   simdata->source = NULL;
141
142   simdata->host_nb = host_nb;
143   simdata->host_list = xbt_new0(smx_host_t, host_nb);
144   simdata->comp_amount = computation_amount;
145   simdata->comm_amount = communication_amount;
146
147   for (i = 0; i < host_nb; i++)
148     simdata->host_list[i] = host_list[i]->simdata->smx_host;
149
150   return task;
151 }
152
153 MSG_error_t MSG_parallel_task_execute(m_task_t task)
154 {
155   simdata_task_t simdata = NULL;
156   m_process_t self = MSG_process_self();
157   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
158   CHECK_HOST();
159
160   simdata = task->simdata;
161
162   xbt_assert0((!simdata->compute)
163               && (task->simdata->refcount == 1),
164               "This task is executed somewhere else. Go fix your code!");
165
166   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
167
168   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
169
170   simdata->refcount++;
171
172   SIMIX_mutex_lock(simdata->mutex);
173   simdata->compute =
174     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
175                                   simdata->host_list, simdata->comp_amount,
176                                   simdata->comm_amount, 1.0, -1.0);
177
178   self->simdata->waiting_action = simdata->compute;
179   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
180   do {
181     SIMIX_cond_wait(simdata->cond, simdata->mutex);
182     state = SIMIX_action_get_state(task->simdata->compute);
183   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
184
185   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
186   self->simdata->waiting_action = NULL;
187
188
189   SIMIX_mutex_unlock(simdata->mutex);
190   simdata->refcount--;
191
192   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
193     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
194     SIMIX_action_destroy(task->simdata->compute);
195     simdata->computation_amount = 0.0;
196     simdata->comm = NULL;
197     simdata->compute = NULL;
198     MSG_RETURN(MSG_OK);
199   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
200     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
201     SIMIX_action_destroy(task->simdata->compute);
202     simdata->comm = NULL;
203     simdata->compute = NULL;
204     MSG_RETURN(MSG_HOST_FAILURE);
205   } else {
206     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
207     SIMIX_action_destroy(task->simdata->compute);
208     simdata->comm = NULL;
209     simdata->compute = NULL;
210     MSG_RETURN(MSG_TASK_CANCELLED);
211   }
212
213 }
214
215
216 /** \ingroup msg_gos_functions
217  * \brief Sleep for the specified number of seconds
218  *
219  * Makes the current process sleep until \a time seconds have elapsed.
220  *
221  * \param nb_sec a number of second
222  */
223 MSG_error_t MSG_process_sleep(double nb_sec)
224 {
225   smx_action_t act_sleep;
226   m_process_t proc = MSG_process_self();
227   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
228   smx_mutex_t mutex;
229   smx_cond_t cond;
230
231   /* create action to sleep */
232   act_sleep =
233     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
234                        nb_sec);
235
236   mutex = SIMIX_mutex_init();
237   SIMIX_mutex_lock(mutex);
238
239   /* create conditional and register action to it */
240   cond = SIMIX_cond_init();
241
242   proc->simdata->waiting_action = act_sleep;
243   SIMIX_register_action_to_condition(act_sleep, cond);
244   do {
245     SIMIX_cond_wait(cond, mutex);
246     state = SIMIX_action_get_state(act_sleep);
247   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
248   proc->simdata->waiting_action = NULL;
249   SIMIX_unregister_action_to_condition(act_sleep, cond);
250   SIMIX_mutex_unlock(mutex);
251
252   /* remove variables */
253   SIMIX_cond_destroy(cond);
254   SIMIX_mutex_destroy(mutex);
255
256   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
257     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
258       SIMIX_action_destroy(act_sleep);
259       MSG_RETURN(MSG_HOST_FAILURE);
260     }
261   } else {
262     SIMIX_action_destroy(act_sleep);
263     MSG_RETURN(MSG_HOST_FAILURE);
264   }
265
266   SIMIX_action_destroy(act_sleep);
267   MSG_RETURN(MSG_OK);
268 }
269
270 /** \ingroup msg_gos_functions
271  * \brief Listen on \a channel and waits for receiving a task from \a host.
272  *
273  * It takes three parameters.
274  * \param task a memory location for storing a #m_task_t. It will
275    hold a task when this function will return. Thus \a task should not
276    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
277    those two condition does not hold, there will be a warning message.
278  * \param channel the channel on which the agent should be
279    listening. This value has to be >=0 and < than the maximal
280    number of channels fixed with MSG_set_channel_number().
281  * \param host the host that is to be watched.
282  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
283    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
284  */
285 MSG_error_t
286 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
287 {
288   return MSG_task_get_ext(task, channel, -1, host);
289 }
290
291 /** \ingroup msg_gos_functions
292  * \brief Listen on a channel and wait for receiving a task.
293  *
294  * It takes two parameters.
295  * \param task a memory location for storing a #m_task_t. It will
296    hold a task when this function will return. Thus \a task should not
297    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
298    those two condition does not hold, there will be a warning message.
299  * \param channel the channel on which the agent should be
300    listening. This value has to be >=0 and < than the maximal
301    number of channels fixed with MSG_set_channel_number().
302  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
303  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
304  */
305 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
306 {
307   return MSG_task_get_with_timeout(task, channel, -1);
308 }
309
310 /** \ingroup msg_gos_functions
311  * \brief Listen on a channel and wait for receiving a task with a timeout.
312  *
313  * It takes three parameters.
314  * \param task a memory location for storing a #m_task_t. It will
315    hold a task when this function will return. Thus \a task should not
316    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
317    those two condition does not hold, there will be a warning message.
318  * \param channel the channel on which the agent should be
319    listening. This value has to be >=0 and < than the maximal
320    number of channels fixed with MSG_set_channel_number().
321  * \param max_duration the maximum time to wait for a task before giving
322     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
323     will not be modified and will still be
324     equal to \c NULL when returning.
325  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
326    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
327  */
328 MSG_error_t
329 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
330                           double max_duration)
331 {
332   return MSG_task_get_ext(task, channel, max_duration, NULL);
333 }
334
335 /** \defgroup msg_gos_functions MSG Operating System Functions
336  *  \brief This section describes the functions that can be used
337  *  by an agent for handling some task.
338  */
339
340 MSG_error_t
341 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
342                  m_host_t host)
343 {
344   xbt_assert1((channel >= 0)
345               && (channel < msg_global->max_channel), "Invalid channel %d",
346               channel);
347
348   return
349     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
350                              (MSG_host_self(), channel), task, host, timeout);
351 }
352
353 MSG_error_t
354 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
355 {
356   return MSG_task_receive_ext(task, alias, -1, host);
357 }
358
359 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
360 {
361   return MSG_task_receive_with_timeout(task, alias, -1);
362 }
363
364 MSG_error_t
365 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
366                               double timeout)
367 {
368   return MSG_task_receive_ext(task, alias, timeout, NULL);
369 }
370
371 MSG_error_t
372 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
373                      m_host_t host)
374 {
375   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
376                                   timeout);
377 }
378
379
380 /** \ingroup msg_gos_functions
381  * \brief Put a task on a channel of an host and waits for the end of the
382  * transmission.
383  *
384  * This function is used for describing the behavior of an agent. It
385  * takes three parameter.
386  * \param task a #m_task_t to send on another location. This task
387    will not be usable anymore when the function will return. There is
388    no automatic task duplication and you have to save your parameters
389    before calling this function. Tasks are unique and once it has been
390    sent to another location, you should not access it anymore. You do
391    not need to call MSG_task_destroy() but to avoid using, as an
392    effect of inattention, this task anymore, you definitely should
393    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
394    can be transfered iff it has been correctly created with
395    MSG_task_create().
396  * \param dest the destination of the message
397  * \param channel the channel on which the agent should put this
398    task. This value has to be >=0 and < than the maximal number of
399    channels fixed with MSG_set_channel_number().
400  * \return #MSG_FATAL if \a task is not properly initialized and
401  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
402  * this function was called was shut down. Returns
403  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
404  * (network failure, dest failure)
405  */
406 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
407 {
408   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
409 }
410
411 /** \ingroup msg_gos_functions
412  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
413  * rate.
414  *
415  * \sa MSG_task_put
416  */
417 MSG_error_t
418 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
419                      double maxrate)
420 {
421   task->simdata->rate = maxrate;
422   return MSG_task_put(task, dest, channel);
423 }
424
425 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
426  * host (with a timeout on the waiting of the destination host) and
427  * waits for the end of the transmission.
428  *
429  * This function is used for describing the behavior of an agent. It
430  * takes four parameter.
431  * \param task a #m_task_t to send on another location. This task
432    will not be usable anymore when the function will return. There is
433    no automatic task duplication and you have to save your parameters
434    before calling this function. Tasks are unique and once it has been
435    sent to another location, you should not access it anymore. You do
436    not need to call MSG_task_destroy() but to avoid using, as an
437    effect of inattention, this task anymore, you definitely should
438    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
439    can be transfered iff it has been correctly created with
440    MSG_task_create().
441  * \param dest the destination of the message
442  * \param channel the channel on which the agent should put this
443    task. This value has to be >=0 and < than the maximal number of
444    channels fixed with MSG_set_channel_number().
445  * \param timeout the maximum time to wait for a task before giving
446     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
447     will not be modified
448  * \return #MSG_FATAL if \a task is not properly initialized and
449    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
450    this function was called was shut down. Returns
451    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
452    (network failure, dest failure, timeout...)
453  */
454 MSG_error_t
455 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
456                           double timeout)
457 {
458   xbt_assert1((channel >= 0)
459               && (channel < msg_global->max_channel), "Invalid channel %d",
460               channel);
461
462   return
463     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
464                                  task, timeout);
465 }
466
467 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
468 {
469   return MSG_task_send_with_timeout(task, alias, -1);
470 }
471
472
473 MSG_error_t
474 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
475 {
476   task->simdata->rate = maxrate;
477   return MSG_task_send(task, alias);
478 }
479
480
481 MSG_error_t
482 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
483 {
484   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
485                                       timeout);
486 }
487
488 int MSG_task_listen(const char *alias)
489 {
490   CHECK_HOST();
491
492   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
493 }
494
495 /** \ingroup msg_gos_functions
496  * \brief Test whether there is a pending communication on a channel.
497  *
498  * It takes one parameter.
499  * \param channel the channel on which the agent should be
500    listening. This value has to be >=0 and < than the maximal
501    number of channels fixed with MSG_set_channel_number().
502  * \return 1 if there is a pending communication and 0 otherwise
503  */
504 int MSG_task_Iprobe(m_channel_t channel)
505 {
506   xbt_assert1((channel >= 0)
507               && (channel < msg_global->max_channel), "Invalid channel %d",
508               channel);
509
510   CHECK_HOST();
511
512   return
513     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
514                           (MSG_host_self(), channel));
515 }
516
517 /** \ingroup msg_gos_functions
518
519  * \brief Return the number of tasks waiting to be received on a \a
520    channel and sent by \a host.
521  *
522  * It takes two parameters.
523  * \param channel the channel on which the agent should be
524    listening. This value has to be >=0 and < than the maximal
525    number of channels fixed with MSG_set_channel_number().
526  * \param host the host that is to be watched.
527  * \return the number of tasks waiting to be received on \a channel
528    and sent by \a host.
529  */
530 int MSG_task_probe_from_host(int channel, m_host_t host)
531 {
532   xbt_assert1((channel >= 0)
533               && (channel < msg_global->max_channel), "Invalid channel %d",
534               channel);
535
536   CHECK_HOST();
537
538   return
539     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
540                                              (MSG_host_self(), channel),
541                                              host);
542
543 }
544
545 int MSG_task_listen_from_host(const char *alias, m_host_t host)
546 {
547   CHECK_HOST();
548
549   return
550     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
551                                              host);
552 }
553
554 /** \ingroup msg_gos_functions
555  * \brief Test whether there is a pending communication on a channel, and who sent it.
556  *
557  * It takes one parameter.
558  * \param channel the channel on which the agent should be
559    listening. This value has to be >=0 and < than the maximal
560    number of channels fixed with MSG_set_channel_number().
561  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
562  */
563 int MSG_task_probe_from(m_channel_t channel)
564 {
565   m_task_t task;
566
567   CHECK_HOST();
568
569   xbt_assert1((channel >= 0)
570               && (channel < msg_global->max_channel), "Invalid channel %d",
571               channel);
572
573   if (NULL ==
574       (task =
575        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
576                             (MSG_host_self(), channel))))
577     return -1;
578
579   return MSG_process_get_PID(task->simdata->sender);
580 }
581
582 int MSG_task_listen_from(const char *alias)
583 {
584   m_task_t task;
585
586   CHECK_HOST();
587
588   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
589     return -1;
590
591   return MSG_process_get_PID(task->simdata->sender);
592 }
593
594 /** \ingroup msg_gos_functions
595  * \brief Wait for at most \a max_duration second for a task reception
596    on \a channel.
597
598  * \a PID is updated with the PID of the first process that triggered this event if any.
599  *
600  * It takes three parameters:
601  * \param channel the channel on which the agent should be
602    listening. This value has to be >=0 and < than the maximal.
603    number of channels fixed with MSG_set_channel_number().
604  * \param PID a memory location for storing an int.
605  * \param timeout the maximum time to wait for a task before
606     giving up. In the case of a reception, *\a PID will be updated
607     with the PID of the first process to send a task.
608  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
609    and #MSG_OK otherwise.
610  */
611 MSG_error_t
612 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
613 {
614   m_host_t h = NULL;
615   simdata_host_t h_simdata = NULL;
616   m_task_t t;
617   int first_time = 1;
618   smx_cond_t cond;
619   msg_mailbox_t mailbox;
620
621   xbt_assert1((channel >= 0)
622               && (channel < msg_global->max_channel), "Invalid channel %d",
623               channel);
624
625   if (PID) {
626     *PID = -1;
627   }
628
629   if (timeout == 0.0) {
630     *PID = MSG_task_probe_from(channel);
631     MSG_RETURN(MSG_OK);
632   } else {
633     CHECK_HOST();
634     h = MSG_host_self();
635     h_simdata = h->simdata;
636
637     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
638
639     while (MSG_mailbox_is_empty(mailbox)) {
640       if (timeout > 0) {
641         if (!first_time) {
642           MSG_RETURN(MSG_OK);
643         }
644       }
645
646       SIMIX_mutex_lock(h_simdata->mutex);
647
648       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
649                   "A process is already blocked on this channel %d", channel);
650
651       cond = SIMIX_cond_init();
652
653       MSG_mailbox_set_cond(mailbox, cond);
654
655       if (timeout > 0) {
656         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
657       } else {
658         SIMIX_cond_wait(cond, h_simdata->mutex);
659       }
660
661       SIMIX_cond_destroy(cond);
662       SIMIX_mutex_unlock(h_simdata->mutex);
663
664       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
665         MSG_RETURN(MSG_HOST_FAILURE);
666       }
667
668       MSG_mailbox_set_cond(mailbox, NULL);
669       first_time = 0;
670     }
671
672     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
673       MSG_RETURN(MSG_OK);
674
675
676     if (PID) {
677       *PID = MSG_process_get_PID(t->simdata->sender);
678     }
679
680     MSG_RETURN(MSG_OK);
681   }
682 }
683
684
685 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
686 {
687   m_host_t h = NULL;
688   simdata_host_t h_simdata = NULL;
689   m_task_t t;
690   int first_time = 1;
691   smx_cond_t cond;
692   msg_mailbox_t mailbox;
693
694   if (PID) {
695     *PID = -1;
696   }
697
698   if (timeout == 0.0) {
699     *PID = MSG_task_listen_from(alias);
700     MSG_RETURN(MSG_OK);
701   } else {
702     CHECK_HOST();
703     h = MSG_host_self();
704     h_simdata = h->simdata;
705
706     DEBUG2("Probing on alias %s (%s)", alias, h->name);
707
708     mailbox = MSG_mailbox_get_by_alias(alias);
709
710     while (MSG_mailbox_is_empty(mailbox)) {
711       if (timeout > 0) {
712         if (!first_time) {
713           MSG_RETURN(MSG_OK);
714         }
715       }
716
717       SIMIX_mutex_lock(h_simdata->mutex);
718
719       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
720                   "A process is already blocked on this alias %s", alias);
721
722       cond = SIMIX_cond_init();
723
724       MSG_mailbox_set_cond(mailbox, cond);
725
726       if (timeout > 0) {
727         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
728       } else {
729         SIMIX_cond_wait(cond, h_simdata->mutex);
730       }
731
732       SIMIX_cond_destroy(cond);
733       SIMIX_mutex_unlock(h_simdata->mutex);
734
735       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
736         MSG_RETURN(MSG_HOST_FAILURE);
737       }
738
739       MSG_mailbox_set_cond(mailbox, NULL);
740       first_time = 0;
741     }
742
743     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
744       MSG_RETURN(MSG_OK);
745
746
747     if (PID) {
748       *PID = MSG_process_get_PID(t->simdata->sender);
749     }
750
751     MSG_RETURN(MSG_OK);
752   }
753 }