Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG_task_isend/irecv and MSG_comm_test/wait added in a rush (not quite tested, not...
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "xbt/sysdep.h"
9 #include "xbt/log.h"
10 #include "mailbox.h"
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
14                                 "Logging specific to MSG (gos)");
15
16 /** \ingroup msg_gos_functions
17  *
18  * \brief Return the last value returned by a MSG function (except
19  * MSG_get_errno...).
20  */
21 MSG_error_t MSG_get_errno(void)
22 {
23   return PROCESS_GET_ERRNO();
24 }
25
26 /** \ingroup msg_gos_functions
27  * \brief Executes a task and waits for its termination.
28  *
29  * This function is used for describing the behavior of an agent. It
30  * takes only one parameter.
31  * \param task a #m_task_t to execute on the location on which the
32    agent is running.
33  * \return #MSG_FATAL if \a task is not properly initialized and
34  * #MSG_OK otherwise.
35  */
36 MSG_error_t MSG_task_execute(m_task_t task)
37 {
38   simdata_task_t simdata = NULL;
39   m_process_t self = MSG_process_self();
40   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
41   CHECK_HOST();
42 #ifdef HAVE_TRACING
43   TRACE_msg_task_execute_start (task);
44 #endif
45
46   simdata = task->simdata;
47
48   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
49               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
50
51   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
52
53   if (simdata->computation_amount == 0) {
54 #ifdef HAVE_TRACING
55     TRACE_msg_task_execute_end (task);
56 #endif
57     return MSG_OK;
58   }
59   simdata->refcount++;
60   SIMIX_mutex_lock(simdata->mutex);
61   simdata->compute =
62     SIMIX_action_execute(SIMIX_host_self(), task->name,
63                          simdata->computation_amount);
64   SIMIX_action_set_priority(simdata->compute, simdata->priority);
65
66   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
67   self->simdata->waiting_action = simdata->compute;
68   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
69   do {
70     SIMIX_cond_wait(simdata->cond, simdata->mutex);
71     state = SIMIX_action_get_state(simdata->compute);
72   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
73   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
74   self->simdata->waiting_action = NULL;
75
76   SIMIX_mutex_unlock(simdata->mutex);
77   simdata->refcount--;
78
79   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
80     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
81     SIMIX_action_destroy(task->simdata->compute);
82     simdata->computation_amount = 0.0;
83     simdata->comm = NULL;
84     simdata->compute = NULL;
85 #ifdef HAVE_TRACING
86     TRACE_msg_task_execute_end (task);
87 #endif
88     MSG_RETURN(MSG_OK);
89   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
90     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
91     SIMIX_action_destroy(task->simdata->compute);
92     simdata->comm = NULL;
93     simdata->compute = NULL;
94 #ifdef HAVE_TRACING
95     TRACE_msg_task_execute_end (task);
96 #endif
97     MSG_RETURN(MSG_HOST_FAILURE);
98   } else {
99     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
100     SIMIX_action_destroy(task->simdata->compute);
101     simdata->comm = NULL;
102     simdata->compute = NULL;
103 #ifdef HAVE_TRACING
104     TRACE_msg_task_execute_end (task);
105 #endif
106     MSG_RETURN(MSG_TASK_CANCELLED);
107   }
108 }
109
110 /** \ingroup m_task_management
111  * \brief Creates a new #m_task_t (a parallel one....).
112  *
113  * A constructor for #m_task_t taking six arguments and returning the
114    corresponding object.
115  * \param name a name for the object. It is for user-level information
116    and can be NULL.
117  * \param host_nb the number of hosts implied in the parallel task.
118  * \param host_list an array of \p host_nb m_host_t.
119  * \param computation_amount an array of \p host_nb
120    doubles. computation_amount[i] is the total number of operations
121    that have to be performed on host_list[i].
122  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
123  * \param data a pointer to any data may want to attach to the new
124    object.  It is for user-level information and can be NULL. It can
125    be retrieved with the function \ref MSG_task_get_data.
126  * \see m_task_t
127  * \return The new corresponding object.
128  */
129 m_task_t
130 MSG_parallel_task_create(const char *name, int host_nb,
131                          const m_host_t * host_list,
132                          double *computation_amount,
133                          double *communication_amount, void *data)
134 {
135   int i;
136   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
137   m_task_t task = xbt_new0(s_m_task_t, 1);
138   task->simdata = simdata;
139
140   /* Task structure */
141   task->name = xbt_strdup(name);
142   task->data = data;
143
144   /* Simulator Data */
145   simdata->computation_amount = 0;
146   simdata->message_size = 0;
147   simdata->cond = SIMIX_cond_init();
148   simdata->mutex = SIMIX_mutex_init();
149   simdata->compute = NULL;
150   simdata->comm = NULL;
151   simdata->rate = -1.0;
152   simdata->refcount = 1;
153   simdata->sender = NULL;
154   simdata->receiver = NULL;
155   simdata->source = NULL;
156
157   simdata->host_nb = host_nb;
158   simdata->host_list = xbt_new0(smx_host_t, host_nb);
159   simdata->comp_amount = computation_amount;
160   simdata->comm_amount = communication_amount;
161
162   for (i = 0; i < host_nb; i++)
163     simdata->host_list[i] = host_list[i]->simdata->smx_host;
164
165   return task;
166 }
167
168 MSG_error_t MSG_parallel_task_execute(m_task_t task)
169 {
170   simdata_task_t simdata = NULL;
171   m_process_t self = MSG_process_self();
172   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
173   CHECK_HOST();
174
175   simdata = task->simdata;
176
177   xbt_assert0((!simdata->compute)
178               && (task->simdata->refcount == 1),
179               "This task is executed somewhere else. Go fix your code!");
180
181   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
182
183   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
184
185   simdata->refcount++;
186
187   SIMIX_mutex_lock(simdata->mutex);
188   simdata->compute =
189     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
190                                   simdata->host_list, simdata->comp_amount,
191                                   simdata->comm_amount, 1.0, -1.0);
192
193   self->simdata->waiting_action = simdata->compute;
194   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
195   do {
196     SIMIX_cond_wait(simdata->cond, simdata->mutex);
197     state = SIMIX_action_get_state(task->simdata->compute);
198   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
199
200   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
201   self->simdata->waiting_action = NULL;
202
203
204   SIMIX_mutex_unlock(simdata->mutex);
205   simdata->refcount--;
206
207   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
208     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
209     SIMIX_action_destroy(task->simdata->compute);
210     simdata->computation_amount = 0.0;
211     simdata->comm = NULL;
212     simdata->compute = NULL;
213     MSG_RETURN(MSG_OK);
214   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
215     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
216     SIMIX_action_destroy(task->simdata->compute);
217     simdata->comm = NULL;
218     simdata->compute = NULL;
219     MSG_RETURN(MSG_HOST_FAILURE);
220   } else {
221     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
222     SIMIX_action_destroy(task->simdata->compute);
223     simdata->comm = NULL;
224     simdata->compute = NULL;
225     MSG_RETURN(MSG_TASK_CANCELLED);
226   }
227
228 }
229
230
231 /** \ingroup msg_gos_functions
232  * \brief Sleep for the specified number of seconds
233  *
234  * Makes the current process sleep until \a time seconds have elapsed.
235  *
236  * \param nb_sec a number of second
237  */
238 MSG_error_t MSG_process_sleep(double nb_sec)
239 {
240   smx_action_t act_sleep;
241   m_process_t proc = MSG_process_self();
242   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
243   smx_mutex_t mutex;
244   smx_cond_t cond;
245
246 #ifdef HAVE_TRACING
247   TRACE_msg_process_sleep_in (MSG_process_self());
248 #endif
249
250   /* create action to sleep */
251   act_sleep =
252     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
253                        nb_sec);
254
255   mutex = SIMIX_mutex_init();
256   SIMIX_mutex_lock(mutex);
257
258   /* create conditional and register action to it */
259   cond = SIMIX_cond_init();
260
261   proc->simdata->waiting_action = act_sleep;
262   SIMIX_register_action_to_condition(act_sleep, cond);
263   do {
264     SIMIX_cond_wait(cond, mutex);
265     state = SIMIX_action_get_state(act_sleep);
266   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
267   proc->simdata->waiting_action = NULL;
268   SIMIX_unregister_action_to_condition(act_sleep, cond);
269   SIMIX_mutex_unlock(mutex);
270
271   /* remove variables */
272   SIMIX_cond_destroy(cond);
273   SIMIX_mutex_destroy(mutex);
274
275   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
276     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
277       SIMIX_action_destroy(act_sleep);
278 #ifdef HAVE_TRACING
279       TRACE_msg_process_sleep_out (MSG_process_self());
280 #endif
281       MSG_RETURN(MSG_HOST_FAILURE);
282     }
283   } else {
284     SIMIX_action_destroy(act_sleep);
285 #ifdef HAVE_TRACING
286     TRACE_msg_process_sleep_out (MSG_process_self());
287 #endif
288     MSG_RETURN(MSG_HOST_FAILURE);
289   }
290
291   SIMIX_action_destroy(act_sleep);
292 #ifdef HAVE_TRACING
293   TRACE_msg_process_sleep_out (MSG_process_self());
294 #endif
295   MSG_RETURN(MSG_OK);
296 }
297
298 /** \ingroup msg_gos_functions
299  * \brief Listen on \a channel and waits for receiving a task from \a host.
300  *
301  * It takes three parameters.
302  * \param task a memory location for storing a #m_task_t. It will
303    hold a task when this function will return. Thus \a task should not
304    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
305    those two condition does not hold, there will be a warning message.
306  * \param channel the channel on which the agent should be
307    listening. This value has to be >=0 and < than the maximal
308    number of channels fixed with MSG_set_channel_number().
309  * \param host the host that is to be watched.
310  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
311    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
312  */
313 MSG_error_t
314 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
315 {
316   return MSG_task_get_ext(task, channel, -1, host);
317 }
318
319 /** \ingroup msg_gos_functions
320  * \brief Listen on a channel and wait for receiving a task.
321  *
322  * It takes two parameters.
323  * \param task a memory location for storing a #m_task_t. It will
324    hold a task when this function will return. Thus \a task should not
325    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
326    those two condition does not hold, there will be a warning message.
327  * \param channel the channel on which the agent should be
328    listening. This value has to be >=0 and < than the maximal
329    number of channels fixed with MSG_set_channel_number().
330  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
331  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
332  */
333 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
334 {
335   return MSG_task_get_with_timeout(task, channel, -1);
336 }
337
338 /** \ingroup msg_gos_functions
339  * \brief Listen on a channel and wait for receiving a task with a timeout.
340  *
341  * It takes three parameters.
342  * \param task a memory location for storing a #m_task_t. It will
343    hold a task when this function will return. Thus \a task should not
344    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
345    those two condition does not hold, there will be a warning message.
346  * \param channel the channel on which the agent should be
347    listening. This value has to be >=0 and < than the maximal
348    number of channels fixed with MSG_set_channel_number().
349  * \param max_duration the maximum time to wait for a task before giving
350     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
351     will not be modified and will still be
352     equal to \c NULL when returning.
353  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
354    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
355  */
356 MSG_error_t
357 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
358                           double max_duration)
359 {
360   return MSG_task_get_ext(task, channel, max_duration, NULL);
361 }
362
363 /** \defgroup msg_gos_functions MSG Operating System Functions
364  *  \brief This section describes the functions that can be used
365  *  by an agent for handling some task.
366  */
367
368 MSG_error_t
369 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
370                  m_host_t host)
371 {
372   xbt_assert1((channel >= 0)
373               && (channel < msg_global->max_channel), "Invalid channel %d",
374               channel);
375
376   return
377     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
378                              (MSG_host_self(), channel), task, host, timeout);
379 }
380
381 MSG_error_t
382 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
383 {
384   return MSG_task_receive_ext(task, alias, -1, host);
385 }
386
387 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
388 {
389   return MSG_task_receive_with_timeout(task, alias, -1);
390 }
391
392 MSG_error_t
393 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
394                               double timeout)
395 {
396   return MSG_task_receive_ext(task, alias, timeout, NULL);
397 }
398
399 MSG_error_t
400 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
401                      m_host_t host)
402 {
403   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
404                                   timeout);
405 }
406
407 msg_comm_t MSG_task_isend(m_task_t task, const char *alias) {
408   simdata_task_t t_simdata = NULL;
409   m_process_t process = MSG_process_self();
410   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
411
412   CHECK_HOST();
413
414   /* FIXME: these functions are not tracable */
415
416   /* Prepare the task to send */
417   t_simdata = task->simdata;
418   t_simdata->sender = process;
419   t_simdata->source = MSG_host_self();
420
421   xbt_assert0(t_simdata->refcount == 1,
422               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
423
424   t_simdata->refcount++;
425   msg_global->sent_msg++;
426
427   process->simdata->waiting_task = task;
428
429   /* Send it by calling SIMIX network layer */
430
431   /* Kept for semantical compatibility with older implementation */
432   if(mailbox->cond)
433     SIMIX_cond_signal(mailbox->cond);
434
435   return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
436       task, sizeof(void*), &t_simdata->comm);
437 }
438
439 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias) {
440   smx_comm_t comm;
441   smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
442   msg_mailbox_t mailbox=MSG_mailbox_get_by_alias(alias);
443   size_t size = sizeof(void*);
444
445   CHECK_HOST();
446
447   /* FIXME: these functions are not tracable */
448
449   memset(&comm,0,sizeof(comm));
450
451   /* Kept for compatibility with older implementation */
452   xbt_assert1(!MSG_mailbox_get_cond(mailbox),
453               "A process is already blocked on this channel %s",
454               MSG_mailbox_get_alias(mailbox));
455
456   /* Sanity check */
457   xbt_assert0(task, "Null pointer for the task storage");
458
459   if (*task)
460     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
461
462   /* Try to receive it by calling SIMIX network layer */
463   return SIMIX_network_irecv(rdv, task, &size);
464 }
465 int MSG_comm_test(msg_comm_t comm) {
466   return SIMIX_network_test(comm);
467 }
468 MSG_error_t MSG_comm_wait(msg_comm_t comm,double timeout) {
469   xbt_ex_t e;
470   MSG_error_t res = MSG_OK;
471   TRY {
472     SIMIX_network_wait(comm,timeout);
473     //  (*task)->simdata->refcount--;
474
475     /* FIXME: these functions are not tracable */
476   }  CATCH(e){
477       switch(e.category){
478         case host_error:
479           res = MSG_HOST_FAILURE;
480           break;
481         case network_error:
482           res = MSG_TRANSFER_FAILURE;
483           break;
484         case timeout_error:
485           res = MSG_TIMEOUT;
486           break;
487         default:
488           xbt_die(bprintf("Unhandled SIMIX network exception: %s",e.msg));
489       }
490       xbt_ex_free(e);
491     }
492   return res;
493 }
494
495 /** \ingroup msg_gos_functions
496  * \brief Put a task on a channel of an host and waits for the end of the
497  * transmission.
498  *
499  * This function is used for describing the behavior of an agent. It
500  * takes three parameter.
501  * \param task a #m_task_t to send on another location. This task
502    will not be usable anymore when the function will return. There is
503    no automatic task duplication and you have to save your parameters
504    before calling this function. Tasks are unique and once it has been
505    sent to another location, you should not access it anymore. You do
506    not need to call MSG_task_destroy() but to avoid using, as an
507    effect of inattention, this task anymore, you definitely should
508    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
509    can be transfered iff it has been correctly created with
510    MSG_task_create().
511  * \param dest the destination of the message
512  * \param channel the channel on which the agent should put this
513    task. This value has to be >=0 and < than the maximal number of
514    channels fixed with MSG_set_channel_number().
515  * \return #MSG_FATAL if \a task is not properly initialized and
516  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
517  * this function was called was shut down. Returns
518  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
519  * (network failure, dest failure)
520  */
521 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
522 {
523   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
524 }
525
526 /** \ingroup msg_gos_functions
527  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
528  * rate.
529  *
530  * \sa MSG_task_put
531  */
532 MSG_error_t
533 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
534                      double maxrate)
535 {
536   task->simdata->rate = maxrate;
537   return MSG_task_put(task, dest, channel);
538 }
539
540 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
541  * host (with a timeout on the waiting of the destination host) and
542  * waits for the end of the transmission.
543  *
544  * This function is used for describing the behavior of an agent. It
545  * takes four parameter.
546  * \param task a #m_task_t to send on another location. This task
547    will not be usable anymore when the function will return. There is
548    no automatic task duplication and you have to save your parameters
549    before calling this function. Tasks are unique and once it has been
550    sent to another location, you should not access it anymore. You do
551    not need to call MSG_task_destroy() but to avoid using, as an
552    effect of inattention, this task anymore, you definitely should
553    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
554    can be transfered iff it has been correctly created with
555    MSG_task_create().
556  * \param dest the destination of the message
557  * \param channel the channel on which the agent should put this
558    task. This value has to be >=0 and < than the maximal number of
559    channels fixed with MSG_set_channel_number().
560  * \param timeout the maximum time to wait for a task before giving
561     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
562     will not be modified
563  * \return #MSG_FATAL if \a task is not properly initialized and
564    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
565    this function was called was shut down. Returns
566    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
567    (network failure, dest failure, timeout...)
568  */
569 MSG_error_t
570 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
571                           double timeout)
572 {
573   xbt_assert1((channel >= 0)
574               && (channel < msg_global->max_channel), "Invalid channel %d",
575               channel);
576
577   return
578     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
579                                  task, timeout);
580 }
581
582 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
583 {
584   return MSG_task_send_with_timeout(task, alias, -1);
585 }
586
587
588 MSG_error_t
589 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
590 {
591   task->simdata->rate = maxrate;
592   return MSG_task_send(task, alias);
593 }
594
595
596 MSG_error_t
597 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
598 {
599   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
600                                       timeout);
601 }
602
603 int MSG_task_listen(const char *alias)
604 {
605   CHECK_HOST();
606
607   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
608 }
609
610 /** \ingroup msg_gos_functions
611  * \brief Test whether there is a pending communication on a channel.
612  *
613  * It takes one parameter.
614  * \param channel the channel on which the agent should be
615    listening. This value has to be >=0 and < than the maximal
616    number of channels fixed with MSG_set_channel_number().
617  * \return 1 if there is a pending communication and 0 otherwise
618  */
619 int MSG_task_Iprobe(m_channel_t channel)
620 {
621   xbt_assert1((channel >= 0)
622               && (channel < msg_global->max_channel), "Invalid channel %d",
623               channel);
624
625   CHECK_HOST();
626
627   return
628     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
629                           (MSG_host_self(), channel));
630 }
631
632 /** \ingroup msg_gos_functions
633
634  * \brief Return the number of tasks waiting to be received on a \a
635    channel and sent by \a host.
636  *
637  * It takes two parameters.
638  * \param channel the channel on which the agent should be
639    listening. This value has to be >=0 and < than the maximal
640    number of channels fixed with MSG_set_channel_number().
641  * \param host the host that is to be watched.
642  * \return the number of tasks waiting to be received on \a channel
643    and sent by \a host.
644  */
645 int MSG_task_probe_from_host(int channel, m_host_t host)
646 {
647   xbt_assert1((channel >= 0)
648               && (channel < msg_global->max_channel), "Invalid channel %d",
649               channel);
650
651   CHECK_HOST();
652
653   return
654     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
655                                              (MSG_host_self(), channel),
656                                              host);
657
658 }
659
660 int MSG_task_listen_from_host(const char *alias, m_host_t host)
661 {
662   CHECK_HOST();
663
664   return
665     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
666                                              host);
667 }
668
669 /** \ingroup msg_gos_functions
670  * \brief Test whether there is a pending communication on a channel, and who sent it.
671  *
672  * It takes one parameter.
673  * \param channel the channel on which the agent should be
674    listening. This value has to be >=0 and < than the maximal
675    number of channels fixed with MSG_set_channel_number().
676  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
677  */
678 int MSG_task_probe_from(m_channel_t channel)
679 {
680   m_task_t task;
681
682   CHECK_HOST();
683
684   xbt_assert1((channel >= 0)
685               && (channel < msg_global->max_channel), "Invalid channel %d",
686               channel);
687
688   if (NULL ==
689       (task =
690        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
691                             (MSG_host_self(), channel))))
692     return -1;
693
694   return MSG_process_get_PID(task->simdata->sender);
695 }
696
697 int MSG_task_listen_from(const char *alias)
698 {
699   m_task_t task;
700
701   CHECK_HOST();
702
703   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
704     return -1;
705
706   return MSG_process_get_PID(task->simdata->sender);
707 }
708
709 /** \ingroup msg_gos_functions
710  * \brief Wait for at most \a max_duration second for a task reception
711    on \a channel.
712
713  * \a PID is updated with the PID of the first process that triggered this event if any.
714  *
715  * It takes three parameters:
716  * \param channel the channel on which the agent should be
717    listening. This value has to be >=0 and < than the maximal.
718    number of channels fixed with MSG_set_channel_number().
719  * \param PID a memory location for storing an int.
720  * \param timeout the maximum time to wait for a task before
721     giving up. In the case of a reception, *\a PID will be updated
722     with the PID of the first process to send a task.
723  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
724    and #MSG_OK otherwise.
725  */
726 MSG_error_t
727 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
728 {
729   m_host_t h = NULL;
730   simdata_host_t h_simdata = NULL;
731   m_task_t t;
732   int first_time = 1;
733   smx_cond_t cond;
734   msg_mailbox_t mailbox;
735
736   xbt_assert1((channel >= 0)
737               && (channel < msg_global->max_channel), "Invalid channel %d",
738               channel);
739
740   if (PID) {
741     *PID = -1;
742   }
743
744   if (timeout == 0.0) {
745     *PID = MSG_task_probe_from(channel);
746     MSG_RETURN(MSG_OK);
747   } else {
748     CHECK_HOST();
749     h = MSG_host_self();
750     h_simdata = h->simdata;
751
752     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
753
754     while (MSG_mailbox_is_empty(mailbox)) {
755       if (timeout > 0) {
756         if (!first_time) {
757           MSG_RETURN(MSG_OK);
758         }
759       }
760
761       SIMIX_mutex_lock(h_simdata->mutex);
762
763       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
764                   "A process is already blocked on this channel %d", channel);
765
766       cond = SIMIX_cond_init();
767
768       MSG_mailbox_set_cond(mailbox, cond);
769
770       if (timeout > 0) {
771         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
772       } else {
773         SIMIX_cond_wait(cond, h_simdata->mutex);
774       }
775
776       SIMIX_cond_destroy(cond);
777       SIMIX_mutex_unlock(h_simdata->mutex);
778
779       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
780         MSG_RETURN(MSG_HOST_FAILURE);
781       }
782
783       MSG_mailbox_set_cond(mailbox, NULL);
784       first_time = 0;
785     }
786
787     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
788       MSG_RETURN(MSG_OK);
789
790
791     if (PID) {
792       *PID = MSG_process_get_PID(t->simdata->sender);
793     }
794
795     MSG_RETURN(MSG_OK);
796   }
797 }
798
799
800 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
801 {
802   m_host_t h = NULL;
803   simdata_host_t h_simdata = NULL;
804   m_task_t t;
805   int first_time = 1;
806   smx_cond_t cond;
807   msg_mailbox_t mailbox;
808
809   if (PID) {
810     *PID = -1;
811   }
812
813   if (timeout == 0.0) {
814     *PID = MSG_task_listen_from(alias);
815     MSG_RETURN(MSG_OK);
816   } else {
817     CHECK_HOST();
818     h = MSG_host_self();
819     h_simdata = h->simdata;
820
821     DEBUG2("Probing on alias %s (%s)", alias, h->name);
822
823     mailbox = MSG_mailbox_get_by_alias(alias);
824
825     while (MSG_mailbox_is_empty(mailbox)) {
826       if (timeout > 0) {
827         if (!first_time) {
828           MSG_RETURN(MSG_OK);
829         }
830       }
831
832       SIMIX_mutex_lock(h_simdata->mutex);
833
834       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
835                   "A process is already blocked on this alias %s", alias);
836
837       cond = SIMIX_cond_init();
838
839       MSG_mailbox_set_cond(mailbox, cond);
840
841       if (timeout > 0) {
842         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
843       } else {
844         SIMIX_cond_wait(cond, h_simdata->mutex);
845       }
846
847       SIMIX_cond_destroy(cond);
848       SIMIX_mutex_unlock(h_simdata->mutex);
849
850       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
851         MSG_RETURN(MSG_HOST_FAILURE);
852       }
853
854       MSG_mailbox_set_cond(mailbox, NULL);
855       first_time = 0;
856     }
857
858     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
859       MSG_RETURN(MSG_OK);
860
861
862     if (PID) {
863       *PID = MSG_process_get_PID(t->simdata->sender);
864     }
865
866     MSG_RETURN(MSG_OK);
867   }
868 }