Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
92ee13b36c3b0d35d5d5247808586e119d6e1ff6
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_OK);
66       }
67     }
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     if(max_duration>0) {
74       __MSG_process_block(max_duration);
75     } else {
76       __MSG_process_block(-1);
77     }
78     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
79        == SURF_CPU_OFF)
80       MSG_RETURN(MSG_HOST_FAILURE);
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     /* OK, we should both be ready now. Are you there ? */
84   }
85
86   DEBUG1("OK, got a task (%s)", t->name);
87
88   t_simdata = t->simdata;
89   /*   *task = __MSG_task_copy(t); */
90   *task=t;
91
92   /* Transfer */
93   t_simdata->using++;
94
95   DEBUG0("Calling SURF for communication creation");
96   t_simdata->comm = surf_workstation_resource->extension_public->
97     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
98                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
99   
100   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
101
102   if(__MSG_process_isBlocked(t_simdata->sender))
103     __MSG_process_unblock(t_simdata->sender);
104
105   PAJE_PROCESS_PUSH_STATE(process,"C");  
106
107   do {
108     DEBUG0("Waiting for action termination");
109     __MSG_task_wait_event(process, t);
110     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
111   } while (state==SURF_ACTION_RUNNING);
112   DEBUG0("Action terminated");
113
114   if(t->simdata->using>1) {
115     xbt_fifo_unshift(msg_global->process_to_run,process);
116     xbt_context_yield();
117   }
118
119   PAJE_PROCESS_POP_STATE(process);
120   PAJE_COMM_STOP(process,t,channel);
121
122   if(state == SURF_ACTION_DONE) {
123     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
124       t_simdata->comm = NULL;
125     MSG_RETURN(MSG_OK);
126   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
127           == SURF_CPU_OFF) {
128     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
129       t_simdata->comm = NULL;
130     MSG_RETURN(MSG_HOST_FAILURE);
131   } else {
132     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
133       t_simdata->comm = NULL;
134     MSG_RETURN(MSG_TRANSFER_FAILURE);
135   }
136 }
137
138 /** \ingroup msg_gos_functions
139  * \brief Listen on a channel and wait for receiving a task.
140  *
141  * It takes two parameters.
142  * \param task a memory location for storing a #m_task_t. It will
143    hold a task when this function will return. Thus \a task should not
144    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
145    those two condition does not hold, there will be a warning message.
146  * \param channel the channel on which the agent should be
147    listening. This value has to be >=0 and < than the maximal
148    number of channels fixed with MSG_set_channel_number().
149  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
150  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
151  */
152 MSG_error_t MSG_task_get(m_task_t * task,
153                          m_channel_t channel)
154 {
155   return MSG_task_get_with_time_out(task, channel, -1);
156 }
157
158 /** \ingroup msg_gos_functions
159  * \brief Listen on a channel and wait for receiving a task with a timeout.
160  *
161  * It takes three parameters.
162  * \param task a memory location for storing a #m_task_t. It will
163    hold a task when this function will return. Thus \a task should not
164    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
165    those two condition does not hold, there will be a warning message.
166  * \param channel the channel on which the agent should be
167    listening. This value has to be >=0 and < than the maximal
168    number of channels fixed with MSG_set_channel_number().
169  * \param max_duration the maximum time to wait for a task before giving
170     up. In such a case, \a task will not be modified and will still be
171     equal to \c NULL when returning.
172  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
173    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
174  */
175 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
176                                        m_channel_t channel,
177                                        double max_duration)
178 {
179   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
180 }
181
182 /** \ingroup msg_gos_functions
183  * \brief Listen on \a channel and waits for receiving a task from \a host.
184  *
185  * It takes three parameters.
186  * \param task a memory location for storing a #m_task_t. It will
187    hold a task when this function will return. Thus \a task should not
188    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
189    those two condition does not hold, there will be a warning message.
190  * \param channel the channel on which the agent should be
191    listening. This value has to be >=0 and < than the maximal
192    number of channels fixed with MSG_set_channel_number().
193  * \param host the host that is to be watched.
194  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
195    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
196  */
197 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
198                                    m_host_t host)
199 {
200   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
201 }
202
203 /** \ingroup msg_gos_functions
204  * \brief Test whether there is a pending communication on a channel.
205  *
206  * It takes one parameter.
207  * \param channel the channel on which the agent should be
208    listening. This value has to be >=0 and < than the maximal
209    number of channels fixed with MSG_set_channel_number().
210  * \return 1 if there is a pending communication and 0 otherwise
211  */
212 int MSG_task_Iprobe(m_channel_t channel)
213 {
214   m_host_t h = NULL;
215   simdata_host_t h_simdata = NULL;
216
217   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
218   DEBUG2("Probing on channel %d (%s)", channel,h->name);
219   CHECK_HOST();
220   h = MSG_host_self();
221   h_simdata = h->simdata;
222   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
223 }
224
225 /** \ingroup msg_gos_functions
226  * \brief Test whether there is a pending communication on a channel, and who sent it.
227  *
228  * It takes one parameter.
229  * \param channel the channel on which the agent should be
230    listening. This value has to be >=0 and < than the maximal
231    number of channels fixed with MSG_set_channel_number().
232  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
233  */
234 int MSG_task_probe_from(m_channel_t channel)
235 {
236   m_host_t h = NULL;
237   simdata_host_t h_simdata = NULL;
238   xbt_fifo_item_t item;
239   m_task_t t;
240
241   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
242   CHECK_HOST();
243   h = MSG_host_self();
244   h_simdata = h->simdata;
245
246   DEBUG2("Probing on channel %d (%s)", channel,h->name);
247    
248   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
249   if (!item || !(t = xbt_fifo_get_item_content(item)))
250     return -1;
251    
252   return MSG_process_get_PID(t->simdata->sender);
253 }
254
255 /** \ingroup msg_gos_functions
256  * \brief Wait for at most \a max_duration second for a task reception
257    on \a channel. *\a PID is updated with the PID of the first process
258    that triggered this event is any.
259  *
260  * It takes three parameters:
261  * \param channel the channel on which the agent should be
262    listening. This value has to be >=0 and < than the maximal.
263    number of channels fixed with MSG_set_channel_number().
264  * \param PID a memory location for storing an int.
265  * \param max_duration the maximum time to wait for a task before
266     giving up. In the case of a reception, *\a PID will be updated
267     with the PID of the first process to send a task.
268  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
269    and #MSG_OK otherwise.
270  */
271 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
272                                     int *PID)
273 {
274   m_host_t h = NULL;
275   simdata_host_t h_simdata = NULL;
276   xbt_fifo_item_t item;
277   m_task_t t;
278   int first_time = 1;
279   m_process_t process = MSG_process_self();
280
281   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
282   if(PID) {
283     *PID = -1;
284   }
285
286   if(max_duration==0.0) {
287     return MSG_task_probe_from(channel);
288   } else {
289     CHECK_HOST();
290     h = MSG_host_self();
291     h_simdata = h->simdata;
292     
293     DEBUG2("Probing on channel %d (%s)", channel,h->name);
294     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
295       if(max_duration>0) {
296         if(!first_time) {
297           MSG_RETURN(MSG_OK);
298         }
299       }
300       xbt_assert2(!(h_simdata->sleeping[channel]),
301                   "A process (%s(%d)) is already blocked on this channel",
302                   h_simdata->sleeping[channel]->name,
303                   h_simdata->sleeping[channel]->simdata->PID);
304       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
305       if(max_duration>0) {
306         __MSG_process_block(max_duration);
307       } else {
308         __MSG_process_block(-1);
309       }
310       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
311          == SURF_CPU_OFF) {
312         MSG_RETURN(MSG_HOST_FAILURE);
313       }
314       h_simdata->sleeping[channel] = NULL;
315       first_time = 0;
316     }
317     if (!item || !(t = xbt_fifo_get_item_content(item))) {
318       MSG_RETURN(MSG_OK);
319     }
320     if(PID) {
321       *PID = MSG_process_get_PID(t->simdata->sender);
322     }
323     MSG_RETURN(MSG_OK);
324   }
325 }
326
327
328 /** \ingroup msg_gos_functions
329
330  * \brief Return the number of tasks waiting to be received on a \a
331    channel and sent by \a host.
332  *
333  * It takes two parameters.
334  * \param channel the channel on which the agent should be
335    listening. This value has to be >=0 and < than the maximal
336    number of channels fixed with MSG_set_channel_number().
337  * \param host the host that is to be watched.
338  * \return the number of tasks waiting to be received on \a channel
339    and sent by \a host.
340  */
341 int MSG_task_probe_from_host(int channel, m_host_t host)
342 {
343   simdata_host_t h_simdata = NULL;
344   xbt_fifo_item_t item;
345   m_task_t t;
346   int count = 0;
347   m_host_t h = NULL;
348   
349   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
350   CHECK_HOST();
351   h = MSG_host_self();
352   h_simdata = h->simdata;
353
354   DEBUG2("Probing on channel %d (%s)", channel,h->name);
355    
356   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
357     if(t->simdata->source==host) count++;
358   }
359    
360   return count;
361 }
362
363 /** \ingroup msg_gos_functions
364  * \brief Put a task on a channel of an host and waits for the end of the
365  * transmission.
366  *
367  * This function is used for describing the behavior of an agent. It
368  * takes three parameter.
369  * \param task a #m_task_t to send on another location. This task
370    will not be usable anymore when the function will return. There is
371    no automatic task duplication and you have to save your parameters
372    before calling this function. Tasks are unique and once it has been
373    sent to another location, you should not access it anymore. You do
374    not need to call MSG_task_destroy() but to avoid using, as an
375    effect of inattention, this task anymore, you definitely should
376    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
377    can be transfered iff it has been correctly created with
378    MSG_task_create().
379  * \param dest the destination of the message
380  * \param channel the channel on which the agent should put this
381    task. This value has to be >=0 and < than the maximal number of
382    channels fixed with MSG_set_channel_number().
383  * \return #MSG_FATAL if \a task is not properly initialized and
384  * #MSG_OK otherwise.
385  */
386 MSG_error_t MSG_task_put(m_task_t task,
387                          m_host_t dest, m_channel_t channel)
388 {
389   m_process_t process = MSG_process_self();
390   simdata_task_t task_simdata = NULL;
391   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
392   m_host_t local_host = NULL;
393   m_host_t remote_host = NULL;
394
395   CHECK_HOST();
396
397   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
398
399   task_simdata = task->simdata;
400   task_simdata->sender = process;
401   task_simdata->source = MSG_process_get_host(process);
402   xbt_assert0(task_simdata->using==1,"Gargl!");
403   task_simdata->comm = NULL;
404   
405   local_host = ((simdata_process_t) process->simdata)->host;
406   remote_host = dest;
407
408   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
409          task->simdata->message_size,local_host->name, remote_host->name, channel);
410
411   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
412                 mbox[channel], task);
413
414   PAJE_COMM_START(process,task,channel);
415     
416   if(remote_host->simdata->sleeping[channel]) {
417     DEBUG0("Somebody is listening. Let's wake him up!");
418     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
419   }
420
421   process->simdata->put_host = dest;
422   process->simdata->put_channel = channel;
423   while(!(task_simdata->comm)) {
424     DEBUG0("Communication not initiated yet. Let's block!");
425     __MSG_process_block(-1);
426   }
427   DEBUG0("Registering to this communication");
428   surf_workstation_resource->common_public->action_use(task_simdata->comm);
429   process->simdata->put_host = NULL;
430   process->simdata->put_channel = -1;
431
432
433   PAJE_PROCESS_PUSH_STATE(process,"C");  
434
435   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
436   while (state==SURF_ACTION_RUNNING) {
437     DEBUG0("Waiting for action termination");
438     __MSG_task_wait_event(process, task);
439     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
440   }
441   DEBUG0("Action terminated");
442
443   PAJE_PROCESS_POP_STATE(process);  
444
445   if(state == SURF_ACTION_DONE) {
446     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
447       task_simdata->comm = NULL;
448     MSG_task_destroy(task);
449     MSG_RETURN(MSG_OK);
450   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
451             == SURF_CPU_OFF) {
452     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
453       task_simdata->comm = NULL;
454     MSG_task_destroy(task);
455     MSG_RETURN(MSG_HOST_FAILURE);
456   } else { 
457     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
458       task_simdata->comm = NULL;
459     MSG_task_destroy(task);
460     MSG_RETURN(MSG_TRANSFER_FAILURE);
461   }
462 }
463
464 /** \ingroup msg_gos_functions
465  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
466  * rate.
467  *
468  * \sa MSG_task_put
469  */
470 MSG_error_t MSG_task_put_bounded(m_task_t task,
471                                  m_host_t dest, m_channel_t channel,
472                                  double max_rate)
473 {
474   MSG_error_t res = MSG_OK;
475   task->simdata->rate=max_rate;
476   res = MSG_task_put(task, dest, channel);
477   task->simdata->rate=-1.0;
478   return(res);
479 }
480
481 /** \ingroup msg_gos_functions
482  * \brief Executes a task and waits for its termination.
483  *
484  * This function is used for describing the behavior of an agent. It
485  * takes only one parameter.
486  * \param task a #m_task_t to execute on the location on which the
487    agent is running.
488  * \return #MSG_FATAL if \a task is not properly initialized and
489  * #MSG_OK otherwise.
490  */
491 MSG_error_t MSG_task_execute(m_task_t task)
492 {
493   m_process_t process = MSG_process_self();
494   MSG_error_t res;
495
496   DEBUG1("Computing on %s", process->simdata->host->name);
497
498   __MSG_task_execute(process, task);
499
500   PAJE_PROCESS_PUSH_STATE(process,"E");  
501   res = __MSG_wait_for_computation(process,task);
502   PAJE_PROCESS_POP_STATE(process);
503   return res;
504 }
505
506 void __MSG_task_execute(m_process_t process, m_task_t task)
507 {
508   simdata_task_t simdata = NULL;
509
510   CHECK_HOST();
511
512   simdata = task->simdata;
513
514   simdata->compute = surf_workstation_resource->extension_public->
515     execute(MSG_process_get_host(process)->simdata->host,
516             simdata->computation_amount);
517   surf_workstation_resource->common_public->
518     set_priority(simdata->compute, simdata->priority);
519
520   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
521 }
522
523 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
524 {
525   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
526   simdata_task_t simdata = task->simdata;
527
528   simdata->using++;
529   do {
530     __MSG_task_wait_event(process, task);
531     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
532   } while (state==SURF_ACTION_RUNNING);
533   simdata->using--;
534     
535
536   if(state == SURF_ACTION_DONE) {
537     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
538       simdata->compute = NULL;
539     simdata->computation_amount = 0.0;
540     MSG_RETURN(MSG_OK);
541   } else if(surf_workstation_resource->extension_public->
542             get_state(MSG_process_get_host(process)->simdata->host) 
543             == SURF_CPU_OFF) {
544     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
545       simdata->compute = NULL;
546     MSG_RETURN(MSG_HOST_FAILURE);
547   } else {
548     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
549       simdata->compute = NULL;
550     MSG_RETURN(MSG_TASK_CANCELLED);
551   }
552 }
553 /** \ingroup m_task_management
554  * \brief Creates a new #m_task_t (a parallel one....).
555  *
556  * A constructor for #m_task_t taking six arguments and returning the 
557    corresponding object.
558  * \param name a name for the object. It is for user-level information
559    and can be NULL.
560  * \param host_nb the number of hosts implied in the parallel task.
561  * \param host_list an array of \a host_nb m_host_t.
562  * \param computation_amount an array of \a host_nb
563    doubles. computation_amount[i] is the total number of operations
564    that have to be performed on host_list[i].
565  * \param communication_amount an array of \a host_nb* \a host_nb doubles.
566  * \param data a pointer to any data may want to attach to the new
567    object.  It is for user-level information and can be NULL. It can
568    be retrieved with the function \ref MSG_task_get_data.
569  * \see m_task_t
570  * \return The new corresponding object.
571  */
572 m_task_t MSG_parallel_task_create(const char *name, 
573                                   int host_nb,
574                                   const m_host_t *host_list,
575                                   double *computation_amount,
576                                   double *communication_amount,
577                                   void *data)
578 {
579   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
580   m_task_t task = xbt_new0(s_m_task_t,1);
581   int i;
582
583   /* Task structure */
584   task->name = xbt_strdup(name);
585   task->simdata = simdata;
586   task->data = data;
587
588   /* Simulator Data */
589   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
590   simdata->rate = -1.0;
591   simdata->using = 1;
592   simdata->sender = NULL;
593   simdata->source = NULL;
594   simdata->host_nb = host_nb;
595   
596   simdata->host_list = xbt_new0(void *, host_nb);
597   simdata->comp_amount = computation_amount;
598   simdata->comm_amount = communication_amount;
599
600   for(i=0;i<host_nb;i++)
601     simdata->host_list[i] = host_list[i]->simdata->host;
602
603   return task;
604 }
605
606
607 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
608 {
609   simdata_task_t simdata = NULL;
610
611   CHECK_HOST();
612
613   simdata = task->simdata;
614
615   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
616
617   simdata->compute = surf_workstation_resource->extension_public->
618   execute_parallel_task(task->simdata->host_nb,
619                         task->simdata->host_list,
620                         task->simdata->comp_amount,
621                         task->simdata->comm_amount,
622                         1.0,
623                         -1.0);
624   if(simdata->compute)
625     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
626 }
627
628 MSG_error_t MSG_parallel_task_execute(m_task_t task)
629 {
630   m_process_t process = MSG_process_self();
631   MSG_error_t res;
632
633   DEBUG0("Computing on a tons of guys");
634   
635   __MSG_parallel_task_execute(process, task);
636
637   if(task->simdata->compute)
638     res = __MSG_wait_for_computation(process,task);
639   else 
640     res = MSG_OK;
641
642   return res;  
643 }
644
645
646 /** \ingroup msg_gos_functions
647  * \brief Sleep for the specified number of seconds
648  *
649  * Makes the current process sleep until \a time seconds have elapsed.
650  *
651  * \param nb_sec a number of second
652  */
653 MSG_error_t MSG_process_sleep(double nb_sec)
654 {
655   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
656   m_process_t process = MSG_process_self();
657   m_task_t dummy = NULL;
658   simdata_task_t simdata = NULL;
659
660   CHECK_HOST();
661   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
662   simdata = dummy->simdata;
663
664   simdata->compute = surf_workstation_resource->extension_public->
665     sleep(MSG_process_get_host(process)->simdata->host,
666             simdata->computation_amount);
667   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
668
669   
670   simdata->using++;
671   do {
672     __MSG_task_wait_event(process, dummy);
673     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
674   } while (state==SURF_ACTION_RUNNING);
675   simdata->using--;
676     
677   if(state == SURF_ACTION_DONE) {
678     if(surf_workstation_resource->extension_public->
679        get_state(MSG_process_get_host(process)->simdata->host) 
680        == SURF_CPU_OFF) {
681       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
682         simdata->compute = NULL;
683       MSG_RETURN(MSG_HOST_FAILURE);
684     }
685     if(__MSG_process_isBlocked(process)) {
686       __MSG_process_unblock(MSG_process_self());
687     }
688     if(surf_workstation_resource->extension_public->
689        get_state(MSG_process_get_host(process)->simdata->host) 
690        == SURF_CPU_OFF) {
691       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
692         simdata->compute = NULL;
693       MSG_RETURN(MSG_HOST_FAILURE);
694     }
695     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
696       simdata->compute = NULL;
697     MSG_task_destroy(dummy);
698     MSG_RETURN(MSG_OK);
699   } else MSG_RETURN(MSG_HOST_FAILURE);
700 }
701
702 /** \ingroup msg_gos_functions
703  * \brief Return the number of MSG tasks currently running on a
704  * the host of the current running process.
705  */
706 static int MSG_get_msgload(void) 
707 {
708   m_process_t process;
709    
710   CHECK_HOST();
711   
712   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
713   process = MSG_process_self();
714   return xbt_fifo_size(process->simdata->host->simdata->process_list);
715 }
716
717 /** \ingroup msg_gos_functions
718  *
719  * \brief Return the the last value returned by a MSG function (except
720  * MSG_get_errno...).
721  */
722 MSG_error_t MSG_get_errno(void)
723 {
724   return PROCESS_GET_ERRNO();
725 }