3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "xbt/sysdep.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15 * \brief This section describes the functions that can be used
16 * by an agent for handling some task.
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
24 m_process_t process = MSG_process_self();
27 simdata_task_t t_simdata = NULL;
28 simdata_host_t h_simdata = NULL;
30 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31 xbt_fifo_item_t item = NULL;
34 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
36 xbt_assert0(task,"Null pointer for the task\n");
39 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
43 h_simdata = h->simdata;
45 DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
48 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
50 t = xbt_fifo_shift(h_simdata->mbox[channel]);
53 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54 if(t->simdata->source==host) break;
65 xbt_assert2(!(h_simdata->sleeping[channel]),
66 "A process (%s(%d)) is already blocked on this channel",
67 h_simdata->sleeping[channel]->name,
68 h_simdata->sleeping[channel]->simdata->PID);
69 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
71 __MSG_process_block(max_duration);
73 __MSG_process_block(-1);
75 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
77 MSG_RETURN(MSG_HOST_FAILURE);
78 h_simdata->sleeping[channel] = NULL;
80 /* OK, we should both be ready now. Are you there ? */
83 DEBUG1("OK, got a task (%s)", t->name);
85 t_simdata = t->simdata;
86 /* *task = __MSG_task_copy(t); */
92 DEBUG0("Calling SURF for communication creation");
93 t_simdata->comm = surf_workstation_resource->extension_public->
94 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
95 h->simdata->host, t_simdata->message_size,t_simdata->rate);
97 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
99 if(__MSG_process_isBlocked(t_simdata->sender))
100 __MSG_process_unblock(t_simdata->sender);
102 PAJE_PROCESS_PUSH_STATE(process,"C");
105 DEBUG0("Waiting for action termination");
106 __MSG_task_wait_event(process, t);
107 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
108 } while (state==SURF_ACTION_RUNNING);
109 DEBUG0("Action terminated");
111 if(t->simdata->using>1) {
112 xbt_fifo_unshift(msg_global->process_to_run,process);
116 PAJE_PROCESS_POP_STATE(process);
117 PAJE_COMM_STOP(process,t,channel);
119 if(state == SURF_ACTION_DONE) {
120 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
121 t_simdata->comm = NULL;
123 } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
125 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
126 t_simdata->comm = NULL;
127 MSG_RETURN(MSG_HOST_FAILURE);
129 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
130 t_simdata->comm = NULL;
131 MSG_RETURN(MSG_TRANSFER_FAILURE);
135 /** \ingroup msg_gos_functions
136 * \brief Listen on a channel and wait for receiving a task.
138 * It takes two parameters.
139 * \param task a memory location for storing a #m_task_t. It will
140 hold a task when this function will return. Thus \a task should not
141 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
142 those two condition does not hold, there will be a warning message.
143 * \param channel the channel on which the agent should be
144 listening. This value has to be >=0 and < than the maximal
145 number of channels fixed with MSG_set_channel_number().
146 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
147 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
149 MSG_error_t MSG_task_get(m_task_t * task,
152 return MSG_task_get_with_time_out(task, channel, -1);
155 /** \ingroup msg_gos_functions
156 * \brief Listen on a channel and wait for receiving a task with a timeout.
158 * It takes three parameters.
159 * \param task a memory location for storing a #m_task_t. It will
160 hold a task when this function will return. Thus \a task should not
161 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
162 those two condition does not hold, there will be a warning message.
163 * \param channel the channel on which the agent should be
164 listening. This value has to be >=0 and < than the maximal
165 number of channels fixed with MSG_set_channel_number().
166 * \param max_duration the maximum time to wait for a task before giving
167 up. In such a case, \a task will not be modified and will still be
168 equal to \c NULL when returning.
169 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
170 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
172 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
176 return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
179 /** \ingroup msg_gos_functions
180 * \brief Listen on \a channel and waits for receiving a task from \a host.
182 * It takes three parameters.
183 * \param task a memory location for storing a #m_task_t. It will
184 hold a task when this function will return. Thus \a task should not
185 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
186 those two condition does not hold, there will be a warning message.
187 * \param channel the channel on which the agent should be
188 listening. This value has to be >=0 and < than the maximal
189 number of channels fixed with MSG_set_channel_number().
190 * \param host the host that is to be watched.
191 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
192 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
194 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
197 return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
200 /** \ingroup msg_gos_functions
201 * \brief Test whether there is a pending communication on a channel.
203 * It takes one parameter.
204 * \param channel the channel on which the agent should be
205 listening. This value has to be >=0 and < than the maximal
206 number of channels fixed with MSG_set_channel_number().
207 * \return 1 if there is a pending communication and 0 otherwise
209 int MSG_task_Iprobe(m_channel_t channel)
212 simdata_host_t h_simdata = NULL;
214 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
215 DEBUG2("Probing on channel %d (%s)", channel,h->name);
218 h_simdata = h->simdata;
219 return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
222 /** \ingroup msg_gos_functions
223 * \brief Test whether there is a pending communication on a channel, and who sent it.
225 * It takes one parameter.
226 * \param channel the channel on which the agent should be
227 listening. This value has to be >=0 and < than the maximal
228 number of channels fixed with MSG_set_channel_number().
229 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
231 int MSG_task_probe_from(m_channel_t channel)
234 simdata_host_t h_simdata = NULL;
235 xbt_fifo_item_t item;
238 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
241 h_simdata = h->simdata;
243 DEBUG2("Probing on channel %d (%s)", channel,h->name);
245 item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
246 if (!item || !(t = xbt_fifo_get_item_content(item)))
249 return MSG_process_get_PID(t->simdata->sender);
252 /** \ingroup msg_gos_functions
253 * \brief Wait for at most \a max_duration second for a task reception
254 on \a channel. *\a PID is updated with the PID of the first process
255 that triggered this event is any.
257 * It takes three parameters:
258 * \param channel the channel on which the agent should be
259 listening. This value has to be >=0 and < than the maximal.
260 number of channels fixed with MSG_set_channel_number().
261 * \param PID a memory location for storing an int.
262 * \param max_duration the maximum time to wait for a task before
263 giving up. In the case of a reception, *\a PID will be updated
264 with the PID of the first process to send a task.
265 * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
266 and #MSG_OK otherwise.
268 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
272 simdata_host_t h_simdata = NULL;
273 xbt_fifo_item_t item;
276 m_process_t process = MSG_process_self();
278 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
283 if(max_duration==0.0) {
284 return MSG_task_probe_from(channel);
288 h_simdata = h->simdata;
290 DEBUG2("Probing on channel %d (%s)", channel,h->name);
291 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
297 xbt_assert2(!(h_simdata->sleeping[channel]),
298 "A process (%s(%d)) is already blocked on this channel",
299 h_simdata->sleeping[channel]->name,
300 h_simdata->sleeping[channel]->simdata->PID);
301 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
303 __MSG_process_block(max_duration);
305 __MSG_process_block(-1);
307 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
309 MSG_RETURN(MSG_HOST_FAILURE);
311 h_simdata->sleeping[channel] = NULL;
314 if (!item || !(t = xbt_fifo_get_item_content(item))) {
318 *PID = MSG_process_get_PID(t->simdata->sender);
325 /** \ingroup msg_gos_functions
327 * \brief Return the number of tasks waiting to be received on a \a
328 channel and sent by \a host.
330 * It takes two parameters.
331 * \param channel the channel on which the agent should be
332 listening. This value has to be >=0 and < than the maximal
333 number of channels fixed with MSG_set_channel_number().
334 * \param host the host that is to be watched.
335 * \return the number of tasks waiting to be received on \a channel
338 int MSG_task_probe_from_host(int channel, m_host_t host)
340 simdata_host_t h_simdata = NULL;
341 xbt_fifo_item_t item;
346 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
349 h_simdata = h->simdata;
351 DEBUG2("Probing on channel %d (%s)", channel,h->name);
353 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
354 if(t->simdata->source==host) count++;
360 /** \ingroup msg_gos_functions
361 * \brief Put a task on a channel of an host and waits for the end of the
364 * This function is used for describing the behavior of an agent. It
365 * takes three parameter.
366 * \param task a #m_task_t to send on another location. This task
367 will not be usable anymore when the function will return. There is
368 no automatic task duplication and you have to save your parameters
369 before calling this function. Tasks are unique and once it has been
370 sent to another location, you should not access it anymore. You do
371 not need to call MSG_task_destroy() but to avoid using, as an
372 effect of inattention, this task anymore, you definitely should
373 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
374 can be transfered iff it has been correctly created with
376 * \param dest the destination of the message
377 * \param channel the channel on which the agent should put this
378 task. This value has to be >=0 and < than the maximal number of
379 channels fixed with MSG_set_channel_number().
380 * \return #MSG_FATAL if \a task is not properly initialized and
383 MSG_error_t MSG_task_put(m_task_t task,
384 m_host_t dest, m_channel_t channel)
386 m_process_t process = MSG_process_self();
387 simdata_task_t task_simdata = NULL;
388 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
389 m_host_t local_host = NULL;
390 m_host_t remote_host = NULL;
394 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
396 task_simdata = task->simdata;
397 task_simdata->sender = process;
398 task_simdata->source = MSG_process_get_host(process);
399 xbt_assert0(task_simdata->using==1,"Gargl!");
400 task_simdata->comm = NULL;
402 local_host = ((simdata_process_t) process->simdata)->host;
405 DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d",
406 task->simdata->message_size,local_host->name, remote_host->name, channel);
408 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
409 mbox[channel], task);
411 PAJE_COMM_START(process,task,channel);
413 if(remote_host->simdata->sleeping[channel]) {
414 DEBUG0("Somebody is listening. Let's wake him up!");
415 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
418 process->simdata->put_host = dest;
419 process->simdata->put_channel = channel;
420 while(!(task_simdata->comm)) {
421 DEBUG0("Communication not initiated yet. Let's block!");
422 __MSG_process_block(-1);
424 DEBUG0("Registering to this communication");
425 surf_workstation_resource->common_public->action_use(task_simdata->comm);
426 process->simdata->put_host = NULL;
427 process->simdata->put_channel = -1;
430 PAJE_PROCESS_PUSH_STATE(process,"C");
432 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
433 while (state==SURF_ACTION_RUNNING) {
434 DEBUG0("Waiting for action termination");
435 __MSG_task_wait_event(process, task);
436 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
438 DEBUG0("Action terminated");
440 PAJE_PROCESS_POP_STATE(process);
442 if(state == SURF_ACTION_DONE) {
443 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
444 task_simdata->comm = NULL;
445 MSG_task_destroy(task);
447 } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
449 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
450 task_simdata->comm = NULL;
451 MSG_task_destroy(task);
452 MSG_RETURN(MSG_HOST_FAILURE);
454 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
455 task_simdata->comm = NULL;
456 MSG_task_destroy(task);
457 MSG_RETURN(MSG_TRANSFER_FAILURE);
461 /** \ingroup msg_gos_functions
462 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
467 MSG_error_t MSG_task_put_bounded(m_task_t task,
468 m_host_t dest, m_channel_t channel,
471 MSG_error_t res = MSG_OK;
472 task->simdata->rate=max_rate;
473 res = MSG_task_put(task, dest, channel);
474 task->simdata->rate=-1.0;
478 /** \ingroup msg_gos_functions
479 * \brief Executes a task and waits for its termination.
481 * This function is used for describing the behavior of an agent. It
482 * takes only one parameter.
483 * \param task a #m_task_t to execute on the location on which the
485 * \return #MSG_FATAL if \a task is not properly initialized and
488 MSG_error_t MSG_task_execute(m_task_t task)
490 m_process_t process = MSG_process_self();
493 DEBUG1("Computing on %s", process->simdata->host->name);
495 __MSG_task_execute(process, task);
497 PAJE_PROCESS_PUSH_STATE(process,"E");
498 res = __MSG_wait_for_computation(process,task);
499 PAJE_PROCESS_POP_STATE(process);
503 void __MSG_task_execute(m_process_t process, m_task_t task)
505 simdata_task_t simdata = NULL;
509 simdata = task->simdata;
511 simdata->compute = surf_workstation_resource->extension_public->
512 execute(MSG_process_get_host(process)->simdata->host,
513 simdata->computation_amount);
514 surf_workstation_resource->common_public->
515 set_priority(simdata->compute, simdata->priority);
517 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
520 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
522 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
523 simdata_task_t simdata = task->simdata;
527 __MSG_task_wait_event(process, task);
528 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
529 } while (state==SURF_ACTION_RUNNING);
533 if(state == SURF_ACTION_DONE) {
534 if(surf_workstation_resource->common_public->action_free(simdata->compute))
535 simdata->compute = NULL;
536 simdata->computation_amount = 0.0;
538 } else if(surf_workstation_resource->extension_public->
539 get_state(MSG_process_get_host(process)->simdata->host)
541 if(surf_workstation_resource->common_public->action_free(simdata->compute))
542 simdata->compute = NULL;
543 MSG_RETURN(MSG_HOST_FAILURE);
545 if(surf_workstation_resource->common_public->action_free(simdata->compute))
546 simdata->compute = NULL;
547 MSG_RETURN(MSG_TASK_CANCELLED);
550 /** \ingroup m_task_management
551 * \brief Creates a new #m_task_t (a parallel one....).
553 * A constructor for #m_task_t taking six arguments and returning the
554 corresponding object.
555 * \param name a name for the object. It is for user-level information
557 * \param host_nb the number of hosts implied in the parallel task.
558 * \param host_list an array of #host_nb m_host_t.
559 * \param computation_amount an array of #host_nb
560 doubles. computation_amount[i] is the total number of operations
561 that have to be performed on host_list[i].
562 * \param communication_amount an array of #host_nb*#host_nb doubles.
563 * \param data a pointer to any data may want to attach to the new
564 object. It is for user-level information and can be NULL. It can
565 be retrieved with the function \ref MSG_task_get_data.
567 * \return The new corresponding object.
569 m_task_t MSG_parallel_task_create(const char *name,
571 const m_host_t *host_list,
572 double *computation_amount,
573 double *communication_amount,
576 simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
577 m_task_t task = xbt_new0(s_m_task_t,1);
581 task->name = xbt_strdup(name);
582 task->simdata = simdata;
586 simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
587 simdata->rate = -1.0;
589 simdata->sender = NULL;
590 simdata->source = NULL;
591 simdata->host_nb = host_nb;
593 simdata->host_list = xbt_new0(void *, host_nb);
594 simdata->comp_amount = computation_amount;
595 simdata->comm_amount = communication_amount;
597 for(i=0;i<host_nb;i++)
598 simdata->host_list[i] = host_list[i]->simdata->host;
604 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
606 simdata_task_t simdata = NULL;
610 simdata = task->simdata;
612 xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
614 simdata->compute = surf_workstation_resource->extension_public->
615 execute_parallel_task(task->simdata->host_nb,
616 task->simdata->host_list,
617 task->simdata->comp_amount,
618 task->simdata->comm_amount,
622 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
625 MSG_error_t MSG_parallel_task_execute(m_task_t task)
627 m_process_t process = MSG_process_self();
630 DEBUG0("Computing on a tons of guys");
632 __MSG_parallel_task_execute(process, task);
634 if(task->simdata->compute)
635 res = __MSG_wait_for_computation(process,task);
643 /** \ingroup msg_gos_functions
644 * \brief Sleep for the specified number of seconds
646 * Makes the current process sleep until \a time seconds have elapsed.
648 * \param nb_sec a number of second
650 MSG_error_t MSG_process_sleep(double nb_sec)
652 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
653 m_process_t process = MSG_process_self();
654 m_task_t dummy = NULL;
655 simdata_task_t simdata = NULL;
658 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
659 simdata = dummy->simdata;
661 simdata->compute = surf_workstation_resource->extension_public->
662 sleep(MSG_process_get_host(process)->simdata->host,
663 simdata->computation_amount);
664 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
669 __MSG_task_wait_event(process, dummy);
670 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
671 } while (state==SURF_ACTION_RUNNING);
674 if(state == SURF_ACTION_DONE) {
675 if(surf_workstation_resource->extension_public->
676 get_state(MSG_process_get_host(process)->simdata->host)
678 if(surf_workstation_resource->common_public->action_free(simdata->compute))
679 simdata->compute = NULL;
680 MSG_RETURN(MSG_HOST_FAILURE);
682 if(__MSG_process_isBlocked(process)) {
683 __MSG_process_unblock(MSG_process_self());
685 if(surf_workstation_resource->extension_public->
686 get_state(MSG_process_get_host(process)->simdata->host)
688 if(surf_workstation_resource->common_public->action_free(simdata->compute))
689 simdata->compute = NULL;
690 MSG_RETURN(MSG_HOST_FAILURE);
692 if(surf_workstation_resource->common_public->action_free(simdata->compute))
693 simdata->compute = NULL;
694 MSG_task_destroy(dummy);
696 } else MSG_RETURN(MSG_HOST_FAILURE);
699 /** \ingroup msg_gos_functions
700 * \brief Return the number of MSG tasks currently running on a
701 * the host of the current running process.
703 static int MSG_get_msgload(void)
709 xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
710 process = MSG_process_self();
711 return xbt_fifo_size(process->simdata->host->simdata->process_list);
714 /** \ingroup msg_gos_functions
716 * \brief Return the the last value returned by a MSG function (except
719 MSG_error_t MSG_get_errno(void)
721 return PROCESS_GET_ERRNO();