Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
76d4dacdb9549db06d3153416d77dd41e2d4bcae
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) break;
57       }
58     }
59                                                        
60     if(max_duration>0) {
61       if(!first_time) {
62         MSG_RETURN(MSG_OK);
63       }
64     }
65     xbt_assert2(!(h_simdata->sleeping[channel]),
66                 "A process (%s(%d)) is already blocked on this channel",
67                 h_simdata->sleeping[channel]->name,
68                 h_simdata->sleeping[channel]->simdata->PID);
69     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
70     if(max_duration>0) {
71       __MSG_process_block(max_duration);
72     } else {
73       __MSG_process_block(-1);
74     }
75     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
76        == SURF_CPU_OFF)
77       MSG_RETURN(MSG_HOST_FAILURE);
78     h_simdata->sleeping[channel] = NULL;
79     first_time = 0;
80     /* OK, we should both be ready now. Are you there ? */
81   }
82
83   DEBUG1("OK, got a task (%s)", t->name);
84
85   t_simdata = t->simdata;
86   /*   *task = __MSG_task_copy(t); */
87   *task=t;
88
89   /* Transfer */
90   t_simdata->using++;
91
92   DEBUG0("Calling SURF for communication creation");
93   t_simdata->comm = surf_workstation_resource->extension_public->
94     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
95                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
96   
97   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
98
99   if(__MSG_process_isBlocked(t_simdata->sender))
100     __MSG_process_unblock(t_simdata->sender);
101
102   PAJE_PROCESS_PUSH_STATE(process,"C");  
103
104   do {
105     DEBUG0("Waiting for action termination");
106     __MSG_task_wait_event(process, t);
107     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
108   } while (state==SURF_ACTION_RUNNING);
109   DEBUG0("Action terminated");
110
111   if(t->simdata->using>1) {
112     xbt_fifo_unshift(msg_global->process_to_run,process);
113     xbt_context_yield();
114   }
115
116   PAJE_PROCESS_POP_STATE(process);
117   PAJE_COMM_STOP(process,t,channel);
118
119   if(state == SURF_ACTION_DONE) {
120     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
121       t_simdata->comm = NULL;
122     MSG_RETURN(MSG_OK);
123   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
124           == SURF_CPU_OFF) {
125     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
126       t_simdata->comm = NULL;
127     MSG_RETURN(MSG_HOST_FAILURE);
128   } else {
129     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
130       t_simdata->comm = NULL;
131     MSG_RETURN(MSG_TRANSFER_FAILURE);
132   }
133 }
134
135 /** \ingroup msg_gos_functions
136  * \brief Listen on a channel and wait for receiving a task.
137  *
138  * It takes two parameters.
139  * \param task a memory location for storing a #m_task_t. It will
140    hold a task when this function will return. Thus \a task should not
141    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
142    those two condition does not hold, there will be a warning message.
143  * \param channel the channel on which the agent should be
144    listening. This value has to be >=0 and < than the maximal
145    number of channels fixed with MSG_set_channel_number().
146  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
147  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
148  */
149 MSG_error_t MSG_task_get(m_task_t * task,
150                          m_channel_t channel)
151 {
152   return MSG_task_get_with_time_out(task, channel, -1);
153 }
154
155 /** \ingroup msg_gos_functions
156  * \brief Listen on a channel and wait for receiving a task with a timeout.
157  *
158  * It takes three parameters.
159  * \param task a memory location for storing a #m_task_t. It will
160    hold a task when this function will return. Thus \a task should not
161    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
162    those two condition does not hold, there will be a warning message.
163  * \param channel the channel on which the agent should be
164    listening. This value has to be >=0 and < than the maximal
165    number of channels fixed with MSG_set_channel_number().
166  * \param max_duration the maximum time to wait for a task before giving
167     up. In such a case, \a task will not be modified and will still be
168     equal to \c NULL when returning.
169  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
170    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
171  */
172 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
173                                        m_channel_t channel,
174                                        double max_duration)
175 {
176   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
177 }
178
179 /** \ingroup msg_gos_functions
180  * \brief Listen on \a channel and waits for receiving a task from \a host.
181  *
182  * It takes three parameters.
183  * \param task a memory location for storing a #m_task_t. It will
184    hold a task when this function will return. Thus \a task should not
185    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
186    those two condition does not hold, there will be a warning message.
187  * \param channel the channel on which the agent should be
188    listening. This value has to be >=0 and < than the maximal
189    number of channels fixed with MSG_set_channel_number().
190  * \param host the host that is to be watched.
191  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
192    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
193  */
194 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
195                                    m_host_t host)
196 {
197   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
198 }
199
200 /** \ingroup msg_gos_functions
201  * \brief Test whether there is a pending communication on a channel.
202  *
203  * It takes one parameter.
204  * \param channel the channel on which the agent should be
205    listening. This value has to be >=0 and < than the maximal
206    number of channels fixed with MSG_set_channel_number().
207  * \return 1 if there is a pending communication and 0 otherwise
208  */
209 int MSG_task_Iprobe(m_channel_t channel)
210 {
211   m_host_t h = NULL;
212   simdata_host_t h_simdata = NULL;
213
214   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
215   DEBUG2("Probing on channel %d (%s)", channel,h->name);
216   CHECK_HOST();
217   h = MSG_host_self();
218   h_simdata = h->simdata;
219   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
220 }
221
222 /** \ingroup msg_gos_functions
223  * \brief Test whether there is a pending communication on a channel, and who sent it.
224  *
225  * It takes one parameter.
226  * \param channel the channel on which the agent should be
227    listening. This value has to be >=0 and < than the maximal
228    number of channels fixed with MSG_set_channel_number().
229  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
230  */
231 int MSG_task_probe_from(m_channel_t channel)
232 {
233   m_host_t h = NULL;
234   simdata_host_t h_simdata = NULL;
235   xbt_fifo_item_t item;
236   m_task_t t;
237
238   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
239   CHECK_HOST();
240   h = MSG_host_self();
241   h_simdata = h->simdata;
242
243   DEBUG2("Probing on channel %d (%s)", channel,h->name);
244    
245   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
246   if (!item || !(t = xbt_fifo_get_item_content(item)))
247     return -1;
248    
249   return MSG_process_get_PID(t->simdata->sender);
250 }
251
252 /** \ingroup msg_gos_functions
253  * \brief Wait for at most \a max_duration second for a task reception
254    on \a channel. *\a PID is updated with the PID of the first process
255    that triggered this event is any.
256  *
257  * It takes three parameters:
258  * \param channel the channel on which the agent should be
259    listening. This value has to be >=0 and < than the maximal.
260    number of channels fixed with MSG_set_channel_number().
261  * \param PID a memory location for storing an int.
262  * \param max_duration the maximum time to wait for a task before
263     giving up. In the case of a reception, *\a PID will be updated
264     with the PID of the first process to send a task.
265  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
266    and #MSG_OK otherwise.
267  */
268 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
269                                     int *PID)
270 {
271   m_host_t h = NULL;
272   simdata_host_t h_simdata = NULL;
273   xbt_fifo_item_t item;
274   m_task_t t;
275   int first_time = 1;
276   m_process_t process = MSG_process_self();
277
278   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
279   if(PID) {
280     *PID = -1;
281   }
282
283   if(max_duration==0.0) {
284     return MSG_task_probe_from(channel);
285   } else {
286     CHECK_HOST();
287     h = MSG_host_self();
288     h_simdata = h->simdata;
289     
290     DEBUG2("Probing on channel %d (%s)", channel,h->name);
291     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
292       if(max_duration>0) {
293         if(!first_time) {
294           MSG_RETURN(MSG_OK);
295         }
296       }
297       xbt_assert2(!(h_simdata->sleeping[channel]),
298                   "A process (%s(%d)) is already blocked on this channel",
299                   h_simdata->sleeping[channel]->name,
300                   h_simdata->sleeping[channel]->simdata->PID);
301       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
302       if(max_duration>0) {
303         __MSG_process_block(max_duration);
304       } else {
305         __MSG_process_block(-1);
306       }
307       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
308          == SURF_CPU_OFF) {
309         MSG_RETURN(MSG_HOST_FAILURE);
310       }
311       h_simdata->sleeping[channel] = NULL;
312       first_time = 0;
313     }
314     if (!item || !(t = xbt_fifo_get_item_content(item))) {
315       MSG_RETURN(MSG_OK);
316     }
317     if(PID) {
318       *PID = MSG_process_get_PID(t->simdata->sender);
319     }
320     MSG_RETURN(MSG_OK);
321   }
322 }
323
324
325 /** \ingroup msg_gos_functions
326
327  * \brief Return the number of tasks waiting to be received on a \a
328    channel and sent by \a host.
329  *
330  * It takes two parameters.
331  * \param channel the channel on which the agent should be
332    listening. This value has to be >=0 and < than the maximal
333    number of channels fixed with MSG_set_channel_number().
334  * \param host the host that is to be watched.
335  * \return the number of tasks waiting to be received on \a channel
336    and sent by \a host.
337  */
338 int MSG_task_probe_from_host(int channel, m_host_t host)
339 {
340   simdata_host_t h_simdata = NULL;
341   xbt_fifo_item_t item;
342   m_task_t t;
343   int count = 0;
344   m_host_t h = NULL;
345   
346   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
347   CHECK_HOST();
348   h = MSG_host_self();
349   h_simdata = h->simdata;
350
351   DEBUG2("Probing on channel %d (%s)", channel,h->name);
352    
353   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
354     if(t->simdata->source==host) count++;
355   }
356    
357   return count;
358 }
359
360 /** \ingroup msg_gos_functions
361  * \brief Put a task on a channel of an host and waits for the end of the
362  * transmission.
363  *
364  * This function is used for describing the behavior of an agent. It
365  * takes three parameter.
366  * \param task a #m_task_t to send on another location. This task
367    will not be usable anymore when the function will return. There is
368    no automatic task duplication and you have to save your parameters
369    before calling this function. Tasks are unique and once it has been
370    sent to another location, you should not access it anymore. You do
371    not need to call MSG_task_destroy() but to avoid using, as an
372    effect of inattention, this task anymore, you definitely should
373    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
374    can be transfered iff it has been correctly created with
375    MSG_task_create().
376  * \param dest the destination of the message
377  * \param channel the channel on which the agent should put this
378    task. This value has to be >=0 and < than the maximal number of
379    channels fixed with MSG_set_channel_number().
380  * \return #MSG_FATAL if \a task is not properly initialized and
381  * #MSG_OK otherwise.
382  */
383 MSG_error_t MSG_task_put(m_task_t task,
384                          m_host_t dest, m_channel_t channel)
385 {
386   m_process_t process = MSG_process_self();
387   simdata_task_t task_simdata = NULL;
388   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
389   m_host_t local_host = NULL;
390   m_host_t remote_host = NULL;
391
392   CHECK_HOST();
393
394   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
395
396   task_simdata = task->simdata;
397   task_simdata->sender = process;
398   task_simdata->source = MSG_process_get_host(process);
399   xbt_assert0(task_simdata->using==1,"Gargl!");
400   task_simdata->comm = NULL;
401   
402   local_host = ((simdata_process_t) process->simdata)->host;
403   remote_host = dest;
404
405   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
406          task->simdata->message_size,local_host->name, remote_host->name, channel);
407
408   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
409                 mbox[channel], task);
410
411   PAJE_COMM_START(process,task,channel);
412     
413   if(remote_host->simdata->sleeping[channel]) {
414     DEBUG0("Somebody is listening. Let's wake him up!");
415     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
416   }
417
418   process->simdata->put_host = dest;
419   process->simdata->put_channel = channel;
420   while(!(task_simdata->comm)) {
421     DEBUG0("Communication not initiated yet. Let's block!");
422     __MSG_process_block(-1);
423   }
424   DEBUG0("Registering to this communication");
425   surf_workstation_resource->common_public->action_use(task_simdata->comm);
426   process->simdata->put_host = NULL;
427   process->simdata->put_channel = -1;
428
429
430   PAJE_PROCESS_PUSH_STATE(process,"C");  
431
432   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
433   while (state==SURF_ACTION_RUNNING) {
434     DEBUG0("Waiting for action termination");
435     __MSG_task_wait_event(process, task);
436     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
437   }
438   DEBUG0("Action terminated");
439
440   PAJE_PROCESS_POP_STATE(process);  
441
442   if(state == SURF_ACTION_DONE) {
443     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
444       task_simdata->comm = NULL;
445     MSG_task_destroy(task);
446     MSG_RETURN(MSG_OK);
447   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
448             == SURF_CPU_OFF) {
449     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
450       task_simdata->comm = NULL;
451     MSG_task_destroy(task);
452     MSG_RETURN(MSG_HOST_FAILURE);
453   } else { 
454     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
455       task_simdata->comm = NULL;
456     MSG_task_destroy(task);
457     MSG_RETURN(MSG_TRANSFER_FAILURE);
458   }
459 }
460
461 /** \ingroup msg_gos_functions
462  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
463  * rate.
464  *
465  * \sa MSG_task_put
466  */
467 MSG_error_t MSG_task_put_bounded(m_task_t task,
468                                  m_host_t dest, m_channel_t channel,
469                                  double max_rate)
470 {
471   MSG_error_t res = MSG_OK;
472   task->simdata->rate=max_rate;
473   res = MSG_task_put(task, dest, channel);
474   task->simdata->rate=-1.0;
475   return(res);
476 }
477
478 /** \ingroup msg_gos_functions
479  * \brief Executes a task and waits for its termination.
480  *
481  * This function is used for describing the behavior of an agent. It
482  * takes only one parameter.
483  * \param task a #m_task_t to execute on the location on which the
484    agent is running.
485  * \return #MSG_FATAL if \a task is not properly initialized and
486  * #MSG_OK otherwise.
487  */
488 MSG_error_t MSG_task_execute(m_task_t task)
489 {
490   m_process_t process = MSG_process_self();
491   MSG_error_t res;
492
493   DEBUG1("Computing on %s", process->simdata->host->name);
494
495   __MSG_task_execute(process, task);
496
497   PAJE_PROCESS_PUSH_STATE(process,"E");  
498   res = __MSG_wait_for_computation(process,task);
499   PAJE_PROCESS_POP_STATE(process);
500   return res;
501 }
502
503 void __MSG_task_execute(m_process_t process, m_task_t task)
504 {
505   simdata_task_t simdata = NULL;
506
507   CHECK_HOST();
508
509   simdata = task->simdata;
510
511   simdata->compute = surf_workstation_resource->extension_public->
512     execute(MSG_process_get_host(process)->simdata->host,
513             simdata->computation_amount);
514   surf_workstation_resource->common_public->
515     set_priority(simdata->compute, simdata->priority);
516
517   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
518 }
519
520 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
521 {
522   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
523   simdata_task_t simdata = task->simdata;
524
525   simdata->using++;
526   do {
527     __MSG_task_wait_event(process, task);
528     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
529   } while (state==SURF_ACTION_RUNNING);
530   simdata->using--;
531     
532
533   if(state == SURF_ACTION_DONE) {
534     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
535       simdata->compute = NULL;
536     simdata->computation_amount = 0.0;
537     MSG_RETURN(MSG_OK);
538   } else if(surf_workstation_resource->extension_public->
539             get_state(MSG_process_get_host(process)->simdata->host) 
540             == SURF_CPU_OFF) {
541     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
542       simdata->compute = NULL;
543     MSG_RETURN(MSG_HOST_FAILURE);
544   } else {
545     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
546       simdata->compute = NULL;
547     MSG_RETURN(MSG_TASK_CANCELLED);
548   }
549 }
550 /** \ingroup m_task_management
551  * \brief Creates a new #m_task_t (a parallel one....).
552  *
553  * A constructor for #m_task_t taking six arguments and returning the 
554    corresponding object.
555  * \param name a name for the object. It is for user-level information
556    and can be NULL.
557  * \param host_nb the number of hosts implied in the parallel task.
558  * \param host_list an array of #host_nb m_host_t.
559  * \param computation_amount an array of #host_nb
560    doubles. computation_amount[i] is the total number of operations
561    that have to be performed on host_list[i].
562  * \param communication_amount an array of #host_nb*#host_nb doubles.
563  * \param data a pointer to any data may want to attach to the new
564    object.  It is for user-level information and can be NULL. It can
565    be retrieved with the function \ref MSG_task_get_data.
566  * \see m_task_t
567  * \return The new corresponding object.
568  */
569 m_task_t MSG_parallel_task_create(const char *name, 
570                                   int host_nb,
571                                   const m_host_t *host_list,
572                                   double *computation_amount,
573                                   double *communication_amount,
574                                   void *data)
575 {
576   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
577   m_task_t task = xbt_new0(s_m_task_t,1);
578   int i;
579
580   /* Task structure */
581   task->name = xbt_strdup(name);
582   task->simdata = simdata;
583   task->data = data;
584
585   /* Simulator Data */
586   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
587   simdata->rate = -1.0;
588   simdata->using = 1;
589   simdata->sender = NULL;
590   simdata->source = NULL;
591   simdata->host_nb = host_nb;
592   
593   simdata->host_list = xbt_new0(void *, host_nb);
594   simdata->comp_amount = computation_amount;
595   simdata->comm_amount = communication_amount;
596
597   for(i=0;i<host_nb;i++)
598     simdata->host_list[i] = host_list[i]->simdata->host;
599
600   return task;
601 }
602
603
604 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
605 {
606   simdata_task_t simdata = NULL;
607
608   CHECK_HOST();
609
610   simdata = task->simdata;
611
612   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
613
614   simdata->compute = surf_workstation_resource->extension_public->
615   execute_parallel_task(task->simdata->host_nb,
616                         task->simdata->host_list,
617                         task->simdata->comp_amount,
618                         task->simdata->comm_amount,
619                         1.0,
620                         -1.0);
621   if(simdata->compute)
622     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
623 }
624
625 MSG_error_t MSG_parallel_task_execute(m_task_t task)
626 {
627   m_process_t process = MSG_process_self();
628   MSG_error_t res;
629
630   DEBUG0("Computing on a tons of guys");
631   
632   __MSG_parallel_task_execute(process, task);
633
634   if(task->simdata->compute)
635     res = __MSG_wait_for_computation(process,task);
636   else 
637     res = MSG_OK;
638
639   return res;  
640 }
641
642
643 /** \ingroup msg_gos_functions
644  * \brief Sleep for the specified number of seconds
645  *
646  * Makes the current process sleep until \a time seconds have elapsed.
647  *
648  * \param nb_sec a number of second
649  */
650 MSG_error_t MSG_process_sleep(double nb_sec)
651 {
652   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
653   m_process_t process = MSG_process_self();
654   m_task_t dummy = NULL;
655   simdata_task_t simdata = NULL;
656
657   CHECK_HOST();
658   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
659   simdata = dummy->simdata;
660
661   simdata->compute = surf_workstation_resource->extension_public->
662     sleep(MSG_process_get_host(process)->simdata->host,
663             simdata->computation_amount);
664   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
665
666   
667   simdata->using++;
668   do {
669     __MSG_task_wait_event(process, dummy);
670     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
671   } while (state==SURF_ACTION_RUNNING);
672   simdata->using--;
673     
674   if(state == SURF_ACTION_DONE) {
675     if(surf_workstation_resource->extension_public->
676        get_state(MSG_process_get_host(process)->simdata->host) 
677        == SURF_CPU_OFF) {
678       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
679         simdata->compute = NULL;
680       MSG_RETURN(MSG_HOST_FAILURE);
681     }
682     if(__MSG_process_isBlocked(process)) {
683       __MSG_process_unblock(MSG_process_self());
684     }
685     if(surf_workstation_resource->extension_public->
686        get_state(MSG_process_get_host(process)->simdata->host) 
687        == SURF_CPU_OFF) {
688       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
689         simdata->compute = NULL;
690       MSG_RETURN(MSG_HOST_FAILURE);
691     }
692     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
693       simdata->compute = NULL;
694     MSG_task_destroy(dummy);
695     MSG_RETURN(MSG_OK);
696   } else MSG_RETURN(MSG_HOST_FAILURE);
697 }
698
699 /** \ingroup msg_gos_functions
700  * \brief Return the number of MSG tasks currently running on a
701  * the host of the current running process.
702  */
703 static int MSG_get_msgload(void) 
704 {
705   m_process_t process;
706    
707   CHECK_HOST();
708   
709   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
710   process = MSG_process_self();
711   return xbt_fifo_size(process->simdata->host->simdata->process_list);
712 }
713
714 /** \ingroup msg_gos_functions
715  *
716  * \brief Return the the last value returned by a MSG function (except
717  * MSG_get_errno...).
718  */
719 MSG_error_t MSG_get_errno(void)
720 {
721   return PROCESS_GET_ERRNO();
722 }