Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix: correct trace mask checking
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \ingroup msg_gos_functions
20  *
21  * \brief Return the last value returned by a MSG function (except
22  * MSG_get_errno...).
23  */
24 MSG_error_t MSG_get_errno(void)
25 {
26   return PROCESS_GET_ERRNO();
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Executes a task and waits for its termination.
31  *
32  * This function is used for describing the behavior of an agent. It
33  * takes only one parameter.
34  * \param task a #m_task_t to execute on the location on which the
35    agent is running.
36  * \return #MSG_FATAL if \a task is not properly initialized and
37  * #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_execute(m_task_t task)
40 {
41   simdata_task_t simdata = NULL;
42   m_process_t self = MSG_process_self();
43   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
44   CHECK_HOST();
45 #ifdef HAVE_TRACING
46   TRACE_msg_task_execute_start (task);
47 #endif
48
49   simdata = task->simdata;
50
51   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
52               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
53
54   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
55
56   if (simdata->computation_amount == 0) {
57 #ifdef HAVE_TRACING
58     TRACE_msg_task_execute_end (task);
59 #endif
60     return MSG_OK;
61   }
62   simdata->refcount++;
63   SIMIX_mutex_lock(simdata->mutex);
64   simdata->compute =
65     SIMIX_action_execute(SIMIX_host_self(), task->name,
66                          simdata->computation_amount);
67   SIMIX_action_set_priority(simdata->compute, simdata->priority);
68
69   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
70   self->simdata->waiting_action = simdata->compute;
71   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
72   do {
73     SIMIX_cond_wait(simdata->cond, simdata->mutex);
74     state = SIMIX_action_get_state(simdata->compute);
75   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
76   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
77   self->simdata->waiting_action = NULL;
78
79   SIMIX_mutex_unlock(simdata->mutex);
80   simdata->refcount--;
81
82   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
83     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
84     SIMIX_action_destroy(task->simdata->compute);
85     simdata->computation_amount = 0.0;
86     simdata->comm = NULL;
87     simdata->compute = NULL;
88 #ifdef HAVE_TRACING
89     TRACE_msg_task_execute_end (task);
90 #endif
91     MSG_RETURN(MSG_OK);
92   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
93     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
94     SIMIX_action_destroy(task->simdata->compute);
95     simdata->comm = NULL;
96     simdata->compute = NULL;
97 #ifdef HAVE_TRACING
98     TRACE_msg_task_execute_end (task);
99 #endif
100     MSG_RETURN(MSG_HOST_FAILURE);
101   } else {
102     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
103     SIMIX_action_destroy(task->simdata->compute);
104     simdata->comm = NULL;
105     simdata->compute = NULL;
106 #ifdef HAVE_TRACING
107     TRACE_msg_task_execute_end (task);
108 #endif
109     MSG_RETURN(MSG_TASK_CANCELLED);
110   }
111 }
112
113 /** \ingroup m_task_management
114  * \brief Creates a new #m_task_t (a parallel one....).
115  *
116  * A constructor for #m_task_t taking six arguments and returning the
117    corresponding object.
118  * \param name a name for the object. It is for user-level information
119    and can be NULL.
120  * \param host_nb the number of hosts implied in the parallel task.
121  * \param host_list an array of \p host_nb m_host_t.
122  * \param computation_amount an array of \p host_nb
123    doubles. computation_amount[i] is the total number of operations
124    that have to be performed on host_list[i].
125  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
126  * \param data a pointer to any data may want to attach to the new
127    object.  It is for user-level information and can be NULL. It can
128    be retrieved with the function \ref MSG_task_get_data.
129  * \see m_task_t
130  * \return The new corresponding object.
131  */
132 m_task_t
133 MSG_parallel_task_create(const char *name, int host_nb,
134                          const m_host_t * host_list,
135                          double *computation_amount,
136                          double *communication_amount, void *data)
137 {
138   int i;
139   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
140   m_task_t task = xbt_new0(s_m_task_t, 1);
141   task->simdata = simdata;
142
143   /* Task structure */
144   task->name = xbt_strdup(name);
145   task->data = data;
146
147   /* Simulator Data */
148   simdata->computation_amount = 0;
149   simdata->message_size = 0;
150   simdata->cond = SIMIX_cond_init();
151   simdata->mutex = SIMIX_mutex_init();
152   simdata->compute = NULL;
153   simdata->comm = NULL;
154   simdata->rate = -1.0;
155   simdata->refcount = 1;
156   simdata->sender = NULL;
157   simdata->receiver = NULL;
158   simdata->source = NULL;
159
160   simdata->host_nb = host_nb;
161   simdata->host_list = xbt_new0(smx_host_t, host_nb);
162   simdata->comp_amount = computation_amount;
163   simdata->comm_amount = communication_amount;
164
165   for (i = 0; i < host_nb; i++)
166     simdata->host_list[i] = host_list[i]->simdata->smx_host;
167
168   return task;
169 }
170
171 MSG_error_t MSG_parallel_task_execute(m_task_t task)
172 {
173   simdata_task_t simdata = NULL;
174   m_process_t self = MSG_process_self();
175   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
176   CHECK_HOST();
177
178   simdata = task->simdata;
179
180   xbt_assert0((!simdata->compute)
181               && (task->simdata->refcount == 1),
182               "This task is executed somewhere else. Go fix your code!");
183
184   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
185
186   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
187
188   simdata->refcount++;
189
190   SIMIX_mutex_lock(simdata->mutex);
191   simdata->compute =
192     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
193                                   simdata->host_list, simdata->comp_amount,
194                                   simdata->comm_amount, 1.0, -1.0);
195
196   self->simdata->waiting_action = simdata->compute;
197   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
198   do {
199     SIMIX_cond_wait(simdata->cond, simdata->mutex);
200     state = SIMIX_action_get_state(task->simdata->compute);
201   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
202
203   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
204   self->simdata->waiting_action = NULL;
205
206
207   SIMIX_mutex_unlock(simdata->mutex);
208   simdata->refcount--;
209
210   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
211     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
212     SIMIX_action_destroy(task->simdata->compute);
213     simdata->computation_amount = 0.0;
214     simdata->comm = NULL;
215     simdata->compute = NULL;
216     MSG_RETURN(MSG_OK);
217   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
218     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
219     SIMIX_action_destroy(task->simdata->compute);
220     simdata->comm = NULL;
221     simdata->compute = NULL;
222     MSG_RETURN(MSG_HOST_FAILURE);
223   } else {
224     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225     SIMIX_action_destroy(task->simdata->compute);
226     simdata->comm = NULL;
227     simdata->compute = NULL;
228     MSG_RETURN(MSG_TASK_CANCELLED);
229   }
230
231 }
232
233
234 /** \ingroup msg_gos_functions
235  * \brief Sleep for the specified number of seconds
236  *
237  * Makes the current process sleep until \a time seconds have elapsed.
238  *
239  * \param nb_sec a number of second
240  */
241 MSG_error_t MSG_process_sleep(double nb_sec)
242 {
243   smx_action_t act_sleep;
244   m_process_t proc = MSG_process_self();
245   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
246   smx_mutex_t mutex;
247   smx_cond_t cond;
248
249 #ifdef HAVE_TRACING
250   TRACE_msg_process_sleep_in (MSG_process_self());
251 #endif
252
253   /* create action to sleep */
254   act_sleep =
255     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
256                        nb_sec);
257
258   mutex = SIMIX_mutex_init();
259   SIMIX_mutex_lock(mutex);
260
261   /* create conditional and register action to it */
262   cond = SIMIX_cond_init();
263
264   proc->simdata->waiting_action = act_sleep;
265   SIMIX_register_action_to_condition(act_sleep, cond);
266   do {
267     SIMIX_cond_wait(cond, mutex);
268     state = SIMIX_action_get_state(act_sleep);
269   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
270   proc->simdata->waiting_action = NULL;
271   SIMIX_unregister_action_to_condition(act_sleep, cond);
272   SIMIX_mutex_unlock(mutex);
273
274   /* remove variables */
275   SIMIX_cond_destroy(cond);
276   SIMIX_mutex_destroy(mutex);
277
278   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
279     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
280       SIMIX_action_destroy(act_sleep);
281 #ifdef HAVE_TRACING
282       TRACE_msg_process_sleep_out (MSG_process_self());
283 #endif
284       MSG_RETURN(MSG_HOST_FAILURE);
285     }
286   } else {
287     SIMIX_action_destroy(act_sleep);
288 #ifdef HAVE_TRACING
289     TRACE_msg_process_sleep_out (MSG_process_self());
290 #endif
291     MSG_RETURN(MSG_HOST_FAILURE);
292   }
293
294   SIMIX_action_destroy(act_sleep);
295 #ifdef HAVE_TRACING
296   TRACE_msg_process_sleep_out (MSG_process_self());
297 #endif
298   MSG_RETURN(MSG_OK);
299 }
300
301 /** \ingroup msg_gos_functions
302  * \brief Listen on \a channel and waits for receiving a task from \a host.
303  *
304  * It takes three parameters.
305  * \param task a memory location for storing a #m_task_t. It will
306    hold a task when this function will return. Thus \a task should not
307    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
308    those two condition does not hold, there will be a warning message.
309  * \param channel the channel on which the agent should be
310    listening. This value has to be >=0 and < than the maximal
311    number of channels fixed with MSG_set_channel_number().
312  * \param host the host that is to be watched.
313  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
314    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
315  */
316 MSG_error_t
317 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
318 {
319   return MSG_task_get_ext(task, channel, -1, host);
320 }
321
322 /** \ingroup msg_gos_functions
323  * \brief Listen on a channel and wait for receiving a task.
324  *
325  * It takes two parameters.
326  * \param task a memory location for storing a #m_task_t. It will
327    hold a task when this function will return. Thus \a task should not
328    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
329    those two condition does not hold, there will be a warning message.
330  * \param channel the channel on which the agent should be
331    listening. This value has to be >=0 and < than the maximal
332    number of channels fixed with MSG_set_channel_number().
333  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
334  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
335  */
336 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
337 {
338   return MSG_task_get_with_timeout(task, channel, -1);
339 }
340
341 /** \ingroup msg_gos_functions
342  * \brief Listen on a channel and wait for receiving a task with a timeout.
343  *
344  * It takes three parameters.
345  * \param task a memory location for storing a #m_task_t. It will
346    hold a task when this function will return. Thus \a task should not
347    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
348    those two condition does not hold, there will be a warning message.
349  * \param channel the channel on which the agent should be
350    listening. This value has to be >=0 and < than the maximal
351    number of channels fixed with MSG_set_channel_number().
352  * \param max_duration the maximum time to wait for a task before giving
353     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
354     will not be modified and will still be
355     equal to \c NULL when returning.
356  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
357    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
358  */
359 MSG_error_t
360 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
361                           double max_duration)
362 {
363   return MSG_task_get_ext(task, channel, max_duration, NULL);
364 }
365
366 /** \defgroup msg_gos_functions MSG Operating System Functions
367  *  \brief This section describes the functions that can be used
368  *  by an agent for handling some task.
369  */
370
371 MSG_error_t
372 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
373                  m_host_t host)
374 {
375   xbt_assert1((channel >= 0)
376               && (channel < msg_global->max_channel), "Invalid channel %d",
377               channel);
378
379   return
380     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
381                              (MSG_host_self(), channel), task, host, timeout);
382 }
383
384 MSG_error_t
385 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
386 {
387   return MSG_task_receive_ext(task, alias, -1, host);
388 }
389
390 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
391 {
392   return MSG_task_receive_with_timeout(task, alias, -1);
393 }
394
395 MSG_error_t
396 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
397                               double timeout)
398 {
399   return MSG_task_receive_ext(task, alias, timeout, NULL);
400 }
401
402 MSG_error_t
403 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
404                      m_host_t host)
405 {
406   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
407                                   timeout);
408 }
409
410
411 /** \ingroup msg_gos_functions
412  * \brief Put a task on a channel of an host and waits for the end of the
413  * transmission.
414  *
415  * This function is used for describing the behavior of an agent. It
416  * takes three parameter.
417  * \param task a #m_task_t to send on another location. This task
418    will not be usable anymore when the function will return. There is
419    no automatic task duplication and you have to save your parameters
420    before calling this function. Tasks are unique and once it has been
421    sent to another location, you should not access it anymore. You do
422    not need to call MSG_task_destroy() but to avoid using, as an
423    effect of inattention, this task anymore, you definitely should
424    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
425    can be transfered iff it has been correctly created with
426    MSG_task_create().
427  * \param dest the destination of the message
428  * \param channel the channel on which the agent should put this
429    task. This value has to be >=0 and < than the maximal number of
430    channels fixed with MSG_set_channel_number().
431  * \return #MSG_FATAL if \a task is not properly initialized and
432  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
433  * this function was called was shut down. Returns
434  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
435  * (network failure, dest failure)
436  */
437 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
438 {
439   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
440 }
441
442 /** \ingroup msg_gos_functions
443  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
444  * rate.
445  *
446  * \sa MSG_task_put
447  */
448 MSG_error_t
449 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
450                      double maxrate)
451 {
452   task->simdata->rate = maxrate;
453   return MSG_task_put(task, dest, channel);
454 }
455
456 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
457  * host (with a timeout on the waiting of the destination host) and
458  * waits for the end of the transmission.
459  *
460  * This function is used for describing the behavior of an agent. It
461  * takes four parameter.
462  * \param task a #m_task_t to send on another location. This task
463    will not be usable anymore when the function will return. There is
464    no automatic task duplication and you have to save your parameters
465    before calling this function. Tasks are unique and once it has been
466    sent to another location, you should not access it anymore. You do
467    not need to call MSG_task_destroy() but to avoid using, as an
468    effect of inattention, this task anymore, you definitely should
469    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
470    can be transfered iff it has been correctly created with
471    MSG_task_create().
472  * \param dest the destination of the message
473  * \param channel the channel on which the agent should put this
474    task. This value has to be >=0 and < than the maximal number of
475    channels fixed with MSG_set_channel_number().
476  * \param timeout the maximum time to wait for a task before giving
477     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
478     will not be modified
479  * \return #MSG_FATAL if \a task is not properly initialized and
480    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
481    this function was called was shut down. Returns
482    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
483    (network failure, dest failure, timeout...)
484  */
485 MSG_error_t
486 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
487                           double timeout)
488 {
489   xbt_assert1((channel >= 0)
490               && (channel < msg_global->max_channel), "Invalid channel %d",
491               channel);
492
493   return
494     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
495                                  task, timeout);
496 }
497
498 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
499 {
500   return MSG_task_send_with_timeout(task, alias, -1);
501 }
502
503
504 MSG_error_t
505 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
506 {
507   task->simdata->rate = maxrate;
508   return MSG_task_send(task, alias);
509 }
510
511
512 MSG_error_t
513 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
514 {
515   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
516                                       timeout);
517 }
518
519 int MSG_task_listen(const char *alias)
520 {
521   CHECK_HOST();
522
523   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
524 }
525
526 /** \ingroup msg_gos_functions
527  * \brief Test whether there is a pending communication on a channel.
528  *
529  * It takes one parameter.
530  * \param channel the channel on which the agent should be
531    listening. This value has to be >=0 and < than the maximal
532    number of channels fixed with MSG_set_channel_number().
533  * \return 1 if there is a pending communication and 0 otherwise
534  */
535 int MSG_task_Iprobe(m_channel_t channel)
536 {
537   xbt_assert1((channel >= 0)
538               && (channel < msg_global->max_channel), "Invalid channel %d",
539               channel);
540
541   CHECK_HOST();
542
543   return
544     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
545                           (MSG_host_self(), channel));
546 }
547
548 /** \ingroup msg_gos_functions
549
550  * \brief Return the number of tasks waiting to be received on a \a
551    channel and sent by \a host.
552  *
553  * It takes two parameters.
554  * \param channel the channel on which the agent should be
555    listening. This value has to be >=0 and < than the maximal
556    number of channels fixed with MSG_set_channel_number().
557  * \param host the host that is to be watched.
558  * \return the number of tasks waiting to be received on \a channel
559    and sent by \a host.
560  */
561 int MSG_task_probe_from_host(int channel, m_host_t host)
562 {
563   xbt_assert1((channel >= 0)
564               && (channel < msg_global->max_channel), "Invalid channel %d",
565               channel);
566
567   CHECK_HOST();
568
569   return
570     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
571                                              (MSG_host_self(), channel),
572                                              host);
573
574 }
575
576 int MSG_task_listen_from_host(const char *alias, m_host_t host)
577 {
578   CHECK_HOST();
579
580   return
581     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
582                                              host);
583 }
584
585 /** \ingroup msg_gos_functions
586  * \brief Test whether there is a pending communication on a channel, and who sent it.
587  *
588  * It takes one parameter.
589  * \param channel the channel on which the agent should be
590    listening. This value has to be >=0 and < than the maximal
591    number of channels fixed with MSG_set_channel_number().
592  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
593  */
594 int MSG_task_probe_from(m_channel_t channel)
595 {
596   m_task_t task;
597
598   CHECK_HOST();
599
600   xbt_assert1((channel >= 0)
601               && (channel < msg_global->max_channel), "Invalid channel %d",
602               channel);
603
604   if (NULL ==
605       (task =
606        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
607                             (MSG_host_self(), channel))))
608     return -1;
609
610   return MSG_process_get_PID(task->simdata->sender);
611 }
612
613 int MSG_task_listen_from(const char *alias)
614 {
615   m_task_t task;
616
617   CHECK_HOST();
618
619   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
620     return -1;
621
622   return MSG_process_get_PID(task->simdata->sender);
623 }
624
625 /** \ingroup msg_gos_functions
626  * \brief Wait for at most \a max_duration second for a task reception
627    on \a channel.
628
629  * \a PID is updated with the PID of the first process that triggered this event if any.
630  *
631  * It takes three parameters:
632  * \param channel the channel on which the agent should be
633    listening. This value has to be >=0 and < than the maximal.
634    number of channels fixed with MSG_set_channel_number().
635  * \param PID a memory location for storing an int.
636  * \param timeout the maximum time to wait for a task before
637     giving up. In the case of a reception, *\a PID will be updated
638     with the PID of the first process to send a task.
639  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
640    and #MSG_OK otherwise.
641  */
642 MSG_error_t
643 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
644 {
645   m_host_t h = NULL;
646   simdata_host_t h_simdata = NULL;
647   m_task_t t;
648   int first_time = 1;
649   smx_cond_t cond;
650   msg_mailbox_t mailbox;
651
652   xbt_assert1((channel >= 0)
653               && (channel < msg_global->max_channel), "Invalid channel %d",
654               channel);
655
656   if (PID) {
657     *PID = -1;
658   }
659
660   if (timeout == 0.0) {
661     *PID = MSG_task_probe_from(channel);
662     MSG_RETURN(MSG_OK);
663   } else {
664     CHECK_HOST();
665     h = MSG_host_self();
666     h_simdata = h->simdata;
667
668     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
669
670     while (MSG_mailbox_is_empty(mailbox)) {
671       if (timeout > 0) {
672         if (!first_time) {
673           MSG_RETURN(MSG_OK);
674         }
675       }
676
677       SIMIX_mutex_lock(h_simdata->mutex);
678
679       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
680                   "A process is already blocked on this channel %d", channel);
681
682       cond = SIMIX_cond_init();
683
684       MSG_mailbox_set_cond(mailbox, cond);
685
686       if (timeout > 0) {
687         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
688       } else {
689         SIMIX_cond_wait(cond, h_simdata->mutex);
690       }
691
692       SIMIX_cond_destroy(cond);
693       SIMIX_mutex_unlock(h_simdata->mutex);
694
695       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
696         MSG_RETURN(MSG_HOST_FAILURE);
697       }
698
699       MSG_mailbox_set_cond(mailbox, NULL);
700       first_time = 0;
701     }
702
703     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
704       MSG_RETURN(MSG_OK);
705
706
707     if (PID) {
708       *PID = MSG_process_get_PID(t->simdata->sender);
709     }
710
711     MSG_RETURN(MSG_OK);
712   }
713 }
714
715
716 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
717 {
718   m_host_t h = NULL;
719   simdata_host_t h_simdata = NULL;
720   m_task_t t;
721   int first_time = 1;
722   smx_cond_t cond;
723   msg_mailbox_t mailbox;
724
725   if (PID) {
726     *PID = -1;
727   }
728
729   if (timeout == 0.0) {
730     *PID = MSG_task_listen_from(alias);
731     MSG_RETURN(MSG_OK);
732   } else {
733     CHECK_HOST();
734     h = MSG_host_self();
735     h_simdata = h->simdata;
736
737     DEBUG2("Probing on alias %s (%s)", alias, h->name);
738
739     mailbox = MSG_mailbox_get_by_alias(alias);
740
741     while (MSG_mailbox_is_empty(mailbox)) {
742       if (timeout > 0) {
743         if (!first_time) {
744           MSG_RETURN(MSG_OK);
745         }
746       }
747
748       SIMIX_mutex_lock(h_simdata->mutex);
749
750       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
751                   "A process is already blocked on this alias %s", alias);
752
753       cond = SIMIX_cond_init();
754
755       MSG_mailbox_set_cond(mailbox, cond);
756
757       if (timeout > 0) {
758         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
759       } else {
760         SIMIX_cond_wait(cond, h_simdata->mutex);
761       }
762
763       SIMIX_cond_destroy(cond);
764       SIMIX_mutex_unlock(h_simdata->mutex);
765
766       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
767         MSG_RETURN(MSG_HOST_FAILURE);
768       }
769
770       MSG_mailbox_set_cond(mailbox, NULL);
771       first_time = 0;
772     }
773
774     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
775       MSG_RETURN(MSG_OK);
776
777
778     if (PID) {
779       *PID = MSG_process_get_PID(t->simdata->sender);
780     }
781
782     MSG_RETURN(MSG_OK);
783   }
784 }