Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f59e937a9f582a960c3c123efa194ad0187ad9f7
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "xbt/sysdep.h"
9 #include "xbt/log.h"
10 #include "mailbox.h"
11
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
14                                 "Logging specific to MSG (gos)");
15
16 /** \ingroup msg_gos_functions
17  *
18  * \brief Return the last value returned by a MSG function (except
19  * MSG_get_errno...).
20  */
21 MSG_error_t MSG_get_errno(void)
22 {
23   return PROCESS_GET_ERRNO();
24 }
25
26 /** \ingroup msg_gos_functions
27  * \brief Executes a task and waits for its termination.
28  *
29  * This function is used for describing the behavior of an agent. It
30  * takes only one parameter.
31  * \param task a #m_task_t to execute on the location on which the
32    agent is running.
33  * \return #MSG_FATAL if \a task is not properly initialized and
34  * #MSG_OK otherwise.
35  */
36 MSG_error_t MSG_task_execute(m_task_t task)
37 {
38   simdata_task_t simdata = NULL;
39   m_process_t self = MSG_process_self();
40   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
41   CHECK_HOST();
42
43   simdata = task->simdata;
44
45   xbt_assert0(simdata->host_nb==0, "This is a parallel task. Go to hell.");
46
47 #ifdef HAVE_TRACING
48   TRACE_msg_task_execute_start (task);
49 #endif
50
51   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
52               "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
53
54   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
55
56   if (simdata->computation_amount == 0) {
57 #ifdef HAVE_TRACING
58     TRACE_msg_task_execute_end (task);
59 #endif
60     return MSG_OK;
61   }
62   simdata->refcount++;
63   SIMIX_mutex_lock(simdata->mutex);
64   simdata->compute =
65     SIMIX_action_execute(SIMIX_host_self(), task->name,
66                          simdata->computation_amount);
67   SIMIX_action_set_priority(simdata->compute, simdata->priority);
68
69   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
70   self->simdata->waiting_action = simdata->compute;
71   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
72   do {
73     SIMIX_cond_wait(simdata->cond, simdata->mutex);
74     state = SIMIX_action_get_state(simdata->compute);
75   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
76   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
77   self->simdata->waiting_action = NULL;
78
79   SIMIX_mutex_unlock(simdata->mutex);
80   simdata->refcount--;
81
82   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
83     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
84     SIMIX_action_destroy(task->simdata->compute);
85     simdata->computation_amount = 0.0;
86     simdata->comm = NULL;
87     simdata->compute = NULL;
88 #ifdef HAVE_TRACING
89     TRACE_msg_task_execute_end (task);
90 #endif
91     MSG_RETURN(MSG_OK);
92   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
93     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
94     SIMIX_action_destroy(task->simdata->compute);
95     simdata->comm = NULL;
96     simdata->compute = NULL;
97 #ifdef HAVE_TRACING
98     TRACE_msg_task_execute_end (task);
99 #endif
100     MSG_RETURN(MSG_HOST_FAILURE);
101   } else {
102     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
103     SIMIX_action_destroy(task->simdata->compute);
104     simdata->comm = NULL;
105     simdata->compute = NULL;
106 #ifdef HAVE_TRACING
107     TRACE_msg_task_execute_end (task);
108 #endif
109     MSG_RETURN(MSG_TASK_CANCELLED);
110   }
111 }
112
113 /** \ingroup m_task_management
114  * \brief Creates a new #m_task_t (a parallel one....).
115  *
116  * A constructor for #m_task_t taking six arguments and returning the
117    corresponding object.
118  * \param name a name for the object. It is for user-level information
119    and can be NULL.
120  * \param host_nb the number of hosts implied in the parallel task.
121  * \param host_list an array of \p host_nb m_host_t.
122  * \param computation_amount an array of \p host_nb
123    doubles. computation_amount[i] is the total number of operations
124    that have to be performed on host_list[i].
125  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
126  * \param data a pointer to any data may want to attach to the new
127    object.  It is for user-level information and can be NULL. It can
128    be retrieved with the function \ref MSG_task_get_data.
129  * \see m_task_t
130  * \return The new corresponding object.
131  */
132 m_task_t
133 MSG_parallel_task_create(const char *name, int host_nb,
134                          const m_host_t * host_list,
135                          double *computation_amount,
136                          double *communication_amount, void *data)
137 {
138   int i;
139   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
140   m_task_t task = xbt_new0(s_m_task_t, 1);
141   task->simdata = simdata;
142
143   /* Task structure */
144   task->name = xbt_strdup(name);
145   task->data = data;
146
147   /* Simulator Data */
148   simdata->computation_amount = 0;
149   simdata->message_size = 0;
150   simdata->cond = SIMIX_cond_init();
151   simdata->mutex = SIMIX_mutex_init();
152   simdata->compute = NULL;
153   simdata->comm = NULL;
154   simdata->rate = -1.0;
155   simdata->refcount = 1;
156   simdata->sender = NULL;
157   simdata->receiver = NULL;
158   simdata->source = NULL;
159
160   simdata->host_nb = host_nb;
161   simdata->host_list = xbt_new0(smx_host_t, host_nb);
162   simdata->comp_amount = computation_amount;
163   simdata->comm_amount = communication_amount;
164
165   for (i = 0; i < host_nb; i++)
166     simdata->host_list[i] = host_list[i]->simdata->smx_host;
167
168   return task;
169 }
170
171 MSG_error_t MSG_parallel_task_execute(m_task_t task)
172 {
173   simdata_task_t simdata = NULL;
174   m_process_t self = MSG_process_self();
175   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
176   CHECK_HOST();
177
178   simdata = task->simdata;
179
180   xbt_assert0((!simdata->compute)
181               && (task->simdata->refcount == 1),
182               "This task is executed somewhere else. Go fix your code!");
183
184   xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
185
186   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
187
188   simdata->refcount++;
189
190   SIMIX_mutex_lock(simdata->mutex);
191   simdata->compute =
192     SIMIX_action_parallel_execute(task->name, simdata->host_nb,
193                                   simdata->host_list, simdata->comp_amount,
194                                   simdata->comm_amount, 1.0, -1.0);
195
196   self->simdata->waiting_action = simdata->compute;
197   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
198   do {
199     SIMIX_cond_wait(simdata->cond, simdata->mutex);
200     state = SIMIX_action_get_state(task->simdata->compute);
201   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
202
203   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
204   self->simdata->waiting_action = NULL;
205
206
207   SIMIX_mutex_unlock(simdata->mutex);
208   simdata->refcount--;
209
210   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
211     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
212     SIMIX_action_destroy(task->simdata->compute);
213     simdata->computation_amount = 0.0;
214     simdata->comm = NULL;
215     simdata->compute = NULL;
216     MSG_RETURN(MSG_OK);
217   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
218     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
219     SIMIX_action_destroy(task->simdata->compute);
220     simdata->comm = NULL;
221     simdata->compute = NULL;
222     MSG_RETURN(MSG_HOST_FAILURE);
223   } else {
224     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225     SIMIX_action_destroy(task->simdata->compute);
226     simdata->comm = NULL;
227     simdata->compute = NULL;
228     MSG_RETURN(MSG_TASK_CANCELLED);
229   }
230
231 }
232
233
234 /** \ingroup msg_gos_functions
235  * \brief Sleep for the specified number of seconds
236  *
237  * Makes the current process sleep until \a time seconds have elapsed.
238  *
239  * \param nb_sec a number of second
240  */
241 MSG_error_t MSG_process_sleep(double nb_sec)
242 {
243   smx_action_t act_sleep;
244   m_process_t proc = MSG_process_self();
245   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
246   smx_mutex_t mutex;
247   smx_cond_t cond;
248
249 #ifdef HAVE_TRACING
250   TRACE_msg_process_sleep_in (MSG_process_self());
251 #endif
252
253   /* create action to sleep */
254   act_sleep =
255     SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
256                        nb_sec);
257
258   mutex = SIMIX_mutex_init();
259   SIMIX_mutex_lock(mutex);
260
261   /* create conditional and register action to it */
262   cond = SIMIX_cond_init();
263
264   proc->simdata->waiting_action = act_sleep;
265   SIMIX_register_action_to_condition(act_sleep, cond);
266   do {
267     SIMIX_cond_wait(cond, mutex);
268     state = SIMIX_action_get_state(act_sleep);
269   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
270   proc->simdata->waiting_action = NULL;
271   SIMIX_unregister_action_to_condition(act_sleep, cond);
272   SIMIX_mutex_unlock(mutex);
273
274   /* remove variables */
275   SIMIX_cond_destroy(cond);
276   SIMIX_mutex_destroy(mutex);
277
278   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
279     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
280       SIMIX_action_destroy(act_sleep);
281 #ifdef HAVE_TRACING
282       TRACE_msg_process_sleep_out (MSG_process_self());
283 #endif
284       MSG_RETURN(MSG_HOST_FAILURE);
285     }
286   } else {
287     SIMIX_action_destroy(act_sleep);
288 #ifdef HAVE_TRACING
289     TRACE_msg_process_sleep_out (MSG_process_self());
290 #endif
291     MSG_RETURN(MSG_HOST_FAILURE);
292   }
293
294   SIMIX_action_destroy(act_sleep);
295 #ifdef HAVE_TRACING
296   TRACE_msg_process_sleep_out (MSG_process_self());
297 #endif
298   MSG_RETURN(MSG_OK);
299 }
300
301 /** \ingroup msg_gos_functions
302  * \brief Listen on \a channel and waits for receiving a task from \a host.
303  *
304  * It takes three parameters.
305  * \param task a memory location for storing a #m_task_t. It will
306    hold a task when this function will return. Thus \a task should not
307    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
308    those two condition does not hold, there will be a warning message.
309  * \param channel the channel on which the agent should be
310    listening. This value has to be >=0 and < than the maximal
311    number of channels fixed with MSG_set_channel_number().
312  * \param host the host that is to be watched.
313  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
314    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
315  */
316 MSG_error_t
317 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
318 {
319   return MSG_task_get_ext(task, channel, -1, host);
320 }
321
322 /** \ingroup msg_gos_functions
323  * \brief Listen on a channel and wait for receiving a task.
324  *
325  * It takes two parameters.
326  * \param task a memory location for storing a #m_task_t. It will
327    hold a task when this function will return. Thus \a task should not
328    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
329    those two condition does not hold, there will be a warning message.
330  * \param channel the channel on which the agent should be
331    listening. This value has to be >=0 and < than the maximal
332    number of channels fixed with MSG_set_channel_number().
333  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
334  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
335  */
336 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
337 {
338   return MSG_task_get_with_timeout(task, channel, -1);
339 }
340
341 /** \ingroup msg_gos_functions
342  * \brief Listen on a channel and wait for receiving a task with a timeout.
343  *
344  * It takes three parameters.
345  * \param task a memory location for storing a #m_task_t. It will
346    hold a task when this function will return. Thus \a task should not
347    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
348    those two condition does not hold, there will be a warning message.
349  * \param channel the channel on which the agent should be
350    listening. This value has to be >=0 and < than the maximal
351    number of channels fixed with MSG_set_channel_number().
352  * \param max_duration the maximum time to wait for a task before giving
353     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
354     will not be modified and will still be
355     equal to \c NULL when returning.
356  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
357    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
358  */
359 MSG_error_t
360 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
361                           double max_duration)
362 {
363   return MSG_task_get_ext(task, channel, max_duration, NULL);
364 }
365
366 /** \defgroup msg_gos_functions MSG Operating System Functions
367  *  \brief This section describes the functions that can be used
368  *  by an agent for handling some task.
369  */
370
371 MSG_error_t
372 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
373                  m_host_t host)
374 {
375   xbt_assert1((channel >= 0)
376               && (channel < msg_global->max_channel), "Invalid channel %d",
377               channel);
378
379   return
380     MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
381                              (MSG_host_self(), channel), task, host, timeout);
382 }
383
384 MSG_error_t
385 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
386 {
387   return MSG_task_receive_ext(task, alias, -1, host);
388 }
389
390 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
391 {
392   return MSG_task_receive_with_timeout(task, alias, -1);
393 }
394
395 MSG_error_t
396 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
397                               double timeout)
398 {
399   return MSG_task_receive_ext(task, alias, timeout, NULL);
400 }
401
402 MSG_error_t
403 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
404                      m_host_t host)
405 {
406   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
407                                   timeout);
408 }
409
410 msg_comm_t MSG_task_isend(m_task_t task, const char *alias) {
411   simdata_task_t t_simdata = NULL;
412   m_process_t process = MSG_process_self();
413   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
414
415   CHECK_HOST();
416
417   /* FIXME: these functions are not tracable */
418
419   /* Prepare the task to send */
420   t_simdata = task->simdata;
421   t_simdata->sender = process;
422   t_simdata->source = MSG_host_self();
423
424   xbt_assert0(t_simdata->refcount == 1,
425               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
426
427   t_simdata->refcount++;
428   msg_global->sent_msg++;
429   process->simdata->waiting_task = task;
430
431   /* Send it by calling SIMIX network layer */
432
433   /* Kept for semantical compatibility with older implementation */
434   if(mailbox->cond)
435     SIMIX_cond_signal(mailbox->cond);
436
437   return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size, t_simdata->rate,
438       task, sizeof(void*), &t_simdata->comm);
439 }
440
441 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias) {
442   smx_comm_t comm;
443   smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
444   msg_mailbox_t mailbox=MSG_mailbox_get_by_alias(alias);
445   size_t size = sizeof(void*);
446
447   CHECK_HOST();
448
449   /* FIXME: these functions are not tracable */
450
451   memset(&comm,0,sizeof(comm));
452
453   /* Kept for compatibility with older implementation */
454   xbt_assert1(!MSG_mailbox_get_cond(mailbox),
455               "A process is already blocked on this channel %s",
456               MSG_mailbox_get_alias(mailbox));
457
458   /* Sanity check */
459   xbt_assert0(task, "Null pointer for the task storage");
460
461   if (*task)
462     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
463
464   /* Try to receive it by calling SIMIX network layer */
465   return SIMIX_network_irecv(rdv, task, &size);
466 }
467 int MSG_comm_test(msg_comm_t comm) {
468   return SIMIX_network_test(comm);
469 }
470
471 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout) {
472   xbt_ex_t e;
473   MSG_error_t res = MSG_OK;
474   TRY {
475     SIMIX_network_wait(comm,timeout);
476
477     if(!(comm->src_proc == SIMIX_process_self()))
478     {
479         m_task_t  task;
480         task = (m_task_t) SIMIX_communication_get_src_buf(comm);
481         task->simdata->refcount--;
482     }
483
484     /* FIXME: these functions are not tracable */
485   }  CATCH(e){
486       switch(e.category){
487         case host_error:
488           res = MSG_HOST_FAILURE;
489           break;
490         case network_error:
491           res = MSG_TRANSFER_FAILURE;
492           break;
493         case timeout_error:
494           res = MSG_TIMEOUT;
495           break;
496         default:
497           xbt_die(bprintf("Unhandled SIMIX network exception: %s",e.msg));
498       }
499       xbt_ex_free(e);
500     }
501   return res;
502 }
503
504 /** \ingroup msg_gos_functions
505  * \brief Put a task on a channel of an host and waits for the end of the
506  * transmission.
507  *
508  * This function is used for describing the behavior of an agent. It
509  * takes three parameter.
510  * \param task a #m_task_t to send on another location. This task
511    will not be usable anymore when the function will return. There is
512    no automatic task duplication and you have to save your parameters
513    before calling this function. Tasks are unique and once it has been
514    sent to another location, you should not access it anymore. You do
515    not need to call MSG_task_destroy() but to avoid using, as an
516    effect of inattention, this task anymore, you definitely should
517    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
518    can be transfered iff it has been correctly created with
519    MSG_task_create().
520  * \param dest the destination of the message
521  * \param channel the channel on which the agent should put this
522    task. This value has to be >=0 and < than the maximal number of
523    channels fixed with MSG_set_channel_number().
524  * \return #MSG_FATAL if \a task is not properly initialized and
525  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
526  * this function was called was shut down. Returns
527  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
528  * (network failure, dest failure)
529  */
530 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
531 {
532   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
533 }
534
535 /** \ingroup msg_gos_functions
536  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
537  * rate.
538  *
539  * \sa MSG_task_put
540  */
541 MSG_error_t
542 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
543                      double maxrate)
544 {
545   task->simdata->rate = maxrate;
546   return MSG_task_put(task, dest, channel);
547 }
548
549 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
550  * host (with a timeout on the waiting of the destination host) and
551  * waits for the end of the transmission.
552  *
553  * This function is used for describing the behavior of an agent. It
554  * takes four parameter.
555  * \param task a #m_task_t to send on another location. This task
556    will not be usable anymore when the function will return. There is
557    no automatic task duplication and you have to save your parameters
558    before calling this function. Tasks are unique and once it has been
559    sent to another location, you should not access it anymore. You do
560    not need to call MSG_task_destroy() but to avoid using, as an
561    effect of inattention, this task anymore, you definitely should
562    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
563    can be transfered iff it has been correctly created with
564    MSG_task_create().
565  * \param dest the destination of the message
566  * \param channel the channel on which the agent should put this
567    task. This value has to be >=0 and < than the maximal number of
568    channels fixed with MSG_set_channel_number().
569  * \param timeout the maximum time to wait for a task before giving
570     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
571     will not be modified
572  * \return #MSG_FATAL if \a task is not properly initialized and
573    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
574    this function was called was shut down. Returns
575    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
576    (network failure, dest failure, timeout...)
577  */
578 MSG_error_t
579 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
580                           double timeout)
581 {
582   xbt_assert1((channel >= 0)
583               && (channel < msg_global->max_channel), "Invalid channel %d",
584               channel);
585
586   return
587     MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
588                                  task, timeout);
589 }
590
591 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
592 {
593   return MSG_task_send_with_timeout(task, alias, -1);
594 }
595
596
597 MSG_error_t
598 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
599 {
600   task->simdata->rate = maxrate;
601   return MSG_task_send(task, alias);
602 }
603
604
605 MSG_error_t
606 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
607 {
608   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
609                                       timeout);
610 }
611
612 int MSG_task_listen(const char *alias)
613 {
614   CHECK_HOST();
615
616   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
617 }
618
619 /** \ingroup msg_gos_functions
620  * \brief Test whether there is a pending communication on a channel.
621  *
622  * It takes one parameter.
623  * \param channel the channel on which the agent should be
624    listening. This value has to be >=0 and < than the maximal
625    number of channels fixed with MSG_set_channel_number().
626  * \return 1 if there is a pending communication and 0 otherwise
627  */
628 int MSG_task_Iprobe(m_channel_t channel)
629 {
630   xbt_assert1((channel >= 0)
631               && (channel < msg_global->max_channel), "Invalid channel %d",
632               channel);
633
634   CHECK_HOST();
635
636   return
637     !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
638                           (MSG_host_self(), channel));
639 }
640
641 /** \ingroup msg_gos_functions
642
643  * \brief Return the number of tasks waiting to be received on a \a
644    channel and sent by \a host.
645  *
646  * It takes two parameters.
647  * \param channel the channel on which the agent should be
648    listening. This value has to be >=0 and < than the maximal
649    number of channels fixed with MSG_set_channel_number().
650  * \param host the host that is to be watched.
651  * \return the number of tasks waiting to be received on \a channel
652    and sent by \a host.
653  */
654 int MSG_task_probe_from_host(int channel, m_host_t host)
655 {
656   xbt_assert1((channel >= 0)
657               && (channel < msg_global->max_channel), "Invalid channel %d",
658               channel);
659
660   CHECK_HOST();
661
662   return
663     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
664                                              (MSG_host_self(), channel),
665                                              host);
666
667 }
668
669 int MSG_task_listen_from_host(const char *alias, m_host_t host)
670 {
671   CHECK_HOST();
672
673   return
674     MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
675                                              host);
676 }
677
678 /** \ingroup msg_gos_functions
679  * \brief Test whether there is a pending communication on a channel, and who sent it.
680  *
681  * It takes one parameter.
682  * \param channel the channel on which the agent should be
683    listening. This value has to be >=0 and < than the maximal
684    number of channels fixed with MSG_set_channel_number().
685  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
686  */
687 int MSG_task_probe_from(m_channel_t channel)
688 {
689   m_task_t task;
690
691   CHECK_HOST();
692
693   xbt_assert1((channel >= 0)
694               && (channel < msg_global->max_channel), "Invalid channel %d",
695               channel);
696
697   if (NULL ==
698       (task =
699        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
700                             (MSG_host_self(), channel))))
701     return -1;
702
703   return MSG_process_get_PID(task->simdata->sender);
704 }
705
706 int MSG_task_listen_from(const char *alias)
707 {
708   m_task_t task;
709
710   CHECK_HOST();
711
712   if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
713     return -1;
714
715   return MSG_process_get_PID(task->simdata->sender);
716 }
717
718 /** \ingroup msg_gos_functions
719  * \brief Wait for at most \a max_duration second for a task reception
720    on \a channel.
721
722  * \a PID is updated with the PID of the first process that triggered this event if any.
723  *
724  * It takes three parameters:
725  * \param channel the channel on which the agent should be
726    listening. This value has to be >=0 and < than the maximal.
727    number of channels fixed with MSG_set_channel_number().
728  * \param PID a memory location for storing an int.
729  * \param timeout the maximum time to wait for a task before
730     giving up. In the case of a reception, *\a PID will be updated
731     with the PID of the first process to send a task.
732  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
733    and #MSG_OK otherwise.
734  */
735 MSG_error_t
736 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
737 {
738   m_host_t h = NULL;
739   simdata_host_t h_simdata = NULL;
740   m_task_t t;
741   int first_time = 1;
742   smx_cond_t cond;
743   msg_mailbox_t mailbox;
744
745   xbt_assert1((channel >= 0)
746               && (channel < msg_global->max_channel), "Invalid channel %d",
747               channel);
748
749   if (PID) {
750     *PID = -1;
751   }
752
753   if (timeout == 0.0) {
754     *PID = MSG_task_probe_from(channel);
755     MSG_RETURN(MSG_OK);
756   } else {
757     CHECK_HOST();
758     h = MSG_host_self();
759     h_simdata = h->simdata;
760
761     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
762
763     while (MSG_mailbox_is_empty(mailbox)) {
764       if (timeout > 0) {
765         if (!first_time) {
766           MSG_RETURN(MSG_OK);
767         }
768       }
769
770       SIMIX_mutex_lock(h_simdata->mutex);
771
772       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
773                   "A process is already blocked on this channel %d", channel);
774
775       cond = SIMIX_cond_init();
776
777       MSG_mailbox_set_cond(mailbox, cond);
778
779       if (timeout > 0) {
780         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
781       } else {
782         SIMIX_cond_wait(cond, h_simdata->mutex);
783       }
784
785       SIMIX_cond_destroy(cond);
786       SIMIX_mutex_unlock(h_simdata->mutex);
787
788       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
789         MSG_RETURN(MSG_HOST_FAILURE);
790       }
791
792       MSG_mailbox_set_cond(mailbox, NULL);
793       first_time = 0;
794     }
795
796     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
797       MSG_RETURN(MSG_OK);
798
799
800     if (PID) {
801       *PID = MSG_process_get_PID(t->simdata->sender);
802     }
803
804     MSG_RETURN(MSG_OK);
805   }
806 }
807
808
809 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
810 {
811   m_host_t h = NULL;
812   simdata_host_t h_simdata = NULL;
813   m_task_t t;
814   int first_time = 1;
815   smx_cond_t cond;
816   msg_mailbox_t mailbox;
817
818   if (PID) {
819     *PID = -1;
820   }
821
822   if (timeout == 0.0) {
823     *PID = MSG_task_listen_from(alias);
824     MSG_RETURN(MSG_OK);
825   } else {
826     CHECK_HOST();
827     h = MSG_host_self();
828     h_simdata = h->simdata;
829
830     DEBUG2("Probing on alias %s (%s)", alias, h->name);
831
832     mailbox = MSG_mailbox_get_by_alias(alias);
833
834     while (MSG_mailbox_is_empty(mailbox)) {
835       if (timeout > 0) {
836         if (!first_time) {
837           MSG_RETURN(MSG_OK);
838         }
839       }
840
841       SIMIX_mutex_lock(h_simdata->mutex);
842
843       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
844                   "A process is already blocked on this alias %s", alias);
845
846       cond = SIMIX_cond_init();
847
848       MSG_mailbox_set_cond(mailbox, cond);
849
850       if (timeout > 0) {
851         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
852       } else {
853         SIMIX_cond_wait(cond, h_simdata->mutex);
854       }
855
856       SIMIX_cond_destroy(cond);
857       SIMIX_mutex_unlock(h_simdata->mutex);
858
859       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
860         MSG_RETURN(MSG_HOST_FAILURE);
861       }
862
863       MSG_mailbox_set_cond(mailbox, NULL);
864       first_time = 0;
865     }
866
867     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
868       MSG_RETURN(MSG_OK);
869
870
871     if (PID) {
872       *PID = MSG_process_get_PID(t->simdata->sender);
873     }
874
875     MSG_RETURN(MSG_OK);
876   }
877 }