Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cosmetics : Some english mistakes in the comments.
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_OK);
66       }
67     }
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     if(max_duration>0) {
74       __MSG_process_block(max_duration);
75     } else {
76       __MSG_process_block(-1);
77     }
78     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
79        == SURF_CPU_OFF)
80       MSG_RETURN(MSG_HOST_FAILURE);
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     /* OK, we should both be ready now. Are you there ? */
84   }
85
86   DEBUG1("OK, got a task (%s)", t->name);
87
88   t_simdata = t->simdata;
89   /*   *task = __MSG_task_copy(t); */
90   *task=t;
91
92   /* Transfer */
93   t_simdata->using++;
94
95   while(MSG_process_is_suspended(t_simdata->sender)) {
96     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
97            t_simdata->sender->name);
98     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
99     if(__MSG_process_isBlocked(t_simdata->sender)) {
100       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
101       __MSG_process_unblock(t_simdata->sender);
102       task_to_wait_for->simdata->using++;
103       __MSG_task_wait_event(process, task_to_wait_for);
104       MSG_task_destroy(task_to_wait_for);
105     } else {
106       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
107       task_to_wait_for->simdata->using++;
108       __MSG_task_wait_event(process, task_to_wait_for);
109       MSG_task_destroy(task_to_wait_for);
110       DEBUG0("He's resumed. He should block again. So let's free him.");
111       __MSG_process_unblock(t_simdata->sender);
112       break;
113     }
114   }
115   DEBUG0("Calling SURF for communication creation");
116   t_simdata->comm = surf_workstation_resource->extension_public->
117     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
118                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
119   
120   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
121
122   if(__MSG_process_isBlocked(t_simdata->sender)) {
123     DEBUG1("Unblocking %s",t_simdata->sender->name);
124     __MSG_process_unblock(t_simdata->sender);
125   }
126
127   PAJE_PROCESS_PUSH_STATE(process,"C");  
128
129   do {
130     DEBUG0("Waiting for action termination");
131     __MSG_task_wait_event(process, t);
132     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
133   } while (state==SURF_ACTION_RUNNING);
134   DEBUG0("Action terminated");
135
136   if(t->simdata->using>1) {
137     xbt_fifo_unshift(msg_global->process_to_run,process);
138     xbt_context_yield();
139   }
140
141   PAJE_PROCESS_POP_STATE(process);
142   PAJE_COMM_STOP(process,t,channel);
143
144   if(state == SURF_ACTION_DONE) {
145     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
146       t_simdata->comm = NULL;
147     MSG_RETURN(MSG_OK);
148   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
149           == SURF_CPU_OFF) {
150     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
151       t_simdata->comm = NULL;
152     MSG_RETURN(MSG_HOST_FAILURE);
153   } else {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_TRANSFER_FAILURE);
157   }
158 }
159
160 /** \ingroup msg_gos_functions
161  * \brief Listen on a channel and wait for receiving a task.
162  *
163  * It takes two parameters.
164  * \param task a memory location for storing a #m_task_t. It will
165    hold a task when this function will return. Thus \a task should not
166    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
167    those two condition does not hold, there will be a warning message.
168  * \param channel the channel on which the agent should be
169    listening. This value has to be >=0 and < than the maximal
170    number of channels fixed with MSG_set_channel_number().
171  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
172  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
173  */
174 MSG_error_t MSG_task_get(m_task_t * task,
175                          m_channel_t channel)
176 {
177   return MSG_task_get_with_time_out(task, channel, -1);
178 }
179
180 /** \ingroup msg_gos_functions
181  * \brief Listen on a channel and wait for receiving a task with a timeout.
182  *
183  * It takes three parameters.
184  * \param task a memory location for storing a #m_task_t. It will
185    hold a task when this function will return. Thus \a task should not
186    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
187    those two condition does not hold, there will be a warning message.
188  * \param channel the channel on which the agent should be
189    listening. This value has to be >=0 and < than the maximal
190    number of channels fixed with MSG_set_channel_number().
191  * \param max_duration the maximum time to wait for a task before giving
192     up. In such a case, \a task will not be modified and will still be
193     equal to \c NULL when returning.
194  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
195    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
196  */
197 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
198                                        m_channel_t channel,
199                                        double max_duration)
200 {
201   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
202 }
203
204 /** \ingroup msg_gos_functions
205  * \brief Listen on \a channel and waits for receiving a task from \a host.
206  *
207  * It takes three parameters.
208  * \param task a memory location for storing a #m_task_t. It will
209    hold a task when this function will return. Thus \a task should not
210    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
211    those two condition does not hold, there will be a warning message.
212  * \param channel the channel on which the agent should be
213    listening. This value has to be >=0 and < than the maximal
214    number of channels fixed with MSG_set_channel_number().
215  * \param host the host that is to be watched.
216  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
217    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
218  */
219 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
220                                    m_host_t host)
221 {
222   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
223 }
224
225 /** \ingroup msg_gos_functions
226  * \brief Test whether there is a pending communication on a channel.
227  *
228  * It takes one parameter.
229  * \param channel the channel on which the agent should be
230    listening. This value has to be >=0 and < than the maximal
231    number of channels fixed with MSG_set_channel_number().
232  * \return 1 if there is a pending communication and 0 otherwise
233  */
234 int MSG_task_Iprobe(m_channel_t channel)
235 {
236   m_host_t h = NULL;
237   simdata_host_t h_simdata = NULL;
238
239   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
240   DEBUG2("Probing on channel %d (%s)", channel,h->name);
241   CHECK_HOST();
242   h = MSG_host_self();
243   h_simdata = h->simdata;
244   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
245 }
246
247 /** \ingroup msg_gos_functions
248  * \brief Test whether there is a pending communication on a channel, and who sent it.
249  *
250  * It takes one parameter.
251  * \param channel the channel on which the agent should be
252    listening. This value has to be >=0 and < than the maximal
253    number of channels fixed with MSG_set_channel_number().
254  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
255  */
256 int MSG_task_probe_from(m_channel_t channel)
257 {
258   m_host_t h = NULL;
259   simdata_host_t h_simdata = NULL;
260   xbt_fifo_item_t item;
261   m_task_t t;
262
263   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
264   CHECK_HOST();
265   h = MSG_host_self();
266   h_simdata = h->simdata;
267
268   DEBUG2("Probing on channel %d (%s)", channel,h->name);
269    
270   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
271   if (!item || !(t = xbt_fifo_get_item_content(item)))
272     return -1;
273    
274   return MSG_process_get_PID(t->simdata->sender);
275 }
276
277 /** \ingroup msg_gos_functions
278  * \brief Wait for at most \a max_duration second for a task reception
279    on \a channel. *\a PID is updated with the PID of the first process
280    that triggered this event if any.
281  *
282  * It takes three parameters:
283  * \param channel the channel on which the agent should be
284    listening. This value has to be >=0 and < than the maximal.
285    number of channels fixed with MSG_set_channel_number().
286  * \param PID a memory location for storing an int.
287  * \param max_duration the maximum time to wait for a task before
288     giving up. In the case of a reception, *\a PID will be updated
289     with the PID of the first process to send a task.
290  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
291    and #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
294                                     int *PID)
295 {
296   m_host_t h = NULL;
297   simdata_host_t h_simdata = NULL;
298   xbt_fifo_item_t item;
299   m_task_t t;
300   int first_time = 1;
301   m_process_t process = MSG_process_self();
302
303   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
304   if(PID) {
305     *PID = -1;
306   }
307
308   if(max_duration==0.0) {
309     return MSG_task_probe_from(channel);
310   } else {
311     CHECK_HOST();
312     h = MSG_host_self();
313     h_simdata = h->simdata;
314     
315     DEBUG2("Probing on channel %d (%s)", channel,h->name);
316     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
317       if(max_duration>0) {
318         if(!first_time) {
319           MSG_RETURN(MSG_OK);
320         }
321       }
322       xbt_assert2(!(h_simdata->sleeping[channel]),
323                   "A process (%s(%d)) is already blocked on this channel",
324                   h_simdata->sleeping[channel]->name,
325                   h_simdata->sleeping[channel]->simdata->PID);
326       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
327       if(max_duration>0) {
328         __MSG_process_block(max_duration);
329       } else {
330         __MSG_process_block(-1);
331       }
332       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
333          == SURF_CPU_OFF) {
334         MSG_RETURN(MSG_HOST_FAILURE);
335       }
336       h_simdata->sleeping[channel] = NULL;
337       first_time = 0;
338     }
339     if (!item || !(t = xbt_fifo_get_item_content(item))) {
340       MSG_RETURN(MSG_OK);
341     }
342     if(PID) {
343       *PID = MSG_process_get_PID(t->simdata->sender);
344     }
345     MSG_RETURN(MSG_OK);
346   }
347 }
348
349
350 /** \ingroup msg_gos_functions
351
352  * \brief Return the number of tasks waiting to be received on a \a
353    channel and sent by \a host.
354  *
355  * It takes two parameters.
356  * \param channel the channel on which the agent should be
357    listening. This value has to be >=0 and < than the maximal
358    number of channels fixed with MSG_set_channel_number().
359  * \param host the host that is to be watched.
360  * \return the number of tasks waiting to be received on \a channel
361    and sent by \a host.
362  */
363 int MSG_task_probe_from_host(int channel, m_host_t host)
364 {
365   simdata_host_t h_simdata = NULL;
366   xbt_fifo_item_t item;
367   m_task_t t;
368   int count = 0;
369   m_host_t h = NULL;
370   
371   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
372   CHECK_HOST();
373   h = MSG_host_self();
374   h_simdata = h->simdata;
375
376   DEBUG2("Probing on channel %d (%s)", channel,h->name);
377    
378   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
379     if(t->simdata->source==host) count++;
380   }
381    
382   return count;
383 }
384
385 /** \ingroup msg_gos_functions
386  * \brief Put a task on a channel of an host and waits for the end of the
387  * transmission.
388  *
389  * This function is used for describing the behavior of an agent. It
390  * takes three parameter.
391  * \param task a #m_task_t to send on another location. This task
392    will not be usable anymore when the function will return. There is
393    no automatic task duplication and you have to save your parameters
394    before calling this function. Tasks are unique and once it has been
395    sent to another location, you should not access it anymore. You do
396    not need to call MSG_task_destroy() but to avoid using, as an
397    effect of inattention, this task anymore, you definitely should
398    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
399    can be transfered iff it has been correctly created with
400    MSG_task_create().
401  * \param dest the destination of the message
402  * \param channel the channel on which the agent should put this
403    task. This value has to be >=0 and < than the maximal number of
404    channels fixed with MSG_set_channel_number().
405  * \return #MSG_FATAL if \a task is not properly initialized and
406  * #MSG_OK otherwise.
407  */
408 MSG_error_t MSG_task_put(m_task_t task,
409                          m_host_t dest, m_channel_t channel)
410 {
411   m_process_t process = MSG_process_self();
412   simdata_task_t task_simdata = NULL;
413   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
414   m_host_t local_host = NULL;
415   m_host_t remote_host = NULL;
416
417   CHECK_HOST();
418
419   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
420
421   task_simdata = task->simdata;
422   task_simdata->sender = process;
423   task_simdata->source = MSG_process_get_host(process);
424   xbt_assert0(task_simdata->using==1,"Gargl!");
425   task_simdata->comm = NULL;
426   
427   local_host = ((simdata_process_t) process->simdata)->host;
428   remote_host = dest;
429
430   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
431          task->simdata->message_size,local_host->name, remote_host->name, channel);
432
433   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
434                 mbox[channel], task);
435
436   PAJE_COMM_START(process,task,channel);
437     
438   if(remote_host->simdata->sleeping[channel]) {
439     DEBUG0("Somebody is listening. Let's wake him up!");
440     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
441   }
442
443   process->simdata->put_host = dest;
444   process->simdata->put_channel = channel;
445   while(!(task_simdata->comm)) {
446     DEBUG0("Communication not initiated yet. Let's block!");
447     __MSG_process_block(-1);
448   }
449   DEBUG0("Registering to this communication");
450   surf_workstation_resource->common_public->action_use(task_simdata->comm);
451   process->simdata->put_host = NULL;
452   process->simdata->put_channel = -1;
453
454
455   PAJE_PROCESS_PUSH_STATE(process,"C");  
456
457   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
458   while (state==SURF_ACTION_RUNNING) {
459     DEBUG0("Waiting for action termination");
460     __MSG_task_wait_event(process, task);
461     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
462   }
463   DEBUG0("Action terminated");
464
465   PAJE_PROCESS_POP_STATE(process);  
466
467   if(state == SURF_ACTION_DONE) {
468     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
469       task_simdata->comm = NULL;
470     MSG_task_destroy(task);
471     MSG_RETURN(MSG_OK);
472   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
473             == SURF_CPU_OFF) {
474     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
475       task_simdata->comm = NULL;
476     MSG_task_destroy(task);
477     MSG_RETURN(MSG_HOST_FAILURE);
478   } else { 
479     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
480       task_simdata->comm = NULL;
481     MSG_task_destroy(task);
482     MSG_RETURN(MSG_TRANSFER_FAILURE);
483   }
484 }
485
486 /** \ingroup msg_gos_functions
487  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
488  * rate.
489  *
490  * \sa MSG_task_put
491  */
492 MSG_error_t MSG_task_put_bounded(m_task_t task,
493                                  m_host_t dest, m_channel_t channel,
494                                  double max_rate)
495 {
496   MSG_error_t res = MSG_OK;
497   task->simdata->rate=max_rate;
498   res = MSG_task_put(task, dest, channel);
499   task->simdata->rate=-1.0;
500   return(res);
501 }
502
503 /** \ingroup msg_gos_functions
504  * \brief Executes a task and waits for its termination.
505  *
506  * This function is used for describing the behavior of an agent. It
507  * takes only one parameter.
508  * \param task a #m_task_t to execute on the location on which the
509    agent is running.
510  * \return #MSG_FATAL if \a task is not properly initialized and
511  * #MSG_OK otherwise.
512  */
513 MSG_error_t MSG_task_execute(m_task_t task)
514 {
515   m_process_t process = MSG_process_self();
516   MSG_error_t res;
517
518   DEBUG1("Computing on %s", process->simdata->host->name);
519
520   __MSG_task_execute(process, task);
521
522   PAJE_PROCESS_PUSH_STATE(process,"E");  
523   res = __MSG_wait_for_computation(process,task);
524   PAJE_PROCESS_POP_STATE(process);
525   return res;
526 }
527
528 void __MSG_task_execute(m_process_t process, m_task_t task)
529 {
530   simdata_task_t simdata = NULL;
531
532   CHECK_HOST();
533
534   simdata = task->simdata;
535
536   simdata->compute = surf_workstation_resource->extension_public->
537     execute(MSG_process_get_host(process)->simdata->host,
538             simdata->computation_amount);
539   surf_workstation_resource->common_public->
540     set_priority(simdata->compute, simdata->priority);
541
542   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
543 }
544
545 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
546 {
547   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
548   simdata_task_t simdata = task->simdata;
549
550   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
551   simdata->using++;
552   do {
553     __MSG_task_wait_event(process, task);
554     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
555   } while (state==SURF_ACTION_RUNNING);
556   simdata->using--;
557     
558
559   if(state == SURF_ACTION_DONE) {
560     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
561       simdata->compute = NULL;
562     simdata->computation_amount = 0.0;
563     XBT_OUT;
564     MSG_RETURN(MSG_OK);
565   } else if(surf_workstation_resource->extension_public->
566             get_state(MSG_process_get_host(process)->simdata->host) 
567             == SURF_CPU_OFF) {
568     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
569       simdata->compute = NULL;
570     XBT_OUT;
571     MSG_RETURN(MSG_HOST_FAILURE);
572   } else {
573     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
574       simdata->compute = NULL;
575     XBT_OUT;
576     MSG_RETURN(MSG_TASK_CANCELLED);
577   }
578 }
579 /** \ingroup m_task_management
580  * \brief Creates a new #m_task_t (a parallel one....).
581  *
582  * A constructor for #m_task_t taking six arguments and returning the 
583    corresponding object.
584  * \param name a name for the object. It is for user-level information
585    and can be NULL.
586  * \param host_nb the number of hosts implied in the parallel task.
587  * \param host_list an array of \p host_nb m_host_t.
588  * \param computation_amount an array of \p host_nb
589    doubles. computation_amount[i] is the total number of operations
590    that have to be performed on host_list[i].
591  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
592  * \param data a pointer to any data may want to attach to the new
593    object.  It is for user-level information and can be NULL. It can
594    be retrieved with the function \ref MSG_task_get_data.
595  * \see m_task_t
596  * \return The new corresponding object.
597  */
598 m_task_t MSG_parallel_task_create(const char *name, 
599                                   int host_nb,
600                                   const m_host_t *host_list,
601                                   double *computation_amount,
602                                   double *communication_amount,
603                                   void *data)
604 {
605   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
606   m_task_t task = xbt_new0(s_m_task_t,1);
607   int i;
608
609   /* Task structure */
610   task->name = xbt_strdup(name);
611   task->simdata = simdata;
612   task->data = data;
613
614   /* Simulator Data */
615   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
616   simdata->rate = -1.0;
617   simdata->using = 1;
618   simdata->sender = NULL;
619   simdata->source = NULL;
620   simdata->host_nb = host_nb;
621   
622   simdata->host_list = xbt_new0(void *, host_nb);
623   simdata->comp_amount = computation_amount;
624   simdata->comm_amount = communication_amount;
625
626   for(i=0;i<host_nb;i++)
627     simdata->host_list[i] = host_list[i]->simdata->host;
628
629   return task;
630 }
631
632
633 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
634 {
635   simdata_task_t simdata = NULL;
636
637   CHECK_HOST();
638
639   simdata = task->simdata;
640
641   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
642
643   simdata->compute = surf_workstation_resource->extension_public->
644   execute_parallel_task(task->simdata->host_nb,
645                         task->simdata->host_list,
646                         task->simdata->comp_amount,
647                         task->simdata->comm_amount,
648                         1.0,
649                         -1.0);
650   if(simdata->compute)
651     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
652 }
653
654 MSG_error_t MSG_parallel_task_execute(m_task_t task)
655 {
656   m_process_t process = MSG_process_self();
657   MSG_error_t res;
658
659   DEBUG0("Computing on a tons of guys");
660   
661   __MSG_parallel_task_execute(process, task);
662
663   if(task->simdata->compute)
664     res = __MSG_wait_for_computation(process,task);
665   else 
666     res = MSG_OK;
667
668   return res;  
669 }
670
671
672 /** \ingroup msg_gos_functions
673  * \brief Sleep for the specified number of seconds
674  *
675  * Makes the current process sleep until \a time seconds have elapsed.
676  *
677  * \param nb_sec a number of second
678  */
679 MSG_error_t MSG_process_sleep(double nb_sec)
680 {
681   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
682   m_process_t process = MSG_process_self();
683   m_task_t dummy = NULL;
684   simdata_task_t simdata = NULL;
685
686   CHECK_HOST();
687   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
688   simdata = dummy->simdata;
689
690   simdata->compute = surf_workstation_resource->extension_public->
691     sleep(MSG_process_get_host(process)->simdata->host,
692             simdata->computation_amount);
693   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
694
695   
696   simdata->using++;
697   do {
698     __MSG_task_wait_event(process, dummy);
699     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
700   } while (state==SURF_ACTION_RUNNING);
701   simdata->using--;
702     
703   if(state == SURF_ACTION_DONE) {
704     if(surf_workstation_resource->extension_public->
705        get_state(MSG_process_get_host(process)->simdata->host) 
706        == SURF_CPU_OFF) {
707       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
708         simdata->compute = NULL;
709       MSG_RETURN(MSG_HOST_FAILURE);
710     }
711     if(__MSG_process_isBlocked(process)) {
712       __MSG_process_unblock(MSG_process_self());
713     }
714     if(surf_workstation_resource->extension_public->
715        get_state(MSG_process_get_host(process)->simdata->host) 
716        == SURF_CPU_OFF) {
717       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
718         simdata->compute = NULL;
719       MSG_RETURN(MSG_HOST_FAILURE);
720     }
721     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
722       simdata->compute = NULL;
723     MSG_task_destroy(dummy);
724     MSG_RETURN(MSG_OK);
725   } else MSG_RETURN(MSG_HOST_FAILURE);
726 }
727
728 /** \ingroup msg_gos_functions
729  * \brief Return the number of MSG tasks currently running on
730  * the host of the current running process.
731  */
732 static int MSG_get_msgload(void) 
733 {
734   m_process_t process;
735    
736   CHECK_HOST();
737   
738   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
739   process = MSG_process_self();
740   return xbt_fifo_size(process->simdata->host->simdata->process_list);
741 }
742
743 /** \ingroup msg_gos_functions
744  *
745  * \brief Return the last value returned by a MSG function (except
746  * MSG_get_errno...).
747  */
748 MSG_error_t MSG_get_errno(void)
749 {
750   return PROCESS_GET_ERRNO();
751 }