Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove remaining references to static version of libraries.
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "simix/private.h"
9 #include "xbt/sysdep.h"
10 #include "mc/mc.h"
11 #include "xbt/log.h"
12 #include "mailbox.h"
13
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
16                                 "Logging specific to MSG (gos)");
17
18 /** \ingroup msg_gos_functions
19  *
20  * \brief Return the last value returned by a MSG function (except
21  * MSG_get_errno...).
22  */
23 MSG_error_t MSG_get_errno(void)
24 {
25   return PROCESS_GET_ERRNO();
26 }
27
28 /** \ingroup msg_gos_functions
29  * \brief Executes a task and waits for its termination.
30  *
31  * This function is used for describing the behavior of an agent. It
32  * takes only one parameter.
33  * \param task a #m_task_t to execute on the location on which the
34  agent is running.
35  * \return #MSG_FATAL if \a task is not properly initialized and
36  * #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_execute(m_task_t task)
39 {
40   simdata_task_t simdata = NULL;
41   m_process_t self = MSG_process_self();
42   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
43   CHECK_HOST();
44
45   simdata = task->simdata;
46
47   xbt_assert0(simdata->host_nb == 0,
48               "This is a parallel task. Go to hell.");
49
50 #ifdef HAVE_TRACING
51   TRACE_msg_task_execute_start(task);
52 #endif
53
54   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
55               "This task is executed somewhere else. Go fix your code! %d",
56               task->simdata->refcount);
57
58   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
59
60   if (simdata->computation_amount == 0) {
61 #ifdef HAVE_TRACING
62     TRACE_msg_task_execute_end(task);
63 #endif
64     return MSG_OK;
65   }
66   simdata->refcount++;
67   SIMIX_mutex_lock(simdata->mutex);
68   simdata->compute =
69       SIMIX_action_execute(SIMIX_host_self(), task->name,
70                            simdata->computation_amount);
71   SIMIX_action_set_priority(simdata->compute, simdata->priority);
72
73   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
74   self->simdata->waiting_action = simdata->compute;
75   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
76   do {
77     SIMIX_cond_wait(simdata->cond, simdata->mutex);
78     state = SIMIX_action_get_state(simdata->compute);
79   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
80   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
81   self->simdata->waiting_action = NULL;
82
83   SIMIX_mutex_unlock(simdata->mutex);
84   simdata->refcount--;
85
86   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
87     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
88     SIMIX_action_destroy(task->simdata->compute);
89     simdata->computation_amount = 0.0;
90     simdata->comm = NULL;
91     simdata->compute = NULL;
92 #ifdef HAVE_TRACING
93     TRACE_msg_task_execute_end(task);
94 #endif
95     MSG_RETURN(MSG_OK);
96   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
97     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
98     SIMIX_action_destroy(task->simdata->compute);
99     simdata->comm = NULL;
100     simdata->compute = NULL;
101 #ifdef HAVE_TRACING
102     TRACE_msg_task_execute_end(task);
103 #endif
104     MSG_RETURN(MSG_HOST_FAILURE);
105   } else {
106     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
107     SIMIX_action_destroy(task->simdata->compute);
108     simdata->comm = NULL;
109     simdata->compute = NULL;
110 #ifdef HAVE_TRACING
111     TRACE_msg_task_execute_end(task);
112 #endif
113     MSG_RETURN(MSG_TASK_CANCELLED);
114   }
115 }
116
117 /** \ingroup m_task_management
118  * \brief Creates a new #m_task_t (a parallel one....).
119  *
120  * A constructor for #m_task_t taking six arguments and returning the
121  corresponding object.
122  * \param name a name for the object. It is for user-level information
123  and can be NULL.
124  * \param host_nb the number of hosts implied in the parallel task.
125  * \param host_list an array of \p host_nb m_host_t.
126  * \param computation_amount an array of \p host_nb
127  doubles. computation_amount[i] is the total number of operations
128  that have to be performed on host_list[i].
129  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
130  * \param data a pointer to any data may want to attach to the new
131  object.  It is for user-level information and can be NULL. It can
132  be retrieved with the function \ref MSG_task_get_data.
133  * \see m_task_t
134  * \return The new corresponding object.
135  */
136 m_task_t
137 MSG_parallel_task_create(const char *name, int host_nb,
138                          const m_host_t * host_list,
139                          double *computation_amount,
140                          double *communication_amount, void *data)
141 {
142   int i;
143   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
144   m_task_t task = xbt_new0(s_m_task_t, 1);
145   task->simdata = simdata;
146
147   /* Task structure */
148   task->name = xbt_strdup(name);
149   task->data = data;
150
151   /* Simulator Data */
152   simdata->computation_amount = 0;
153   simdata->message_size = 0;
154   simdata->cond = SIMIX_cond_init();
155   simdata->mutex = SIMIX_mutex_init();
156   simdata->compute = NULL;
157   simdata->comm = NULL;
158   simdata->rate = -1.0;
159   simdata->refcount = 1;
160   simdata->sender = NULL;
161   simdata->receiver = NULL;
162   simdata->source = NULL;
163
164   simdata->host_nb = host_nb;
165   simdata->host_list = xbt_new0(smx_host_t, host_nb);
166   simdata->comp_amount = computation_amount;
167   simdata->comm_amount = communication_amount;
168
169   for (i = 0; i < host_nb; i++)
170     simdata->host_list[i] = host_list[i]->simdata->smx_host;
171
172   return task;
173 }
174
175 MSG_error_t MSG_parallel_task_execute(m_task_t task)
176 {
177   simdata_task_t simdata = NULL;
178   m_process_t self = MSG_process_self();
179   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
180   CHECK_HOST();
181
182   simdata = task->simdata;
183
184   xbt_assert0((!simdata->compute)
185               && (task->simdata->refcount == 1),
186               "This task is executed somewhere else. Go fix your code!");
187
188   xbt_assert0(simdata->host_nb,
189               "This is not a parallel task. Go to hell.");
190
191   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
192
193   simdata->refcount++;
194
195   SIMIX_mutex_lock(simdata->mutex);
196   simdata->compute =
197       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
198                                     simdata->host_list,
199                                     simdata->comp_amount,
200                                     simdata->comm_amount, 1.0, -1.0);
201
202   self->simdata->waiting_action = simdata->compute;
203   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
204   do {
205     SIMIX_cond_wait(simdata->cond, simdata->mutex);
206     state = SIMIX_action_get_state(task->simdata->compute);
207   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
208
209   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
210   self->simdata->waiting_action = NULL;
211
212
213   SIMIX_mutex_unlock(simdata->mutex);
214   simdata->refcount--;
215
216   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
217     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
218     SIMIX_action_destroy(task->simdata->compute);
219     simdata->computation_amount = 0.0;
220     simdata->comm = NULL;
221     simdata->compute = NULL;
222     MSG_RETURN(MSG_OK);
223   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
224     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225     SIMIX_action_destroy(task->simdata->compute);
226     simdata->comm = NULL;
227     simdata->compute = NULL;
228     MSG_RETURN(MSG_HOST_FAILURE);
229   } else {
230     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
231     SIMIX_action_destroy(task->simdata->compute);
232     simdata->comm = NULL;
233     simdata->compute = NULL;
234     MSG_RETURN(MSG_TASK_CANCELLED);
235   }
236
237 }
238
239
240 /** \ingroup msg_gos_functions
241  * \brief Sleep for the specified number of seconds
242  *
243  * Makes the current process sleep until \a time seconds have elapsed.
244  *
245  * \param nb_sec a number of second
246  */
247 MSG_error_t MSG_process_sleep(double nb_sec)
248 {
249   smx_action_t act_sleep;
250   m_process_t proc = MSG_process_self();
251   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
252   smx_mutex_t mutex;
253   smx_cond_t cond;
254
255 #ifdef HAVE_TRACING
256   TRACE_msg_process_sleep_in(MSG_process_self());
257 #endif
258
259   /* create action to sleep */
260   act_sleep =
261       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
262                          nb_sec);
263
264   mutex = SIMIX_mutex_init();
265   SIMIX_mutex_lock(mutex);
266
267   /* create conditional and register action to it */
268   cond = SIMIX_cond_init();
269
270   proc->simdata->waiting_action = act_sleep;
271   SIMIX_register_action_to_condition(act_sleep, cond);
272   do {
273     SIMIX_cond_wait(cond, mutex);
274     state = SIMIX_action_get_state(act_sleep);
275   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
276   proc->simdata->waiting_action = NULL;
277   SIMIX_unregister_action_to_condition(act_sleep, cond);
278   SIMIX_mutex_unlock(mutex);
279
280   /* remove variables */
281   SIMIX_cond_destroy(cond);
282   SIMIX_mutex_destroy(mutex);
283
284   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
285     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
286       SIMIX_action_destroy(act_sleep);
287 #ifdef HAVE_TRACING
288       TRACE_msg_process_sleep_out(MSG_process_self());
289 #endif
290       MSG_RETURN(MSG_HOST_FAILURE);
291     }
292   } else {
293     SIMIX_action_destroy(act_sleep);
294 #ifdef HAVE_TRACING
295     TRACE_msg_process_sleep_out(MSG_process_self());
296 #endif
297     MSG_RETURN(MSG_HOST_FAILURE);
298   }
299
300   SIMIX_action_destroy(act_sleep);
301 #ifdef HAVE_TRACING
302   TRACE_msg_process_sleep_out(MSG_process_self());
303 #endif
304   MSG_RETURN(MSG_OK);
305 }
306
307 /** \ingroup msg_gos_functions
308  * \brief Listen on \a channel and waits for receiving a task from \a host.
309  *
310  * It takes three parameters.
311  * \param task a memory location for storing a #m_task_t. It will
312  hold a task when this function will return. Thus \a task should not
313  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
314  those two condition does not hold, there will be a warning message.
315  * \param channel the channel on which the agent should be
316  listening. This value has to be >=0 and < than the maximal
317  number of channels fixed with MSG_set_channel_number().
318  * \param host the host that is to be watched.
319  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
320  if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
321  */
322 MSG_error_t
323 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
324 {
325   return MSG_task_get_ext(task, channel, -1, host);
326 }
327
328 /** \ingroup msg_gos_functions
329  * \brief Listen on a channel and wait for receiving a task.
330  *
331  * It takes two parameters.
332  * \param task a memory location for storing a #m_task_t. It will
333  hold a task when this function will return. Thus \a task should not
334  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
335  those two condition does not hold, there will be a warning message.
336  * \param channel the channel on which the agent should be
337  listening. This value has to be >=0 and < than the maximal
338  number of channels fixed with MSG_set_channel_number().
339  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
340  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
341  */
342 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
343 {
344   return MSG_task_get_with_timeout(task, channel, -1);
345 }
346
347 /** \ingroup msg_gos_functions
348  * \brief Listen on a channel and wait for receiving a task with a timeout.
349  *
350  * It takes three parameters.
351  * \param task a memory location for storing a #m_task_t. It will
352  hold a task when this function will return. Thus \a task should not
353  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
354  those two condition does not hold, there will be a warning message.
355  * \param channel the channel on which the agent should be
356  listening. This value has to be >=0 and < than the maximal
357  number of channels fixed with MSG_set_channel_number().
358  * \param max_duration the maximum time to wait for a task before giving
359  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
360  will not be modified and will still be
361  equal to \c NULL when returning.
362  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
363  if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
364  */
365 MSG_error_t
366 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
367                           double max_duration)
368 {
369   return MSG_task_get_ext(task, channel, max_duration, NULL);
370 }
371
372 /** \defgroup msg_gos_functions MSG Operating System Functions
373  *  \brief This section describes the functions that can be used
374  *  by an agent for handling some task.
375  */
376
377 MSG_error_t
378 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
379                  m_host_t host)
380 {
381   xbt_assert1((channel >= 0)
382               && (channel < msg_global->max_channel), "Invalid channel %d",
383               channel);
384
385   return
386       MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
387                                (MSG_host_self(), channel), task, host,
388                                timeout);
389 }
390
391 MSG_error_t
392 MSG_task_receive_from_host(m_task_t * task, const char *alias,
393                            m_host_t host)
394 {
395   return MSG_task_receive_ext(task, alias, -1, host);
396 }
397
398 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
399 {
400   return MSG_task_receive_with_timeout(task, alias, -1);
401 }
402
403 MSG_error_t
404 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
405                               double timeout)
406 {
407   return MSG_task_receive_ext(task, alias, timeout, NULL);
408 }
409
410 MSG_error_t
411 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
412                      m_host_t host)
413 {
414   DEBUG1
415       ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
416        alias);
417   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
418                                   host, timeout);
419 }
420
421 /** \ingroup msg_gos_functions
422  * \brief Send a task on a channel.
423  *
424  * This function takes two parameter.
425  * \param task a #m_task_t to send on another location.
426  * \param alias the channel on which the agent should put this
427  task. This value has to be >=0 and < than the maximal number of
428  channels fixed with MSG_set_channel_number().
429  * \return the msg_comm_t communication.
430  */
431 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
432 {
433   simdata_task_t t_simdata = NULL;
434   m_process_t process = MSG_process_self();
435   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
436
437   CHECK_HOST();
438
439   /* FIXME: these functions are not tracable */
440
441   /* Prepare the task to send */
442   t_simdata = task->simdata;
443   t_simdata->sender = process;
444   t_simdata->source = MSG_host_self();
445
446   xbt_assert0(t_simdata->refcount == 1,
447               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
448
449   t_simdata->refcount++;
450   msg_global->sent_msg++;
451   process->simdata->waiting_task = task;
452
453   /* Send it by calling SIMIX network layer */
454
455   /* Kept for semantical compatibility with older implementation */
456   if (mailbox->cond)
457     SIMIX_cond_signal(mailbox->cond);
458
459   return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size,
460                              t_simdata->rate, task, sizeof(void *),
461                              &t_simdata->comm);
462 }
463
464 /** \ingroup msg_gos_functions
465  * \brief Listen on a channel for receiving a task from an asynchronous communication.
466  *
467  * It takes two parameters.
468  * \param task a memory location for storing a #m_task_t.
469  * \param alias the channel on which the agent should be
470  listening. This value has to be >=0 and < than the maximal
471  number of channels fixed with MSG_set_channel_number().
472  * \return the msg_comm_t communication.
473  */
474 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias)
475 {
476   smx_comm_t comm;
477   smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
478   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
479
480   CHECK_HOST();
481
482   /* FIXME: these functions are not tracable */
483
484   memset(&comm, 0, sizeof(comm));
485
486   /* Kept for compatibility with older implementation */
487   xbt_assert1(!MSG_mailbox_get_cond(mailbox),
488               "A process is already blocked on this channel %s",
489               MSG_mailbox_get_alias(mailbox));
490
491   /* Sanity check */
492   xbt_assert0(task, "Null pointer for the task storage");
493
494   if (*task)
495     CRITICAL0
496         ("MSG_task_get() was asked to write in a non empty task struct.");
497
498   /* Try to receive it by calling SIMIX network layer */
499   return SIMIX_network_irecv(rdv, task, NULL);
500 }
501
502 /** \ingroup msg_gos_functions
503  * \brief Test the status of a communication.
504  *
505  * It takes one parameter.
506  * \param comm the communication to test.
507  * \return the status of the communication:
508  *              TRUE : the communication is completed
509  *              FALSE: the communication is incompleted
510  * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
511  */
512 int MSG_comm_test(msg_comm_t comm)
513 {
514   return SIMIX_network_test(comm);
515 }
516
517 /** \ingroup msg_gos_functions
518  * \brief After received TRUE to MSG_comm_test(), the communication must be destroyed.
519  *
520  * It takes one parameter.
521  * \param comm the communication to destroy.
522  */
523 void MSG_comm_destroy(msg_comm_t comm)
524 {
525   if (!(comm->src_proc == SIMIX_process_self())) {
526     m_task_t task;
527     task = (m_task_t) SIMIX_communication_get_src_buf(comm);
528     task->simdata->refcount--;
529   }
530   SIMIX_communication_destroy(comm);
531 }
532
533 /** \ingroup msg_gos_functions
534  * \brief Wait for the completion of a communication.
535  *
536  * It takes two parameters.
537  * \param comm the communication to wait.
538  * \param timeout Wait until the communication terminates or the timeout occurs
539  * \return MSG_error_t
540  */
541 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
542 {
543   xbt_ex_t e;
544   MSG_error_t res = MSG_OK;
545   TRY {
546     SIMIX_network_wait(comm, timeout);
547
548     if (!(comm->src_proc == SIMIX_process_self())) {
549       m_task_t task;
550       task = (m_task_t) SIMIX_communication_get_src_buf(comm);
551       task->simdata->refcount--;
552     }
553
554     /* FIXME: these functions are not tracable */
555   }
556   CATCH(e) {
557     switch (e.category) {
558     case host_error:
559       res = MSG_HOST_FAILURE;
560       break;
561     case network_error:
562       res = MSG_TRANSFER_FAILURE;
563       break;
564     case timeout_error:
565       res = MSG_TIMEOUT;
566       break;
567     default:
568       xbt_die(bprintf("Unhandled SIMIX network exception: %s", e.msg));
569     }
570     xbt_ex_free(e);
571   }
572   return res;
573 }
574
575 /** \ingroup msg_gos_functions
576 * \brief This function is called by a sender and permit to wait for each communication
577 *
578 * It takes three parameters.
579 * \param comm a vector of communication
580 * \param nb_elem is the size of the comm vector
581 * \param timeout for each call of  MSG_comm_wait
582 */
583 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
584 {
585   int i = 0;
586   for (i = 0; i < nb_elem; i++) {
587     MSG_comm_wait(comm[i], timeout);
588   }
589 }
590
591 /** \ingroup msg_gos_functions
592 * \brief This function wait for the first completed communication
593 *
594 * It takes on parameter.
595 * \param comms a vector of communication
596 * \return the position of the completed communication from the xbt_dynar_t.
597 */
598 int MSG_comm_waitany(xbt_dynar_t comms)
599 {
600   return SIMIX_network_waitany(comms);
601 }
602
603 m_task_t MSG_comm_get_task(msg_comm_t comm)
604 {
605   xbt_assert0(comm, "Invalid parameters");
606   return (m_task_t) SIMIX_communication_get_src_buf(comm);
607 }
608
609 /** \ingroup msg_gos_functions
610  * \brief Put a task on a channel of an host and waits for the end of the
611  * transmission.
612  *
613  * This function is used for describing the behavior of an agent. It
614  * takes three parameter.
615  * \param task a #m_task_t to send on another location. This task
616  will not be usable anymore when the function will return. There is
617  no automatic task duplication and you have to save your parameters
618  before calling this function. Tasks are unique and once it has been
619  sent to another location, you should not access it anymore. You do
620  not need to call MSG_task_destroy() but to avoid using, as an
621  effect of inattention, this task anymore, you definitely should
622  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
623  can be transfered iff it has been correctly created with
624  MSG_task_create().
625  * \param dest the destination of the message
626  * \param channel the channel on which the agent should put this
627  task. This value has to be >=0 and < than the maximal number of
628  channels fixed with MSG_set_channel_number().
629  * \return #MSG_FATAL if \a task is not properly initialized and
630  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
631  * this function was called was shut down. Returns
632  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
633  * (network failure, dest failure)
634  */
635 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
636 {
637   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
638 }
639
640 /** \ingroup msg_gos_functions
641  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
642  * rate.
643  *
644  * \sa MSG_task_put
645  */
646 MSG_error_t
647 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
648                      double maxrate)
649 {
650   task->simdata->rate = maxrate;
651   return MSG_task_put(task, dest, channel);
652 }
653
654 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
655  * host (with a timeout on the waiting of the destination host) and
656  * waits for the end of the transmission.
657  *
658  * This function is used for describing the behavior of an agent. It
659  * takes four parameter.
660  * \param task a #m_task_t to send on another location. This task
661  will not be usable anymore when the function will return. There is
662  no automatic task duplication and you have to save your parameters
663  before calling this function. Tasks are unique and once it has been
664  sent to another location, you should not access it anymore. You do
665  not need to call MSG_task_destroy() but to avoid using, as an
666  effect of inattention, this task anymore, you definitely should
667  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
668  can be transfered iff it has been correctly created with
669  MSG_task_create().
670  * \param dest the destination of the message
671  * \param channel the channel on which the agent should put this
672  task. This value has to be >=0 and < than the maximal number of
673  channels fixed with MSG_set_channel_number().
674  * \param timeout the maximum time to wait for a task before giving
675  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
676  will not be modified
677  * \return #MSG_FATAL if \a task is not properly initialized and
678 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
679 this function was called was shut down. Returns
680 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
681 (network failure, dest failure, timeout...)
682  */
683 MSG_error_t
684 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
685                           m_channel_t channel, double timeout)
686 {
687   xbt_assert1((channel >= 0)
688               && (channel < msg_global->max_channel), "Invalid channel %d",
689               channel);
690
691   return
692       MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
693                                    (dest, channel), task, timeout);
694 }
695
696 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
697 {
698   DEBUG1("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
699   return MSG_task_send_with_timeout(task, alias, -1);
700 }
701
702
703 MSG_error_t
704 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
705 {
706   task->simdata->rate = maxrate;
707   return MSG_task_send(task, alias);
708 }
709
710
711 MSG_error_t
712 MSG_task_send_with_timeout(m_task_t task, const char *alias,
713                            double timeout)
714 {
715   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
716                                       task, timeout);
717 }
718
719 int MSG_task_listen(const char *alias)
720 {
721   CHECK_HOST();
722
723   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
724 }
725
726 /** \ingroup msg_gos_functions
727  * \brief Test whether there is a pending communication on a channel.
728  *
729  * It takes one parameter.
730  * \param channel the channel on which the agent should be
731  listening. This value has to be >=0 and < than the maximal
732  number of channels fixed with MSG_set_channel_number().
733  * \return 1 if there is a pending communication and 0 otherwise
734  */
735 int MSG_task_Iprobe(m_channel_t channel)
736 {
737   xbt_assert1((channel >= 0)
738               && (channel < msg_global->max_channel), "Invalid channel %d",
739               channel);
740
741   CHECK_HOST();
742
743   return
744       !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
745                             (MSG_host_self(), channel));
746 }
747
748 /** \ingroup msg_gos_functions
749
750  * \brief Return the number of tasks waiting to be received on a \a
751  channel and sent by \a host.
752  *
753  * It takes two parameters.
754  * \param channel the channel on which the agent should be
755  listening. This value has to be >=0 and < than the maximal
756  number of channels fixed with MSG_set_channel_number().
757  * \param host the host that is to be watched.
758  * \return the number of tasks waiting to be received on \a channel
759  and sent by \a host.
760  */
761 int MSG_task_probe_from_host(int channel, m_host_t host)
762 {
763   xbt_assert1((channel >= 0)
764               && (channel < msg_global->max_channel), "Invalid channel %d",
765               channel);
766
767   CHECK_HOST();
768
769   return
770       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
771                                                (MSG_host_self(), channel),
772                                                host);
773
774 }
775
776 int MSG_task_listen_from_host(const char *alias, m_host_t host)
777 {
778   CHECK_HOST();
779
780   return
781       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
782                                                (alias), host);
783 }
784
785 /** \ingroup msg_gos_functions
786  * \brief Test whether there is a pending communication on a channel, and who sent it.
787  *
788  * It takes one parameter.
789  * \param channel the channel on which the agent should be
790  listening. This value has to be >=0 and < than the maximal
791  number of channels fixed with MSG_set_channel_number().
792  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
793  */
794 int MSG_task_probe_from(m_channel_t channel)
795 {
796   m_task_t task;
797
798   CHECK_HOST();
799
800   xbt_assert1((channel >= 0)
801               && (channel < msg_global->max_channel), "Invalid channel %d",
802               channel);
803
804   if (NULL ==
805       (task =
806        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
807                             (MSG_host_self(), channel))))
808     return -1;
809
810   return MSG_process_get_PID(task->simdata->sender);
811 }
812
813 int MSG_task_listen_from(const char *alias)
814 {
815   m_task_t task;
816
817   CHECK_HOST();
818
819   if (NULL ==
820       (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
821     return -1;
822
823   return MSG_process_get_PID(task->simdata->sender);
824 }
825
826 /** \ingroup msg_gos_functions
827  * \brief Wait for at most \a max_duration second for a task reception
828  on \a channel.
829
830  * \a PID is updated with the PID of the first process that triggered this event if any.
831  *
832  * It takes three parameters:
833  * \param channel the channel on which the agent should be
834  listening. This value has to be >=0 and < than the maximal.
835  number of channels fixed with MSG_set_channel_number().
836  * \param PID a memory location for storing an int.
837  * \param timeout the maximum time to wait for a task before
838  giving up. In the case of a reception, *\a PID will be updated
839  with the PID of the first process to send a task.
840  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
841  and #MSG_OK otherwise.
842  */
843 MSG_error_t
844 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
845 {
846   m_host_t h = NULL;
847   simdata_host_t h_simdata = NULL;
848   m_task_t t;
849   int first_time = 1;
850   smx_cond_t cond;
851   msg_mailbox_t mailbox;
852
853   xbt_assert1((channel >= 0)
854               && (channel < msg_global->max_channel), "Invalid channel %d",
855               channel);
856
857   if (PID) {
858     *PID = -1;
859   }
860
861   if (timeout == 0.0) {
862     *PID = MSG_task_probe_from(channel);
863     MSG_RETURN(MSG_OK);
864   } else {
865     CHECK_HOST();
866     h = MSG_host_self();
867     h_simdata = h->simdata;
868
869     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
870
871     while (MSG_mailbox_is_empty(mailbox)) {
872       if (timeout > 0) {
873         if (!first_time) {
874           MSG_RETURN(MSG_OK);
875         }
876       }
877
878       SIMIX_mutex_lock(h_simdata->mutex);
879
880       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
881                   "A process is already blocked on this channel %d",
882                   channel);
883
884       cond = SIMIX_cond_init();
885
886       MSG_mailbox_set_cond(mailbox, cond);
887
888       if (timeout > 0) {
889         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
890       } else {
891         SIMIX_cond_wait(cond, h_simdata->mutex);
892       }
893
894       SIMIX_cond_destroy(cond);
895       SIMIX_mutex_unlock(h_simdata->mutex);
896
897       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
898         MSG_RETURN(MSG_HOST_FAILURE);
899       }
900
901       MSG_mailbox_set_cond(mailbox, NULL);
902       first_time = 0;
903     }
904
905     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
906       MSG_RETURN(MSG_OK);
907
908
909     if (PID) {
910       *PID = MSG_process_get_PID(t->simdata->sender);
911     }
912
913     MSG_RETURN(MSG_OK);
914   }
915 }
916
917
918 MSG_error_t MSG_alias_select_from(const char *alias, double timeout,
919                                   int *PID)
920 {
921   m_host_t h = NULL;
922   simdata_host_t h_simdata = NULL;
923   m_task_t t;
924   int first_time = 1;
925   smx_cond_t cond;
926   msg_mailbox_t mailbox;
927
928   if (PID) {
929     *PID = -1;
930   }
931
932   if (timeout == 0.0) {
933     *PID = MSG_task_listen_from(alias);
934     MSG_RETURN(MSG_OK);
935   } else {
936     CHECK_HOST();
937     h = MSG_host_self();
938     h_simdata = h->simdata;
939
940     DEBUG2("Probing on alias %s (%s)", alias, h->name);
941
942     mailbox = MSG_mailbox_get_by_alias(alias);
943
944     while (MSG_mailbox_is_empty(mailbox)) {
945       if (timeout > 0) {
946         if (!first_time) {
947           MSG_RETURN(MSG_OK);
948         }
949       }
950
951       SIMIX_mutex_lock(h_simdata->mutex);
952
953       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
954                   "A process is already blocked on this alias %s", alias);
955
956       cond = SIMIX_cond_init();
957
958       MSG_mailbox_set_cond(mailbox, cond);
959
960       if (timeout > 0) {
961         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
962       } else {
963         SIMIX_cond_wait(cond, h_simdata->mutex);
964       }
965
966       SIMIX_cond_destroy(cond);
967       SIMIX_mutex_unlock(h_simdata->mutex);
968
969       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
970         MSG_RETURN(MSG_HOST_FAILURE);
971       }
972
973       MSG_mailbox_set_cond(mailbox, NULL);
974       first_time = 0;
975     }
976
977     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
978       MSG_RETURN(MSG_OK);
979
980
981     if (PID) {
982       *PID = MSG_process_get_PID(t->simdata->sender);
983     }
984
985     MSG_RETURN(MSG_OK);
986   }
987 }