1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "msg/private.h"
8 #include "simix/private.h"
9 #include "xbt/sysdep.h"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
16 "Logging specific to MSG (gos)");
18 /** \ingroup msg_gos_functions
20 * \brief Return the last value returned by a MSG function (except
23 MSG_error_t MSG_get_errno(void)
25 return PROCESS_GET_ERRNO();
28 /** \ingroup msg_gos_functions
29 * \brief Executes a task and waits for its termination.
31 * This function is used for describing the behavior of an agent. It
32 * takes only one parameter.
33 * \param task a #m_task_t to execute on the location on which the
35 * \return #MSG_FATAL if \a task is not properly initialized and
38 MSG_error_t MSG_task_execute(m_task_t task)
40 simdata_task_t simdata = NULL;
41 m_process_t self = MSG_process_self();
42 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
45 simdata = task->simdata;
47 xbt_assert0(simdata->host_nb == 0,
48 "This is a parallel task. Go to hell.");
51 TRACE_msg_task_execute_start(task);
54 xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
55 "This task is executed somewhere else. Go fix your code! %d",
56 task->simdata->refcount);
58 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
60 if (simdata->computation_amount == 0) {
62 TRACE_msg_task_execute_end(task);
67 SIMIX_mutex_lock(simdata->mutex);
69 SIMIX_action_execute(SIMIX_host_self(), task->name,
70 simdata->computation_amount);
71 SIMIX_action_set_priority(simdata->compute, simdata->priority);
73 /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
74 self->simdata->waiting_action = simdata->compute;
75 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
77 SIMIX_cond_wait(simdata->cond, simdata->mutex);
78 state = SIMIX_action_get_state(simdata->compute);
79 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
80 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
81 self->simdata->waiting_action = NULL;
83 SIMIX_mutex_unlock(simdata->mutex);
86 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
87 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
88 SIMIX_action_destroy(task->simdata->compute);
89 simdata->computation_amount = 0.0;
91 simdata->compute = NULL;
93 TRACE_msg_task_execute_end(task);
96 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
97 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
98 SIMIX_action_destroy(task->simdata->compute);
100 simdata->compute = NULL;
102 TRACE_msg_task_execute_end(task);
104 MSG_RETURN(MSG_HOST_FAILURE);
106 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
107 SIMIX_action_destroy(task->simdata->compute);
108 simdata->comm = NULL;
109 simdata->compute = NULL;
111 TRACE_msg_task_execute_end(task);
113 MSG_RETURN(MSG_TASK_CANCELLED);
117 /** \ingroup m_task_management
118 * \brief Creates a new #m_task_t (a parallel one....).
120 * A constructor for #m_task_t taking six arguments and returning the
121 corresponding object.
122 * \param name a name for the object. It is for user-level information
124 * \param host_nb the number of hosts implied in the parallel task.
125 * \param host_list an array of \p host_nb m_host_t.
126 * \param computation_amount an array of \p host_nb
127 doubles. computation_amount[i] is the total number of operations
128 that have to be performed on host_list[i].
129 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
130 * \param data a pointer to any data may want to attach to the new
131 object. It is for user-level information and can be NULL. It can
132 be retrieved with the function \ref MSG_task_get_data.
134 * \return The new corresponding object.
137 MSG_parallel_task_create(const char *name, int host_nb,
138 const m_host_t * host_list,
139 double *computation_amount,
140 double *communication_amount, void *data)
143 simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
144 m_task_t task = xbt_new0(s_m_task_t, 1);
145 task->simdata = simdata;
148 task->name = xbt_strdup(name);
152 simdata->computation_amount = 0;
153 simdata->message_size = 0;
154 simdata->cond = SIMIX_cond_init();
155 simdata->mutex = SIMIX_mutex_init();
156 simdata->compute = NULL;
157 simdata->comm = NULL;
158 simdata->rate = -1.0;
159 simdata->refcount = 1;
160 simdata->sender = NULL;
161 simdata->receiver = NULL;
162 simdata->source = NULL;
164 simdata->host_nb = host_nb;
165 simdata->host_list = xbt_new0(smx_host_t, host_nb);
166 simdata->comp_amount = computation_amount;
167 simdata->comm_amount = communication_amount;
169 for (i = 0; i < host_nb; i++)
170 simdata->host_list[i] = host_list[i]->simdata->smx_host;
175 MSG_error_t MSG_parallel_task_execute(m_task_t task)
177 simdata_task_t simdata = NULL;
178 m_process_t self = MSG_process_self();
179 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
182 simdata = task->simdata;
184 xbt_assert0((!simdata->compute)
185 && (task->simdata->refcount == 1),
186 "This task is executed somewhere else. Go fix your code!");
188 xbt_assert0(simdata->host_nb,
189 "This is not a parallel task. Go to hell.");
191 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
195 SIMIX_mutex_lock(simdata->mutex);
197 SIMIX_action_parallel_execute(task->name, simdata->host_nb,
199 simdata->comp_amount,
200 simdata->comm_amount, 1.0, -1.0);
202 self->simdata->waiting_action = simdata->compute;
203 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
205 SIMIX_cond_wait(simdata->cond, simdata->mutex);
206 state = SIMIX_action_get_state(task->simdata->compute);
207 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
209 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
210 self->simdata->waiting_action = NULL;
213 SIMIX_mutex_unlock(simdata->mutex);
216 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
217 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
218 SIMIX_action_destroy(task->simdata->compute);
219 simdata->computation_amount = 0.0;
220 simdata->comm = NULL;
221 simdata->compute = NULL;
223 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
224 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225 SIMIX_action_destroy(task->simdata->compute);
226 simdata->comm = NULL;
227 simdata->compute = NULL;
228 MSG_RETURN(MSG_HOST_FAILURE);
230 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
231 SIMIX_action_destroy(task->simdata->compute);
232 simdata->comm = NULL;
233 simdata->compute = NULL;
234 MSG_RETURN(MSG_TASK_CANCELLED);
240 /** \ingroup msg_gos_functions
241 * \brief Sleep for the specified number of seconds
243 * Makes the current process sleep until \a time seconds have elapsed.
245 * \param nb_sec a number of second
247 MSG_error_t MSG_process_sleep(double nb_sec)
249 smx_action_t act_sleep;
250 m_process_t proc = MSG_process_self();
251 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
256 TRACE_msg_process_sleep_in(MSG_process_self());
259 /* create action to sleep */
261 SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
264 mutex = SIMIX_mutex_init();
265 SIMIX_mutex_lock(mutex);
267 /* create conditional and register action to it */
268 cond = SIMIX_cond_init();
270 proc->simdata->waiting_action = act_sleep;
271 SIMIX_register_action_to_condition(act_sleep, cond);
273 SIMIX_cond_wait(cond, mutex);
274 state = SIMIX_action_get_state(act_sleep);
275 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
276 proc->simdata->waiting_action = NULL;
277 SIMIX_unregister_action_to_condition(act_sleep, cond);
278 SIMIX_mutex_unlock(mutex);
280 /* remove variables */
281 SIMIX_cond_destroy(cond);
282 SIMIX_mutex_destroy(mutex);
284 if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
285 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
286 SIMIX_action_destroy(act_sleep);
288 TRACE_msg_process_sleep_out(MSG_process_self());
290 MSG_RETURN(MSG_HOST_FAILURE);
293 SIMIX_action_destroy(act_sleep);
295 TRACE_msg_process_sleep_out(MSG_process_self());
297 MSG_RETURN(MSG_HOST_FAILURE);
300 SIMIX_action_destroy(act_sleep);
302 TRACE_msg_process_sleep_out(MSG_process_self());
307 /** \ingroup msg_gos_functions
308 * \brief Listen on \a channel and waits for receiving a task from \a host.
310 * It takes three parameters.
311 * \param task a memory location for storing a #m_task_t. It will
312 hold a task when this function will return. Thus \a task should not
313 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
314 those two condition does not hold, there will be a warning message.
315 * \param channel the channel on which the agent should be
316 listening. This value has to be >=0 and < than the maximal
317 number of channels fixed with MSG_set_channel_number().
318 * \param host the host that is to be watched.
319 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
320 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
323 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
325 return MSG_task_get_ext(task, channel, -1, host);
328 /** \ingroup msg_gos_functions
329 * \brief Listen on a channel and wait for receiving a task.
331 * It takes two parameters.
332 * \param task a memory location for storing a #m_task_t. It will
333 hold a task when this function will return. Thus \a task should not
334 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
335 those two condition does not hold, there will be a warning message.
336 * \param channel the channel on which the agent should be
337 listening. This value has to be >=0 and < than the maximal
338 number of channels fixed with MSG_set_channel_number().
339 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
340 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
342 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
344 return MSG_task_get_with_timeout(task, channel, -1);
347 /** \ingroup msg_gos_functions
348 * \brief Listen on a channel and wait for receiving a task with a timeout.
350 * It takes three parameters.
351 * \param task a memory location for storing a #m_task_t. It will
352 hold a task when this function will return. Thus \a task should not
353 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
354 those two condition does not hold, there will be a warning message.
355 * \param channel the channel on which the agent should be
356 listening. This value has to be >=0 and < than the maximal
357 number of channels fixed with MSG_set_channel_number().
358 * \param max_duration the maximum time to wait for a task before giving
359 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
360 will not be modified and will still be
361 equal to \c NULL when returning.
362 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
363 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
366 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
369 return MSG_task_get_ext(task, channel, max_duration, NULL);
372 /** \defgroup msg_gos_functions MSG Operating System Functions
373 * \brief This section describes the functions that can be used
374 * by an agent for handling some task.
378 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
381 xbt_assert1((channel >= 0)
382 && (channel < msg_global->max_channel), "Invalid channel %d",
386 MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
387 (MSG_host_self(), channel), task, host,
392 MSG_task_receive_from_host(m_task_t * task, const char *alias,
395 return MSG_task_receive_ext(task, alias, -1, host);
398 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
400 return MSG_task_receive_with_timeout(task, alias, -1);
404 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
407 return MSG_task_receive_ext(task, alias, timeout, NULL);
411 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
415 ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
417 return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
421 /** \ingroup msg_gos_functions
422 * \brief Send a task on a channel.
424 * This function takes two parameter.
425 * \param task a #m_task_t to send on another location.
426 * \param alias the channel on which the agent should put this
427 task. This value has to be >=0 and < than the maximal number of
428 channels fixed with MSG_set_channel_number().
429 * \return the msg_comm_t communication.
431 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
433 simdata_task_t t_simdata = NULL;
434 m_process_t process = MSG_process_self();
435 msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
439 /* FIXME: these functions are not tracable */
441 /* Prepare the task to send */
442 t_simdata = task->simdata;
443 t_simdata->sender = process;
444 t_simdata->source = MSG_host_self();
446 xbt_assert0(t_simdata->refcount == 1,
447 "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
449 t_simdata->refcount++;
450 msg_global->sent_msg++;
451 process->simdata->waiting_task = task;
453 /* Send it by calling SIMIX network layer */
455 /* Kept for semantical compatibility with older implementation */
457 SIMIX_cond_signal(mailbox->cond);
459 return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size,
460 t_simdata->rate, task, sizeof(void *),
464 /** \ingroup msg_gos_functions
465 * \brief Listen on a channel for receiving a task from an asynchronous communication.
467 * It takes two parameters.
468 * \param task a memory location for storing a #m_task_t.
469 * \param alias the channel on which the agent should be
470 listening. This value has to be >=0 and < than the maximal
471 number of channels fixed with MSG_set_channel_number().
472 * \return the msg_comm_t communication.
474 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias)
477 smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
478 msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
482 /* FIXME: these functions are not tracable */
484 memset(&comm, 0, sizeof(comm));
486 /* Kept for compatibility with older implementation */
487 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
488 "A process is already blocked on this channel %s",
489 MSG_mailbox_get_alias(mailbox));
492 xbt_assert0(task, "Null pointer for the task storage");
496 ("MSG_task_get() was asked to write in a non empty task struct.");
498 /* Try to receive it by calling SIMIX network layer */
499 return SIMIX_network_irecv(rdv, task, NULL);
502 /** \ingroup msg_gos_functions
503 * \brief Test the status of a communication.
505 * It takes one parameter.
506 * \param comm the communication to test.
507 * \return the status of the communication:
508 * TRUE : the communication is completed
509 * FALSE: the communication is incompleted
510 * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
512 int MSG_comm_test(msg_comm_t comm)
514 return SIMIX_network_test(comm);
517 /** \ingroup msg_gos_functions
518 * \brief After received TRUE to MSG_comm_test(), the communication must be destroyed.
520 * It takes one parameter.
521 * \param comm the communication to destroy.
523 void MSG_comm_destroy(msg_comm_t comm)
525 if (!(comm->src_proc == SIMIX_process_self())) {
527 task = (m_task_t) SIMIX_communication_get_src_buf(comm);
528 task->simdata->refcount--;
530 SIMIX_communication_destroy(comm);
533 /** \ingroup msg_gos_functions
534 * \brief Wait for the completion of a communication.
536 * It takes two parameters.
537 * \param comm the communication to wait.
538 * \param timeout Wait until the communication terminates or the timeout occurs
539 * \return MSG_error_t
541 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
544 MSG_error_t res = MSG_OK;
546 SIMIX_network_wait(comm, timeout);
548 if (!(comm->src_proc == SIMIX_process_self())) {
550 task = (m_task_t) SIMIX_communication_get_src_buf(comm);
551 task->simdata->refcount--;
554 /* FIXME: these functions are not tracable */
557 switch (e.category) {
559 res = MSG_HOST_FAILURE;
562 res = MSG_TRANSFER_FAILURE;
568 xbt_die(bprintf("Unhandled SIMIX network exception: %s", e.msg));
575 /** \ingroup msg_gos_functions
576 * \brief This function is called by a sender and permit to wait for each communication
578 * It takes three parameters.
579 * \param comm a vector of communication
580 * \param nb_elem is the size of the comm vector
581 * \param timeout for each call of MSG_comm_wait
583 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
586 for (i = 0; i < nb_elem; i++) {
587 MSG_comm_wait(comm[i], timeout);
591 /** \ingroup msg_gos_functions
592 * \brief This function wait for the first completed communication
594 * It takes on parameter.
595 * \param comms a vector of communication
596 * \return the position of the completed communication from the xbt_dynar_t.
598 int MSG_comm_waitany(xbt_dynar_t comms)
600 return SIMIX_network_waitany(comms);
603 m_task_t MSG_comm_get_task(msg_comm_t comm)
605 xbt_assert0(comm, "Invalid parameters");
606 return (m_task_t) SIMIX_communication_get_src_buf(comm);
609 /** \ingroup msg_gos_functions
610 * \brief Put a task on a channel of an host and waits for the end of the
613 * This function is used for describing the behavior of an agent. It
614 * takes three parameter.
615 * \param task a #m_task_t to send on another location. This task
616 will not be usable anymore when the function will return. There is
617 no automatic task duplication and you have to save your parameters
618 before calling this function. Tasks are unique and once it has been
619 sent to another location, you should not access it anymore. You do
620 not need to call MSG_task_destroy() but to avoid using, as an
621 effect of inattention, this task anymore, you definitely should
622 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
623 can be transfered iff it has been correctly created with
625 * \param dest the destination of the message
626 * \param channel the channel on which the agent should put this
627 task. This value has to be >=0 and < than the maximal number of
628 channels fixed with MSG_set_channel_number().
629 * \return #MSG_FATAL if \a task is not properly initialized and
630 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
631 * this function was called was shut down. Returns
632 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
633 * (network failure, dest failure)
635 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
637 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
640 /** \ingroup msg_gos_functions
641 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
647 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
650 task->simdata->rate = maxrate;
651 return MSG_task_put(task, dest, channel);
654 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
655 * host (with a timeout on the waiting of the destination host) and
656 * waits for the end of the transmission.
658 * This function is used for describing the behavior of an agent. It
659 * takes four parameter.
660 * \param task a #m_task_t to send on another location. This task
661 will not be usable anymore when the function will return. There is
662 no automatic task duplication and you have to save your parameters
663 before calling this function. Tasks are unique and once it has been
664 sent to another location, you should not access it anymore. You do
665 not need to call MSG_task_destroy() but to avoid using, as an
666 effect of inattention, this task anymore, you definitely should
667 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
668 can be transfered iff it has been correctly created with
670 * \param dest the destination of the message
671 * \param channel the channel on which the agent should put this
672 task. This value has to be >=0 and < than the maximal number of
673 channels fixed with MSG_set_channel_number().
674 * \param timeout the maximum time to wait for a task before giving
675 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
677 * \return #MSG_FATAL if \a task is not properly initialized and
678 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
679 this function was called was shut down. Returns
680 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
681 (network failure, dest failure, timeout...)
684 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
685 m_channel_t channel, double timeout)
687 xbt_assert1((channel >= 0)
688 && (channel < msg_global->max_channel), "Invalid channel %d",
692 MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
693 (dest, channel), task, timeout);
696 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
698 DEBUG1("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
699 return MSG_task_send_with_timeout(task, alias, -1);
704 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
706 task->simdata->rate = maxrate;
707 return MSG_task_send(task, alias);
712 MSG_task_send_with_timeout(m_task_t task, const char *alias,
715 return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
719 int MSG_task_listen(const char *alias)
723 return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
726 /** \ingroup msg_gos_functions
727 * \brief Test whether there is a pending communication on a channel.
729 * It takes one parameter.
730 * \param channel the channel on which the agent should be
731 listening. This value has to be >=0 and < than the maximal
732 number of channels fixed with MSG_set_channel_number().
733 * \return 1 if there is a pending communication and 0 otherwise
735 int MSG_task_Iprobe(m_channel_t channel)
737 xbt_assert1((channel >= 0)
738 && (channel < msg_global->max_channel), "Invalid channel %d",
744 !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
745 (MSG_host_self(), channel));
748 /** \ingroup msg_gos_functions
750 * \brief Return the number of tasks waiting to be received on a \a
751 channel and sent by \a host.
753 * It takes two parameters.
754 * \param channel the channel on which the agent should be
755 listening. This value has to be >=0 and < than the maximal
756 number of channels fixed with MSG_set_channel_number().
757 * \param host the host that is to be watched.
758 * \return the number of tasks waiting to be received on \a channel
761 int MSG_task_probe_from_host(int channel, m_host_t host)
763 xbt_assert1((channel >= 0)
764 && (channel < msg_global->max_channel), "Invalid channel %d",
770 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
771 (MSG_host_self(), channel),
776 int MSG_task_listen_from_host(const char *alias, m_host_t host)
781 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
785 /** \ingroup msg_gos_functions
786 * \brief Test whether there is a pending communication on a channel, and who sent it.
788 * It takes one parameter.
789 * \param channel the channel on which the agent should be
790 listening. This value has to be >=0 and < than the maximal
791 number of channels fixed with MSG_set_channel_number().
792 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
794 int MSG_task_probe_from(m_channel_t channel)
800 xbt_assert1((channel >= 0)
801 && (channel < msg_global->max_channel), "Invalid channel %d",
806 MSG_mailbox_get_head(MSG_mailbox_get_by_channel
807 (MSG_host_self(), channel))))
810 return MSG_process_get_PID(task->simdata->sender);
813 int MSG_task_listen_from(const char *alias)
820 (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
823 return MSG_process_get_PID(task->simdata->sender);
826 /** \ingroup msg_gos_functions
827 * \brief Wait for at most \a max_duration second for a task reception
830 * \a PID is updated with the PID of the first process that triggered this event if any.
832 * It takes three parameters:
833 * \param channel the channel on which the agent should be
834 listening. This value has to be >=0 and < than the maximal.
835 number of channels fixed with MSG_set_channel_number().
836 * \param PID a memory location for storing an int.
837 * \param timeout the maximum time to wait for a task before
838 giving up. In the case of a reception, *\a PID will be updated
839 with the PID of the first process to send a task.
840 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
841 and #MSG_OK otherwise.
844 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
847 simdata_host_t h_simdata = NULL;
851 msg_mailbox_t mailbox;
853 xbt_assert1((channel >= 0)
854 && (channel < msg_global->max_channel), "Invalid channel %d",
861 if (timeout == 0.0) {
862 *PID = MSG_task_probe_from(channel);
867 h_simdata = h->simdata;
869 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
871 while (MSG_mailbox_is_empty(mailbox)) {
878 SIMIX_mutex_lock(h_simdata->mutex);
880 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
881 "A process is already blocked on this channel %d",
884 cond = SIMIX_cond_init();
886 MSG_mailbox_set_cond(mailbox, cond);
889 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
891 SIMIX_cond_wait(cond, h_simdata->mutex);
894 SIMIX_cond_destroy(cond);
895 SIMIX_mutex_unlock(h_simdata->mutex);
897 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
898 MSG_RETURN(MSG_HOST_FAILURE);
901 MSG_mailbox_set_cond(mailbox, NULL);
905 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
910 *PID = MSG_process_get_PID(t->simdata->sender);
918 MSG_error_t MSG_alias_select_from(const char *alias, double timeout,
922 simdata_host_t h_simdata = NULL;
926 msg_mailbox_t mailbox;
932 if (timeout == 0.0) {
933 *PID = MSG_task_listen_from(alias);
938 h_simdata = h->simdata;
940 DEBUG2("Probing on alias %s (%s)", alias, h->name);
942 mailbox = MSG_mailbox_get_by_alias(alias);
944 while (MSG_mailbox_is_empty(mailbox)) {
951 SIMIX_mutex_lock(h_simdata->mutex);
953 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
954 "A process is already blocked on this alias %s", alias);
956 cond = SIMIX_cond_init();
958 MSG_mailbox_set_cond(mailbox, cond);
961 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
963 SIMIX_cond_wait(cond, h_simdata->mutex);
966 SIMIX_cond_destroy(cond);
967 SIMIX_mutex_unlock(h_simdata->mutex);
969 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
970 MSG_RETURN(MSG_HOST_FAILURE);
973 MSG_mailbox_set_cond(mailbox, NULL);
977 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
982 *PID = MSG_process_get_PID(t->simdata->sender);