Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix use after free when using SIMIX_network_wait().
[simgrid.git] / src / msg / gos.c
1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "msg/private.h"
8 #include "simix/private.h"
9 #include "xbt/sysdep.h"
10 #include "mc/mc.h"
11 #include "xbt/log.h"
12 #include "mailbox.h"
13
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
16                                 "Logging specific to MSG (gos)");
17
18 /** \ingroup msg_gos_functions
19  *
20  * \brief Return the last value returned by a MSG function (except
21  * MSG_get_errno...).
22  */
23 MSG_error_t MSG_get_errno(void)
24 {
25   return PROCESS_GET_ERRNO();
26 }
27
28 /** \ingroup msg_gos_functions
29  * \brief Executes a task and waits for its termination.
30  *
31  * This function is used for describing the behavior of an agent. It
32  * takes only one parameter.
33  * \param task a #m_task_t to execute on the location on which the
34  agent is running.
35  * \return #MSG_FATAL if \a task is not properly initialized and
36  * #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_execute(m_task_t task)
39 {
40   simdata_task_t simdata = NULL;
41   m_process_t self = MSG_process_self();
42   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
43   CHECK_HOST();
44
45   simdata = task->simdata;
46
47   xbt_assert0(simdata->host_nb == 0,
48               "This is a parallel task. Go to hell.");
49
50 #ifdef HAVE_TRACING
51   TRACE_msg_task_execute_start(task);
52 #endif
53
54   xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
55               "This task is executed somewhere else. Go fix your code! %d",
56               task->simdata->refcount);
57
58   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
59
60   if (simdata->computation_amount == 0) {
61 #ifdef HAVE_TRACING
62     TRACE_msg_task_execute_end(task);
63 #endif
64     return MSG_OK;
65   }
66   simdata->refcount++;
67   SIMIX_mutex_lock(simdata->mutex);
68   simdata->compute =
69       SIMIX_action_execute(SIMIX_host_self(), task->name,
70                            simdata->computation_amount);
71   SIMIX_action_set_priority(simdata->compute, simdata->priority);
72
73   /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
74   self->simdata->waiting_action = simdata->compute;
75   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
76   do {
77     SIMIX_cond_wait(simdata->cond, simdata->mutex);
78     state = SIMIX_action_get_state(simdata->compute);
79   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
80   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
81   self->simdata->waiting_action = NULL;
82
83   SIMIX_mutex_unlock(simdata->mutex);
84   simdata->refcount--;
85
86   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
87     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
88     SIMIX_action_destroy(task->simdata->compute);
89     simdata->computation_amount = 0.0;
90     simdata->comm = NULL;
91     simdata->compute = NULL;
92 #ifdef HAVE_TRACING
93     TRACE_msg_task_execute_end(task);
94 #endif
95     MSG_RETURN(MSG_OK);
96   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
97     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
98     SIMIX_action_destroy(task->simdata->compute);
99     simdata->comm = NULL;
100     simdata->compute = NULL;
101 #ifdef HAVE_TRACING
102     TRACE_msg_task_execute_end(task);
103 #endif
104     MSG_RETURN(MSG_HOST_FAILURE);
105   } else {
106     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
107     SIMIX_action_destroy(task->simdata->compute);
108     simdata->comm = NULL;
109     simdata->compute = NULL;
110 #ifdef HAVE_TRACING
111     TRACE_msg_task_execute_end(task);
112 #endif
113     MSG_RETURN(MSG_TASK_CANCELLED);
114   }
115 }
116
117 /** \ingroup m_task_management
118  * \brief Creates a new #m_task_t (a parallel one....).
119  *
120  * A constructor for #m_task_t taking six arguments and returning the
121  corresponding object.
122  * \param name a name for the object. It is for user-level information
123  and can be NULL.
124  * \param host_nb the number of hosts implied in the parallel task.
125  * \param host_list an array of \p host_nb m_host_t.
126  * \param computation_amount an array of \p host_nb
127  doubles. computation_amount[i] is the total number of operations
128  that have to be performed on host_list[i].
129  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
130  * \param data a pointer to any data may want to attach to the new
131  object.  It is for user-level information and can be NULL. It can
132  be retrieved with the function \ref MSG_task_get_data.
133  * \see m_task_t
134  * \return The new corresponding object.
135  */
136 m_task_t
137 MSG_parallel_task_create(const char *name, int host_nb,
138                          const m_host_t * host_list,
139                          double *computation_amount,
140                          double *communication_amount, void *data)
141 {
142   int i;
143   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
144   m_task_t task = xbt_new0(s_m_task_t, 1);
145   task->simdata = simdata;
146
147   /* Task structure */
148   task->name = xbt_strdup(name);
149   task->data = data;
150
151   /* Simulator Data */
152   simdata->computation_amount = 0;
153   simdata->message_size = 0;
154   simdata->cond = SIMIX_cond_init();
155   simdata->mutex = SIMIX_mutex_init();
156   simdata->compute = NULL;
157   simdata->comm = NULL;
158   simdata->rate = -1.0;
159   simdata->refcount = 1;
160   simdata->sender = NULL;
161   simdata->receiver = NULL;
162   simdata->source = NULL;
163
164   simdata->host_nb = host_nb;
165   simdata->host_list = xbt_new0(smx_host_t, host_nb);
166   simdata->comp_amount = computation_amount;
167   simdata->comm_amount = communication_amount;
168
169   for (i = 0; i < host_nb; i++)
170     simdata->host_list[i] = host_list[i]->simdata->smx_host;
171
172   return task;
173 }
174
175 MSG_error_t MSG_parallel_task_execute(m_task_t task)
176 {
177   simdata_task_t simdata = NULL;
178   m_process_t self = MSG_process_self();
179   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
180   CHECK_HOST();
181
182   simdata = task->simdata;
183
184   xbt_assert0((!simdata->compute)
185               && (task->simdata->refcount == 1),
186               "This task is executed somewhere else. Go fix your code!");
187
188   xbt_assert0(simdata->host_nb,
189               "This is not a parallel task. Go to hell.");
190
191   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
192
193   simdata->refcount++;
194
195   SIMIX_mutex_lock(simdata->mutex);
196   simdata->compute =
197       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
198                                     simdata->host_list,
199                                     simdata->comp_amount,
200                                     simdata->comm_amount, 1.0, -1.0);
201
202   self->simdata->waiting_action = simdata->compute;
203   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
204   do {
205     SIMIX_cond_wait(simdata->cond, simdata->mutex);
206     state = SIMIX_action_get_state(task->simdata->compute);
207   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
208
209   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
210   self->simdata->waiting_action = NULL;
211
212
213   SIMIX_mutex_unlock(simdata->mutex);
214   simdata->refcount--;
215
216   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
217     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
218     SIMIX_action_destroy(task->simdata->compute);
219     simdata->computation_amount = 0.0;
220     simdata->comm = NULL;
221     simdata->compute = NULL;
222     MSG_RETURN(MSG_OK);
223   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
224     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225     SIMIX_action_destroy(task->simdata->compute);
226     simdata->comm = NULL;
227     simdata->compute = NULL;
228     MSG_RETURN(MSG_HOST_FAILURE);
229   } else {
230     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
231     SIMIX_action_destroy(task->simdata->compute);
232     simdata->comm = NULL;
233     simdata->compute = NULL;
234     MSG_RETURN(MSG_TASK_CANCELLED);
235   }
236
237 }
238
239
240 /** \ingroup msg_gos_functions
241  * \brief Sleep for the specified number of seconds
242  *
243  * Makes the current process sleep until \a time seconds have elapsed.
244  *
245  * \param nb_sec a number of second
246  */
247 MSG_error_t MSG_process_sleep(double nb_sec)
248 {
249   smx_action_t act_sleep;
250   m_process_t proc = MSG_process_self();
251   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
252   smx_mutex_t mutex;
253   smx_cond_t cond;
254
255 #ifdef HAVE_TRACING
256   TRACE_msg_process_sleep_in(MSG_process_self());
257 #endif
258
259   /* create action to sleep */
260   act_sleep =
261       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
262                          nb_sec);
263
264   mutex = SIMIX_mutex_init();
265   SIMIX_mutex_lock(mutex);
266
267   /* create conditional and register action to it */
268   cond = SIMIX_cond_init();
269
270   proc->simdata->waiting_action = act_sleep;
271   SIMIX_register_action_to_condition(act_sleep, cond);
272   do {
273     SIMIX_cond_wait(cond, mutex);
274     state = SIMIX_action_get_state(act_sleep);
275   } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
276   proc->simdata->waiting_action = NULL;
277   SIMIX_unregister_action_to_condition(act_sleep, cond);
278   SIMIX_mutex_unlock(mutex);
279
280   /* remove variables */
281   SIMIX_cond_destroy(cond);
282   SIMIX_mutex_destroy(mutex);
283
284   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
285     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
286       SIMIX_action_destroy(act_sleep);
287 #ifdef HAVE_TRACING
288       TRACE_msg_process_sleep_out(MSG_process_self());
289 #endif
290       MSG_RETURN(MSG_HOST_FAILURE);
291     }
292   } else {
293     SIMIX_action_destroy(act_sleep);
294 #ifdef HAVE_TRACING
295     TRACE_msg_process_sleep_out(MSG_process_self());
296 #endif
297     MSG_RETURN(MSG_HOST_FAILURE);
298   }
299
300   SIMIX_action_destroy(act_sleep);
301 #ifdef HAVE_TRACING
302   TRACE_msg_process_sleep_out(MSG_process_self());
303 #endif
304   MSG_RETURN(MSG_OK);
305 }
306
307 /** \ingroup msg_gos_functions
308  * \brief Listen on \a channel and waits for receiving a task from \a host.
309  *
310  * It takes three parameters.
311  * \param task a memory location for storing a #m_task_t. It will
312  hold a task when this function will return. Thus \a task should not
313  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
314  those two condition does not hold, there will be a warning message.
315  * \param channel the channel on which the agent should be
316  listening. This value has to be >=0 and < than the maximal
317  number of channels fixed with MSG_set_channel_number().
318  * \param host the host that is to be watched.
319  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
320  if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
321  */
322 MSG_error_t
323 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
324 {
325   return MSG_task_get_ext(task, channel, -1, host);
326 }
327
328 /** \ingroup msg_gos_functions
329  * \brief Listen on a channel and wait for receiving a task.
330  *
331  * It takes two parameters.
332  * \param task a memory location for storing a #m_task_t. It will
333  hold a task when this function will return. Thus \a task should not
334  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
335  those two condition does not hold, there will be a warning message.
336  * \param channel the channel on which the agent should be
337  listening. This value has to be >=0 and < than the maximal
338  number of channels fixed with MSG_set_channel_number().
339  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
340  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
341  */
342 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
343 {
344   return MSG_task_get_with_timeout(task, channel, -1);
345 }
346
347 /** \ingroup msg_gos_functions
348  * \brief Listen on a channel and wait for receiving a task with a timeout.
349  *
350  * It takes three parameters.
351  * \param task a memory location for storing a #m_task_t. It will
352  hold a task when this function will return. Thus \a task should not
353  be equal to \c NULL and \a *task should be equal to \c NULL. If one of
354  those two condition does not hold, there will be a warning message.
355  * \param channel the channel on which the agent should be
356  listening. This value has to be >=0 and < than the maximal
357  number of channels fixed with MSG_set_channel_number().
358  * \param max_duration the maximum time to wait for a task before giving
359  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
360  will not be modified and will still be
361  equal to \c NULL when returning.
362  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
363  if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
364  */
365 MSG_error_t
366 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
367                           double max_duration)
368 {
369   return MSG_task_get_ext(task, channel, max_duration, NULL);
370 }
371
372 /** \defgroup msg_gos_functions MSG Operating System Functions
373  *  \brief This section describes the functions that can be used
374  *  by an agent for handling some task.
375  */
376
377 MSG_error_t
378 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
379                  m_host_t host)
380 {
381   xbt_assert1((channel >= 0)
382               && (channel < msg_global->max_channel), "Invalid channel %d",
383               channel);
384
385   return
386       MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
387                                (MSG_host_self(), channel), task, host,
388                                timeout);
389 }
390
391 MSG_error_t
392 MSG_task_receive_from_host(m_task_t * task, const char *alias,
393                            m_host_t host)
394 {
395   return MSG_task_receive_ext(task, alias, -1, host);
396 }
397
398 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
399 {
400   return MSG_task_receive_with_timeout(task, alias, -1);
401 }
402
403 MSG_error_t
404 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
405                               double timeout)
406 {
407   return MSG_task_receive_ext(task, alias, timeout, NULL);
408 }
409
410 MSG_error_t
411 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
412                      m_host_t host)
413 {
414   DEBUG1
415       ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
416        alias);
417   return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
418                                   host, timeout);
419 }
420
421 /** \ingroup msg_gos_functions
422  * \brief Send a task on a channel.
423  *
424  * This function takes two parameter.
425  * \param task a #m_task_t to send on another location.
426  * \param alias the channel on which the agent should put this
427  task. This value has to be >=0 and < than the maximal number of
428  channels fixed with MSG_set_channel_number().
429  * \return the msg_comm_t communication.
430  */
431 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
432 {
433   simdata_task_t t_simdata = NULL;
434   m_process_t process = MSG_process_self();
435   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
436
437   CHECK_HOST();
438
439   /* FIXME: these functions are not tracable */
440
441   /* Prepare the task to send */
442   t_simdata = task->simdata;
443   t_simdata->sender = process;
444   t_simdata->source = MSG_host_self();
445
446   xbt_assert0(t_simdata->refcount == 1,
447               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
448
449   t_simdata->refcount++;
450   msg_global->sent_msg++;
451   process->simdata->waiting_task = task;
452
453   /* Send it by calling SIMIX network layer */
454
455   /* Kept for semantical compatibility with older implementation */
456   if (mailbox->cond)
457     SIMIX_cond_signal(mailbox->cond);
458
459   return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size,
460                              t_simdata->rate, task, sizeof(void *),
461                              &t_simdata->comm);
462 }
463
464 /** \ingroup msg_gos_functions
465  * \brief Listen on a channel for receiving a task from an asynchronous communication.
466  *
467  * It takes two parameters.
468  * \param task a memory location for storing a #m_task_t.
469  * \param alias the channel on which the agent should be
470  listening. This value has to be >=0 and < than the maximal
471  number of channels fixed with MSG_set_channel_number().
472  * \return the msg_comm_t communication.
473  */
474 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias)
475 {
476   smx_comm_t comm;
477   smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
478   msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
479
480   CHECK_HOST();
481
482   /* FIXME: these functions are not tracable */
483
484   memset(&comm, 0, sizeof(comm));
485
486   /* Kept for compatibility with older implementation */
487   xbt_assert1(!MSG_mailbox_get_cond(mailbox),
488               "A process is already blocked on this channel %s",
489               MSG_mailbox_get_alias(mailbox));
490
491   /* Sanity check */
492   xbt_assert0(task, "Null pointer for the task storage");
493
494   if (*task)
495     CRITICAL0
496         ("MSG_task_get() was asked to write in a non empty task struct.");
497
498   /* Try to receive it by calling SIMIX network layer */
499   return SIMIX_network_irecv(rdv, task, NULL);
500 }
501
502 /** \ingroup msg_gos_functions
503  * \brief Test the status of a communication.
504  *
505  * It takes one parameter.
506  * \param comm the communication to test.
507  * \return the status of the communication:
508  *              TRUE : the communication is completed
509  *              FALSE: the communication is incompleted
510  * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
511  */
512 int MSG_comm_test(msg_comm_t comm)
513 {
514   return SIMIX_network_test(comm);
515 }
516
517 /** \ingroup msg_gos_functions
518  * \brief After received TRUE to MSG_comm_test(), the communication must be destroyed.
519  *
520  * It takes one parameter.
521  * \param comm the communication to destroy.
522  */
523 void MSG_comm_destroy(msg_comm_t comm)
524 {
525   if (!(comm->src_proc == SIMIX_process_self())) {
526     m_task_t task;
527     task = (m_task_t) SIMIX_communication_get_src_buf(comm);
528     task->simdata->refcount--;
529   }
530   SIMIX_communication_destroy(comm);
531 }
532
533 /** \ingroup msg_gos_functions
534  * \brief Wait for the completion of a communication.
535  *
536  * It takes two parameters.
537  * \param comm the communication to wait.
538  * \param timeout Wait until the communication terminates or the timeout occurs
539  * \return MSG_error_t
540  */
541 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
542 {
543   xbt_ex_t e;
544   MSG_error_t res = MSG_OK;
545   TRY {
546     SIMIX_network_wait(comm, timeout);
547
548     if (!(comm->src_proc == SIMIX_process_self())) {
549       m_task_t task;
550       task = (m_task_t) SIMIX_communication_get_src_buf(comm);
551       task->simdata->refcount--;
552     }
553     SIMIX_communication_destroy(comm);
554
555     /* FIXME: these functions are not tracable */
556   }
557   CATCH(e) {
558     switch (e.category) {
559     case host_error:
560       res = MSG_HOST_FAILURE;
561       break;
562     case network_error:
563       res = MSG_TRANSFER_FAILURE;
564       break;
565     case timeout_error:
566       res = MSG_TIMEOUT;
567       break;
568     default:
569       xbt_die(bprintf("Unhandled SIMIX network exception: %s", e.msg));
570     }
571     xbt_ex_free(e);
572   }
573   return res;
574 }
575
576 /** \ingroup msg_gos_functions
577 * \brief This function is called by a sender and permit to wait for each communication
578 *
579 * It takes three parameters.
580 * \param comm a vector of communication
581 * \param nb_elem is the size of the comm vector
582 * \param timeout for each call of  MSG_comm_wait
583 */
584 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
585 {
586   int i = 0;
587   for (i = 0; i < nb_elem; i++) {
588     MSG_comm_wait(comm[i], timeout);
589   }
590 }
591
592 /** \ingroup msg_gos_functions
593 * \brief This function wait for the first completed communication
594 *
595 * It takes on parameter.
596 * \param comms a vector of communication
597 * \return the position of the completed communication from the xbt_dynar_t.
598 */
599 int MSG_comm_waitany(xbt_dynar_t comms)
600 {
601   return SIMIX_network_waitany(comms);
602 }
603
604 m_task_t MSG_comm_get_task(msg_comm_t comm)
605 {
606   xbt_assert0(comm, "Invalid parameters");
607   return (m_task_t) SIMIX_communication_get_src_buf(comm);
608 }
609
610 /** \ingroup msg_gos_functions
611  * \brief Put a task on a channel of an host and waits for the end of the
612  * transmission.
613  *
614  * This function is used for describing the behavior of an agent. It
615  * takes three parameter.
616  * \param task a #m_task_t to send on another location. This task
617  will not be usable anymore when the function will return. There is
618  no automatic task duplication and you have to save your parameters
619  before calling this function. Tasks are unique and once it has been
620  sent to another location, you should not access it anymore. You do
621  not need to call MSG_task_destroy() but to avoid using, as an
622  effect of inattention, this task anymore, you definitely should
623  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
624  can be transfered iff it has been correctly created with
625  MSG_task_create().
626  * \param dest the destination of the message
627  * \param channel the channel on which the agent should put this
628  task. This value has to be >=0 and < than the maximal number of
629  channels fixed with MSG_set_channel_number().
630  * \return #MSG_FATAL if \a task is not properly initialized and
631  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
632  * this function was called was shut down. Returns
633  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
634  * (network failure, dest failure)
635  */
636 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
637 {
638   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
639 }
640
641 /** \ingroup msg_gos_functions
642  * \brief Does exactly the same as MSG_task_put but with a bounded transmition
643  * rate.
644  *
645  * \sa MSG_task_put
646  */
647 MSG_error_t
648 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
649                      double maxrate)
650 {
651   task->simdata->rate = maxrate;
652   return MSG_task_put(task, dest, channel);
653 }
654
655 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
656  * host (with a timeout on the waiting of the destination host) and
657  * waits for the end of the transmission.
658  *
659  * This function is used for describing the behavior of an agent. It
660  * takes four parameter.
661  * \param task a #m_task_t to send on another location. This task
662  will not be usable anymore when the function will return. There is
663  no automatic task duplication and you have to save your parameters
664  before calling this function. Tasks are unique and once it has been
665  sent to another location, you should not access it anymore. You do
666  not need to call MSG_task_destroy() but to avoid using, as an
667  effect of inattention, this task anymore, you definitely should
668  renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
669  can be transfered iff it has been correctly created with
670  MSG_task_create().
671  * \param dest the destination of the message
672  * \param channel the channel on which the agent should put this
673  task. This value has to be >=0 and < than the maximal number of
674  channels fixed with MSG_set_channel_number().
675  * \param timeout the maximum time to wait for a task before giving
676  up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
677  will not be modified
678  * \return #MSG_FATAL if \a task is not properly initialized and
679 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
680 this function was called was shut down. Returns
681 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
682 (network failure, dest failure, timeout...)
683  */
684 MSG_error_t
685 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
686                           m_channel_t channel, double timeout)
687 {
688   xbt_assert1((channel >= 0)
689               && (channel < msg_global->max_channel), "Invalid channel %d",
690               channel);
691
692   return
693       MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
694                                    (dest, channel), task, timeout);
695 }
696
697 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
698 {
699   DEBUG1("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
700   return MSG_task_send_with_timeout(task, alias, -1);
701 }
702
703
704 MSG_error_t
705 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
706 {
707   task->simdata->rate = maxrate;
708   return MSG_task_send(task, alias);
709 }
710
711
712 MSG_error_t
713 MSG_task_send_with_timeout(m_task_t task, const char *alias,
714                            double timeout)
715 {
716   return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
717                                       task, timeout);
718 }
719
720 int MSG_task_listen(const char *alias)
721 {
722   CHECK_HOST();
723
724   return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
725 }
726
727 /** \ingroup msg_gos_functions
728  * \brief Test whether there is a pending communication on a channel.
729  *
730  * It takes one parameter.
731  * \param channel the channel on which the agent should be
732  listening. This value has to be >=0 and < than the maximal
733  number of channels fixed with MSG_set_channel_number().
734  * \return 1 if there is a pending communication and 0 otherwise
735  */
736 int MSG_task_Iprobe(m_channel_t channel)
737 {
738   xbt_assert1((channel >= 0)
739               && (channel < msg_global->max_channel), "Invalid channel %d",
740               channel);
741
742   CHECK_HOST();
743
744   return
745       !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
746                             (MSG_host_self(), channel));
747 }
748
749 /** \ingroup msg_gos_functions
750
751  * \brief Return the number of tasks waiting to be received on a \a
752  channel and sent by \a host.
753  *
754  * It takes two parameters.
755  * \param channel the channel on which the agent should be
756  listening. This value has to be >=0 and < than the maximal
757  number of channels fixed with MSG_set_channel_number().
758  * \param host the host that is to be watched.
759  * \return the number of tasks waiting to be received on \a channel
760  and sent by \a host.
761  */
762 int MSG_task_probe_from_host(int channel, m_host_t host)
763 {
764   xbt_assert1((channel >= 0)
765               && (channel < msg_global->max_channel), "Invalid channel %d",
766               channel);
767
768   CHECK_HOST();
769
770   return
771       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
772                                                (MSG_host_self(), channel),
773                                                host);
774
775 }
776
777 int MSG_task_listen_from_host(const char *alias, m_host_t host)
778 {
779   CHECK_HOST();
780
781   return
782       MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
783                                                (alias), host);
784 }
785
786 /** \ingroup msg_gos_functions
787  * \brief Test whether there is a pending communication on a channel, and who sent it.
788  *
789  * It takes one parameter.
790  * \param channel the channel on which the agent should be
791  listening. This value has to be >=0 and < than the maximal
792  number of channels fixed with MSG_set_channel_number().
793  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
794  */
795 int MSG_task_probe_from(m_channel_t channel)
796 {
797   m_task_t task;
798
799   CHECK_HOST();
800
801   xbt_assert1((channel >= 0)
802               && (channel < msg_global->max_channel), "Invalid channel %d",
803               channel);
804
805   if (NULL ==
806       (task =
807        MSG_mailbox_get_head(MSG_mailbox_get_by_channel
808                             (MSG_host_self(), channel))))
809     return -1;
810
811   return MSG_process_get_PID(task->simdata->sender);
812 }
813
814 int MSG_task_listen_from(const char *alias)
815 {
816   m_task_t task;
817
818   CHECK_HOST();
819
820   if (NULL ==
821       (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
822     return -1;
823
824   return MSG_process_get_PID(task->simdata->sender);
825 }
826
827 /** \ingroup msg_gos_functions
828  * \brief Wait for at most \a max_duration second for a task reception
829  on \a channel.
830
831  * \a PID is updated with the PID of the first process that triggered this event if any.
832  *
833  * It takes three parameters:
834  * \param channel the channel on which the agent should be
835  listening. This value has to be >=0 and < than the maximal.
836  number of channels fixed with MSG_set_channel_number().
837  * \param PID a memory location for storing an int.
838  * \param timeout the maximum time to wait for a task before
839  giving up. In the case of a reception, *\a PID will be updated
840  with the PID of the first process to send a task.
841  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
842  and #MSG_OK otherwise.
843  */
844 MSG_error_t
845 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
846 {
847   m_host_t h = NULL;
848   simdata_host_t h_simdata = NULL;
849   m_task_t t;
850   int first_time = 1;
851   smx_cond_t cond;
852   msg_mailbox_t mailbox;
853
854   xbt_assert1((channel >= 0)
855               && (channel < msg_global->max_channel), "Invalid channel %d",
856               channel);
857
858   if (PID) {
859     *PID = -1;
860   }
861
862   if (timeout == 0.0) {
863     *PID = MSG_task_probe_from(channel);
864     MSG_RETURN(MSG_OK);
865   } else {
866     CHECK_HOST();
867     h = MSG_host_self();
868     h_simdata = h->simdata;
869
870     mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
871
872     while (MSG_mailbox_is_empty(mailbox)) {
873       if (timeout > 0) {
874         if (!first_time) {
875           MSG_RETURN(MSG_OK);
876         }
877       }
878
879       SIMIX_mutex_lock(h_simdata->mutex);
880
881       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
882                   "A process is already blocked on this channel %d",
883                   channel);
884
885       cond = SIMIX_cond_init();
886
887       MSG_mailbox_set_cond(mailbox, cond);
888
889       if (timeout > 0) {
890         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
891       } else {
892         SIMIX_cond_wait(cond, h_simdata->mutex);
893       }
894
895       SIMIX_cond_destroy(cond);
896       SIMIX_mutex_unlock(h_simdata->mutex);
897
898       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
899         MSG_RETURN(MSG_HOST_FAILURE);
900       }
901
902       MSG_mailbox_set_cond(mailbox, NULL);
903       first_time = 0;
904     }
905
906     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
907       MSG_RETURN(MSG_OK);
908
909
910     if (PID) {
911       *PID = MSG_process_get_PID(t->simdata->sender);
912     }
913
914     MSG_RETURN(MSG_OK);
915   }
916 }
917
918
919 MSG_error_t MSG_alias_select_from(const char *alias, double timeout,
920                                   int *PID)
921 {
922   m_host_t h = NULL;
923   simdata_host_t h_simdata = NULL;
924   m_task_t t;
925   int first_time = 1;
926   smx_cond_t cond;
927   msg_mailbox_t mailbox;
928
929   if (PID) {
930     *PID = -1;
931   }
932
933   if (timeout == 0.0) {
934     *PID = MSG_task_listen_from(alias);
935     MSG_RETURN(MSG_OK);
936   } else {
937     CHECK_HOST();
938     h = MSG_host_self();
939     h_simdata = h->simdata;
940
941     DEBUG2("Probing on alias %s (%s)", alias, h->name);
942
943     mailbox = MSG_mailbox_get_by_alias(alias);
944
945     while (MSG_mailbox_is_empty(mailbox)) {
946       if (timeout > 0) {
947         if (!first_time) {
948           MSG_RETURN(MSG_OK);
949         }
950       }
951
952       SIMIX_mutex_lock(h_simdata->mutex);
953
954       xbt_assert1(!MSG_mailbox_get_cond(mailbox),
955                   "A process is already blocked on this alias %s", alias);
956
957       cond = SIMIX_cond_init();
958
959       MSG_mailbox_set_cond(mailbox, cond);
960
961       if (timeout > 0) {
962         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
963       } else {
964         SIMIX_cond_wait(cond, h_simdata->mutex);
965       }
966
967       SIMIX_cond_destroy(cond);
968       SIMIX_mutex_unlock(h_simdata->mutex);
969
970       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
971         MSG_RETURN(MSG_HOST_FAILURE);
972       }
973
974       MSG_mailbox_set_cond(mailbox, NULL);
975       first_time = 0;
976     }
977
978     if (NULL == (t = MSG_mailbox_get_head(mailbox)))
979       MSG_RETURN(MSG_OK);
980
981
982     if (PID) {
983       *PID = MSG_process_get_PID(t->simdata->sender);
984     }
985
986     MSG_RETURN(MSG_OK);
987   }
988 }