Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add a dangerous MSG_task_ref() needed by lua to not free tasks twice when they get...
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \ingroup msg_gos_functions
20  *
21  * \brief Return the last value returned by a MSG function (except
22  * MSG_get_errno...).
23  */
24 MSG_error_t MSG_get_errno(void)
25 {
26   return PROCESS_GET_ERRNO();
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Executes a task and waits for its termination.
31  *
32  * This function is used for describing the behavior of an agent. It
33  * takes only one parameter.
34  * \param task a #m_task_t to execute on the location on which the
35    agent is running.
36  * \return #MSG_FATAL if \a task is not properly initialized and
37  * #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_execute(m_task_t task)
40 {
41   simdata_task_t simdata = NULL;
42   m_process_t self = MSG_process_self();
43   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
44   CHECK_HOST();
45
46   simdata = task->simdata;
47
48   xbt_assert1((!simdata->compute),
49               //&& (task->simdata->refcount == 1), FIXME: since lua bindings play with this refcount to make sure that tasks don't get gc() twice, this field cannot be used here as is anymore
50               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
51
52   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
53
54   simdata->refcount++;
55   SIMIX_mutex_lock(simdata->mutex);
56   simdata->compute =
57     SIMIX_action_execute(SIMIX_host_self(), task->name,
58                          simdata->computation_amount);
59   SIMIX_action_set_priority(simdata->compute, simdata->priority);
60
61   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
62   self->simdata->waiting_action = simdata->compute;
63   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
64   do {
65     SIMIX_cond_wait(simdata->cond, simdata->mutex);
66     state = SIMIX_action_get_state(simdata->compute);
67   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
68   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
69   self->simdata->waiting_action = NULL;
70
71   SIMIX_mutex_unlock(simdata->mutex);
72   simdata->refcount--;
73
74   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
75     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
76     SIMIX_action_destroy(task->simdata->compute);
77     simdata->computation_amount = 0.0;
78     simdata->comm = NULL;
79     simdata->compute = NULL;
80     MSG_RETURN(MSG_OK);
81   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
82     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
83     SIMIX_action_destroy(task->simdata->compute);
84     simdata->comm = NULL;
85     simdata->compute = NULL;
86     MSG_RETURN(MSG_HOST_FAILURE);
87   } else {
88     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
89     SIMIX_action_destroy(task->simdata->compute);
90     simdata->comm = NULL;
91     simdata->compute = NULL;
92     MSG_RETURN(MSG_TASK_CANCELLED);
93   }
94 }
95
96 /** \ingroup m_task_management
97  * \brief Creates a new #m_task_t (a parallel one....).
98  *
99  * A constructor for #m_task_t taking six arguments and returning the
100    corresponding object.
101  * \param name a name for the object. It is for user-level information
102    and can be NULL.
103  * \param host_nb the number of hosts implied in the parallel task.
104  * \param host_list an array of \p host_nb m_host_t.
105  * \param computation_amount an array of \p host_nb
106    doubles. computation_amount[i] is the total number of operations
107    that have to be performed on host_list[i].
108  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
109  * \param data a pointer to any data may want to attach to the new
110    object.  It is for user-level information and can be NULL. It can
111    be retrieved with the function \ref MSG_task_get_data.
112  * \see m_task_t
113  * \return The new corresponding object.
114  */
115 m_task_t
116 MSG_parallel_task_create(const char *name, int host_nb,
117                          const m_host_t * host_list,
118                          double *computation_amount,
119                          double *communication_amount, void *data)
120 {
121   int i;
122   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
123   m_task_t task = xbt_new0(s_m_task_t, 1);
124   task->simdata = simdata;
125
126   /* Task structure */
127   task->name = xbt_strdup(name);
128   task->data = data;
129
130   /* Simulator Data */
131   simdata->computation_amount = 0;
132   simdata->message_size = 0;
133   simdata->cond = SIMIX_cond_init();
134   simdata->mutex = SIMIX_mutex_init();
135   simdata->compute = NULL;
136   simdata->comm = NULL;
137   simdata->rate = -1.0;
138   simdata->refcount = 1;
139   simdata->sender = NULL;
140   simdata->receiver = NULL;
141   simdata->source = NULL;
142
143   simdata->host_nb = host_nb;
144   simdata->host_list = xbt_new0(smx_host_t, host_nb);
145   simdata->comp_amount = computation_amount;
146   simdata->comm_amount = communication_amount;
147
148   for (i = 0; i < host_nb; i++)
149     simdata->host_list[i] = host_list[i]->simdata->smx_host;
150
151   return task;
152 }
153
154 MSG_error_t MSG_parallel_task_execute(m_task_t task)
155 {
156   simdata_task_t simdata = NULL;
157   m_process_t self = MSG_process_self();
158   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
159   CHECK_HOST();
160
161   simdata = task->simdata;
162
163   xbt_assert0((!simdata->compute)
164               && (task->simdata->refcount == 1),
165               "This task is executed somewhere else. Go fix your code!");
166
167   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
168
169   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
170
171   simdata->refcount++;
172
173   SIMIX_mutex_lock(simdata->mutex);
174   simdata->compute =
175     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
176                                   simdata->host_list, simdata->comp_amount,
177                                   simdata->comm_amount, 1.0, -1.0);
178
179   self->simdata->waiting_action = simdata->compute;
180   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
181   do {
182     SIMIX_cond_wait(simdata->cond, simdata->mutex);
183     state = SIMIX_action_get_state(task->simdata->compute);
184   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
185
186   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
187   self->simdata->waiting_action = NULL;
188
189
190   SIMIX_mutex_unlock(simdata->mutex);
191   simdata->refcount--;
192
193   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
194     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
195     SIMIX_action_destroy(task->simdata->compute);
196     simdata->computation_amount = 0.0;
197     simdata->comm = NULL;
198     simdata->compute = NULL;
199     MSG_RETURN(MSG_OK);
200   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
201     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
202     SIMIX_action_destroy(task->simdata->compute);
203     simdata->comm = NULL;
204     simdata->compute = NULL;
205     MSG_RETURN(MSG_HOST_FAILURE);
206   } else {
207     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
208     SIMIX_action_destroy(task->simdata->compute);
209     simdata->comm = NULL;
210     simdata->compute = NULL;
211     MSG_RETURN(MSG_TASK_CANCELLED);
212   }
213
214 }
215
216
217 /** \ingroup msg_gos_functions
218  * \brief Sleep for the specified number of seconds
219  *
220  * Makes the current process sleep until \a time seconds have elapsed.
221  *
222  * \param nb_sec a number of second
223  */
224 MSG_error_t MSG_process_sleep(double nb_sec)
225 {
226   smx_action_t act_sleep;
227   m_process_t proc = MSG_process_self();
228   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
229   smx_mutex_t mutex;
230   smx_cond_t cond;
231
232   /* create action to sleep */
233   act_sleep =
234     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
235                        nb_sec);
236
237   mutex = SIMIX_mutex_init();
238   SIMIX_mutex_lock(mutex);
239
240   /* create conditional and register action to it */
241   cond = SIMIX_cond_init();
242
243   proc->simdata->waiting_action = act_sleep;
244   SIMIX_register_action_to_condition(act_sleep, cond);
245   do {
246     SIMIX_cond_wait(cond, mutex);
247     state = SIMIX_action_get_state(act_sleep);
248   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
249   proc->simdata->waiting_action = NULL;
250   SIMIX_unregister_action_to_condition(act_sleep, cond);
251   SIMIX_mutex_unlock(mutex);
252
253   /* remove variables */
254   SIMIX_cond_destroy(cond);
255   SIMIX_mutex_destroy(mutex);
256
257   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
258     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
259       SIMIX_action_destroy(act_sleep);
260       MSG_RETURN(MSG_HOST_FAILURE);
261     }
262   } else {
263     SIMIX_action_destroy(act_sleep);
264     MSG_RETURN(MSG_HOST_FAILURE);
265   }
266
267   SIMIX_action_destroy(act_sleep);
268   MSG_RETURN(MSG_OK);
269 }
270
271 /** \ingroup msg_gos_functions
272  * \brief Listen on \a channel and waits for receiving a task from \a host.
273  *
274  * It takes three parameters.
275  * \param task a memory location for storing a #m_task_t. It will
276    hold a task when this function will return. Thus \a task should not
277    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
278    those two condition does not hold, there will be a warning message.
279  * \param channel the channel on which the agent should be
280    listening. This value has to be >=0 and < than the maximal
281    number of channels fixed with MSG_set_channel_number().
282  * \param host the host that is to be watched.
283  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
284    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
285  */
286 MSG_error_t
287 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
288 {
289   return MSG_task_get_ext(task, channel, -1, host);
290 }
291
292 /** \ingroup msg_gos_functions
293  * \brief Listen on a channel and wait for receiving a task.
294  *
295  * It takes two parameters.
296  * \param task a memory location for storing a #m_task_t. It will
297    hold a task when this function will return. Thus \a task should not
298    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
299    those two condition does not hold, there will be a warning message.
300  * \param channel the channel on which the agent should be
301    listening. This value has to be >=0 and < than the maximal
302    number of channels fixed with MSG_set_channel_number().
303  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
304  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
305  */
306 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
307 {
308   return MSG_task_get_with_timeout(task, channel, -1);
309 }
310
311 /** \ingroup msg_gos_functions
312  * \brief Listen on a channel and wait for receiving a task with a timeout.
313  *
314  * It takes three parameters.
315  * \param task a memory location for storing a #m_task_t. It will
316    hold a task when this function will return. Thus \a task should not
317    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
318    those two condition does not hold, there will be a warning message.
319  * \param channel the channel on which the agent should be
320    listening. This value has to be >=0 and < than the maximal
321    number of channels fixed with MSG_set_channel_number().
322  * \param max_duration the maximum time to wait for a task before giving
323     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
324     will not be modified and will still be
325     equal to \c NULL when returning.
326  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
327    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
328  */
329 MSG_error_t
330 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
331                           double max_duration)
332 {
333   return MSG_task_get_ext(task, channel, max_duration, NULL);
334 }
335
336 /** \defgroup msg_gos_functions MSG Operating System Functions
337  *  \brief This section describes the functions that can be used
338  *  by an agent for handling some task.
339  */
340
341 MSG_error_t
342 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
343                  m_host_t host)
344 {
345   xbt_assert1((channel >= 0)
346               && (channel < msg_global->max_channel), "Invalid channel %d",
347               channel);
348
349   return
350     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
351                              (MSG_host_self(), channel), task, host, timeout);
352 }
353
354 MSG_error_t
355 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
356 {
357   return MSG_task_receive_ext(task, alias, -1, host);
358 }
359
360 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
361 {
362   return MSG_task_receive_with_timeout(task, alias, -1);
363 }
364
365 MSG_error_t
366 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
367                               double timeout)
368 {
369   return MSG_task_receive_ext(task, alias, timeout, NULL);
370 }
371
372 MSG_error_t
373 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
374                      m_host_t host)
375 {
376   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
377                                   timeout);
378 }
379
380
381 /** \ingroup msg_gos_functions
382  * \brief Put a task on a channel of an host and waits for the end of the
383  * transmission.
384  *
385  * This function is used for describing the behavior of an agent. It
386  * takes three parameter.
387  * \param task a #m_task_t to send on another location. This task
388    will not be usable anymore when the function will return. There is
389    no automatic task duplication and you have to save your parameters
390    before calling this function. Tasks are unique and once it has been
391    sent to another location, you should not access it anymore. You do
392    not need to call MSG_task_destroy() but to avoid using, as an
393    effect of inattention, this task anymore, you definitely should
394    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
395    can be transfered iff it has been correctly created with
396    MSG_task_create().
397  * \param dest the destination of the message
398  * \param channel the channel on which the agent should put this
399    task. This value has to be >=0 and < than the maximal number of
400    channels fixed with MSG_set_channel_number().
401  * \return #MSG_FATAL if \a task is not properly initialized and
402  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
403  * this function was called was shut down. Returns
404  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
405  * (network failure, dest failure)
406  */
407 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
408 {
409   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
410 }
411
412 /** \ingroup msg_gos_functions
413  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
414  * rate.
415  *
416  * \sa MSG_task_put
417  */
418 MSG_error_t
419 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
420                      double maxrate)
421 {
422   task->simdata->rate = maxrate;
423   return MSG_task_put(task, dest, channel);
424 }
425
426 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
427  * host (with a timeout on the waiting of the destination host) and
428  * waits for the end of the transmission.
429  *
430  * This function is used for describing the behavior of an agent. It
431  * takes four parameter.
432  * \param task a #m_task_t to send on another location. This task
433    will not be usable anymore when the function will return. There is
434    no automatic task duplication and you have to save your parameters
435    before calling this function. Tasks are unique and once it has been
436    sent to another location, you should not access it anymore. You do
437    not need to call MSG_task_destroy() but to avoid using, as an
438    effect of inattention, this task anymore, you definitely should
439    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
440    can be transfered iff it has been correctly created with
441    MSG_task_create().
442  * \param dest the destination of the message
443  * \param channel the channel on which the agent should put this
444    task. This value has to be >=0 and < than the maximal number of
445    channels fixed with MSG_set_channel_number().
446  * \param timeout the maximum time to wait for a task before giving
447     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
448     will not be modified
449  * \return #MSG_FATAL if \a task is not properly initialized and
450    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
451    this function was called was shut down. Returns
452    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
453    (network failure, dest failure, timeout...)
454  */
455 MSG_error_t
456 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
457                           double timeout)
458 {
459   xbt_assert1((channel >= 0)
460               && (channel < msg_global->max_channel), "Invalid channel %d",
461               channel);
462
463   return
464     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
465                                  task, timeout);
466 }
467
468 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
469 {
470   return MSG_task_send_with_timeout(task, alias, -1);
471 }
472
473
474 MSG_error_t
475 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
476 {
477   task->simdata->rate = maxrate;
478   return MSG_task_send(task, alias);
479 }
480
481
482 MSG_error_t
483 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
484 {
485   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
486                                       timeout);
487 }
488
489 int MSG_task_listen(const char *alias)
490 {
491   CHECK_HOST();
492
493   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
494 }
495
496 /** \ingroup msg_gos_functions
497  * \brief Test whether there is a pending communication on a channel.
498  *
499  * It takes one parameter.
500  * \param channel the channel on which the agent should be
501    listening. This value has to be >=0 and < than the maximal
502    number of channels fixed with MSG_set_channel_number().
503  * \return 1 if there is a pending communication and 0 otherwise
504  */
505 int MSG_task_Iprobe(m_channel_t channel)
506 {
507   xbt_assert1((channel >= 0)
508               && (channel < msg_global->max_channel), "Invalid channel %d",
509               channel);
510
511   CHECK_HOST();
512
513   return
514     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
515                           (MSG_host_self(), channel));
516 }
517
518 /** \ingroup msg_gos_functions
519
520  * \brief Return the number of tasks waiting to be received on a \a
521    channel and sent by \a host.
522  *
523  * It takes two parameters.
524  * \param channel the channel on which the agent should be
525    listening. This value has to be >=0 and < than the maximal
526    number of channels fixed with MSG_set_channel_number().
527  * \param host the host that is to be watched.
528  * \return the number of tasks waiting to be received on \a channel
529    and sent by \a host.
530  */
531 int MSG_task_probe_from_host(int channel, m_host_t host)
532 {
533   xbt_assert1((channel >= 0)
534               && (channel < msg_global->max_channel), "Invalid channel %d",
535               channel);
536
537   CHECK_HOST();
538
539   return
540     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
541                                              (MSG_host_self(), channel),
542                                              host);
543
544 }
545
546 int MSG_task_listen_from_host(const char *alias, m_host_t host)
547 {
548   CHECK_HOST();
549
550   return
551     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
552                                              host);
553 }
554
555 /** \ingroup msg_gos_functions
556  * \brief Test whether there is a pending communication on a channel, and who sent it.
557  *
558  * It takes one parameter.
559  * \param channel the channel on which the agent should be
560    listening. This value has to be >=0 and < than the maximal
561    number of channels fixed with MSG_set_channel_number().
562  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
563  */
564 int MSG_task_probe_from(m_channel_t channel)
565 {
566   m_task_t task;
567
568   CHECK_HOST();
569
570   xbt_assert1((channel >= 0)
571               && (channel < msg_global->max_channel), "Invalid channel %d",
572               channel);
573
574   if (NULL ==
575       (task =
576        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
577                             (MSG_host_self(), channel))))
578     return -1;
579
580   return MSG_process_get_PID(task->simdata->sender);
581 }
582
583 int MSG_task_listen_from(const char *alias)
584 {
585   m_task_t task;
586
587   CHECK_HOST();
588
589   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
590     return -1;
591
592   return MSG_process_get_PID(task->simdata->sender);
593 }
594
595 /** \ingroup msg_gos_functions
596  * \brief Wait for at most \a max_duration second for a task reception
597    on \a channel.
598
599  * \a PID is updated with the PID of the first process that triggered this event if any.
600  *
601  * It takes three parameters:
602  * \param channel the channel on which the agent should be
603    listening. This value has to be >=0 and < than the maximal.
604    number of channels fixed with MSG_set_channel_number().
605  * \param PID a memory location for storing an int.
606  * \param timeout the maximum time to wait for a task before
607     giving up. In the case of a reception, *\a PID will be updated
608     with the PID of the first process to send a task.
609  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
610    and #MSG_OK otherwise.
611  */
612 MSG_error_t
613 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
614 {
615   m_host_t h = NULL;
616   simdata_host_t h_simdata = NULL;
617   m_task_t t;
618   int first_time = 1;
619   smx_cond_t cond;
620   msg_mailbox_t mailbox;
621
622   xbt_assert1((channel >= 0)
623               && (channel < msg_global->max_channel), "Invalid channel %d",
624               channel);
625
626   if (PID) {
627     *PID = -1;
628   }
629
630   if (timeout == 0.0) {
631     *PID = MSG_task_probe_from(channel);
632     MSG_RETURN(MSG_OK);
633   } else {
634     CHECK_HOST();
635     h = MSG_host_self();
636     h_simdata = h->simdata;
637
638     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
639
640     while (MSG_mailbox_is_empty(mailbox)) {
641       if (timeout > 0) {
642         if (!first_time) {
643           MSG_RETURN(MSG_OK);
644         }
645       }
646
647       SIMIX_mutex_lock(h_simdata->mutex);
648
649       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
650                   "A process is already blocked on this channel %d", channel);
651
652       cond = SIMIX_cond_init();
653
654       MSG_mailbox_set_cond(mailbox, cond);
655
656       if (timeout > 0) {
657         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
658       } else {
659         SIMIX_cond_wait(cond, h_simdata->mutex);
660       }
661
662       SIMIX_cond_destroy(cond);
663       SIMIX_mutex_unlock(h_simdata->mutex);
664
665       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
666         MSG_RETURN(MSG_HOST_FAILURE);
667       }
668
669       MSG_mailbox_set_cond(mailbox, NULL);
670       first_time = 0;
671     }
672
673     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
674       MSG_RETURN(MSG_OK);
675
676
677     if (PID) {
678       *PID = MSG_process_get_PID(t->simdata->sender);
679     }
680
681     MSG_RETURN(MSG_OK);
682   }
683 }
684
685
686 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
687 {
688   m_host_t h = NULL;
689   simdata_host_t h_simdata = NULL;
690   m_task_t t;
691   int first_time = 1;
692   smx_cond_t cond;
693   msg_mailbox_t mailbox;
694
695   if (PID) {
696     *PID = -1;
697   }
698
699   if (timeout == 0.0) {
700     *PID = MSG_task_listen_from(alias);
701     MSG_RETURN(MSG_OK);
702   } else {
703     CHECK_HOST();
704     h = MSG_host_self();
705     h_simdata = h->simdata;
706
707     DEBUG2("Probing on alias %s (%s)", alias, h->name);
708
709     mailbox = MSG_mailbox_get_by_alias(alias);
710
711     while (MSG_mailbox_is_empty(mailbox)) {
712       if (timeout > 0) {
713         if (!first_time) {
714           MSG_RETURN(MSG_OK);
715         }
716       }
717
718       SIMIX_mutex_lock(h_simdata->mutex);
719
720       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
721                   "A process is already blocked on this alias %s", alias);
722
723       cond = SIMIX_cond_init();
724
725       MSG_mailbox_set_cond(mailbox, cond);
726
727       if (timeout > 0) {
728         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
729       } else {
730         SIMIX_cond_wait(cond, h_simdata->mutex);
731       }
732
733       SIMIX_cond_destroy(cond);
734       SIMIX_mutex_unlock(h_simdata->mutex);
735
736       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
737         MSG_RETURN(MSG_HOST_FAILURE);
738       }
739
740       MSG_mailbox_set_cond(mailbox, NULL);
741       first_time = 0;
742     }
743
744     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
745       MSG_RETURN(MSG_OK);
746
747
748     if (PID) {
749       *PID = MSG_process_get_PID(t->simdata->sender);
750     }
751
752     MSG_RETURN(MSG_OK);
753   }
754 }