1 /* Copyright (c) 2002-2007 Arnaud Legrand. */
2 /* Copyright (c) 2007 Bruno Donassolo. */
3 /* All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "msg/private.h"
9 #include "xbt/sysdep.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15 "Logging specific to MSG (gos)");
17 /** \ingroup msg_gos_functions
19 * \brief Return the last value returned by a MSG function (except
22 MSG_error_t MSG_get_errno(void)
24 return PROCESS_GET_ERRNO();
27 /** \ingroup msg_gos_functions
28 * \brief Executes a task and waits for its termination.
30 * This function is used for describing the behavior of an agent. It
31 * takes only one parameter.
32 * \param task a #m_task_t to execute on the location on which the
34 * \return #MSG_FATAL if \a task is not properly initialized and
37 MSG_error_t MSG_task_execute(m_task_t task)
39 simdata_task_t simdata = NULL;
40 m_process_t self = MSG_process_self();
41 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
44 TRACE_msg_task_execute_start (task);
47 simdata = task->simdata;
49 xbt_assert1((!simdata->compute) && (task->simdata->refcount == 1),
50 "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
52 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
54 if (simdata->computation_amount == 0) {
56 TRACE_msg_task_execute_end (task);
61 SIMIX_mutex_lock(simdata->mutex);
63 SIMIX_action_execute(SIMIX_host_self(), task->name,
64 simdata->computation_amount);
65 SIMIX_action_set_priority(simdata->compute, simdata->priority);
67 /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
68 self->simdata->waiting_action = simdata->compute;
69 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
71 SIMIX_cond_wait(simdata->cond, simdata->mutex);
72 state = SIMIX_action_get_state(simdata->compute);
73 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
74 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
75 self->simdata->waiting_action = NULL;
77 SIMIX_mutex_unlock(simdata->mutex);
80 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
81 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
82 SIMIX_action_destroy(task->simdata->compute);
83 simdata->computation_amount = 0.0;
85 simdata->compute = NULL;
87 TRACE_msg_task_execute_end (task);
90 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
91 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
92 SIMIX_action_destroy(task->simdata->compute);
94 simdata->compute = NULL;
96 TRACE_msg_task_execute_end (task);
98 MSG_RETURN(MSG_HOST_FAILURE);
100 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
101 SIMIX_action_destroy(task->simdata->compute);
102 simdata->comm = NULL;
103 simdata->compute = NULL;
105 TRACE_msg_task_execute_end (task);
107 MSG_RETURN(MSG_TASK_CANCELLED);
111 /** \ingroup m_task_management
112 * \brief Creates a new #m_task_t (a parallel one....).
114 * A constructor for #m_task_t taking six arguments and returning the
115 corresponding object.
116 * \param name a name for the object. It is for user-level information
118 * \param host_nb the number of hosts implied in the parallel task.
119 * \param host_list an array of \p host_nb m_host_t.
120 * \param computation_amount an array of \p host_nb
121 doubles. computation_amount[i] is the total number of operations
122 that have to be performed on host_list[i].
123 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
124 * \param data a pointer to any data may want to attach to the new
125 object. It is for user-level information and can be NULL. It can
126 be retrieved with the function \ref MSG_task_get_data.
128 * \return The new corresponding object.
131 MSG_parallel_task_create(const char *name, int host_nb,
132 const m_host_t * host_list,
133 double *computation_amount,
134 double *communication_amount, void *data)
137 simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
138 m_task_t task = xbt_new0(s_m_task_t, 1);
139 task->simdata = simdata;
142 task->name = xbt_strdup(name);
146 simdata->computation_amount = 0;
147 simdata->message_size = 0;
148 simdata->cond = SIMIX_cond_init();
149 simdata->mutex = SIMIX_mutex_init();
150 simdata->compute = NULL;
151 simdata->comm = NULL;
152 simdata->rate = -1.0;
153 simdata->refcount = 1;
154 simdata->sender = NULL;
155 simdata->receiver = NULL;
156 simdata->source = NULL;
158 simdata->host_nb = host_nb;
159 simdata->host_list = xbt_new0(smx_host_t, host_nb);
160 simdata->comp_amount = computation_amount;
161 simdata->comm_amount = communication_amount;
163 for (i = 0; i < host_nb; i++)
164 simdata->host_list[i] = host_list[i]->simdata->smx_host;
169 MSG_error_t MSG_parallel_task_execute(m_task_t task)
171 simdata_task_t simdata = NULL;
172 m_process_t self = MSG_process_self();
173 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
176 simdata = task->simdata;
178 xbt_assert0((!simdata->compute)
179 && (task->simdata->refcount == 1),
180 "This task is executed somewhere else. Go fix your code!");
182 xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
184 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
188 SIMIX_mutex_lock(simdata->mutex);
190 SIMIX_action_parallel_execute(task->name, simdata->host_nb,
191 simdata->host_list, simdata->comp_amount,
192 simdata->comm_amount, 1.0, -1.0);
194 self->simdata->waiting_action = simdata->compute;
195 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
197 SIMIX_cond_wait(simdata->cond, simdata->mutex);
198 state = SIMIX_action_get_state(task->simdata->compute);
199 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
201 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
202 self->simdata->waiting_action = NULL;
205 SIMIX_mutex_unlock(simdata->mutex);
208 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
209 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
210 SIMIX_action_destroy(task->simdata->compute);
211 simdata->computation_amount = 0.0;
212 simdata->comm = NULL;
213 simdata->compute = NULL;
215 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
216 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
217 SIMIX_action_destroy(task->simdata->compute);
218 simdata->comm = NULL;
219 simdata->compute = NULL;
220 MSG_RETURN(MSG_HOST_FAILURE);
222 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
223 SIMIX_action_destroy(task->simdata->compute);
224 simdata->comm = NULL;
225 simdata->compute = NULL;
226 MSG_RETURN(MSG_TASK_CANCELLED);
232 /** \ingroup msg_gos_functions
233 * \brief Sleep for the specified number of seconds
235 * Makes the current process sleep until \a time seconds have elapsed.
237 * \param nb_sec a number of second
239 MSG_error_t MSG_process_sleep(double nb_sec)
241 smx_action_t act_sleep;
242 m_process_t proc = MSG_process_self();
243 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
248 TRACE_msg_process_sleep_in (MSG_process_self());
251 /* create action to sleep */
253 SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
256 mutex = SIMIX_mutex_init();
257 SIMIX_mutex_lock(mutex);
259 /* create conditional and register action to it */
260 cond = SIMIX_cond_init();
262 proc->simdata->waiting_action = act_sleep;
263 SIMIX_register_action_to_condition(act_sleep, cond);
265 SIMIX_cond_wait(cond, mutex);
266 state = SIMIX_action_get_state(act_sleep);
267 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
268 proc->simdata->waiting_action = NULL;
269 SIMIX_unregister_action_to_condition(act_sleep, cond);
270 SIMIX_mutex_unlock(mutex);
272 /* remove variables */
273 SIMIX_cond_destroy(cond);
274 SIMIX_mutex_destroy(mutex);
276 if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
277 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
278 SIMIX_action_destroy(act_sleep);
280 TRACE_msg_process_sleep_out (MSG_process_self());
282 MSG_RETURN(MSG_HOST_FAILURE);
285 SIMIX_action_destroy(act_sleep);
287 TRACE_msg_process_sleep_out (MSG_process_self());
289 MSG_RETURN(MSG_HOST_FAILURE);
292 SIMIX_action_destroy(act_sleep);
294 TRACE_msg_process_sleep_out (MSG_process_self());
299 /** \ingroup msg_gos_functions
300 * \brief Listen on \a channel and waits for receiving a task from \a host.
302 * It takes three parameters.
303 * \param task a memory location for storing a #m_task_t. It will
304 hold a task when this function will return. Thus \a task should not
305 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
306 those two condition does not hold, there will be a warning message.
307 * \param channel the channel on which the agent should be
308 listening. This value has to be >=0 and < than the maximal
309 number of channels fixed with MSG_set_channel_number().
310 * \param host the host that is to be watched.
311 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
312 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
315 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
317 return MSG_task_get_ext(task, channel, -1, host);
320 /** \ingroup msg_gos_functions
321 * \brief Listen on a channel and wait for receiving a task.
323 * It takes two parameters.
324 * \param task a memory location for storing a #m_task_t. It will
325 hold a task when this function will return. Thus \a task should not
326 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
327 those two condition does not hold, there will be a warning message.
328 * \param channel the channel on which the agent should be
329 listening. This value has to be >=0 and < than the maximal
330 number of channels fixed with MSG_set_channel_number().
331 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
332 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
334 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
336 return MSG_task_get_with_timeout(task, channel, -1);
339 /** \ingroup msg_gos_functions
340 * \brief Listen on a channel and wait for receiving a task with a timeout.
342 * It takes three parameters.
343 * \param task a memory location for storing a #m_task_t. It will
344 hold a task when this function will return. Thus \a task should not
345 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
346 those two condition does not hold, there will be a warning message.
347 * \param channel the channel on which the agent should be
348 listening. This value has to be >=0 and < than the maximal
349 number of channels fixed with MSG_set_channel_number().
350 * \param max_duration the maximum time to wait for a task before giving
351 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
352 will not be modified and will still be
353 equal to \c NULL when returning.
354 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
355 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
358 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
361 return MSG_task_get_ext(task, channel, max_duration, NULL);
364 /** \defgroup msg_gos_functions MSG Operating System Functions
365 * \brief This section describes the functions that can be used
366 * by an agent for handling some task.
370 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
373 xbt_assert1((channel >= 0)
374 && (channel < msg_global->max_channel), "Invalid channel %d",
378 MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
379 (MSG_host_self(), channel), task, host, timeout);
383 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
385 return MSG_task_receive_ext(task, alias, -1, host);
388 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
390 return MSG_task_receive_with_timeout(task, alias, -1);
394 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
397 return MSG_task_receive_ext(task, alias, timeout, NULL);
401 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
404 return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
409 /** \ingroup msg_gos_functions
410 * \brief Put a task on a channel of an host and waits for the end of the
413 * This function is used for describing the behavior of an agent. It
414 * takes three parameter.
415 * \param task a #m_task_t to send on another location. This task
416 will not be usable anymore when the function will return. There is
417 no automatic task duplication and you have to save your parameters
418 before calling this function. Tasks are unique and once it has been
419 sent to another location, you should not access it anymore. You do
420 not need to call MSG_task_destroy() but to avoid using, as an
421 effect of inattention, this task anymore, you definitely should
422 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
423 can be transfered iff it has been correctly created with
425 * \param dest the destination of the message
426 * \param channel the channel on which the agent should put this
427 task. This value has to be >=0 and < than the maximal number of
428 channels fixed with MSG_set_channel_number().
429 * \return #MSG_FATAL if \a task is not properly initialized and
430 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
431 * this function was called was shut down. Returns
432 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
433 * (network failure, dest failure)
435 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
437 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
440 /** \ingroup msg_gos_functions
441 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
447 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
450 task->simdata->rate = maxrate;
451 return MSG_task_put(task, dest, channel);
454 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
455 * host (with a timeout on the waiting of the destination host) and
456 * waits for the end of the transmission.
458 * This function is used for describing the behavior of an agent. It
459 * takes four parameter.
460 * \param task a #m_task_t to send on another location. This task
461 will not be usable anymore when the function will return. There is
462 no automatic task duplication and you have to save your parameters
463 before calling this function. Tasks are unique and once it has been
464 sent to another location, you should not access it anymore. You do
465 not need to call MSG_task_destroy() but to avoid using, as an
466 effect of inattention, this task anymore, you definitely should
467 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
468 can be transfered iff it has been correctly created with
470 * \param dest the destination of the message
471 * \param channel the channel on which the agent should put this
472 task. This value has to be >=0 and < than the maximal number of
473 channels fixed with MSG_set_channel_number().
474 * \param timeout the maximum time to wait for a task before giving
475 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
477 * \return #MSG_FATAL if \a task is not properly initialized and
478 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
479 this function was called was shut down. Returns
480 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
481 (network failure, dest failure, timeout...)
484 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
487 xbt_assert1((channel >= 0)
488 && (channel < msg_global->max_channel), "Invalid channel %d",
492 MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
496 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
498 return MSG_task_send_with_timeout(task, alias, -1);
503 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
505 task->simdata->rate = maxrate;
506 return MSG_task_send(task, alias);
511 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
513 return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
517 int MSG_task_listen(const char *alias)
521 return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
524 /** \ingroup msg_gos_functions
525 * \brief Test whether there is a pending communication on a channel.
527 * It takes one parameter.
528 * \param channel the channel on which the agent should be
529 listening. This value has to be >=0 and < than the maximal
530 number of channels fixed with MSG_set_channel_number().
531 * \return 1 if there is a pending communication and 0 otherwise
533 int MSG_task_Iprobe(m_channel_t channel)
535 xbt_assert1((channel >= 0)
536 && (channel < msg_global->max_channel), "Invalid channel %d",
542 !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
543 (MSG_host_self(), channel));
546 /** \ingroup msg_gos_functions
548 * \brief Return the number of tasks waiting to be received on a \a
549 channel and sent by \a host.
551 * It takes two parameters.
552 * \param channel the channel on which the agent should be
553 listening. This value has to be >=0 and < than the maximal
554 number of channels fixed with MSG_set_channel_number().
555 * \param host the host that is to be watched.
556 * \return the number of tasks waiting to be received on \a channel
559 int MSG_task_probe_from_host(int channel, m_host_t host)
561 xbt_assert1((channel >= 0)
562 && (channel < msg_global->max_channel), "Invalid channel %d",
568 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
569 (MSG_host_self(), channel),
574 int MSG_task_listen_from_host(const char *alias, m_host_t host)
579 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
583 /** \ingroup msg_gos_functions
584 * \brief Test whether there is a pending communication on a channel, and who sent it.
586 * It takes one parameter.
587 * \param channel the channel on which the agent should be
588 listening. This value has to be >=0 and < than the maximal
589 number of channels fixed with MSG_set_channel_number().
590 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
592 int MSG_task_probe_from(m_channel_t channel)
598 xbt_assert1((channel >= 0)
599 && (channel < msg_global->max_channel), "Invalid channel %d",
604 MSG_mailbox_get_head(MSG_mailbox_get_by_channel
605 (MSG_host_self(), channel))))
608 return MSG_process_get_PID(task->simdata->sender);
611 int MSG_task_listen_from(const char *alias)
617 if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
620 return MSG_process_get_PID(task->simdata->sender);
623 /** \ingroup msg_gos_functions
624 * \brief Wait for at most \a max_duration second for a task reception
627 * \a PID is updated with the PID of the first process that triggered this event if any.
629 * It takes three parameters:
630 * \param channel the channel on which the agent should be
631 listening. This value has to be >=0 and < than the maximal.
632 number of channels fixed with MSG_set_channel_number().
633 * \param PID a memory location for storing an int.
634 * \param timeout the maximum time to wait for a task before
635 giving up. In the case of a reception, *\a PID will be updated
636 with the PID of the first process to send a task.
637 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
638 and #MSG_OK otherwise.
641 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
644 simdata_host_t h_simdata = NULL;
648 msg_mailbox_t mailbox;
650 xbt_assert1((channel >= 0)
651 && (channel < msg_global->max_channel), "Invalid channel %d",
658 if (timeout == 0.0) {
659 *PID = MSG_task_probe_from(channel);
664 h_simdata = h->simdata;
666 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
668 while (MSG_mailbox_is_empty(mailbox)) {
675 SIMIX_mutex_lock(h_simdata->mutex);
677 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
678 "A process is already blocked on this channel %d", channel);
680 cond = SIMIX_cond_init();
682 MSG_mailbox_set_cond(mailbox, cond);
685 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
687 SIMIX_cond_wait(cond, h_simdata->mutex);
690 SIMIX_cond_destroy(cond);
691 SIMIX_mutex_unlock(h_simdata->mutex);
693 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
694 MSG_RETURN(MSG_HOST_FAILURE);
697 MSG_mailbox_set_cond(mailbox, NULL);
701 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
706 *PID = MSG_process_get_PID(t->simdata->sender);
714 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
717 simdata_host_t h_simdata = NULL;
721 msg_mailbox_t mailbox;
727 if (timeout == 0.0) {
728 *PID = MSG_task_listen_from(alias);
733 h_simdata = h->simdata;
735 DEBUG2("Probing on alias %s (%s)", alias, h->name);
737 mailbox = MSG_mailbox_get_by_alias(alias);
739 while (MSG_mailbox_is_empty(mailbox)) {
746 SIMIX_mutex_lock(h_simdata->mutex);
748 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
749 "A process is already blocked on this alias %s", alias);
751 cond = SIMIX_cond_init();
753 MSG_mailbox_set_cond(mailbox, cond);
756 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
758 SIMIX_cond_wait(cond, h_simdata->mutex);
761 SIMIX_cond_destroy(cond);
762 SIMIX_mutex_unlock(h_simdata->mutex);
764 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
765 MSG_RETURN(MSG_HOST_FAILURE);
768 MSG_mailbox_set_cond(mailbox, NULL);
772 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
777 *PID = MSG_process_get_PID(t->simdata->sender);