3 /* Copyright (c) 2002-2007 Arnaud Legrand. */
4 /* Copyright (c) 2007 Bruno Donassolo. */
5 /* All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17 "Logging specific to MSG (gos)");
19 /** \ingroup msg_gos_functions
21 * \brief Return the last value returned by a MSG function (except
24 MSG_error_t MSG_get_errno(void)
26 return PROCESS_GET_ERRNO();
29 /** \ingroup msg_gos_functions
30 * \brief Executes a task and waits for its termination.
32 * This function is used for describing the behavior of an agent. It
33 * takes only one parameter.
34 * \param task a #m_task_t to execute on the location on which the
36 * \return #MSG_FATAL if \a task is not properly initialized and
39 MSG_error_t MSG_task_execute(m_task_t task)
41 simdata_task_t simdata = NULL;
42 m_process_t self = MSG_process_self();
43 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
46 simdata = task->simdata;
47 xbt_assert0((!simdata->compute)
48 && (task->simdata->refcount == 1),
49 "This task is executed somewhere else. Go fix your code!");
51 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
54 SIMIX_mutex_lock(simdata->mutex);
56 SIMIX_action_execute(SIMIX_host_self(), task->name,
57 simdata->computation_amount);
58 SIMIX_action_set_priority(simdata->compute, simdata->priority);
60 self->simdata->waiting_task = task;
61 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
63 SIMIX_cond_wait(simdata->cond, simdata->mutex);
64 state = SIMIX_action_get_state(simdata->compute);
65 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
66 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
67 self->simdata->waiting_task = NULL;
69 SIMIX_mutex_unlock(simdata->mutex);
72 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
73 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
74 SIMIX_action_destroy(task->simdata->compute);
75 simdata->computation_amount = 0.0;
77 simdata->compute = NULL;
79 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
80 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
81 SIMIX_action_destroy(task->simdata->compute);
83 simdata->compute = NULL;
84 MSG_RETURN(MSG_HOST_FAILURE);
86 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
87 SIMIX_action_destroy(task->simdata->compute);
89 simdata->compute = NULL;
90 MSG_RETURN(MSG_TASK_CANCELLED);
94 /** \ingroup m_task_management
95 * \brief Creates a new #m_task_t (a parallel one....).
97 * A constructor for #m_task_t taking six arguments and returning the
99 * \param name a name for the object. It is for user-level information
101 * \param host_nb the number of hosts implied in the parallel task.
102 * \param host_list an array of \p host_nb m_host_t.
103 * \param computation_amount an array of \p host_nb
104 doubles. computation_amount[i] is the total number of operations
105 that have to be performed on host_list[i].
106 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
107 * \param data a pointer to any data may want to attach to the new
108 object. It is for user-level information and can be NULL. It can
109 be retrieved with the function \ref MSG_task_get_data.
111 * \return The new corresponding object.
114 MSG_parallel_task_create(const char *name, int host_nb,
115 const m_host_t * host_list,
116 double *computation_amount,
117 double *communication_amount, void *data)
120 simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
121 m_task_t task = xbt_new0(s_m_task_t, 1);
122 task->simdata = simdata;
125 task->name = xbt_strdup(name);
129 simdata->computation_amount = 0;
130 simdata->message_size = 0;
131 simdata->cond = SIMIX_cond_init();
132 simdata->mutex = SIMIX_mutex_init();
133 simdata->compute = NULL;
134 simdata->comm = NULL;
135 simdata->rate = -1.0;
136 simdata->refcount = 1;
137 simdata->sender = NULL;
138 simdata->receiver = NULL;
139 simdata->source = NULL;
141 simdata->host_nb = host_nb;
142 simdata->host_list = xbt_new0(smx_host_t, host_nb);
143 simdata->comp_amount = computation_amount;
144 simdata->comm_amount = communication_amount;
146 for (i = 0; i < host_nb; i++)
147 simdata->host_list[i] = host_list[i]->simdata->smx_host;
152 MSG_error_t MSG_parallel_task_execute(m_task_t task)
154 simdata_task_t simdata = NULL;
155 m_process_t self = MSG_process_self();
156 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
159 simdata = task->simdata;
161 xbt_assert0((!simdata->compute)
162 && (task->simdata->refcount == 1),
163 "This task is executed somewhere else. Go fix your code!");
165 xbt_assert0(simdata->host_nb, "This is not a parallel task. Go to hell.");
167 DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
171 SIMIX_mutex_lock(simdata->mutex);
173 SIMIX_action_parallel_execute(task->name, simdata->host_nb,
174 simdata->host_list, simdata->comp_amount,
175 simdata->comm_amount, 1.0, -1.0);
177 self->simdata->waiting_task = task;
178 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
180 SIMIX_cond_wait(simdata->cond, simdata->mutex);
181 state = SIMIX_action_get_state(task->simdata->compute);
182 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
184 SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
185 self->simdata->waiting_task = NULL;
188 SIMIX_mutex_unlock(simdata->mutex);
191 if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
192 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
193 SIMIX_action_destroy(task->simdata->compute);
194 simdata->computation_amount = 0.0;
195 simdata->comm = NULL;
196 simdata->compute = NULL;
198 } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
199 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
200 SIMIX_action_destroy(task->simdata->compute);
201 simdata->comm = NULL;
202 simdata->compute = NULL;
203 MSG_RETURN(MSG_HOST_FAILURE);
205 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
206 SIMIX_action_destroy(task->simdata->compute);
207 simdata->comm = NULL;
208 simdata->compute = NULL;
209 MSG_RETURN(MSG_TASK_CANCELLED);
215 /** \ingroup msg_gos_functions
216 * \brief Sleep for the specified number of seconds
218 * Makes the current process sleep until \a time seconds have elapsed.
220 * \param nb_sec a number of second
222 MSG_error_t MSG_process_sleep(double nb_sec)
224 smx_action_t act_sleep;
225 m_process_t proc = MSG_process_self();
226 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
230 /* create action to sleep */
232 SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
235 mutex = SIMIX_mutex_init();
236 SIMIX_mutex_lock(mutex);
238 /* create conditional and register action to it */
239 cond = SIMIX_cond_init();
241 SIMIX_register_action_to_condition(act_sleep, cond);
243 SIMIX_cond_wait(cond, mutex);
244 state = SIMIX_action_get_state(act_sleep);
245 } while (state == SURF_ACTION_READY || state == SURF_ACTION_RUNNING);
246 SIMIX_unregister_action_to_condition(act_sleep, cond);
247 SIMIX_mutex_unlock(mutex);
249 /* remove variables */
250 SIMIX_cond_destroy(cond);
251 SIMIX_mutex_destroy(mutex);
253 if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
254 if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_RESOURCE_OFF) {
255 SIMIX_action_destroy(act_sleep);
256 MSG_RETURN(MSG_HOST_FAILURE);
259 SIMIX_action_destroy(act_sleep);
260 MSG_RETURN(MSG_HOST_FAILURE);
263 SIMIX_action_destroy(act_sleep);
267 /** \ingroup msg_gos_functions
268 * \brief Return the number of MSG tasks currently running on
269 * the host of the current running process.
271 static int MSG_get_msgload(void)
273 xbt_die("not implemented yet");
279 /** \ingroup msg_gos_functions
280 * \brief Listen on \a channel and waits for receiving a task from \a host.
282 * It takes three parameters.
283 * \param task a memory location for storing a #m_task_t. It will
284 hold a task when this function will return. Thus \a task should not
285 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
286 those two condition does not hold, there will be a warning message.
287 * \param channel the channel on which the agent should be
288 listening. This value has to be >=0 and < than the maximal
289 number of channels fixed with MSG_set_channel_number().
290 * \param host the host that is to be watched.
291 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
292 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
295 MSG_task_get_from_host(m_task_t * task, m_channel_t channel, m_host_t host)
297 return MSG_task_get_ext(task, channel, -1, host);
300 /** \ingroup msg_gos_functions
301 * \brief Listen on a channel and wait for receiving a task.
303 * It takes two parameters.
304 * \param task a memory location for storing a #m_task_t. It will
305 hold a task when this function will return. Thus \a task should not
306 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
307 those two condition does not hold, there will be a warning message.
308 * \param channel the channel on which the agent should be
309 listening. This value has to be >=0 and < than the maximal
310 number of channels fixed with MSG_set_channel_number().
311 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
312 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
314 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
316 return MSG_task_get_with_timeout(task, channel, -1);
319 /** \ingroup msg_gos_functions
320 * \brief Listen on a channel and wait for receiving a task with a timeout.
322 * It takes three parameters.
323 * \param task a memory location for storing a #m_task_t. It will
324 hold a task when this function will return. Thus \a task should not
325 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
326 those two condition does not hold, there will be a warning message.
327 * \param channel the channel on which the agent should be
328 listening. This value has to be >=0 and < than the maximal
329 number of channels fixed with MSG_set_channel_number().
330 * \param max_duration the maximum time to wait for a task before giving
331 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
332 will not be modified and will still be
333 equal to \c NULL when returning.
334 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
335 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
338 MSG_task_get_with_timeout(m_task_t * task, m_channel_t channel,
341 return MSG_task_get_ext(task, channel, max_duration, NULL);
344 /** \defgroup msg_gos_functions MSG Operating System Functions
345 * \brief This section describes the functions that can be used
346 * by an agent for handling some task.
350 MSG_task_get_ext(m_task_t * task, m_channel_t channel, double timeout,
353 xbt_assert1((channel >= 0)
354 && (channel < msg_global->max_channel), "Invalid channel %d",
358 MSG_mailbox_get_task_ext(MSG_mailbox_get_by_channel
359 (MSG_host_self(), channel), task, host, timeout);
363 MSG_task_receive_from_host(m_task_t * task, const char *alias, m_host_t host)
365 return MSG_task_receive_ext(task, alias, -1, host);
368 MSG_error_t MSG_task_receive(m_task_t * task, const char *alias)
370 return MSG_task_receive_with_timeout(task, alias, -1);
374 MSG_task_receive_with_timeout(m_task_t * task, const char *alias,
377 return MSG_task_receive_ext(task, alias, timeout, NULL);
381 MSG_task_receive_ext(m_task_t * task, const char *alias, double timeout,
384 return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, host,
389 /** \ingroup msg_gos_functions
390 * \brief Put a task on a channel of an host and waits for the end of the
393 * This function is used for describing the behavior of an agent. It
394 * takes three parameter.
395 * \param task a #m_task_t to send on another location. This task
396 will not be usable anymore when the function will return. There is
397 no automatic task duplication and you have to save your parameters
398 before calling this function. Tasks are unique and once it has been
399 sent to another location, you should not access it anymore. You do
400 not need to call MSG_task_destroy() but to avoid using, as an
401 effect of inattention, this task anymore, you definitely should
402 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
403 can be transfered iff it has been correctly created with
405 * \param dest the destination of the message
406 * \param channel the channel on which the agent should put this
407 task. This value has to be >=0 and < than the maximal number of
408 channels fixed with MSG_set_channel_number().
409 * \return #MSG_FATAL if \a task is not properly initialized and
410 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
411 * this function was called was shut down. Returns
412 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
413 * (network failure, dest failure)
415 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
417 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
420 /** \ingroup msg_gos_functions
421 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
427 MSG_task_put_bounded(m_task_t task, m_host_t dest, m_channel_t channel,
430 task->simdata->rate = maxrate;
431 return MSG_task_put(task, dest, channel);
434 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
435 * host (with a timeout on the waiting of the destination host) and
436 * waits for the end of the transmission.
438 * This function is used for describing the behavior of an agent. It
439 * takes four parameter.
440 * \param task a #m_task_t to send on another location. This task
441 will not be usable anymore when the function will return. There is
442 no automatic task duplication and you have to save your parameters
443 before calling this function. Tasks are unique and once it has been
444 sent to another location, you should not access it anymore. You do
445 not need to call MSG_task_destroy() but to avoid using, as an
446 effect of inattention, this task anymore, you definitely should
447 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
448 can be transfered iff it has been correctly created with
450 * \param dest the destination of the message
451 * \param channel the channel on which the agent should put this
452 task. This value has to be >=0 and < than the maximal number of
453 channels fixed with MSG_set_channel_number().
454 * \param timeout the maximum time to wait for a task before giving
455 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
457 * \return #MSG_FATAL if \a task is not properly initialized and
458 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
459 this function was called was shut down. Returns
460 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
461 (network failure, dest failure, timeout...)
464 MSG_task_put_with_timeout(m_task_t task, m_host_t dest, m_channel_t channel,
467 xbt_assert1((channel >= 0)
468 && (channel < msg_global->max_channel), "Invalid channel %d",
472 MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_channel(dest, channel),
476 MSG_error_t MSG_task_send(m_task_t task, const char *alias)
478 return MSG_task_send_with_timeout(task, alias, -1);
483 MSG_task_send_bounded(m_task_t task, const char *alias, double maxrate)
485 task->simdata->rate = maxrate;
486 return MSG_task_send(task, alias);
491 MSG_task_send_with_timeout(m_task_t task, const char *alias, double timeout)
493 return MSG_mailbox_put_with_timeout(MSG_mailbox_get_by_alias(alias), task,
497 int MSG_task_listen(const char *alias)
501 return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
504 /** \ingroup msg_gos_functions
505 * \brief Test whether there is a pending communication on a channel.
507 * It takes one parameter.
508 * \param channel the channel on which the agent should be
509 listening. This value has to be >=0 and < than the maximal
510 number of channels fixed with MSG_set_channel_number().
511 * \return 1 if there is a pending communication and 0 otherwise
513 int MSG_task_Iprobe(m_channel_t channel)
515 xbt_assert1((channel >= 0)
516 && (channel < msg_global->max_channel), "Invalid channel %d",
522 !MSG_mailbox_is_empty(MSG_mailbox_get_by_channel
523 (MSG_host_self(), channel));
526 /** \ingroup msg_gos_functions
528 * \brief Return the number of tasks waiting to be received on a \a
529 channel and sent by \a host.
531 * It takes two parameters.
532 * \param channel the channel on which the agent should be
533 listening. This value has to be >=0 and < than the maximal
534 number of channels fixed with MSG_set_channel_number().
535 * \param host the host that is to be watched.
536 * \return the number of tasks waiting to be received on \a channel
539 int MSG_task_probe_from_host(int channel, m_host_t host)
541 xbt_assert1((channel >= 0)
542 && (channel < msg_global->max_channel), "Invalid channel %d",
548 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_channel
549 (MSG_host_self(), channel),
554 int MSG_task_listen_from_host(const char *alias, m_host_t host)
559 MSG_mailbox_get_count_host_waiting_tasks(MSG_mailbox_get_by_alias(alias),
563 /** \ingroup msg_gos_functions
564 * \brief Test whether there is a pending communication on a channel, and who sent it.
566 * It takes one parameter.
567 * \param channel the channel on which the agent should be
568 listening. This value has to be >=0 and < than the maximal
569 number of channels fixed with MSG_set_channel_number().
570 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
572 int MSG_task_probe_from(m_channel_t channel)
578 xbt_assert1((channel >= 0)
579 && (channel < msg_global->max_channel), "Invalid channel %d",
584 MSG_mailbox_get_head(MSG_mailbox_get_by_channel
585 (MSG_host_self(), channel))))
588 return MSG_process_get_PID(task->simdata->sender);
591 int MSG_task_listen_from(const char *alias)
597 if (NULL == (task = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
600 return MSG_process_get_PID(task->simdata->sender);
603 /** \ingroup msg_gos_functions
604 * \brief Wait for at most \a max_duration second for a task reception
607 * \a PID is updated with the PID of the first process that triggered this event if any.
609 * It takes three parameters:
610 * \param channel the channel on which the agent should be
611 listening. This value has to be >=0 and < than the maximal.
612 number of channels fixed with MSG_set_channel_number().
613 * \param PID a memory location for storing an int.
614 * \param timeout the maximum time to wait for a task before
615 giving up. In the case of a reception, *\a PID will be updated
616 with the PID of the first process to send a task.
617 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
618 and #MSG_OK otherwise.
621 MSG_channel_select_from(m_channel_t channel, double timeout, int *PID)
624 simdata_host_t h_simdata = NULL;
628 msg_mailbox_t mailbox;
630 xbt_assert1((channel >= 0)
631 && (channel < msg_global->max_channel), "Invalid channel %d",
638 if (timeout == 0.0) {
639 *PID = MSG_task_probe_from(channel);
644 h_simdata = h->simdata;
646 mailbox = MSG_mailbox_get_by_channel(MSG_host_self(), channel);
648 while (MSG_mailbox_is_empty(mailbox)) {
655 SIMIX_mutex_lock(h_simdata->mutex);
657 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
658 "A process is already blocked on this channel %d", channel);
660 cond = SIMIX_cond_init();
662 MSG_mailbox_set_cond(mailbox, cond);
665 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
667 SIMIX_cond_wait(cond, h_simdata->mutex);
670 SIMIX_cond_destroy(cond);
671 SIMIX_mutex_unlock(h_simdata->mutex);
673 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
674 MSG_RETURN(MSG_HOST_FAILURE);
677 MSG_mailbox_set_cond(mailbox, NULL);
681 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
686 *PID = MSG_process_get_PID(t->simdata->sender);
694 MSG_error_t MSG_alias_select_from(const char *alias, double timeout, int *PID)
697 simdata_host_t h_simdata = NULL;
701 msg_mailbox_t mailbox;
707 if (timeout == 0.0) {
708 *PID = MSG_task_listen_from(alias);
713 h_simdata = h->simdata;
715 DEBUG2("Probing on alias %s (%s)", alias, h->name);
717 mailbox = MSG_mailbox_get_by_alias(alias);
719 while (MSG_mailbox_is_empty(mailbox)) {
726 SIMIX_mutex_lock(h_simdata->mutex);
728 xbt_assert1(!MSG_mailbox_get_cond(mailbox),
729 "A process is already blocked on this alias %s", alias);
731 cond = SIMIX_cond_init();
733 MSG_mailbox_set_cond(mailbox, cond);
736 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
738 SIMIX_cond_wait(cond, h_simdata->mutex);
741 SIMIX_cond_destroy(cond);
742 SIMIX_mutex_unlock(h_simdata->mutex);
744 if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
745 MSG_RETURN(MSG_HOST_FAILURE);
748 MSG_mailbox_set_cond(mailbox, NULL);
752 if (NULL == (t = MSG_mailbox_get_head(mailbox)))
757 *PID = MSG_process_get_PID(t->simdata->sender);