Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
2 /* Copyright (c) 2007 Bruno Donassolo.                                      */
3 /* All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "msg/private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 #include "mailbox.h"
12
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \ingroup msg_gos_functions
18  *
19  * \brief Return the last value returned by a MSG function (except
20  * MSG_get_errno...).
21  */
22 MSG_error_t MSG_get_errno(void)
23 {
24   return PROCESS_GET_ERRNO();
25 }
26
27 /** \ingroup msg_gos_functions
28  * \brief Executes a task and waits for its termination.
29  *
30  * This function is used for describing the behavior of an agent. It
31  * takes only one parameter.
32  * \param task a #m_task_t to execute on the location on which the
33    agent is running.
34  * \return #MSG_FATAL if \a task is not properly initialized and
35  * #MSG_OK otherwise.
36  */
37 MSG_error_t MSG_task_execute(m_task_t task)
38 {
39   simdata_task_t simdata = NULL;
40   m_process_t self = MSG_process_self();
41   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
42   CHECK_HOST();
43 #ifdef HAVE_TRACING
44   TRACE_msg_task_execute_start (task);
45 #endif
46
47   simdata = task->simdata;
48
49   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
50               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
51
52   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
53
54   if (simdata->computation_amount == 0) {
55 #ifdef HAVE_TRACING
56     TRACE_msg_task_execute_end (task);
57 #endif
58     return MSG_OK;
59   }
60   simdata->refcount++;
61   SIMIX_mutex_lock(simdata->mutex);
62   simdata->compute =
63     SIMIX_action_execute(SIMIX_host_self(), task->name,
64                          simdata->computation_amount);
65   SIMIX_action_set_priority(simdata->compute, simdata->priority);
66
67   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
68   self->simdata->waiting_action = simdata->compute;
69   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
70   do {
71     SIMIX_cond_wait(simdata->cond, simdata->mutex);
72     state = SIMIX_action_get_state(simdata->compute);
73   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
74   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
75   self->simdata->waiting_action = NULL;
76
77   SIMIX_mutex_unlock(simdata->mutex);
78   simdata->refcount--;
79
80   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
81     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
82     SIMIX_action_destroy(task->simdata->compute);
83     simdata->computation_amount = 0.0;
84     simdata->comm = NULL;
85     simdata->compute = NULL;
86 #ifdef HAVE_TRACING
87     TRACE_msg_task_execute_end (task);
88 #endif
89     MSG_RETURN(MSG_OK);
90   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
91     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
92     SIMIX_action_destroy(task->simdata->compute);
93     simdata->comm = NULL;
94     simdata->compute = NULL;
95 #ifdef HAVE_TRACING
96     TRACE_msg_task_execute_end (task);
97 #endif
98     MSG_RETURN(MSG_HOST_FAILURE);
99   } else {
100     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
101     SIMIX_action_destroy(task->simdata->compute);
102     simdata->comm = NULL;
103     simdata->compute = NULL;
104 #ifdef HAVE_TRACING
105     TRACE_msg_task_execute_end (task);
106 #endif
107     MSG_RETURN(MSG_TASK_CANCELLED);
108   }
109 }
110
111 /** \ingroup m_task_management
112  * \brief Creates a new #m_task_t (a parallel one....).
113  *
114  * A constructor for #m_task_t taking six arguments and returning the
115    corresponding object.
116  * \param name a name for the object. It is for user-level information
117    and can be NULL.
118  * \param host_nb the number of hosts implied in the parallel task.
119  * \param host_list an array of \p host_nb m_host_t.
120  * \param computation_amount an array of \p host_nb
121    doubles. computation_amount[i] is the total number of operations
122    that have to be performed on host_list[i].
123  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
124  * \param data a pointer to any data may want to attach to the new
125    object.  It is for user-level information and can be NULL. It can
126    be retrieved with the function \ref MSG_task_get_data.
127  * \see m_task_t
128  * \return The new corresponding object.
129  */
130 m_task_t
131 MSG_parallel_task_create(const char *name, int host_nb,
132                          const m_host_t * host_list,
133                          double *computation_amount,
134                          double *communication_amount, void *data)
135 {
136   int i;
137   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
138   m_task_t task = xbt_new0(s_m_task_t, 1);
139   task->simdata = simdata;
140
141   /* Task structure */
142   task->name = xbt_strdup(name);
143   task->data = data;
144
145   /* Simulator Data */
146   simdata->computation_amount = 0;
147   simdata->message_size = 0;
148   simdata->cond = SIMIX_cond_init();
149   simdata->mutex = SIMIX_mutex_init();
150   simdata->compute = NULL;
151   simdata->comm = NULL;
152   simdata->rate = -1.0;
153   simdata->refcount = 1;
154   simdata->sender = NULL;
155   simdata->receiver = NULL;
156   simdata->source = NULL;
157
158   simdata->host_nb = host_nb;
159   simdata->host_list = xbt_new0(smx_host_t, host_nb);
160   simdata->comp_amount = computation_amount;
161   simdata->comm_amount = communication_amount;
162
163   for (i = 0; i < host_nb; i++)
164     simdata->host_list[i] = host_list[i]->simdata->smx_host;
165
166   return task;
167 }
168
169 MSG_error_t MSG_parallel_task_execute(m_task_t task)
170 {
171   simdata_task_t simdata = NULL;
172   m_process_t self = MSG_process_self();
173   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
174   CHECK_HOST();
175
176   simdata = task->simdata;
177
178   xbt_assert0((!simdata->compute)
179               && (task->simdata->refcount == 1),
180               "This task is executed somewhere else. Go fix your code!");
181
182   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
183
184   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
185
186   simdata->refcount++;
187
188   SIMIX_mutex_lock(simdata->mutex);
189   simdata->compute =
190     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
191                                   simdata->host_list, simdata->comp_amount,
192                                   simdata->comm_amount, 1.0, -1.0);
193
194   self->simdata->waiting_action = simdata->compute;
195   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
196   do {
197     SIMIX_cond_wait(simdata->cond, simdata->mutex);
198     state = SIMIX_action_get_state(task->simdata->compute);
199   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
200
201   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
202   self->simdata->waiting_action = NULL;
203
204
205   SIMIX_mutex_unlock(simdata->mutex);
206   simdata->refcount--;
207
208   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
209     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
210     SIMIX_action_destroy(task->simdata->compute);
211     simdata->computation_amount = 0.0;
212     simdata->comm = NULL;
213     simdata->compute = NULL;
214     MSG_RETURN(MSG_OK);
215   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
216     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
217     SIMIX_action_destroy(task->simdata->compute);
218     simdata->comm = NULL;
219     simdata->compute = NULL;
220     MSG_RETURN(MSG_HOST_FAILURE);
221   } else {
222     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
223     SIMIX_action_destroy(task->simdata->compute);
224     simdata->comm = NULL;
225     simdata->compute = NULL;
226     MSG_RETURN(MSG_TASK_CANCELLED);
227   }
228
229 }
230
231
232 /** \ingroup msg_gos_functions
233  * \brief Sleep for the specified number of seconds
234  *
235  * Makes the current process sleep until \a time seconds have elapsed.
236  *
237  * \param nb_sec a number of second
238  */
239 MSG_error_t MSG_process_sleep(double nb_sec)
240 {
241   smx_action_t act_sleep;
242   m_process_t proc = MSG_process_self();
243   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
244   smx_mutex_t mutex;
245   smx_cond_t cond;
246
247 #ifdef HAVE_TRACING
248   TRACE_msg_process_sleep_in (MSG_process_self());
249 #endif
250
251   /* create action to sleep */
252   act_sleep =
253     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
254                        nb_sec);
255
256   mutex = SIMIX_mutex_init();
257   SIMIX_mutex_lock(mutex);
258
259   /* create conditional and register action to it */
260   cond = SIMIX_cond_init();
261
262   proc->simdata->waiting_action = act_sleep;
263   SIMIX_register_action_to_condition(act_sleep, cond);
264   do {
265     SIMIX_cond_wait(cond, mutex);
266     state = SIMIX_action_get_state(act_sleep);
267   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
268   proc->simdata->waiting_action = NULL;
269   SIMIX_unregister_action_to_condition(act_sleep, cond);
270   SIMIX_mutex_unlock(mutex);
271
272   /* remove variables */
273   SIMIX_cond_destroy(cond);
274   SIMIX_mutex_destroy(mutex);
275
276   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
277     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
278       SIMIX_action_destroy(act_sleep);
279 #ifdef HAVE_TRACING
280       TRACE_msg_process_sleep_out (MSG_process_self());
281 #endif
282       MSG_RETURN(MSG_HOST_FAILURE);
283     }
284   } else {
285     SIMIX_action_destroy(act_sleep);
286 #ifdef HAVE_TRACING
287     TRACE_msg_process_sleep_out (MSG_process_self());
288 #endif
289     MSG_RETURN(MSG_HOST_FAILURE);
290   }
291
292   SIMIX_action_destroy(act_sleep);
293 #ifdef HAVE_TRACING
294   TRACE_msg_process_sleep_out (MSG_process_self());
295 #endif
296   MSG_RETURN(MSG_OK);
297 }
298
299 /** \ingroup msg_gos_functions
300  * \brief Listen on \a channel and waits for receiving a task from \a host.
301  *
302  * It takes three parameters.
303  * \param task a memory location for storing a #m_task_t. It will
304    hold a task when this function will return. Thus \a task should not
305    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
306    those two condition does not hold, there will be a warning message.
307  * \param channel the channel on which the agent should be
308    listening. This value has to be >=0 and < than the maximal
309    number of channels fixed with MSG_set_channel_number().
310  * \param host the host that is to be watched.
311  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
312    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
313  */
314 MSG_error_t
315 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
316 {
317   return MSG_task_get_ext(task, channel, -1, host);
318 }
319
320 /** \ingroup msg_gos_functions
321  * \brief Listen on a channel and wait for receiving a task.
322  *
323  * It takes two parameters.
324  * \param task a memory location for storing a #m_task_t. It will
325    hold a task when this function will return. Thus \a task should not
326    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
327    those two condition does not hold, there will be a warning message.
328  * \param channel the channel on which the agent should be
329    listening. This value has to be >=0 and < than the maximal
330    number of channels fixed with MSG_set_channel_number().
331  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
332  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
333  */
334 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
335 {
336   return MSG_task_get_with_timeout(task, channel, -1);
337 }
338
339 /** \ingroup msg_gos_functions
340  * \brief Listen on a channel and wait for receiving a task with a timeout.
341  *
342  * It takes three parameters.
343  * \param task a memory location for storing a #m_task_t. It will
344    hold a task when this function will return. Thus \a task should not
345    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
346    those two condition does not hold, there will be a warning message.
347  * \param channel the channel on which the agent should be
348    listening. This value has to be >=0 and < than the maximal
349    number of channels fixed with MSG_set_channel_number().
350  * \param max_duration the maximum time to wait for a task before giving
351     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
352     will not be modified and will still be
353     equal to \c NULL when returning.
354  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
355    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
356  */
357 MSG_error_t
358 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
359                           double max_duration)
360 {
361   return MSG_task_get_ext(task, channel, max_duration, NULL);
362 }
363
364 /** \defgroup msg_gos_functions MSG Operating System Functions
365  *  \brief This section describes the functions that can be used
366  *  by an agent for handling some task.
367  */
368
369 MSG_error_t
370 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
371                  m_host_t host)
372 {
373   xbt_assert1((channel >= 0)
374               && (channel < msg_global->max_channel), "Invalid channel %d",
375               channel);
376
377   return
378     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
379                              (MSG_host_self(), channel), task, host, timeout);
380 }
381
382 MSG_error_t
383 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
384 {
385   return MSG_task_receive_ext(task, alias, -1, host);
386 }
387
388 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
389 {
390   return MSG_task_receive_with_timeout(task, alias, -1);
391 }
392
393 MSG_error_t
394 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
395                               double timeout)
396 {
397   return MSG_task_receive_ext(task, alias, timeout, NULL);
398 }
399
400 MSG_error_t
401 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
402                      m_host_t host)
403 {
404   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
405                                   timeout);
406 }
407
408
409 /** \ingroup msg_gos_functions
410  * \brief Put a task on a channel of an host and waits for the end of the
411  * transmission.
412  *
413  * This function is used for describing the behavior of an agent. It
414  * takes three parameter.
415  * \param task a #m_task_t to send on another location. This task
416    will not be usable anymore when the function will return. There is
417    no automatic task duplication and you have to save your parameters
418    before calling this function. Tasks are unique and once it has been
419    sent to another location, you should not access it anymore. You do
420    not need to call MSG_task_destroy() but to avoid using, as an
421    effect of inattention, this task anymore, you definitely should
422    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
423    can be transfered iff it has been correctly created with
424    MSG_task_create().
425  * \param dest the destination of the message
426  * \param channel the channel on which the agent should put this
427    task. This value has to be >=0 and < than the maximal number of
428    channels fixed with MSG_set_channel_number().
429  * \return #MSG_FATAL if \a task is not properly initialized and
430  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
431  * this function was called was shut down. Returns
432  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
433  * (network failure, dest failure)
434  */
435 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
436 {
437   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
438 }
439
440 /** \ingroup msg_gos_functions
441  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
442  * rate.
443  *
444  * \sa MSG_task_put
445  */
446 MSG_error_t
447 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
448                      double maxrate)
449 {
450   task->simdata->rate = maxrate;
451   return MSG_task_put(task, dest, channel);
452 }
453
454 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
455  * host (with a timeout on the waiting of the destination host) and
456  * waits for the end of the transmission.
457  *
458  * This function is used for describing the behavior of an agent. It
459  * takes four parameter.
460  * \param task a #m_task_t to send on another location. This task
461    will not be usable anymore when the function will return. There is
462    no automatic task duplication and you have to save your parameters
463    before calling this function. Tasks are unique and once it has been
464    sent to another location, you should not access it anymore. You do
465    not need to call MSG_task_destroy() but to avoid using, as an
466    effect of inattention, this task anymore, you definitely should
467    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
468    can be transfered iff it has been correctly created with
469    MSG_task_create().
470  * \param dest the destination of the message
471  * \param channel the channel on which the agent should put this
472    task. This value has to be >=0 and < than the maximal number of
473    channels fixed with MSG_set_channel_number().
474  * \param timeout the maximum time to wait for a task before giving
475     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
476     will not be modified
477  * \return #MSG_FATAL if \a task is not properly initialized and
478    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
479    this function was called was shut down. Returns
480    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
481    (network failure, dest failure, timeout...)
482  */
483 MSG_error_t
484 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
485                           double timeout)
486 {
487   xbt_assert1((channel >= 0)
488               && (channel < msg_global->max_channel), "Invalid channel %d",
489               channel);
490
491   return
492     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
493                                  task, timeout);
494 }
495
496 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
497 {
498   return MSG_task_send_with_timeout(task, alias, -1);
499 }
500
501
502 MSG_error_t
503 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
504 {
505   task->simdata->rate = maxrate;
506   return MSG_task_send(task, alias);
507 }
508
509
510 MSG_error_t
511 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
512 {
513   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
514                                       timeout);
515 }
516
517 int MSG_task_listen(const char *alias)
518 {
519   CHECK_HOST();
520
521   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
522 }
523
524 /** \ingroup msg_gos_functions
525  * \brief Test whether there is a pending communication on a channel.
526  *
527  * It takes one parameter.
528  * \param channel the channel on which the agent should be
529    listening. This value has to be >=0 and < than the maximal
530    number of channels fixed with MSG_set_channel_number().
531  * \return 1 if there is a pending communication and 0 otherwise
532  */
533 int MSG_task_Iprobe(m_channel_t channel)
534 {
535   xbt_assert1((channel >= 0)
536               && (channel < msg_global->max_channel), "Invalid channel %d",
537               channel);
538
539   CHECK_HOST();
540
541   return
542     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
543                           (MSG_host_self(), channel));
544 }
545
546 /** \ingroup msg_gos_functions
547
548  * \brief Return the number of tasks waiting to be received on a \a
549    channel and sent by \a host.
550  *
551  * It takes two parameters.
552  * \param channel the channel on which the agent should be
553    listening. This value has to be >=0 and < than the maximal
554    number of channels fixed with MSG_set_channel_number().
555  * \param host the host that is to be watched.
556  * \return the number of tasks waiting to be received on \a channel
557    and sent by \a host.
558  */
559 int MSG_task_probe_from_host(int channel, m_host_t host)
560 {
561   xbt_assert1((channel >= 0)
562               && (channel < msg_global->max_channel), "Invalid channel %d",
563               channel);
564
565   CHECK_HOST();
566
567   return
568     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
569                                              (MSG_host_self(), channel),
570                                              host);
571
572 }
573
574 int MSG_task_listen_from_host(const char *alias, m_host_t host)
575 {
576   CHECK_HOST();
577
578   return
579     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
580                                              host);
581 }
582
583 /** \ingroup msg_gos_functions
584  * \brief Test whether there is a pending communication on a channel, and who sent it.
585  *
586  * It takes one parameter.
587  * \param channel the channel on which the agent should be
588    listening. This value has to be >=0 and < than the maximal
589    number of channels fixed with MSG_set_channel_number().
590  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
591  */
592 int MSG_task_probe_from(m_channel_t channel)
593 {
594   m_task_t task;
595
596   CHECK_HOST();
597
598   xbt_assert1((channel >= 0)
599               && (channel < msg_global->max_channel), "Invalid channel %d",
600               channel);
601
602   if (NULL ==
603       (task =
604        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
605                             (MSG_host_self(), channel))))
606     return -1;
607
608   return MSG_process_get_PID(task->simdata->sender);
609 }
610
611 int MSG_task_listen_from(const char *alias)
612 {
613   m_task_t task;
614
615   CHECK_HOST();
616
617   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
618     return -1;
619
620   return MSG_process_get_PID(task->simdata->sender);
621 }
622
623 /** \ingroup msg_gos_functions
624  * \brief Wait for at most \a max_duration second for a task reception
625    on \a channel.
626
627  * \a PID is updated with the PID of the first process that triggered this event if any.
628  *
629  * It takes three parameters:
630  * \param channel the channel on which the agent should be
631    listening. This value has to be >=0 and < than the maximal.
632    number of channels fixed with MSG_set_channel_number().
633  * \param PID a memory location for storing an int.
634  * \param timeout the maximum time to wait for a task before
635     giving up. In the case of a reception, *\a PID will be updated
636     with the PID of the first process to send a task.
637  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
638    and #MSG_OK otherwise.
639  */
640 MSG_error_t
641 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
642 {
643   m_host_t h = NULL;
644   simdata_host_t h_simdata = NULL;
645   m_task_t t;
646   int first_time = 1;
647   smx_cond_t cond;
648   msg_mailbox_t mailbox;
649
650   xbt_assert1((channel >= 0)
651               && (channel < msg_global->max_channel), "Invalid channel %d",
652               channel);
653
654   if (PID) {
655     *PID = -1;
656   }
657
658   if (timeout == 0.0) {
659     *PID = MSG_task_probe_from(channel);
660     MSG_RETURN(MSG_OK);
661   } else {
662     CHECK_HOST();
663     h = MSG_host_self();
664     h_simdata = h->simdata;
665
666     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
667
668     while (MSG_mailbox_is_empty(mailbox)) {
669       if (timeout > 0) {
670         if (!first_time) {
671           MSG_RETURN(MSG_OK);
672         }
673       }
674
675       SIMIX_mutex_lock(h_simdata->mutex);
676
677       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
678                   "A process is already blocked on this channel %d", channel);
679
680       cond = SIMIX_cond_init();
681
682       MSG_mailbox_set_cond(mailbox, cond);
683
684       if (timeout > 0) {
685         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
686       } else {
687         SIMIX_cond_wait(cond, h_simdata->mutex);
688       }
689
690       SIMIX_cond_destroy(cond);
691       SIMIX_mutex_unlock(h_simdata->mutex);
692
693       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
694         MSG_RETURN(MSG_HOST_FAILURE);
695       }
696
697       MSG_mailbox_set_cond(mailbox, NULL);
698       first_time = 0;
699     }
700
701     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
702       MSG_RETURN(MSG_OK);
703
704
705     if (PID) {
706       *PID = MSG_process_get_PID(t->simdata->sender);
707     }
708
709     MSG_RETURN(MSG_OK);
710   }
711 }
712
713
714 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
715 {
716   m_host_t h = NULL;
717   simdata_host_t h_simdata = NULL;
718   m_task_t t;
719   int first_time = 1;
720   smx_cond_t cond;
721   msg_mailbox_t mailbox;
722
723   if (PID) {
724     *PID = -1;
725   }
726
727   if (timeout == 0.0) {
728     *PID = MSG_task_listen_from(alias);
729     MSG_RETURN(MSG_OK);
730   } else {
731     CHECK_HOST();
732     h = MSG_host_self();
733     h_simdata = h->simdata;
734
735     DEBUG2("Probing on alias %s (%s)", alias, h->name);
736
737     mailbox = MSG_mailbox_get_by_alias(alias);
738
739     while (MSG_mailbox_is_empty(mailbox)) {
740       if (timeout > 0) {
741         if (!first_time) {
742           MSG_RETURN(MSG_OK);
743         }
744       }
745
746       SIMIX_mutex_lock(h_simdata->mutex);
747
748       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
749                   "A process is already blocked on this alias %s", alias);
750
751       cond = SIMIX_cond_init();
752
753       MSG_mailbox_set_cond(mailbox, cond);
754
755       if (timeout > 0) {
756         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
757       } else {
758         SIMIX_cond_wait(cond, h_simdata->mutex);
759       }
760
761       SIMIX_cond_destroy(cond);
762       SIMIX_mutex_unlock(h_simdata->mutex);
763
764       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
765         MSG_RETURN(MSG_HOST_FAILURE);
766       }
767
768       MSG_mailbox_set_cond(mailbox, NULL);
769       first_time = 0;
770     }
771
772     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
773       MSG_RETURN(MSG_OK);
774
775
776     if (PID) {
777       *PID = MSG_process_get_PID(t->simdata->sender);
778     }
779
780     MSG_RETURN(MSG_OK);
781   }
782 }