1 #include "msg_simix_private.h"
2 #include "xbt/sysdep.h"
5 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
7 /** \defgroup msg_gos_functions MSG Operating System Functions
8 * \brief This section describes the functions that can be used
9 * by an agent for handling some task.
12 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
18 m_process_t process = MSG_process_self();
21 simdata_task_t t_simdata = NULL;
22 simdata_host_t h_simdata = NULL;
24 xbt_fifo_item_t item = NULL;
26 smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
29 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
31 xbt_assert0(task,"Null pointer for the task\n");
34 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
38 h_simdata = h->simdata;
40 DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
42 SIMIX_mutex_lock(h->simdata->mutex);
44 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
46 t = xbt_fifo_shift(h_simdata->mbox[channel]);
49 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
50 if(t->simdata->source==host) break;
53 xbt_fifo_remove_item(h->simdata->mbox[channel],item);
61 MSG_RETURN(MSG_TRANSFER_FAILURE);
64 xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel);
66 cond = SIMIX_cond_init();
67 h_simdata->sleeping[channel] = cond;
68 if (max_duration > 0) {
69 SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
71 else SIMIX_cond_wait(h_simdata->sleeping[channel],h->simdata->mutex);
75 SIMIX_mutex_unlock(h->simdata->mutex);
77 DEBUG1("OK, got a task (%s)", t->name);
78 /* clean conditional */
80 SIMIX_cond_destroy(cond);
81 h_simdata->sleeping[channel] = NULL;
84 t_simdata = t->simdata;
85 /* *task = __MSG_task_copy(t); */
88 SIMIX_mutex_lock(t_simdata->mutex);
92 /* create SIMIX action to the communication */
93 t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
94 process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate);
96 if (MSG_process_is_suspended(t_simdata->sender)) {
97 SIMIX_set_priority(t_simdata->comm,0);
98 t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
99 process->simdata->host->simdata->host,t->name, t_simdata->message_size, t_simdata->rate);
102 /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
103 SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
104 SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond);
105 SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex);
107 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
108 t->simdata->comm = NULL;
109 t->simdata->compute = NULL;
110 SIMIX_mutex_unlock(t_simdata->mutex);
112 //MSG_task_destroy(t);
117 /** \ingroup msg_gos_functions
118 * \brief Listen on a channel and wait for receiving a task.
120 * It takes two parameters.
121 * \param task a memory location for storing a #m_task_t. It will
122 hold a task when this function will return. Thus \a task should not
123 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
124 those two condition does not hold, there will be a warning message.
125 * \param channel the channel on which the agent should be
126 listening. This value has to be >=0 and < than the maximal
127 number of channels fixed with MSG_set_channel_number().
128 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
129 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
131 MSG_error_t MSG_task_get(m_task_t * task,
134 return MSG_task_get_with_time_out(task, channel, -1);
137 /** \ingroup msg_gos_functions
138 * \brief Listen on a channel and wait for receiving a task with a timeout.
140 * It takes three parameters.
141 * \param task a memory location for storing a #m_task_t. It will
142 hold a task when this function will return. Thus \a task should not
143 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
144 those two condition does not hold, there will be a warning message.
145 * \param channel the channel on which the agent should be
146 listening. This value has to be >=0 and < than the maximal
147 number of channels fixed with MSG_set_channel_number().
148 * \param max_duration the maximum time to wait for a task before giving
149 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
150 will not be modified and will still be
151 equal to \c NULL when returning.
152 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
153 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
155 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
159 return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
162 /** \ingroup msg_gos_functions
163 * \brief Listen on \a channel and waits for receiving a task from \a host.
165 * It takes three parameters.
166 * \param task a memory location for storing a #m_task_t. It will
167 hold a task when this function will return. Thus \a task should not
168 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
169 those two condition does not hold, there will be a warning message.
170 * \param channel the channel on which the agent should be
171 listening. This value has to be >=0 and < than the maximal
172 number of channels fixed with MSG_set_channel_number().
173 * \param host the host that is to be watched.
174 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
177 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
180 return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
183 /** \ingroup msg_gos_functions
184 * \brief Test whether there is a pending communication on a channel.
186 * It takes one parameter.
187 * \param channel the channel on which the agent should be
188 listening. This value has to be >=0 and < than the maximal
189 number of channels fixed with MSG_set_channel_number().
190 * \return 1 if there is a pending communication and 0 otherwise
192 int MSG_task_Iprobe(m_channel_t channel)
196 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
199 DEBUG2("Probing on channel %d (%s)", channel,h->name);
202 return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
205 /** \ingroup msg_gos_functions
206 * \brief Test whether there is a pending communication on a channel, and who sent it.
208 * It takes one parameter.
209 * \param channel the channel on which the agent should be
210 listening. This value has to be >=0 and < than the maximal
211 number of channels fixed with MSG_set_channel_number().
212 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
214 int MSG_task_probe_from(m_channel_t channel)
217 xbt_fifo_item_t item;
220 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
225 DEBUG2("Probing on channel %d (%s)", channel,h->name);
227 item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
228 if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) )
231 return MSG_process_get_PID(t->simdata->sender);
234 /** \ingroup msg_gos_functions
235 * \brief Wait for at most \a max_duration second for a task reception
236 on \a channel. *\a PID is updated with the PID of the first process
237 that triggered this event if any.
239 * It takes three parameters:
240 * \param channel the channel on which the agent should be
241 listening. This value has to be >=0 and < than the maximal.
242 number of channels fixed with MSG_set_channel_number().
243 * \param PID a memory location for storing an int.
244 * \param max_duration the maximum time to wait for a task before
245 giving up. In the case of a reception, *\a PID will be updated
246 with the PID of the first process to send a task.
247 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
248 and #MSG_OK otherwise.
250 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
254 simdata_host_t h_simdata = NULL;
255 xbt_fifo_item_t item;
260 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
265 if(max_duration==0.0) {
266 *PID = MSG_task_probe_from(channel);
271 h_simdata = h->simdata;
273 DEBUG2("Probing on channel %d (%s)", channel,h->name);
274 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
280 SIMIX_mutex_lock(h_simdata->mutex);
281 xbt_assert1(!(h_simdata->sleeping[channel]),
282 "A process is already blocked on this channel %d", channel);
283 cond = SIMIX_cond_init();
284 h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
286 SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration);
288 SIMIX_cond_wait(cond,h_simdata->mutex);
290 SIMIX_cond_destroy(cond);
291 SIMIX_mutex_unlock(h_simdata->mutex);
292 if(SIMIX_host_get_state(h_simdata->host)==0) {
293 MSG_RETURN(MSG_HOST_FAILURE);
295 h_simdata->sleeping[channel] = NULL;
298 if (!item || !(t = xbt_fifo_get_item_content(item))) {
302 *PID = MSG_process_get_PID(t->simdata->sender);
309 /** \ingroup msg_gos_functions
311 * \brief Return the number of tasks waiting to be received on a \a
312 channel and sent by \a host.
314 * It takes two parameters.
315 * \param channel the channel on which the agent should be
316 listening. This value has to be >=0 and < than the maximal
317 number of channels fixed with MSG_set_channel_number().
318 * \param host the host that is to be watched.
319 * \return the number of tasks waiting to be received on \a channel
322 int MSG_task_probe_from_host(int channel, m_host_t host)
324 xbt_die("not implemented yet");
328 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
329 * host (with a timeout on the waiting of the destination host) and
330 * waits for the end of the transmission.
332 * This function is used for describing the behavior of an agent. It
333 * takes four parameter.
334 * \param task a #m_task_t to send on another location. This task
335 will not be usable anymore when the function will return. There is
336 no automatic task duplication and you have to save your parameters
337 before calling this function. Tasks are unique and once it has been
338 sent to another location, you should not access it anymore. You do
339 not need to call MSG_task_destroy() but to avoid using, as an
340 effect of inattention, this task anymore, you definitely should
341 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
342 can be transfered iff it has been correctly created with
344 * \param dest the destination of the message
345 * \param channel the channel on which the agent should put this
346 task. This value has to be >=0 and < than the maximal number of
347 channels fixed with MSG_set_channel_number().
348 * \param max_duration the maximum time to wait for a task before giving
349 up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task
351 * \return #MSG_FATAL if \a task is not properly initialized and
352 #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
353 this function was called was shut down. Returns
354 #MSG_TRANSFER_FAILURE if the transfer could not be properly done
355 (network failure, dest failure, timeout...)
357 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
358 m_channel_t channel, double max_duration)
362 m_process_t process = MSG_process_self();
363 simdata_task_t task_simdata = NULL;
364 m_host_t local_host = NULL;
365 m_host_t remote_host = NULL;
368 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
370 task_simdata = task->simdata;
371 task_simdata->sender = process;
372 task_simdata->source = MSG_process_get_host(process);
373 xbt_assert0(task_simdata->using==1,
374 "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
375 task_simdata->comm = NULL;
377 local_host = ((simdata_process_t) process->simdata)->host;
380 DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
381 task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
383 SIMIX_mutex_lock(remote_host->simdata->mutex);
384 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
385 mbox[channel], task);
388 if(remote_host->simdata->sleeping[channel]) {
389 DEBUG0("Somebody is listening. Let's wake him up!");
390 SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
392 SIMIX_mutex_unlock(remote_host->simdata->mutex);
394 process->simdata->put_host = dest;
395 process->simdata->put_channel = channel;
396 SIMIX_mutex_lock(task->simdata->mutex);
397 // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
398 DEBUG0("Waiting action finish!");
399 if (max_duration >0) {
400 SIMIX_cond_wait_timeout(task->simdata->cond,task->simdata->mutex,max_duration);
403 SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex);
405 DEBUG1("Action terminated %s",task->name);
406 task->simdata->using--;
407 SIMIX_mutex_unlock(task->simdata->mutex);
412 /** \ingroup msg_gos_functions
413 * \brief Put a task on a channel of an host and waits for the end of the
416 * This function is used for describing the behavior of an agent. It
417 * takes three parameter.
418 * \param task a #m_task_t to send on another location. This task
419 will not be usable anymore when the function will return. There is
420 no automatic task duplication and you have to save your parameters
421 before calling this function. Tasks are unique and once it has been
422 sent to another location, you should not access it anymore. You do
423 not need to call MSG_task_destroy() but to avoid using, as an
424 effect of inattention, this task anymore, you definitely should
425 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
426 can be transfered iff it has been correctly created with
428 * \param dest the destination of the message
429 * \param channel the channel on which the agent should put this
430 task. This value has to be >=0 and < than the maximal number of
431 channels fixed with MSG_set_channel_number().
432 * \return #MSG_FATAL if \a task is not properly initialized and
433 * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
434 * this function was called was shut down. Returns
435 * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
436 * (network failure, dest failure)
438 MSG_error_t MSG_task_put(m_task_t task,
439 m_host_t dest, m_channel_t channel)
441 return MSG_task_put_with_timeout(task, dest, channel, -1.0);
444 /** \ingroup msg_gos_functions
445 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
450 MSG_error_t MSG_task_put_bounded(m_task_t task,
451 m_host_t dest, m_channel_t channel,
454 MSG_error_t res = MSG_OK;
455 task->simdata->rate=max_rate;
456 res = MSG_task_put(task, dest, channel);
460 /** \ingroup msg_gos_functions
461 * \brief Executes a task and waits for its termination.
463 * This function is used for describing the behavior of an agent. It
464 * takes only one parameter.
465 * \param task a #m_task_t to execute on the location on which the
467 * \return #MSG_FATAL if \a task is not properly initialized and
470 MSG_error_t MSG_task_execute(m_task_t task)
472 simdata_task_t simdata = NULL;
476 simdata = task->simdata;
477 xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
478 "This taks is executed somewhere else. Go fix your code!");
480 DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
482 SIMIX_mutex_lock(simdata->mutex);
483 simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
484 SIMIX_action_set_priority(simdata->compute, simdata->priority);
486 SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
487 SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
489 SIMIX_cond_wait(simdata->cond, simdata->mutex);
491 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
492 simdata->comm = NULL;
493 simdata->compute = NULL;
495 SIMIX_mutex_unlock(simdata->mutex);
501 /** \ingroup m_task_management
502 * \brief Creates a new #m_task_t (a parallel one....).
504 * A constructor for #m_task_t taking six arguments and returning the
505 corresponding object.
506 * \param name a name for the object. It is for user-level information
508 * \param host_nb the number of hosts implied in the parallel task.
509 * \param host_list an array of \p host_nb m_host_t.
510 * \param computation_amount an array of \p host_nb
511 doubles. computation_amount[i] is the total number of operations
512 that have to be performed on host_list[i].
513 * \param communication_amount an array of \p host_nb* \p host_nb doubles.
514 * \param data a pointer to any data may want to attach to the new
515 object. It is for user-level information and can be NULL. It can
516 be retrieved with the function \ref MSG_task_get_data.
518 * \return The new corresponding object.
520 m_task_t MSG_parallel_task_create(const char *name,
522 const m_host_t *host_list,
523 double *computation_amount,
524 double *communication_amount,
527 m_task_t task = xbt_new0(s_m_task_t,1);
528 xbt_die("not implemented yet");
533 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
538 MSG_error_t MSG_parallel_task_execute(m_task_t task)
541 xbt_die("not implemented yet");
546 /** \ingroup msg_gos_functions
547 * \brief Sleep for the specified number of seconds
549 * Makes the current process sleep until \a time seconds have elapsed.
551 * \param nb_sec a number of second
553 MSG_error_t MSG_process_sleep(double nb_sec)
555 smx_action_t act_sleep;
556 m_process_t proc = MSG_process_self();
559 /* create action to sleep */
560 act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
562 mutex = SIMIX_mutex_init();
563 SIMIX_mutex_lock(mutex);
564 /* create conditional and register action to it */
565 cond = SIMIX_cond_init();
567 SIMIX_register_condition_to_action(act_sleep, cond);
568 SIMIX_register_action_to_condition(act_sleep, cond);
569 SIMIX_cond_wait(cond,mutex);
570 SIMIX_mutex_unlock(mutex);
572 /* remove variables */
573 SIMIX_cond_destroy(cond);
574 SIMIX_mutex_destroy(mutex);
579 /** \ingroup msg_gos_functions
580 * \brief Return the number of MSG tasks currently running on
581 * the host of the current running process.
583 static int MSG_get_msgload(void)
585 xbt_die("not implemented yet");
589 /** \ingroup msg_gos_functions
591 * \brief Return the last value returned by a MSG function (except
594 MSG_error_t MSG_get_errno(void)
596 return PROCESS_GET_ERRNO();