3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "xbt/sysdep.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12 "Logging specific to MSG (gos)");
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15 * \brief This section describes the functions that can be used
16 * by an agent for handling some task.
19 /** \ingroup msg_gos_functions
20 * \brief Listen on a channel and wait for receiving a task.
22 * It takes two parameters.
23 * \param task a memory location for storing a #m_task_t. It will
24 hold a task when this function will return. Thus \a task should not
25 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
26 those two condition does not hold, there will be a warning message.
27 * \param channel the channel on which the agent should be
28 listening. This value has to be >=0 and < than the maximal
29 number of channels fixed with MSG_set_channel_number().
30 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
31 * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
33 MSG_error_t MSG_task_get(m_task_t * task,
36 return MSG_task_get_with_time_out(task, channel, -1);
39 /** \ingroup msg_gos_functions
40 * \brief Listen on a channel and wait for receiving a task with a timeout.
42 * It takes three parameters.
43 * \param task a memory location for storing a #m_task_t. It will
44 hold a task when this function will return. Thus \a task should not
45 be equal to \c NULL and \a *task should be equal to \c NULL. If one of
46 those two condition does not hold, there will be a warning message.
47 * \param channel the channel on which the agent should be
48 listening. This value has to be >=0 and < than the maximal
49 number of channels fixed with MSG_set_channel_number().
50 * \param max_duration the maximum time to wait for a task before giving
51 up. In such a case, \a task will not be modified and will still be
52 equal to \c NULL when returning.
53 * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
54 if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
57 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
61 m_process_t process = MSG_process_self();
64 simdata_task_t t_simdata = NULL;
65 simdata_host_t h_simdata = NULL;
67 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
70 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
72 xbt_assert0(task,"Null pointer for the task\n");
75 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
79 h_simdata = h->simdata;
81 DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
83 while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
89 xbt_assert2(!(h_simdata->sleeping[channel]),
90 "A process (%s(%d)) is already blocked on this channel",
91 h_simdata->sleeping[channel]->name,
92 h_simdata->sleeping[channel]->simdata->PID);
93 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
95 __MSG_process_block(max_duration);
97 __MSG_process_block(-1);
99 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
101 MSG_RETURN(MSG_HOST_FAILURE);
102 h_simdata->sleeping[channel] = NULL;
104 /* OK, we should both be ready now. Are you there ? */
107 DEBUG1("OK, got a task (%s)", t->name);
109 t_simdata = t->simdata;
110 /* *task = __MSG_task_copy(t); */
116 DEBUG0("Calling SURF for communication creation");
117 t_simdata->comm = surf_workstation_resource->extension_public->
118 communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
119 h->simdata->host, t_simdata->message_size,t_simdata->rate);
121 surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
123 if(__MSG_process_isBlocked(t_simdata->sender))
124 __MSG_process_unblock(t_simdata->sender);
126 PAJE_PROCESS_PUSH_STATE(process,"C");
129 DEBUG0("Waiting for action termination");
130 __MSG_task_wait_event(process, t);
131 state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
132 } while (state==SURF_ACTION_RUNNING);
133 DEBUG0("Action terminated");
135 if(t->simdata->using>1) {
136 xbt_fifo_unshift(msg_global->process_to_run,process);
140 PAJE_PROCESS_POP_STATE(process);
141 PAJE_COMM_STOP(process,t,channel);
143 if(state == SURF_ACTION_DONE) {
144 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
145 t_simdata->comm = NULL;
147 } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
149 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
150 t_simdata->comm = NULL;
151 MSG_RETURN(MSG_HOST_FAILURE);
153 if(surf_workstation_resource->common_public->action_free(t_simdata->comm))
154 t_simdata->comm = NULL;
155 MSG_RETURN(MSG_TRANSFER_FAILURE);
159 /** \ingroup msg_gos_functions
160 * \brief Test whether there is a pending communication on a channel.
162 * It takes one parameter.
163 * \param channel the channel on which the agent should be
164 listening. This value has to be >=0 and < than the maximal
165 number of channels fixed with MSG_set_channel_number().
166 * \return 1 if there is a pending communication and 0 otherwise
168 int MSG_task_Iprobe(m_channel_t channel)
171 simdata_host_t h_simdata = NULL;
173 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
174 DEBUG2("Probing on channel %d (%s)", channel,h->name);
177 h_simdata = h->simdata;
178 return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
181 /** \ingroup msg_gos_functions
182 * \brief Test whether there is a pending communication on a channel, and who sent it.
184 * It takes one parameter.
185 * \param channel the channel on which the agent should be
186 listening. This value has to be >=0 and < than the maximal
187 number of channels fixed with MSG_set_channel_number().
188 * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
190 int MSG_task_probe_from(m_channel_t channel)
193 simdata_host_t h_simdata = NULL;
194 xbt_fifo_item_t item;
197 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
200 h_simdata = h->simdata;
202 DEBUG2("Probing on channel %d (%s)", channel,h->name);
204 item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
205 if (!item || !(t = xbt_fifo_get_item_content(item)))
208 return MSG_process_get_PID(t->simdata->sender);
211 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
215 simdata_host_t h_simdata = NULL;
216 xbt_fifo_item_t item;
219 m_process_t process = MSG_process_self();
221 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
226 if(max_duration==0.0) {
227 return MSG_task_probe_from(channel);
231 h_simdata = h->simdata;
233 DEBUG2("Probing on channel %d (%s)", channel,h->name);
234 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
240 xbt_assert2(!(h_simdata->sleeping[channel]),
241 "A process (%s(%d)) is already blocked on this channel",
242 h_simdata->sleeping[channel]->name,
243 h_simdata->sleeping[channel]->simdata->PID);
244 h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
246 __MSG_process_block(max_duration);
248 __MSG_process_block(-1);
250 if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
252 MSG_RETURN(MSG_HOST_FAILURE);
254 h_simdata->sleeping[channel] = NULL;
257 if (!item || !(t = xbt_fifo_get_item_content(item))) {
261 *PID = MSG_process_get_PID(t->simdata->sender);
266 /** \ingroup msg_gos_functions
267 * \brief Put a task on a channel of an host and waits for the end of the
270 * This function is used for describing the behavior of an agent. It
271 * takes three parameter.
272 * \param task a #m_task_t to send on another location. This task
273 will not be usable anymore when the function will return. There is
274 no automatic task duplication and you have to save your parameters
275 before calling this function. Tasks are unique and once it has been
276 sent to another location, you should not access it anymore. You do
277 not need to call MSG_task_destroy() but to avoid using, as an
278 effect of inattention, this task anymore, you definitely should
279 renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
280 can be transfered iff it has been correctly created with
282 * \param dest the destination of the message
283 * \param channel the channel on which the agent should put this
284 task. This value has to be >=0 and < than the maximal number of
285 channels fixed with MSG_set_channel_number().
286 * \return #MSG_FATAL if \a task is not properly initialized and
289 MSG_error_t MSG_task_put(m_task_t task,
290 m_host_t dest, m_channel_t channel)
292 m_process_t process = MSG_process_self();
293 simdata_task_t task_simdata = NULL;
294 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
295 m_host_t local_host = NULL;
296 m_host_t remote_host = NULL;
300 xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
302 task_simdata = task->simdata;
303 task_simdata->sender = process;
304 task_simdata->source = MSG_process_get_host(process);
305 xbt_assert0(task_simdata->using==1,"Gargl!");
306 task_simdata->comm = NULL;
308 local_host = ((simdata_process_t) process->simdata)->host;
311 DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d",
312 task->simdata->message_size,local_host->name, remote_host->name, channel);
314 xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
315 mbox[channel], task);
317 PAJE_COMM_START(process,task,channel);
319 if(remote_host->simdata->sleeping[channel]) {
320 DEBUG0("Somebody is listening. Let's wake him up!");
321 __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
324 process->simdata->put_host = dest;
325 process->simdata->put_channel = channel;
326 while(!(task_simdata->comm)) {
327 DEBUG0("Communication not initiated yet. Let's block!");
328 __MSG_process_block(-1);
330 DEBUG0("Registering to this communication");
331 surf_workstation_resource->common_public->action_use(task_simdata->comm);
332 process->simdata->put_host = NULL;
333 process->simdata->put_channel = -1;
336 PAJE_PROCESS_PUSH_STATE(process,"C");
338 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
339 while (state==SURF_ACTION_RUNNING) {
340 DEBUG0("Waiting for action termination");
341 __MSG_task_wait_event(process, task);
342 state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
344 DEBUG0("Action terminated");
346 PAJE_PROCESS_POP_STATE(process);
348 if(state == SURF_ACTION_DONE) {
349 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
350 task_simdata->comm = NULL;
351 MSG_task_destroy(task);
353 } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host)
355 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
356 task_simdata->comm = NULL;
357 MSG_task_destroy(task);
358 MSG_RETURN(MSG_HOST_FAILURE);
360 if(surf_workstation_resource->common_public->action_free(task_simdata->comm))
361 task_simdata->comm = NULL;
362 MSG_task_destroy(task);
363 MSG_RETURN(MSG_TRANSFER_FAILURE);
367 /** \ingroup msg_gos_functions
368 * \brief Does exactly the same as MSG_task_put but with a bounded transmition
373 MSG_error_t MSG_task_put_bounded(m_task_t task,
374 m_host_t dest, m_channel_t channel,
377 MSG_error_t res = MSG_OK;
378 task->simdata->rate=max_rate;
379 res = MSG_task_put(task, dest, channel);
380 task->simdata->rate=-1.0;
384 /** \ingroup msg_gos_functions
385 * \brief Executes a task and waits for its termination.
387 * This function is used for describing the behavior of an agent. It
388 * takes only one parameter.
389 * \param task a #m_task_t to execute on the location on which the
391 * \return #MSG_FATAL if \a task is not properly initialized and
394 MSG_error_t MSG_task_execute(m_task_t task)
396 m_process_t process = MSG_process_self();
399 DEBUG1("Computing on %s", process->simdata->host->name);
401 __MSG_task_execute(process, task);
403 PAJE_PROCESS_PUSH_STATE(process,"E");
404 res = __MSG_wait_for_computation(process,task);
405 PAJE_PROCESS_POP_STATE(process);
409 void __MSG_task_execute(m_process_t process, m_task_t task)
411 simdata_task_t simdata = NULL;
415 simdata = task->simdata;
417 simdata->compute = surf_workstation_resource->extension_public->
418 execute(MSG_process_get_host(process)->simdata->host,
419 simdata->computation_amount);
420 surf_workstation_resource->common_public->
421 set_priority(simdata->compute, simdata->priority);
423 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
426 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
428 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
429 simdata_task_t simdata = task->simdata;
433 __MSG_task_wait_event(process, task);
434 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
435 } while (state==SURF_ACTION_RUNNING);
439 if(state == SURF_ACTION_DONE) {
440 if(surf_workstation_resource->common_public->action_free(simdata->compute))
441 simdata->compute = NULL;
442 simdata->computation_amount = 0.0;
444 } else if(surf_workstation_resource->extension_public->
445 get_state(MSG_process_get_host(process)->simdata->host)
447 if(surf_workstation_resource->common_public->action_free(simdata->compute))
448 simdata->compute = NULL;
449 MSG_RETURN(MSG_HOST_FAILURE);
451 if(surf_workstation_resource->common_public->action_free(simdata->compute))
452 simdata->compute = NULL;
453 MSG_RETURN(MSG_TASK_CANCELLED);
456 /** \ingroup m_task_management
457 * \brief Creates a new #m_task_t (a parallel one....).
459 * A constructor for #m_task_t taking six arguments and returning the
460 corresponding object.
461 * \param name a name for the object. It is for user-level information
463 * \param host_nb the number of hosts implied in the parallel task.
464 * \param host_list an array of #host_nb m_host_t.
465 * \param computation_amount an array of #host_nb
466 doubles. computation_amount[i] is the total number of operations
467 that have to be performed on host_list[i].
468 * \param communication_amount an array of #host_nb*#host_nb doubles.
469 * \param data a pointer to any data may want to attach to the new
470 object. It is for user-level information and can be NULL. It can
471 be retrieved with the function \ref MSG_task_get_data.
473 * \return The new corresponding object.
475 m_task_t MSG_parallel_task_create(const char *name,
477 const m_host_t *host_list,
478 double *computation_amount,
479 double *communication_amount,
482 simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
483 m_task_t task = xbt_new0(s_m_task_t,1);
487 task->name = xbt_strdup(name);
488 task->simdata = simdata;
492 simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
493 simdata->rate = -1.0;
495 simdata->sender = NULL;
496 simdata->source = NULL;
497 simdata->host_nb = host_nb;
499 simdata->host_list = xbt_new0(void *, host_nb);
500 simdata->comp_amount = computation_amount;
501 simdata->comm_amount = communication_amount;
503 for(i=0;i<host_nb;i++)
504 simdata->host_list[i] = host_list[i]->simdata->host;
510 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
512 simdata_task_t simdata = NULL;
516 simdata = task->simdata;
518 xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
520 simdata->compute = surf_workstation_resource->extension_public->
521 execute_parallel_task(task->simdata->host_nb,
522 task->simdata->host_list,
523 task->simdata->comp_amount,
524 task->simdata->comm_amount,
528 surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
531 MSG_error_t MSG_parallel_task_execute(m_task_t task)
533 m_process_t process = MSG_process_self();
536 DEBUG0("Computing on a tons of guys");
538 __MSG_parallel_task_execute(process, task);
540 if(task->simdata->compute)
541 res = __MSG_wait_for_computation(process,task);
549 /** \ingroup msg_gos_functions
550 * \brief Sleep for the specified number of seconds
552 * Makes the current process sleep until \a time seconds have elapsed.
554 * \param nb_sec a number of second
556 MSG_error_t MSG_process_sleep(double nb_sec)
558 e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
559 m_process_t process = MSG_process_self();
560 m_task_t dummy = NULL;
561 simdata_task_t simdata = NULL;
564 dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
565 simdata = dummy->simdata;
567 simdata->compute = surf_workstation_resource->extension_public->
568 sleep(MSG_process_get_host(process)->simdata->host,
569 simdata->computation_amount);
570 surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
575 __MSG_task_wait_event(process, dummy);
576 state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
577 } while (state==SURF_ACTION_RUNNING);
580 if(state == SURF_ACTION_DONE) {
581 if(surf_workstation_resource->extension_public->
582 get_state(MSG_process_get_host(process)->simdata->host)
584 if(surf_workstation_resource->common_public->action_free(simdata->compute))
585 simdata->compute = NULL;
586 MSG_RETURN(MSG_HOST_FAILURE);
588 if(__MSG_process_isBlocked(process)) {
589 __MSG_process_unblock(MSG_process_self());
591 if(surf_workstation_resource->extension_public->
592 get_state(MSG_process_get_host(process)->simdata->host)
594 if(surf_workstation_resource->common_public->action_free(simdata->compute))
595 simdata->compute = NULL;
596 MSG_RETURN(MSG_HOST_FAILURE);
598 if(surf_workstation_resource->common_public->action_free(simdata->compute))
599 simdata->compute = NULL;
600 MSG_task_destroy(dummy);
602 } else MSG_RETURN(MSG_HOST_FAILURE);
605 /** \ingroup msg_gos_functions
606 * \brief Return the number of MSG tasks currently running on a
607 * the host of the current running process.
609 static int MSG_get_msgload(void)
615 xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
616 process = MSG_process_self();
617 return xbt_fifo_size(process->simdata->host->simdata->process_list);
620 /** \ingroup msg_gos_functions
622 * \brief Return the the last value returned by a MSG function (except
625 MSG_error_t MSG_get_errno(void)
627 return PROCESS_GET_ERRNO();