3 /* Copyright (c) 2002-2007 Arnaud Legrand. */
4 /* Copyright (c) 2007 Bruno Donassolo. */
5 /* All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17 "Logging specific to MSG (gos)");
19 /** \ingroup msg_gos_functions
21 * \brief Return the last value returned by a MSG function (except
24 MSG_error_t MSG_get_errno(void)
26 return PROCESS_GET_ERRNO();
29 /** \ingroup msg_gos_functions
30 * \brief Executes a task and waits for its termination.
32 * This function is used for describing the behavior of an agent. It
33 * takes only one parameter.
34 * \param task a #m_task_t to execute on the location on which the
36 * \return #MSG_FATAL if \a task is not properly initialized and
39 MSG_error_t MSG_task_execute(m_task_t task)
41 simdata_task_t simdata = NULL;
42 m_process_t self = MSG_process_self();
43 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
46 simdata = task->simdata;
48 xbt_assert1((!simdata->compute),
49 //&& (task->simdata->refcount == 1), FIXME: since lua bindings play with this refcount to make sure that tasks don't get gc() twice, this field cannot be used here as is anymore
50 "This task is executed somewhere else. Go fix your code! %d", task->simdata->refcount);
52 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
55 SIMIX_mutex_lock(simdata->mutex);
57 SIMIX_action_execute(SIMIX_host_self(), task->name,
58 simdata->computation_amount);
59 SIMIX_action_set_priority(simdata->compute, simdata->priority);
61 /* changed to waiting action since we are always waiting one action (execute, communicate or sleep) */
62 self->simdata->waiting_action = simdata->compute;
63 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
65 SIMIX_cond_wait(simdata->cond, simdata->mutex);
66 state = SIMIX_action_get_state(simdata->compute);
67 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
68 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
69 self->simdata->waiting_action = NULL;
71 SIMIX_mutex_unlock(simdata->mutex);
74 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
75 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
76 SIMIX_action_destroy(task->simdata->compute);
77 simdata->computation_amount = 0.0;
79 simdata->compute = NULL;
81 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
82 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
83 SIMIX_action_destroy(task->simdata->compute);
85 simdata->compute = NULL;
86 MSG_RETURN(MSG_HOST_FAILURE);
88 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
89 SIMIX_action_destroy(task->simdata->compute);
91 simdata->compute = NULL;
92 MSG_RETURN(MSG_TASK_CANCELLED);
96 /** \ingroup m_task_management
97 * \brief Creates a new #m_task_t (a parallel one....).
99 * A constructor for #m_task_t taking six arguments and returning the
100 corresponding object.
101 * \param name a name for the object. It is for user-level information
103 * \param host_nb the number of hosts implied in the parallel task.
104 * \param host_list an array of \p host_nb m_host_t.
105 * \param computation_amount an array of \p host_nb
106 doubles. computation_amount[i] is the total number of operations
107 that have to be performed on host_list[i].
108 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
109 * \param data a pointer to any data may want to attach to the new
110 object. It is for user-level information and can be NULL. It can
111 be retrieved with the function \ref MSG_task_get_data.
113 * \return The new corresponding object.
116 MSG_parallel_task_create(const char *name, int host_nb,
117 const m_host_t * host_list,
118 double *computation_amount,
119 double *communication_amount, void *data)
122 simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
123 m_task_t task = xbt_new0(s_m_task_t, 1);
124 task->simdata = simdata;
127 task->name = xbt_strdup(name);
131 simdata->computation_amount = 0;
132 simdata->message_size = 0;
133 simdata->cond = SIMIX_cond_init();
134 simdata->mutex = SIMIX_mutex_init();
135 simdata->compute = NULL;
136 simdata->comm = NULL;
137 simdata->rate = -1.0;
138 simdata->refcount = 1;
139 simdata->sender = NULL;
140 simdata->receiver = NULL;
141 simdata->source = NULL;
143 simdata->host_nb = host_nb;
144 simdata->host_list = xbt_new0(smx_host_t, host_nb);
145 simdata->comp_amount = computation_amount;
146 simdata->comm_amount = communication_amount;
148 for (i = 0; i < host_nb; i++)
149 simdata->host_list[i] = host_list[i]->simdata->smx_host;
154 MSG_error_t MSG_parallel_task_execute(m_task_t task)
156 simdata_task_t simdata = NULL;
157 m_process_t self = MSG_process_self();
158 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
161 simdata = task->simdata;
163 xbt_assert0((!simdata->compute)
164 && (task->simdata->refcount == 1),
165 "This task is executed somewhere else. Go fix your code!");
167 xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
169 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
173 SIMIX_mutex_lock(simdata->mutex);
175 SIMIX_action_parallel_execute(task->name, simdata->host_nb,
176 simdata->host_list, simdata->comp_amount,
177 simdata->comm_amount, 1.0, -1.0);
179 self->simdata->waiting_action = simdata->compute;
180 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
182 SIMIX_cond_wait(simdata->cond, simdata->mutex);
183 state = SIMIX_action_get_state(task->simdata->compute);
184 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
186 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
187 self->simdata->waiting_action = NULL;
190 SIMIX_mutex_unlock(simdata->mutex);
193 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
194 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
195 SIMIX_action_destroy(task->simdata->compute);
196 simdata->computation_amount = 0.0;
197 simdata->comm = NULL;
198 simdata->compute = NULL;
200 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
201 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
202 SIMIX_action_destroy(task->simdata->compute);
203 simdata->comm = NULL;
204 simdata->compute = NULL;
205 MSG_RETURN(MSG_HOST_FAILURE);
207 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
208 SIMIX_action_destroy(task->simdata->compute);
209 simdata->comm = NULL;
210 simdata->compute = NULL;
211 MSG_RETURN(MSG_TASK_CANCELLED);
217 /** \ingroup msg_gos_functions
218 * \brief Sleep for the specified number of seconds
220 * Makes the current process sleep until \a time seconds have elapsed.
222 * \param nb_sec a number of second
224 MSG_error_t MSG_process_sleep(double nb_sec)
226 smx_action_t act_sleep;
227 m_process_t proc = MSG_process_self();
228 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
232 /* create action to sleep */
234 SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
237 mutex = SIMIX_mutex_init();
238 SIMIX_mutex_lock(mutex);
240 /* create conditional and register action to it */
241 cond = SIMIX_cond_init();
243 proc->simdata->waiting_action = act_sleep;
244 SIMIX_register_action_to_condition(act_sleep, cond);
246 SIMIX_cond_wait(cond, mutex);
247 state = SIMIX_action_get_state(act_sleep);
248 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
249 proc->simdata->waiting_action = NULL;
250 SIMIX_unregister_action_to_condition(act_sleep, cond);
251 SIMIX_mutex_unlock(mutex);
253 /* remove variables */
254 SIMIX_cond_destroy(cond);
255 SIMIX_mutex_destroy(mutex);
257 if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
258 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
259 SIMIX_action_destroy(act_sleep);
260 MSG_RETURN(MSG_HOST_FAILURE);
263 SIMIX_action_destroy(act_sleep);
264 MSG_RETURN(MSG_HOST_FAILURE);
267 SIMIX_action_destroy(act_sleep);
271 /** \ingroup msg_gos_functions
272 * \brief Listen on \a channel and waits for receiving a task from \a host.
274 * It takes three parameters.
275 * \param task a memory location for storing a #m_task_t. It will
276 hold a task when this function will return. Thus \a task should not
277 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
278 those two condition does not hold, there will be a warning message.
279 * \param channel the channel on which the agent should be
280 listening. This value has to be >=0 and < than the maximal
281 number of channels fixed with MSG_set_channel_number().
282 * \param host the host that is to be watched.
283 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
284 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
287 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
289 return MSG_task_get_ext(task, channel, -1, host);
292 /** \ingroup msg_gos_functions
293 * \brief Listen on a channel and wait for receiving a task.
295 * It takes two parameters.
296 * \param task a memory location for storing a #m_task_t. It will
297 hold a task when this function will return. Thus \a task should not
298 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
299 those two condition does not hold, there will be a warning message.
300 * \param channel the channel on which the agent should be
301 listening. This value has to be >=0 and < than the maximal
302 number of channels fixed with MSG_set_channel_number().
303 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
304 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
306 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
308 return MSG_task_get_with_timeout(task, channel, -1);
311 /** \ingroup msg_gos_functions
312 * \brief Listen on a channel and wait for receiving a task with a timeout.
314 * It takes three parameters.
315 * \param task a memory location for storing a #m_task_t. It will
316 hold a task when this function will return. Thus \a task should not
317 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
318 those two condition does not hold, there will be a warning message.
319 * \param channel the channel on which the agent should be
320 listening. This value has to be >=0 and < than the maximal
321 number of channels fixed with MSG_set_channel_number().
322 * \param max_duration the maximum time to wait for a task before giving
323 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
324 will not be modified and will still be
325 equal to \c NULL when returning.
326 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
327 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
330 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
333 return MSG_task_get_ext(task, channel, max_duration, NULL);
336 /** \defgroup msg_gos_functions MSG Operating System Functions
337 * \brief This section describes the functions that can be used
338 * by an agent for handling some task.
342 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
345 xbt_assert1((channel >= 0)
346 && (channel < msg_global->max_channel), "Invalid channel %d",
350 MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
351 (MSG_host_self(), channel), task, host, timeout);
355 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
357 return MSG_task_receive_ext(task, alias, -1, host);
360 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
362 return MSG_task_receive_with_timeout(task, alias, -1);
366 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
369 return MSG_task_receive_ext(task, alias, timeout, NULL);
373 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
376 return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
381 /** \ingroup msg_gos_functions
382 * \brief Put a task on a channel of an host and waits for the end of the
385 * This function is used for describing the behavior of an agent. It
386 * takes three parameter.
387 * \param task a #m_task_t to send on another location. This task
388 will not be usable anymore when the function will return. There is
389 no automatic task duplication and you have to save your parameters
390 before calling this function. Tasks are unique and once it has been
391 sent to another location, you should not access it anymore. You do
392 not need to call MSG_task_destroy() but to avoid using, as an
393 effect of inattention, this task anymore, you definitely should
394 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
395 can be transfered iff it has been correctly created with
397 * \param dest the destination of the message
398 * \param channel the channel on which the agent should put this
399 task. This value has to be >=0 and < than the maximal number of
400 channels fixed with MSG_set_channel_number().
401 * \return #MSG_FATAL if \a task is not properly initialized and
402 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
403 * this function was called was shut down. Returns
404 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
405 * (network failure, dest failure)
407 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
409 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
412 /** \ingroup msg_gos_functions
413 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
419 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
422 task->simdata->rate = maxrate;
423 return MSG_task_put(task, dest, channel);
426 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
427 * host (with a timeout on the waiting of the destination host) and
428 * waits for the end of the transmission.
430 * This function is used for describing the behavior of an agent. It
431 * takes four parameter.
432 * \param task a #m_task_t to send on another location. This task
433 will not be usable anymore when the function will return. There is
434 no automatic task duplication and you have to save your parameters
435 before calling this function. Tasks are unique and once it has been
436 sent to another location, you should not access it anymore. You do
437 not need to call MSG_task_destroy() but to avoid using, as an
438 effect of inattention, this task anymore, you definitely should
439 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
440 can be transfered iff it has been correctly created with
442 * \param dest the destination of the message
443 * \param channel the channel on which the agent should put this
444 task. This value has to be >=0 and < than the maximal number of
445 channels fixed with MSG_set_channel_number().
446 * \param timeout the maximum time to wait for a task before giving
447 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
449 * \return #MSG_FATAL if \a task is not properly initialized and
450 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
451 this function was called was shut down. Returns
452 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
453 (network failure, dest failure, timeout...)
456 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
459 xbt_assert1((channel >= 0)
460 && (channel < msg_global->max_channel), "Invalid channel %d",
464 MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
468 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
470 return MSG_task_send_with_timeout(task, alias, -1);
475 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
477 task->simdata->rate = maxrate;
478 return MSG_task_send(task, alias);
483 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
485 return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
489 int MSG_task_listen(const char *alias)
493 return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
496 /** \ingroup msg_gos_functions
497 * \brief Test whether there is a pending communication on a channel.
499 * It takes one parameter.
500 * \param channel the channel on which the agent should be
501 listening. This value has to be >=0 and < than the maximal
502 number of channels fixed with MSG_set_channel_number().
503 * \return 1 if there is a pending communication and 0 otherwise
505 int MSG_task_Iprobe(m_channel_t channel)
507 xbt_assert1((channel >= 0)
508 && (channel < msg_global->max_channel), "Invalid channel %d",
514 !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
515 (MSG_host_self(), channel));
518 /** \ingroup msg_gos_functions
520 * \brief Return the number of tasks waiting to be received on a \a
521 channel and sent by \a host.
523 * It takes two parameters.
524 * \param channel the channel on which the agent should be
525 listening. This value has to be >=0 and < than the maximal
526 number of channels fixed with MSG_set_channel_number().
527 * \param host the host that is to be watched.
528 * \return the number of tasks waiting to be received on \a channel
531 int MSG_task_probe_from_host(int channel, m_host_t host)
533 xbt_assert1((channel >= 0)
534 && (channel < msg_global->max_channel), "Invalid channel %d",
540 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
541 (MSG_host_self(), channel),
546 int MSG_task_listen_from_host(const char *alias, m_host_t host)
551 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
555 /** \ingroup msg_gos_functions
556 * \brief Test whether there is a pending communication on a channel, and who sent it.
558 * It takes one parameter.
559 * \param channel the channel on which the agent should be
560 listening. This value has to be >=0 and < than the maximal
561 number of channels fixed with MSG_set_channel_number().
562 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
564 int MSG_task_probe_from(m_channel_t channel)
570 xbt_assert1((channel >= 0)
571 && (channel < msg_global->max_channel), "Invalid channel %d",
576 MSG_mailbox_get_head(MSG_mailbox_get_by_channel
577 (MSG_host_self(), channel))))
580 return MSG_process_get_PID(task->simdata->sender);
583 int MSG_task_listen_from(const char *alias)
589 if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
592 return MSG_process_get_PID(task->simdata->sender);
595 /** \ingroup msg_gos_functions
596 * \brief Wait for at most \a max_duration second for a task reception
599 * \a PID is updated with the PID of the first process that triggered this event if any.
601 * It takes three parameters:
602 * \param channel the channel on which the agent should be
603 listening. This value has to be >=0 and < than the maximal.
604 number of channels fixed with MSG_set_channel_number().
605 * \param PID a memory location for storing an int.
606 * \param timeout the maximum time to wait for a task before
607 giving up. In the case of a reception, *\a PID will be updated
608 with the PID of the first process to send a task.
609 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
610 and #MSG_OK otherwise.
613 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
616 simdata_host_t h_simdata = NULL;
620 msg_mailbox_t mailbox;
622 xbt_assert1((channel >= 0)
623 && (channel < msg_global->max_channel), "Invalid channel %d",
630 if (timeout == 0.0) {
631 *PID = MSG_task_probe_from(channel);
636 h_simdata = h->simdata;
638 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
640 while (MSG_mailbox_is_empty(mailbox)) {
647 SIMIX_mutex_lock(h_simdata->mutex);
649 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
650 "A process is already blocked on this channel %d", channel);
652 cond = SIMIX_cond_init();
654 MSG_mailbox_set_cond(mailbox, cond);
657 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
659 SIMIX_cond_wait(cond, h_simdata->mutex);
662 SIMIX_cond_destroy(cond);
663 SIMIX_mutex_unlock(h_simdata->mutex);
665 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
666 MSG_RETURN(MSG_HOST_FAILURE);
669 MSG_mailbox_set_cond(mailbox, NULL);
673 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
678 *PID = MSG_process_get_PID(t->simdata->sender);
686 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
689 simdata_host_t h_simdata = NULL;
693 msg_mailbox_t mailbox;
699 if (timeout == 0.0) {
700 *PID = MSG_task_listen_from(alias);
705 h_simdata = h->simdata;
707 DEBUG2("Probing on alias %s (%s)", alias, h->name);
709 mailbox = MSG_mailbox_get_by_alias(alias);
711 while (MSG_mailbox_is_empty(mailbox)) {
718 SIMIX_mutex_lock(h_simdata->mutex);
720 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
721 "A process is already blocked on this alias %s", alias);
723 cond = SIMIX_cond_init();
725 MSG_mailbox_set_cond(mailbox, cond);
728 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
730 SIMIX_cond_wait(cond, h_simdata->mutex);
733 SIMIX_cond_destroy(cond);
734 SIMIX_mutex_unlock(h_simdata->mutex);
736 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
737 MSG_RETURN(MSG_HOST_FAILURE);
740 MSG_mailbox_set_cond(mailbox, NULL);
744 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
749 *PID = MSG_process_get_PID(t->simdata->sender);