Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bug fix in MSG raised by Matthieu and Henri!
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \ingroup msg_gos_functions
20  *
21  * \brief Return the last value returned by a MSG function (except
22  * MSG_get_errno...).
23  */
24 MSG_error_t
25 MSG_get_errno(void)
26 {
27         return PROCESS_GET_ERRNO();
28 }
29
30 /** \ingroup msg_gos_functions
31  * \brief Executes a task and waits for its termination.
32  *
33  * This function is used for describing the behavior of an agent. It
34  * takes only one parameter.
35  * \param task a #m_task_t to execute on the location on which the
36    agent is running.
37  * \return #MSG_FATAL if \a task is not properly initialized and
38  * #MSG_OK otherwise.
39  */
40 MSG_error_t
41 MSG_task_execute(m_task_t task)
42 {
43         simdata_task_t simdata = NULL;
44         m_process_t self = MSG_process_self();
45         e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
46         CHECK_HOST();
47
48         simdata = task->simdata;
49         xbt_assert0((!simdata->compute) && (task->simdata->refcount  == 1),"This task is executed somewhere else. Go fix your code!");
50
51         DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
52
53         simdata->refcount ++;
54         SIMIX_mutex_lock(simdata->mutex);
55         simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
56         SIMIX_action_set_priority(simdata->compute, simdata->priority);
57
58         self->simdata->waiting_task = task;
59         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
60         do {
61           SIMIX_cond_wait(simdata->cond, simdata->mutex);
62           state = SIMIX_action_get_state(simdata->compute);
63         } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING);
64         SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
65         self->simdata->waiting_task = NULL;
66
67         SIMIX_mutex_unlock(simdata->mutex);
68         simdata->refcount --;
69
70         if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE)
71         {
72                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
73                 SIMIX_action_destroy(task->simdata->compute);
74                 simdata->computation_amount = 0.0;
75                 simdata->comm = NULL;
76                 simdata->compute = NULL;
77                 MSG_RETURN(MSG_OK);
78         }
79         else if(SIMIX_host_get_state(SIMIX_host_self()) == 0)
80         {
81                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
82                 SIMIX_action_destroy(task->simdata->compute);
83                 simdata->comm = NULL;
84                 simdata->compute = NULL;
85                 MSG_RETURN(MSG_HOST_FAILURE);
86         }
87         else
88         {
89                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
90                 SIMIX_action_destroy(task->simdata->compute);
91                 simdata->comm = NULL;
92                 simdata->compute = NULL;
93                 MSG_RETURN(MSG_TASK_CANCELLED);
94         }
95 }
96
97 /** \ingroup m_task_management
98  * \brief Creates a new #m_task_t (a parallel one....).
99  *
100  * A constructor for #m_task_t taking six arguments and returning the
101    corresponding object.
102  * \param name a name for the object. It is for user-level information
103    and can be NULL.
104  * \param host_nb the number of hosts implied in the parallel task.
105  * \param host_list an array of \p host_nb m_host_t.
106  * \param computation_amount an array of \p host_nb
107    doubles. computation_amount[i] is the total number of operations
108    that have to be performed on host_list[i].
109  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
110  * \param data a pointer to any data may want to attach to the new
111    object.  It is for user-level information and can be NULL. It can
112    be retrieved with the function \ref MSG_task_get_data.
113  * \see m_task_t
114  * \return The new corresponding object.
115  */
116 m_task_t
117 MSG_parallel_task_create(const char *name,int host_nb, const m_host_t * host_list, double *computation_amount, double *communication_amount, void *data)
118 {
119         int i;
120         simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
121         m_task_t task = xbt_new0(s_m_task_t, 1);
122         task->simdata = simdata;
123
124         /* Task structure */
125         task->name = xbt_strdup(name);
126         task->data = data;
127
128         /* Simulator Data */
129         simdata->computation_amount = 0;
130         simdata->message_size = 0;
131         simdata->cond = SIMIX_cond_init();
132         simdata->mutex = SIMIX_mutex_init();
133         simdata->compute = NULL;
134         simdata->comm = NULL;
135         simdata->rate = -1.0;
136         simdata->refcount  = 1;
137         simdata->sender = NULL;
138         simdata->receiver = NULL;
139         simdata->source = NULL;
140
141         simdata->host_nb = host_nb;
142         simdata->host_list = xbt_new0(smx_host_t, host_nb);
143         simdata->comp_amount = computation_amount;
144         simdata->comm_amount = communication_amount;
145
146         for (i = 0; i < host_nb; i++)
147                 simdata->host_list[i] = host_list[i]->simdata->smx_host;
148
149         return task;
150 }
151
152 MSG_error_t
153 MSG_parallel_task_execute(m_task_t task)
154 {
155         simdata_task_t simdata = NULL;
156         m_process_t self = MSG_process_self();
157         e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
158         CHECK_HOST();
159
160         simdata = task->simdata;
161
162         xbt_assert0((!simdata->compute) && (task->simdata->refcount  == 1),"This task is executed somewhere else. Go fix your code!");
163
164         xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
165
166         DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
167
168         simdata->refcount ++;
169
170         SIMIX_mutex_lock(simdata->mutex);
171         simdata->compute =
172         SIMIX_action_parallel_execute(task->name, simdata->host_nb, simdata->host_list, simdata->comp_amount, simdata->comm_amount, 1.0, -1.0);
173
174         self->simdata->waiting_task = task;
175         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
176         do {
177           SIMIX_cond_wait(simdata->cond, simdata->mutex);
178           state = SIMIX_action_get_state(task->simdata->compute);
179         } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING);
180
181         SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
182         self->simdata->waiting_task = NULL;
183
184
185         SIMIX_mutex_unlock(simdata->mutex);
186         simdata->refcount --;
187
188         if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE)
189         {
190                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
191                 SIMIX_action_destroy(task->simdata->compute);
192                 simdata->computation_amount = 0.0;
193                 simdata->comm = NULL;
194                 simdata->compute = NULL;
195                 MSG_RETURN(MSG_OK);
196         }
197         else if(SIMIX_host_get_state(SIMIX_host_self()) == 0)
198         {
199                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
200                 SIMIX_action_destroy(task->simdata->compute);
201                 simdata->comm = NULL;
202                 simdata->compute = NULL;
203                 MSG_RETURN(MSG_HOST_FAILURE);
204         }
205         else
206         {
207                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
208                 SIMIX_action_destroy(task->simdata->compute);
209                 simdata->comm = NULL;
210                 simdata->compute = NULL;
211                 MSG_RETURN(MSG_TASK_CANCELLED);
212         }
213
214 }
215
216
217 /** \ingroup msg_gos_functions
218  * \brief Sleep for the specified number of seconds
219  *
220  * Makes the current process sleep until \a time seconds have elapsed.
221  *
222  * \param nb_sec a number of second
223  */
224 MSG_error_t
225 MSG_process_sleep(double nb_sec)
226 {
227         smx_action_t act_sleep;
228         m_process_t proc = MSG_process_self();
229         e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
230         smx_mutex_t mutex;
231         smx_cond_t cond;
232
233         /* create action to sleep */
234         act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process), nb_sec);
235
236         mutex = SIMIX_mutex_init();
237         SIMIX_mutex_lock(mutex);
238
239         /* create conditional and register action to it */
240         cond = SIMIX_cond_init();
241
242         SIMIX_register_action_to_condition(act_sleep, cond);
243         do {
244           SIMIX_cond_wait(cond, mutex);
245           state = SIMIX_action_get_state(act_sleep);
246         } while(state==SURF_ACTION_READY || state==SURF_ACTION_RUNNING);
247         SIMIX_unregister_action_to_condition(act_sleep, cond);
248         SIMIX_mutex_unlock(mutex);
249
250         /* remove variables */
251         SIMIX_cond_destroy(cond);
252         SIMIX_mutex_destroy(mutex);
253
254         if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE)
255         {
256                 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF)
257                 {
258                         SIMIX_action_destroy(act_sleep);
259                         MSG_RETURN(MSG_HOST_FAILURE);
260                 }
261         }
262         else
263         {
264                 SIMIX_action_destroy(act_sleep);
265                 MSG_RETURN(MSG_HOST_FAILURE);
266         }
267
268         SIMIX_action_destroy(act_sleep);
269         MSG_RETURN(MSG_OK);
270 }
271
272 /** \ingroup msg_gos_functions
273  * \brief Return the number of MSG tasks currently running on
274  * the host of the current running process.
275  */
276 static int
277 MSG_get_msgload(void)
278 {
279         xbt_die("not implemented yet");
280         return 0;
281 }
282
283
284
285 /** \ingroup msg_gos_functions
286  * \brief Listen on \a channel and waits for receiving a task from \a host.
287  *
288  * It takes three parameters.
289  * \param task a memory location for storing a #m_task_t. It will
290    hold a task when this function will return. Thus \a task should not
291    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
292    those two condition does not hold, there will be a warning message.
293  * \param channel the channel on which the agent should be
294    listening. This value has to be >=0 and < than the maximal
295    number of channels fixed with MSG_set_channel_number().
296  * \param host the host that is to be watched.
297  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
298    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
299  */
300 MSG_error_t
301 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
302 {
303         return MSG_task_get_ext(task, channel, -1, host);
304 }
305
306 /** \ingroup msg_gos_functions
307  * \brief Listen on a channel and wait for receiving a task.
308  *
309  * It takes two parameters.
310  * \param task a memory location for storing a #m_task_t. It will
311    hold a task when this function will return. Thus \a task should not
312    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
313    those two condition does not hold, there will be a warning message.
314  * \param channel the channel on which the agent should be
315    listening. This value has to be >=0 and < than the maximal
316    number of channels fixed with MSG_set_channel_number().
317  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
318  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
319  */
320 MSG_error_t
321 MSG_task_get(m_task_t * task, m_channel_t channel)
322 {
323         return MSG_task_get_with_timeout(task, channel, -1);
324 }
325
326 /** \ingroup msg_gos_functions
327  * \brief Listen on a channel and wait for receiving a task with a timeout.
328  *
329  * It takes three parameters.
330  * \param task a memory location for storing a #m_task_t. It will
331    hold a task when this function will return. Thus \a task should not
332    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
333    those two condition does not hold, there will be a warning message.
334  * \param channel the channel on which the agent should be
335    listening. This value has to be >=0 and < than the maximal
336    number of channels fixed with MSG_set_channel_number().
337  * \param max_duration the maximum time to wait for a task before giving
338     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
339     will not be modified and will still be
340     equal to \c NULL when returning.
341  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
342    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
343  */
344 MSG_error_t
345 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel, double max_duration)
346 {
347   return MSG_task_get_ext(task, channel, max_duration, NULL);
348 }
349
350 /** \defgroup msg_gos_functions MSG Operating System Functions
351  *  \brief This section describes the functions that can be used
352  *  by an agent for handling some task.
353  */
354
355 MSG_error_t
356 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,m_host_t host)
357 {
358         xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d",channel);
359
360         return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel(MSG_host_self(), channel), task, host, timeout);
361 }
362
363 MSG_error_t
364 MSG_task_receive_from_host(m_task_t * task, const char* alias, m_host_t host)
365 {
366         return MSG_task_receive_ext(task, alias, -1, host);
367 }
368
369 MSG_error_t
370 MSG_task_receive(m_task_t * task, const char* alias)
371 {
372         return MSG_task_receive_with_timeout(task, alias, -1);
373 }
374
375 MSG_error_t
376 MSG_task_receive_with_timeout(m_task_t * task, const char* alias, double timeout)
377 {
378         return MSG_task_receive_ext(task, alias, timeout, NULL);
379 }
380
381 MSG_error_t
382 MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host)
383 {
384         return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host, timeout);
385 }
386
387
388 /** \ingroup msg_gos_functions
389  * \brief Put a task on a channel of an host and waits for the end of the
390  * transmission.
391  *
392  * This function is used for describing the behavior of an agent. It
393  * takes three parameter.
394  * \param task a #m_task_t to send on another location. This task
395    will not be usable anymore when the function will return. There is
396    no automatic task duplication and you have to save your parameters
397    before calling this function. Tasks are unique and once it has been
398    sent to another location, you should not access it anymore. You do
399    not need to call MSG_task_destroy() but to avoid using, as an
400    effect of inattention, this task anymore, you definitely should
401    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
402    can be transfered iff it has been correctly created with
403    MSG_task_create().
404  * \param dest the destination of the message
405  * \param channel the channel on which the agent should put this
406    task. This value has to be >=0 and < than the maximal number of
407    channels fixed with MSG_set_channel_number().
408  * \return #MSG_FATAL if \a task is not properly initialized and
409  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
410  * this function was called was shut down. Returns
411  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
412  * (network failure, dest failure)
413  */
414 MSG_error_t
415 MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
416 {
417         return MSG_task_put_with_timeout(task, dest, channel, -1.0);
418 }
419
420 /** \ingroup msg_gos_functions
421  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
422  * rate.
423  *
424  * \sa MSG_task_put
425  */
426 MSG_error_t
427 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel, double maxrate)
428 {
429         task->simdata->rate = maxrate;
430         return MSG_task_put(task, dest, channel);
431 }
432
433 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
434  * host (with a timeout on the waiting of the destination host) and
435  * waits for the end of the transmission.
436  *
437  * This function is used for describing the behavior of an agent. It
438  * takes four parameter.
439  * \param task a #m_task_t to send on another location. This task
440    will not be usable anymore when the function will return. There is
441    no automatic task duplication and you have to save your parameters
442    before calling this function. Tasks are unique and once it has been
443    sent to another location, you should not access it anymore. You do
444    not need to call MSG_task_destroy() but to avoid using, as an
445    effect of inattention, this task anymore, you definitely should
446    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
447    can be transfered iff it has been correctly created with
448    MSG_task_create().
449  * \param dest the destination of the message
450  * \param channel the channel on which the agent should put this
451    task. This value has to be >=0 and < than the maximal number of
452    channels fixed with MSG_set_channel_number().
453  * \param timeout the maximum time to wait for a task before giving
454     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
455     will not be modified
456  * \return #MSG_FATAL if \a task is not properly initialized and
457    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
458    this function was called was shut down. Returns
459    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
460    (network failure, dest failure, timeout...)
461  */
462 MSG_error_t
463 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel, double timeout)
464 {
465         xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel);
466
467         return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel), task, timeout);
468 }
469
470 MSG_error_t
471 MSG_task_send(m_task_t task,const char* alias)
472 {
473         return MSG_task_send_with_timeout(task, alias, -1);
474 }
475
476
477 MSG_error_t
478 MSG_task_send_bounded(m_task_t task, const char* alias, double maxrate)
479 {
480         task->simdata->rate = maxrate;
481         return MSG_task_send(task, alias);
482 }
483
484
485 MSG_error_t
486 MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout)
487 {
488         return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task, timeout);
489 }
490
491 int
492 MSG_task_listen(const char* alias)
493 {
494         CHECK_HOST();
495
496         return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
497 }
498
499 /** \ingroup msg_gos_functions
500  * \brief Test whether there is a pending communication on a channel.
501  *
502  * It takes one parameter.
503  * \param channel the channel on which the agent should be
504    listening. This value has to be >=0 and < than the maximal
505    number of channels fixed with MSG_set_channel_number().
506  * \return 1 if there is a pending communication and 0 otherwise
507  */
508 int
509 MSG_task_Iprobe(m_channel_t channel)
510 {
511   xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel);
512
513   CHECK_HOST();
514
515   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel(MSG_host_self(), channel));
516 }
517
518 /** \ingroup msg_gos_functions
519
520  * \brief Return the number of tasks waiting to be received on a \a
521    channel and sent by \a host.
522  *
523  * It takes two parameters.
524  * \param channel the channel on which the agent should be
525    listening. This value has to be >=0 and < than the maximal
526    number of channels fixed with MSG_set_channel_number().
527  * \param host the host that is to be watched.
528  * \return the number of tasks waiting to be received on \a channel
529    and sent by \a host.
530  */
531 int
532 MSG_task_probe_from_host(int channel, m_host_t host)
533 {
534         xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel);
535
536         CHECK_HOST();
537
538         return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel(MSG_host_self(), channel),host);
539
540 }
541
542 int
543 MSG_task_listen_from_host(const char* alias, m_host_t host)
544 {
545         CHECK_HOST();
546
547         return MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),host);
548 }
549
550 /** \ingroup msg_gos_functions
551  * \brief Test whether there is a pending communication on a channel, and who sent it.
552  *
553  * It takes one parameter.
554  * \param channel the channel on which the agent should be
555    listening. This value has to be >=0 and < than the maximal
556    number of channels fixed with MSG_set_channel_number().
557  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
558  */
559 int
560 MSG_task_probe_from(m_channel_t channel)
561 {
562         m_task_t task;
563
564         CHECK_HOST();
565
566         xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d", channel);
567
568         if(NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_channel(MSG_host_self(), channel))))
569                 return -1;
570
571         return MSG_process_get_PID(task->simdata->sender);
572 }
573
574 int
575 MSG_task_listen_from(const char* alias)
576 {
577         m_task_t task;
578
579         CHECK_HOST();
580
581         if(NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
582                 return -1;
583
584         return MSG_process_get_PID(task->simdata->sender);
585 }
586
587 /** \ingroup msg_gos_functions
588  * \brief Wait for at most \a max_duration second for a task reception
589    on \a channel.
590
591  * \a PID is updated with the PID of the first process that triggered this event if any.
592  *
593  * It takes three parameters:
594  * \param channel the channel on which the agent should be
595    listening. This value has to be >=0 and < than the maximal.
596    number of channels fixed with MSG_set_channel_number().
597  * \param PID a memory location for storing an int.
598  * \param timeout the maximum time to wait for a task before
599     giving up. In the case of a reception, *\a PID will be updated
600     with the PID of the first process to send a task.
601  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
602    and #MSG_OK otherwise.
603  */
604 MSG_error_t
605 MSG_channel_select_from(m_channel_t channel,double timeout, int *PID)
606 {
607         m_host_t h = NULL;
608         simdata_host_t h_simdata = NULL;
609         m_task_t t;
610         int first_time = 1;
611         smx_cond_t cond;
612         msg_mailbox_t mailbox;
613
614         xbt_assert1((channel >= 0) && (channel < msg_global->max_channel), "Invalid channel %d",channel);
615
616         if(PID)
617         {
618                 *PID = -1;
619         }
620
621         if (timeout == 0.0)
622         {
623                 *PID = MSG_task_probe_from(channel);
624                 MSG_RETURN(MSG_OK);
625         }
626         else
627         {
628                 CHECK_HOST();
629                 h = MSG_host_self();
630                 h_simdata = h->simdata;
631
632                 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
633
634                 while(MSG_mailbox_is_empty(mailbox))
635                 {
636                         if(timeout > 0)
637                         {
638                                 if (!first_time)
639                                 {
640                                         MSG_RETURN(MSG_OK);
641                                 }
642                         }
643
644                         SIMIX_mutex_lock(h_simdata->mutex);
645
646                         xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this channel %d",channel);
647
648                         cond = SIMIX_cond_init();
649
650                         MSG_mailbox_set_cond(mailbox, cond);
651
652                         if (timeout > 0)
653                         {
654                                 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
655                         }
656                         else
657                         {
658                                 SIMIX_cond_wait(cond, h_simdata->mutex);
659                         }
660
661                         SIMIX_cond_destroy(cond);
662                         SIMIX_mutex_unlock(h_simdata->mutex);
663
664                         if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
665                         {
666                                 MSG_RETURN(MSG_HOST_FAILURE);
667                         }
668
669                         MSG_mailbox_set_cond(mailbox,NULL);
670                         first_time = 0;
671                 }
672
673                 if(NULL == (t = MSG_mailbox_get_head(mailbox)))
674                         MSG_RETURN(MSG_OK);
675
676
677                 if (PID)
678                 {
679                         *PID = MSG_process_get_PID(t->simdata->sender);
680                 }
681
682                 MSG_RETURN(MSG_OK);
683         }
684 }
685
686
687 MSG_error_t
688 MSG_alias_select_from(const char* alias, double timeout, int* PID)
689 {
690         m_host_t h = NULL;
691         simdata_host_t h_simdata = NULL;
692         m_task_t t;
693         int first_time = 1;
694         smx_cond_t cond;
695         msg_mailbox_t mailbox;
696
697         if (PID)
698         {
699                 *PID = -1;
700         }
701
702         if(timeout == 0.0)
703         {
704                 *PID = MSG_task_listen_from(alias);
705                 MSG_RETURN(MSG_OK);
706         }
707         else
708         {
709                 CHECK_HOST();
710                 h = MSG_host_self();
711                 h_simdata = h->simdata;
712
713                 DEBUG2("Probing on alias %s (%s)", alias, h->name);
714
715                 mailbox = MSG_mailbox_get_by_alias(alias);
716
717                 while(MSG_mailbox_is_empty(mailbox))
718                 {
719                         if(timeout > 0)
720                         {
721                                 if (!first_time)
722                                 {
723                                         MSG_RETURN(MSG_OK);
724                                 }
725                         }
726
727                         SIMIX_mutex_lock(h_simdata->mutex);
728
729                         xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias);
730
731                         cond = SIMIX_cond_init();
732
733                         MSG_mailbox_set_cond(mailbox, cond);
734
735                         if (timeout > 0)
736                         {
737                                 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
738                         }
739                         else
740                         {
741                                 SIMIX_cond_wait(cond, h_simdata->mutex);
742                         }
743
744                         SIMIX_cond_destroy(cond);
745                         SIMIX_mutex_unlock(h_simdata->mutex);
746
747                         if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
748                         {
749                                 MSG_RETURN(MSG_HOST_FAILURE);
750                         }
751
752                         MSG_mailbox_set_cond(mailbox,NULL);
753                         first_time = 0;
754                 }
755
756                 if(NULL == (t = MSG_mailbox_get_head(mailbox)))
757                         MSG_RETURN(MSG_OK);
758
759
760                 if (PID)
761                 {
762                         *PID = MSG_process_get_PID(t->simdata->sender);
763                 }
764
765                 MSG_RETURN(MSG_OK);
766         }
767 }
768
769
770
771
772