1 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "msg/private.h"
8 #include "simix/private.h"
9 #include "xbt/sysdep.h"
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
16 "Logging specific to MSG (gos)");
18 /** \ingroup msg_gos_functions
20 * \brief Return the last value returned by a MSG function (except
23 MSG_error_t MSG_get_errno(void)
25 return PROCESS_GET_ERRNO();
28 /** \ingroup msg_gos_functions
29 * \brief Executes a task and waits for its termination.
31 * This function is used for describing the behavior of an agent. It
32 * takes only one parameter.
33 * \param task a #m_task_t to execute on the location on which the
35 * \return #MSG_FATAL if \a task is not properly initialized and
38 MSG_error_t MSG_task_execute(m_task_t task)
40 simdata_task_t simdata = NULL;
41 m_process_t self = MSG_process_self();
42 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
45 simdata = task->simdata;
47 xbt_assert0(simdata->host_nb == 0,
48 "This is a parallel task. Go to hell.");
51 TRACE_msg_task_execute_start(task);
54 xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
55 "This task is executed somewhere else. Go fix your code! %d",
56 task->simdata->refcount);
58 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
60 if (simdata->computation_amount == 0) {
62 TRACE_msg_task_execute_end(task);
67 SIMIX_mutex_lock(simdata->mutex);
69 SIMIX_action_execute(SIMIX_host_self(), task->name,
70 simdata->computation_amount);
71 SIMIX_action_set_priority(simdata->compute, simdata->priority);
73 /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
74 self->simdata->waiting_action = simdata->compute;
75 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
77 SIMIX_cond_wait(simdata->cond, simdata->mutex);
78 state = SIMIX_action_get_state(simdata->compute);
79 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
80 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
81 self->simdata->waiting_action = NULL;
83 SIMIX_mutex_unlock(simdata->mutex);
86 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
87 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
88 SIMIX_action_destroy(task->simdata->compute);
89 simdata->computation_amount = 0.0;
91 simdata->compute = NULL;
93 TRACE_msg_task_execute_end(task);
96 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
97 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
98 SIMIX_action_destroy(task->simdata->compute);
100 simdata->compute = NULL;
102 TRACE_msg_task_execute_end(task);
104 MSG_RETURN(MSG_HOST_FAILURE);
106 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
107 SIMIX_action_destroy(task->simdata->compute);
108 simdata->comm = NULL;
109 simdata->compute = NULL;
111 TRACE_msg_task_execute_end(task);
113 MSG_RETURN(MSG_TASK_CANCELLED);
117 /** \ingroup m_task_management
118 * \brief Creates a new #m_task_t (a parallel one....).
120 * A constructor for #m_task_t taking six arguments and returning the
121 corresponding object.
122 * \param name a name for the object. It is for user-level information
124 * \param host_nb the number of hosts implied in the parallel task.
125 * \param host_list an array of \p host_nb m_host_t.
126 * \param computation_amount an array of \p host_nb
127 doubles. computation_amount[i] is the total number of operations
128 that have to be performed on host_list[i].
129 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
130 * \param data a pointer to any data may want to attach to the new
131 object. It is for user-level information and can be NULL. It can
132 be retrieved with the function \ref MSG_task_get_data.
134 * \return The new corresponding object.
137 MSG_parallel_task_create(const char *name, int host_nb,
138 const m_host_t * host_list,
139 double *computation_amount,
140 double *communication_amount, void *data)
143 simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
144 m_task_t task = xbt_new0(s_m_task_t, 1);
145 task->simdata = simdata;
148 task->name = xbt_strdup(name);
152 simdata->computation_amount = 0;
153 simdata->message_size = 0;
154 simdata->cond = SIMIX_cond_init();
155 simdata->mutex = SIMIX_mutex_init();
156 simdata->compute = NULL;
157 simdata->comm = NULL;
158 simdata->rate = -1.0;
159 simdata->refcount = 1;
160 simdata->sender = NULL;
161 simdata->receiver = NULL;
162 simdata->source = NULL;
164 simdata->host_nb = host_nb;
165 simdata->host_list = xbt_new0(smx_host_t, host_nb);
166 simdata->comp_amount = computation_amount;
167 simdata->comm_amount = communication_amount;
169 for (i = 0; i < host_nb; i++)
170 simdata->host_list[i] = host_list[i]->simdata->smx_host;
175 MSG_error_t MSG_parallel_task_execute(m_task_t task)
177 simdata_task_t simdata = NULL;
178 m_process_t self = MSG_process_self();
179 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
182 simdata = task->simdata;
184 xbt_assert0((!simdata->compute)
185 && (task->simdata->refcount == 1),
186 "This task is executed somewhere else. Go fix your code!");
188 xbt_assert0(simdata->host_nb,
189 "This is not a parallel task. Go to hell.");
191 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
195 SIMIX_mutex_lock(simdata->mutex);
197 SIMIX_action_parallel_execute(task->name, simdata->host_nb,
199 simdata->comp_amount,
200 simdata->comm_amount, 1.0, -1.0);
202 self->simdata->waiting_action = simdata->compute;
203 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
205 SIMIX_cond_wait(simdata->cond, simdata->mutex);
206 state = SIMIX_action_get_state(task->simdata->compute);
207 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
209 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
210 self->simdata->waiting_action = NULL;
213 SIMIX_mutex_unlock(simdata->mutex);
216 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
217 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
218 SIMIX_action_destroy(task->simdata->compute);
219 simdata->computation_amount = 0.0;
220 simdata->comm = NULL;
221 simdata->compute = NULL;
223 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
224 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
225 SIMIX_action_destroy(task->simdata->compute);
226 simdata->comm = NULL;
227 simdata->compute = NULL;
228 MSG_RETURN(MSG_HOST_FAILURE);
230 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
231 SIMIX_action_destroy(task->simdata->compute);
232 simdata->comm = NULL;
233 simdata->compute = NULL;
234 MSG_RETURN(MSG_TASK_CANCELLED);
240 /** \ingroup msg_gos_functions
241 * \brief Sleep for the specified number of seconds
243 * Makes the current process sleep until \a time seconds have elapsed.
245 * \param nb_sec a number of second
247 MSG_error_t MSG_process_sleep(double nb_sec)
249 smx_action_t act_sleep;
250 m_process_t proc = MSG_process_self();
251 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
256 TRACE_msg_process_sleep_in(MSG_process_self());
259 /* create action to sleep */
261 SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
264 mutex = SIMIX_mutex_init();
265 SIMIX_mutex_lock(mutex);
267 /* create conditional and register action to it */
268 cond = SIMIX_cond_init();
270 proc->simdata->waiting_action = act_sleep;
271 SIMIX_register_action_to_condition(act_sleep, cond);
273 SIMIX_cond_wait(cond, mutex);
274 state = SIMIX_action_get_state(act_sleep);
275 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
276 proc->simdata->waiting_action = NULL;
277 SIMIX_unregister_action_to_condition(act_sleep, cond);
278 SIMIX_mutex_unlock(mutex);
280 /* remove variables */
281 SIMIX_cond_destroy(cond);
282 SIMIX_mutex_destroy(mutex);
284 if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
285 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
286 SIMIX_action_destroy(act_sleep);
288 TRACE_msg_process_sleep_out(MSG_process_self());
290 MSG_RETURN(MSG_HOST_FAILURE);
293 SIMIX_action_destroy(act_sleep);
295 TRACE_msg_process_sleep_out(MSG_process_self());
297 MSG_RETURN(MSG_HOST_FAILURE);
300 SIMIX_action_destroy(act_sleep);
302 TRACE_msg_process_sleep_out(MSG_process_self());
307 /** \ingroup msg_gos_functions
308 * \brief Listen on \a channel and waits for receiving a task from \a host.
310 * It takes three parameters.
311 * \param task a memory location for storing a #m_task_t. It will
312 hold a task when this function will return. Thus \a task should not
313 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
314 those two condition does not hold, there will be a warning message.
315 * \param channel the channel on which the agent should be
316 listening. This value has to be >=0 and < than the maximal
317 number of channels fixed with MSG_set_channel_number().
318 * \param host the host that is to be watched.
319 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
320 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
323 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
325 return MSG_task_get_ext(task, channel, -1, host);
328 /** \ingroup msg_gos_functions
329 * \brief Listen on a channel and wait for receiving a task.
331 * It takes two parameters.
332 * \param task a memory location for storing a #m_task_t. It will
333 hold a task when this function will return. Thus \a task should not
334 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
335 those two condition does not hold, there will be a warning message.
336 * \param channel the channel on which the agent should be
337 listening. This value has to be >=0 and < than the maximal
338 number of channels fixed with MSG_set_channel_number().
339 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
340 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
342 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
344 return MSG_task_get_with_timeout(task, channel, -1);
347 /** \ingroup msg_gos_functions
348 * \brief Listen on a channel and wait for receiving a task with a timeout.
350 * It takes three parameters.
351 * \param task a memory location for storing a #m_task_t. It will
352 hold a task when this function will return. Thus \a task should not
353 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
354 those two condition does not hold, there will be a warning message.
355 * \param channel the channel on which the agent should be
356 listening. This value has to be >=0 and < than the maximal
357 number of channels fixed with MSG_set_channel_number().
358 * \param max_duration the maximum time to wait for a task before giving
359 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
360 will not be modified and will still be
361 equal to \c NULL when returning.
362 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
363 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
366 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
369 return MSG_task_get_ext(task, channel, max_duration, NULL);
372 /** \defgroup msg_gos_functions MSG Operating System Functions
373 * \brief This section describes the functions that can be used
374 * by an agent for handling some task.
378 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
381 xbt_assert1((channel >= 0)
382 && (channel < msg_global->max_channel), "Invalid channel %d",
386 MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
387 (MSG_host_self(), channel), task, host,
392 MSG_task_receive_from_host(m_task_t * task, const char *alias,
395 return MSG_task_receive_ext(task, alias, -1, host);
398 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
400 return MSG_task_receive_with_timeout(task, alias, -1);
404 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
407 return MSG_task_receive_ext(task, alias, timeout, NULL);
411 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
415 ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
417 return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task,
421 /** \ingroup msg_gos_functions
422 * \brief Send a task on a channel.
424 * This function takes two parameter.
425 * \param task a #m_task_t to send on another location.
426 * \param alias the channel on which the agent should put this
427 task. This value has to be >=0 and < than the maximal number of
428 channels fixed with MSG_set_channel_number().
429 * \return the msg_comm_t communication.
431 msg_comm_t MSG_task_isend(m_task_t task, const char *alias)
433 simdata_task_t t_simdata = NULL;
434 m_process_t process = MSG_process_self();
435 msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
439 /* FIXME: these functions are not tracable */
441 /* Prepare the task to send */
442 t_simdata = task->simdata;
443 t_simdata->sender = process;
444 t_simdata->source = MSG_host_self();
446 xbt_assert0(t_simdata->refcount == 1,
447 "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
449 t_simdata->refcount++;
450 msg_global->sent_msg++;
451 process->simdata->waiting_task = task;
453 /* Send it by calling SIMIX network layer */
455 /* Kept for semantical compatibility with older implementation */
457 SIMIX_cond_signal(mailbox->cond);
459 return SIMIX_network_isend(mailbox->rdv, t_simdata->message_size,
460 t_simdata->rate, task, sizeof(void *),
464 /** \ingroup msg_gos_functions
465 * \brief Listen on a channel for receiving a task from an asynchronous communication.
467 * It takes two parameters.
468 * \param task a memory location for storing a #m_task_t.
469 * \param alias the channel on which the agent should be
470 listening. This value has to be >=0 and < than the maximal
471 number of channels fixed with MSG_set_channel_number().
472 * \return the msg_comm_t communication.
474 msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias)
477 smx_rdv_t rdv = MSG_mailbox_get_by_alias(alias)->rdv;
478 msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
482 /* FIXME: these functions are not tracable */
484 memset(&comm, 0, sizeof(comm));
486 /* Kept for compatibility with older implementation */
487 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
488 "A process is already blocked on this channel %s",
489 MSG_mailbox_get_alias(mailbox));
492 xbt_assert0(task, "Null pointer for the task storage");
496 ("MSG_task_get() was asked to write in a non empty task struct.");
498 /* Try to receive it by calling SIMIX network layer */
499 return SIMIX_network_irecv(rdv, task, NULL);
502 /** \ingroup msg_gos_functions
503 * \brief Test the status of a communication.
505 * It takes one parameter.
506 * \param comm the communication to test.
507 * \return the status of the communication:
508 * TRUE : the communication is completed
509 * FALSE: the communication is incompleted
510 * If the status is FALSE, don't forget to use MSG_process_sleep() after the test.
512 int MSG_comm_test(msg_comm_t comm)
514 return SIMIX_network_test(comm);
517 /** \ingroup msg_gos_functions
518 * \brief After received TRUE to MSG_comm_test(), the communication must be destroyed.
520 * It takes one parameter.
521 * \param comm the communication to destroy.
523 void MSG_comm_destroy(msg_comm_t comm)
525 if (!(comm->src_proc == SIMIX_process_self())) {
527 task = (m_task_t) SIMIX_communication_get_src_buf(comm);
528 task->simdata->refcount--;
530 SIMIX_communication_destroy(comm);
533 /** \ingroup msg_gos_functions
534 * \brief Wait for the completion of a communication.
536 * It takes two parameters.
537 * \param comm the communication to wait.
538 * \param timeout Wait until the communication terminates or the timeout occurs
539 * \return MSG_error_t
541 MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
544 MSG_error_t res = MSG_OK;
546 SIMIX_network_wait(comm, timeout);
548 if (!(comm->src_proc == SIMIX_process_self())) {
550 task = (m_task_t) SIMIX_communication_get_src_buf(comm);
551 task->simdata->refcount--;
553 SIMIX_communication_destroy(comm);
555 /* FIXME: these functions are not tracable */
558 switch (e.category) {
560 res = MSG_HOST_FAILURE;
563 res = MSG_TRANSFER_FAILURE;
569 xbt_die(bprintf("Unhandled SIMIX network exception: %s", e.msg));
576 /** \ingroup msg_gos_functions
577 * \brief This function is called by a sender and permit to wait for each communication
579 * It takes three parameters.
580 * \param comm a vector of communication
581 * \param nb_elem is the size of the comm vector
582 * \param timeout for each call of MSG_comm_wait
584 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
587 for (i = 0; i < nb_elem; i++) {
588 MSG_comm_wait(comm[i], timeout);
592 /** \ingroup msg_gos_functions
593 * \brief This function wait for the first completed communication
595 * It takes on parameter.
596 * \param comms a vector of communication
597 * \return the position of the completed communication from the xbt_dynar_t.
599 int MSG_comm_waitany(xbt_dynar_t comms)
601 return SIMIX_network_waitany(comms);
604 m_task_t MSG_comm_get_task(msg_comm_t comm)
606 xbt_assert0(comm, "Invalid parameters");
607 return (m_task_t) SIMIX_communication_get_src_buf(comm);
610 /** \ingroup msg_gos_functions
611 * \brief Put a task on a channel of an host and waits for the end of the
614 * This function is used for describing the behavior of an agent. It
615 * takes three parameter.
616 * \param task a #m_task_t to send on another location. This task
617 will not be usable anymore when the function will return. There is
618 no automatic task duplication and you have to save your parameters
619 before calling this function. Tasks are unique and once it has been
620 sent to another location, you should not access it anymore. You do
621 not need to call MSG_task_destroy() but to avoid using, as an
622 effect of inattention, this task anymore, you definitely should
623 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
624 can be transfered iff it has been correctly created with
626 * \param dest the destination of the message
627 * \param channel the channel on which the agent should put this
628 task. This value has to be >=0 and < than the maximal number of
629 channels fixed with MSG_set_channel_number().
630 * \return #MSG_FATAL if \a task is not properly initialized and
631 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
632 * this function was called was shut down. Returns
633 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
634 * (network failure, dest failure)
636 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
638 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
641 /** \ingroup msg_gos_functions
642 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
648 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
651 task->simdata->rate = maxrate;
652 return MSG_task_put(task, dest, channel);
655 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
656 * host (with a timeout on the waiting of the destination host) and
657 * waits for the end of the transmission.
659 * This function is used for describing the behavior of an agent. It
660 * takes four parameter.
661 * \param task a #m_task_t to send on another location. This task
662 will not be usable anymore when the function will return. There is
663 no automatic task duplication and you have to save your parameters
664 before calling this function. Tasks are unique and once it has been
665 sent to another location, you should not access it anymore. You do
666 not need to call MSG_task_destroy() but to avoid using, as an
667 effect of inattention, this task anymore, you definitely should
668 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
669 can be transfered iff it has been correctly created with
671 * \param dest the destination of the message
672 * \param channel the channel on which the agent should put this
673 task. This value has to be >=0 and < than the maximal number of
674 channels fixed with MSG_set_channel_number().
675 * \param timeout the maximum time to wait for a task before giving
676 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
678 * \return #MSG_FATAL if \a task is not properly initialized and
679 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
680 this function was called was shut down. Returns
681 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
682 (network failure, dest failure, timeout...)
685 MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
686 m_channel_t channel, double timeout)
688 xbt_assert1((channel >= 0)
689 && (channel < msg_global->max_channel), "Invalid channel %d",
693 MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel
694 (dest, channel), task, timeout);
697 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
699 DEBUG1("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
700 return MSG_task_send_with_timeout(task, alias, -1);
705 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
707 task->simdata->rate = maxrate;
708 return MSG_task_send(task, alias);
713 MSG_task_send_with_timeout(m_task_t task, const char *alias,
716 return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias),
720 int MSG_task_listen(const char *alias)
724 return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
727 /** \ingroup msg_gos_functions
728 * \brief Test whether there is a pending communication on a channel.
730 * It takes one parameter.
731 * \param channel the channel on which the agent should be
732 listening. This value has to be >=0 and < than the maximal
733 number of channels fixed with MSG_set_channel_number().
734 * \return 1 if there is a pending communication and 0 otherwise
736 int MSG_task_Iprobe(m_channel_t channel)
738 xbt_assert1((channel >= 0)
739 && (channel < msg_global->max_channel), "Invalid channel %d",
745 !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
746 (MSG_host_self(), channel));
749 /** \ingroup msg_gos_functions
751 * \brief Return the number of tasks waiting to be received on a \a
752 channel and sent by \a host.
754 * It takes two parameters.
755 * \param channel the channel on which the agent should be
756 listening. This value has to be >=0 and < than the maximal
757 number of channels fixed with MSG_set_channel_number().
758 * \param host the host that is to be watched.
759 * \return the number of tasks waiting to be received on \a channel
762 int MSG_task_probe_from_host(int channel, m_host_t host)
764 xbt_assert1((channel >= 0)
765 && (channel < msg_global->max_channel), "Invalid channel %d",
771 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
772 (MSG_host_self(), channel),
777 int MSG_task_listen_from_host(const char *alias, m_host_t host)
782 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias
786 /** \ingroup msg_gos_functions
787 * \brief Test whether there is a pending communication on a channel, and who sent it.
789 * It takes one parameter.
790 * \param channel the channel on which the agent should be
791 listening. This value has to be >=0 and < than the maximal
792 number of channels fixed with MSG_set_channel_number().
793 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
795 int MSG_task_probe_from(m_channel_t channel)
801 xbt_assert1((channel >= 0)
802 && (channel < msg_global->max_channel), "Invalid channel %d",
807 MSG_mailbox_get_head(MSG_mailbox_get_by_channel
808 (MSG_host_self(), channel))))
811 return MSG_process_get_PID(task->simdata->sender);
814 int MSG_task_listen_from(const char *alias)
821 (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
824 return MSG_process_get_PID(task->simdata->sender);
827 /** \ingroup msg_gos_functions
828 * \brief Wait for at most \a max_duration second for a task reception
831 * \a PID is updated with the PID of the first process that triggered this event if any.
833 * It takes three parameters:
834 * \param channel the channel on which the agent should be
835 listening. This value has to be >=0 and < than the maximal.
836 number of channels fixed with MSG_set_channel_number().
837 * \param PID a memory location for storing an int.
838 * \param timeout the maximum time to wait for a task before
839 giving up. In the case of a reception, *\a PID will be updated
840 with the PID of the first process to send a task.
841 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
842 and #MSG_OK otherwise.
845 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
848 simdata_host_t h_simdata = NULL;
852 msg_mailbox_t mailbox;
854 xbt_assert1((channel >= 0)
855 && (channel < msg_global->max_channel), "Invalid channel %d",
862 if (timeout == 0.0) {
863 *PID = MSG_task_probe_from(channel);
868 h_simdata = h->simdata;
870 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
872 while (MSG_mailbox_is_empty(mailbox)) {
879 SIMIX_mutex_lock(h_simdata->mutex);
881 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
882 "A process is already blocked on this channel %d",
885 cond = SIMIX_cond_init();
887 MSG_mailbox_set_cond(mailbox, cond);
890 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
892 SIMIX_cond_wait(cond, h_simdata->mutex);
895 SIMIX_cond_destroy(cond);
896 SIMIX_mutex_unlock(h_simdata->mutex);
898 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
899 MSG_RETURN(MSG_HOST_FAILURE);
902 MSG_mailbox_set_cond(mailbox, NULL);
906 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
911 *PID = MSG_process_get_PID(t->simdata->sender);
919 MSG_error_t MSG_alias_select_from(const char *alias, double timeout,
923 simdata_host_t h_simdata = NULL;
927 msg_mailbox_t mailbox;
933 if (timeout == 0.0) {
934 *PID = MSG_task_listen_from(alias);
939 h_simdata = h->simdata;
941 DEBUG2("Probing on alias %s (%s)", alias, h->name);
943 mailbox = MSG_mailbox_get_by_alias(alias);
945 while (MSG_mailbox_is_empty(mailbox)) {
952 SIMIX_mutex_lock(h_simdata->mutex);
954 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
955 "A process is already blocked on this alias %s", alias);
957 cond = SIMIX_cond_init();
959 MSG_mailbox_set_cond(mailbox, cond);
962 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
964 SIMIX_cond_wait(cond, h_simdata->mutex);
967 SIMIX_cond_destroy(cond);
968 SIMIX_mutex_unlock(h_simdata->mutex);
970 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
971 MSG_RETURN(MSG_HOST_FAILURE);
974 MSG_mailbox_set_cond(mailbox, NULL);
978 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
983 *PID = MSG_process_get_PID(t->simdata->sender);