Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
51ea845fc585d61c916dd21c3756d518acf47dc3
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   m_task_t task_to_wait_for;
28   simdata_task_t t_simdata = NULL;
29   simdata_host_t h_simdata = NULL;
30   int first_time = 1;
31   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
32   xbt_fifo_item_t item = NULL;
33
34   CHECK_HOST();
35   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
36   /* Sanity check */
37   xbt_assert0(task,"Null pointer for the task\n");
38
39   if (*task) 
40     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
41
42   /* Get the task */
43   h = MSG_host_self();
44   h_simdata = h->simdata;
45
46   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
47
48   while (1) {
49     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
50       if(!host) {
51         t = xbt_fifo_shift(h_simdata->mbox[channel]);
52         break;
53       } else {
54         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
55           if(t->simdata->source==host) break;
56         }
57         if(item) {
58           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
59           break;
60         } 
61       }
62     }
63                                                        
64     if(max_duration>0) {
65       if(!first_time) {
66         PAJE_PROCESS_POP_STATE(process);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       MSG_RETURN(MSG_HOST_FAILURE);
86     /* OK, we should both be ready now. Are you there ? */
87   }
88
89   DEBUG1("OK, got a task (%s)", t->name);
90
91   t_simdata = t->simdata;
92   /*   *task = __MSG_task_copy(t); */
93   *task=t;
94
95   /* Transfer */
96   t_simdata->using++;
97
98   while(MSG_process_is_suspended(t_simdata->sender)) {
99     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
100            t_simdata->sender->name);
101     /*m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;*/
102     task_to_wait_for = t_simdata->sender->simdata->waiting_task;
103     if(__MSG_process_isBlocked(t_simdata->sender)) {
104       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
105       __MSG_process_unblock(t_simdata->sender);
106       task_to_wait_for->simdata->using++;
107       __MSG_task_wait_event(process, task_to_wait_for);
108       MSG_task_destroy(task_to_wait_for);
109     } else {
110       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
111       task_to_wait_for->simdata->using++;
112       __MSG_task_wait_event(process, task_to_wait_for);
113       MSG_task_destroy(task_to_wait_for);
114       DEBUG0("He's resumed. He should block again. So let's free him.");
115       __MSG_process_unblock(t_simdata->sender);
116       break;
117     }
118   }
119   DEBUG0("Calling SURF for communication creation");
120   t_simdata->comm = surf_workstation_resource->extension_public->
121     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
122                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
123   
124   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
125
126   if(__MSG_process_isBlocked(t_simdata->sender)) {
127     DEBUG1("Unblocking %s",t_simdata->sender->name);
128     __MSG_process_unblock(t_simdata->sender);
129   }
130
131   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
132
133   do {
134     DEBUG0("Waiting for action termination");
135     __MSG_task_wait_event(process, t);
136     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
137   } while (state==SURF_ACTION_RUNNING);
138   DEBUG0("Action terminated");
139
140   if(t->simdata->using>1) {
141     xbt_fifo_unshift(msg_global->process_to_run,process);
142     xbt_context_yield();
143   }
144
145   PAJE_PROCESS_POP_STATE(process);
146   PAJE_COMM_STOP(process,t,channel);
147
148   if(state == SURF_ACTION_DONE) {
149     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
150       t_simdata->comm = NULL;
151     MSG_RETURN(MSG_OK);
152   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
153           == SURF_CPU_OFF) {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_HOST_FAILURE);
157   } else {
158     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
159       t_simdata->comm = NULL;
160     MSG_RETURN(MSG_TRANSFER_FAILURE);
161   }
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Listen on a channel and wait for receiving a task.
166  *
167  * It takes two parameters.
168  * \param task a memory location for storing a #m_task_t. It will
169    hold a task when this function will return. Thus \a task should not
170    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
171    those two condition does not hold, there will be a warning message.
172  * \param channel the channel on which the agent should be
173    listening. This value has to be >=0 and < than the maximal
174    number of channels fixed with MSG_set_channel_number().
175  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
176  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
177  */
178 MSG_error_t MSG_task_get(m_task_t * task,
179                          m_channel_t channel)
180 {
181   return MSG_task_get_with_time_out(task, channel, -1);
182 }
183
184 /** \ingroup msg_gos_functions
185  * \brief Listen on a channel and wait for receiving a task with a timeout.
186  *
187  * It takes three parameters.
188  * \param task a memory location for storing a #m_task_t. It will
189    hold a task when this function will return. Thus \a task should not
190    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
191    those two condition does not hold, there will be a warning message.
192  * \param channel the channel on which the agent should be
193    listening. This value has to be >=0 and < than the maximal
194    number of channels fixed with MSG_set_channel_number().
195  * \param max_duration the maximum time to wait for a task before giving
196     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
197     will not be modified and will still be
198     equal to \c NULL when returning. 
199  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
200    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
201  */
202 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
203                                        m_channel_t channel,
204                                        double max_duration)
205 {
206   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
207 }
208
209 /** \ingroup msg_gos_functions
210  * \brief Listen on \a channel and waits for receiving a task from \a host.
211  *
212  * It takes three parameters.
213  * \param task a memory location for storing a #m_task_t. It will
214    hold a task when this function will return. Thus \a task should not
215    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
216    those two condition does not hold, there will be a warning message.
217  * \param channel the channel on which the agent should be
218    listening. This value has to be >=0 and < than the maximal
219    number of channels fixed with MSG_set_channel_number().
220  * \param host the host that is to be watched.
221  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
222    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
223  */
224 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
225                                    m_host_t host)
226 {
227   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
228 }
229
230 /** \ingroup msg_gos_functions
231  * \brief Test whether there is a pending communication on a channel.
232  *
233  * It takes one parameter.
234  * \param channel the channel on which the agent should be
235    listening. This value has to be >=0 and < than the maximal
236    number of channels fixed with MSG_set_channel_number().
237  * \return 1 if there is a pending communication and 0 otherwise
238  */
239 int MSG_task_Iprobe(m_channel_t channel)
240 {
241   m_host_t h = NULL;
242   simdata_host_t h_simdata = NULL;
243
244   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
245   DEBUG2("Probing on channel %d (%s)", channel,h->name);
246   CHECK_HOST();
247   h = MSG_host_self();
248   h_simdata = h->simdata;
249   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
250 }
251
252 /** \ingroup msg_gos_functions
253  * \brief Test whether there is a pending communication on a channel, and who sent it.
254  *
255  * It takes one parameter.
256  * \param channel the channel on which the agent should be
257    listening. This value has to be >=0 and < than the maximal
258    number of channels fixed with MSG_set_channel_number().
259  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
260  */
261 int MSG_task_probe_from(m_channel_t channel)
262 {
263   m_host_t h = NULL;
264   simdata_host_t h_simdata = NULL;
265   xbt_fifo_item_t item;
266   m_task_t t;
267
268   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
269   CHECK_HOST();
270   h = MSG_host_self();
271   h_simdata = h->simdata;
272
273   DEBUG2("Probing on channel %d (%s)", channel,h->name);
274    
275   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
276   if (!item || !(t = xbt_fifo_get_item_content(item)))
277     return -1;
278    
279   return MSG_process_get_PID(t->simdata->sender);
280 }
281
282 /** \ingroup msg_gos_functions
283  * \brief Wait for at most \a max_duration second for a task reception
284    on \a channel. *\a PID is updated with the PID of the first process
285    that triggered this event if any.
286  *
287  * It takes three parameters:
288  * \param channel the channel on which the agent should be
289    listening. This value has to be >=0 and < than the maximal.
290    number of channels fixed with MSG_set_channel_number().
291  * \param PID a memory location for storing an int.
292  * \param max_duration the maximum time to wait for a task before
293     giving up. In the case of a reception, *\a PID will be updated
294     with the PID of the first process to send a task.
295  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
296    and #MSG_OK otherwise.
297  */
298 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
299                                     int *PID)
300 {
301   m_host_t h = NULL;
302   simdata_host_t h_simdata = NULL;
303   xbt_fifo_item_t item;
304   m_task_t t;
305   int first_time = 1;
306   m_process_t process = MSG_process_self();
307
308   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
309   if(PID) {
310     *PID = -1;
311   }
312
313   if(max_duration==0.0) {
314     *PID = MSG_task_probe_from(channel);
315     MSG_RETURN(MSG_OK);
316   } else {
317     CHECK_HOST();
318     h = MSG_host_self();
319     h_simdata = h->simdata;
320     
321     DEBUG2("Probing on channel %d (%s)", channel,h->name);
322     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
323       if(max_duration>0) {
324         if(!first_time) {
325           MSG_RETURN(MSG_OK);
326         }
327       }
328       xbt_assert2(!(h_simdata->sleeping[channel]),
329                   "A process (%s(%d)) is already blocked on this channel",
330                   h_simdata->sleeping[channel]->name,
331                   h_simdata->sleeping[channel]->simdata->PID);
332       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
333       if(max_duration>0) {
334         __MSG_process_block(max_duration,"");
335       } else {
336         __MSG_process_block(-1,"");
337       }
338       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
339          == SURF_CPU_OFF) {
340         MSG_RETURN(MSG_HOST_FAILURE);
341       }
342       h_simdata->sleeping[channel] = NULL;
343       first_time = 0;
344     }
345     if (!item || !(t = xbt_fifo_get_item_content(item))) {
346       MSG_RETURN(MSG_OK);
347     }
348     if(PID) {
349       *PID = MSG_process_get_PID(t->simdata->sender);
350     }
351     MSG_RETURN(MSG_OK);
352   }
353 }
354
355
356 /** \ingroup msg_gos_functions
357
358  * \brief Return the number of tasks waiting to be received on a \a
359    channel and sent by \a host.
360  *
361  * It takes two parameters.
362  * \param channel the channel on which the agent should be
363    listening. This value has to be >=0 and < than the maximal
364    number of channels fixed with MSG_set_channel_number().
365  * \param host the host that is to be watched.
366  * \return the number of tasks waiting to be received on \a channel
367    and sent by \a host.
368  */
369 int MSG_task_probe_from_host(int channel, m_host_t host)
370 {
371   simdata_host_t h_simdata = NULL;
372   xbt_fifo_item_t item;
373   m_task_t t;
374   int count = 0;
375   m_host_t h = NULL;
376   
377   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
378   CHECK_HOST();
379   h = MSG_host_self();
380   h_simdata = h->simdata;
381
382   DEBUG2("Probing on channel %d (%s)", channel,h->name);
383    
384   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
385     if(t->simdata->source==host) count++;
386   }
387    
388   return count;
389 }
390
391 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
392  * host (with a timeout on the waiting of the destination host) and
393  * waits for the end of the transmission.
394  *
395  * This function is used for describing the behavior of an agent. It
396  * takes four parameter.
397  * \param task a #m_task_t to send on another location. This task
398    will not be usable anymore when the function will return. There is
399    no automatic task duplication and you have to save your parameters
400    before calling this function. Tasks are unique and once it has been
401    sent to another location, you should not access it anymore. You do
402    not need to call MSG_task_destroy() but to avoid using, as an
403    effect of inattention, this task anymore, you definitely should
404    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
405    can be transfered iff it has been correctly created with
406    MSG_task_create().
407  * \param dest the destination of the message
408  * \param channel the channel on which the agent should put this
409    task. This value has to be >=0 and < than the maximal number of
410    channels fixed with MSG_set_channel_number().
411  * \param max_duration the maximum time to wait for a task before giving
412     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
413     will not be modified 
414  * \return #MSG_FATAL if \a task is not properly initialized and
415    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
416    this function was called was shut down. Returns
417    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
418    (network failure, dest failure, timeout...)
419  */
420 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
421                                       m_channel_t channel, double max_duration)
422 {
423   m_process_t process = MSG_process_self();
424   simdata_task_t task_simdata = NULL;
425   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
426   m_host_t local_host = NULL;
427   m_host_t remote_host = NULL;
428   int first_time = 1;
429
430   CHECK_HOST();
431
432   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
433
434   task_simdata = task->simdata;
435   task_simdata->sender = process;
436   task_simdata->source = MSG_process_get_host(process);
437   xbt_assert0(task_simdata->using==1,
438               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
439   task_simdata->comm = NULL;
440   
441   local_host = ((simdata_process_t) process->simdata)->host;
442   remote_host = dest;
443
444   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
445          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
446
447   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
448                 mbox[channel], task);
449
450   PAJE_COMM_START(process,task,channel);
451     
452   if(remote_host->simdata->sleeping[channel]) {
453     DEBUG0("Somebody is listening. Let's wake him up!");
454     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
455   }
456
457   process->simdata->put_host = dest;
458   process->simdata->put_channel = channel;
459   while(!(task_simdata->comm)) {
460     if(max_duration>0) {
461       if(!first_time) {
462         PAJE_PROCESS_POP_STATE(process);
463         PAJE_COMM_STOP(process,task,channel);
464         MSG_RETURN(MSG_TRANSFER_FAILURE);
465       }
466     }
467     DEBUG0("Communication not initiated yet. Let's block!");
468     if(max_duration>0)
469       __MSG_process_block(max_duration,task->name);
470     else
471       __MSG_process_block(-1,task->name);
472
473     first_time = 0;
474
475     if(surf_workstation_resource->extension_public->
476        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
477       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
478                       task);
479       PAJE_PROCESS_POP_STATE(process);
480       PAJE_COMM_STOP(process,task,channel);
481       MSG_task_destroy(task);
482       MSG_RETURN(MSG_HOST_FAILURE);
483     }
484   }
485   DEBUG0("Registering to this communication");
486   surf_workstation_resource->common_public->action_use(task_simdata->comm);
487   process->simdata->put_host = NULL;
488   process->simdata->put_channel = -1;
489
490
491   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
492
493   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
494   while (state==SURF_ACTION_RUNNING) {
495     DEBUG0("Waiting for action termination");
496     __MSG_task_wait_event(process, task);
497     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
498   }
499   DEBUG0("Action terminated");
500   task->simdata->rate=-1.0; /* Sets the rate back to default */
501
502   PAJE_PROCESS_POP_STATE(process);  
503
504   if(state == SURF_ACTION_DONE) {
505     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
506       task_simdata->comm = NULL;
507     MSG_task_destroy(task);
508     MSG_RETURN(MSG_OK);
509   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
510             == SURF_CPU_OFF) {
511     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
512       task_simdata->comm = NULL;
513     MSG_task_destroy(task);
514     MSG_RETURN(MSG_HOST_FAILURE);
515   } else { 
516     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
517       task_simdata->comm = NULL;
518     MSG_task_destroy(task);
519     MSG_RETURN(MSG_TRANSFER_FAILURE);
520   }
521 }
522 /** \ingroup msg_gos_functions
523  * \brief Put a task on a channel of an host and waits for the end of the
524  * transmission.
525  *
526  * This function is used for describing the behavior of an agent. It
527  * takes three parameter.
528  * \param task a #m_task_t to send on another location. This task
529    will not be usable anymore when the function will return. There is
530    no automatic task duplication and you have to save your parameters
531    before calling this function. Tasks are unique and once it has been
532    sent to another location, you should not access it anymore. You do
533    not need to call MSG_task_destroy() but to avoid using, as an
534    effect of inattention, this task anymore, you definitely should
535    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
536    can be transfered iff it has been correctly created with
537    MSG_task_create().
538  * \param dest the destination of the message
539  * \param channel the channel on which the agent should put this
540    task. This value has to be >=0 and < than the maximal number of
541    channels fixed with MSG_set_channel_number().
542  * \return #MSG_FATAL if \a task is not properly initialized and
543  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
544  * this function was called was shut down. Returns
545  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
546  * (network failure, dest failure)
547  */
548 MSG_error_t MSG_task_put(m_task_t task,
549                          m_host_t dest, m_channel_t channel)
550 {
551   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
552 }
553
554 /** \ingroup msg_gos_functions
555  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
556  * rate.
557  *
558  * \sa MSG_task_put
559  */
560 MSG_error_t MSG_task_put_bounded(m_task_t task,
561                                  m_host_t dest, m_channel_t channel,
562                                  double max_rate)
563 {
564   MSG_error_t res = MSG_OK;
565   task->simdata->rate=max_rate;
566   res = MSG_task_put(task, dest, channel);
567   return(res);
568 }
569
570 /** \ingroup msg_gos_functions
571  * \brief Executes a task and waits for its termination.
572  *
573  * This function is used for describing the behavior of an agent. It
574  * takes only one parameter.
575  * \param task a #m_task_t to execute on the location on which the
576    agent is running.
577  * \return #MSG_FATAL if \a task is not properly initialized and
578  * #MSG_OK otherwise.
579  */
580 MSG_error_t MSG_task_execute(m_task_t task)
581 {
582   m_process_t process = MSG_process_self();
583   MSG_error_t res;
584
585   DEBUG1("Computing on %s", process->simdata->host->name);
586
587   __MSG_task_execute(process, task);
588
589   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
590   res = __MSG_wait_for_computation(process,task);
591   PAJE_PROCESS_POP_STATE(process);
592   return res;
593 }
594
595 void __MSG_task_execute(m_process_t process, m_task_t task)
596 {
597   simdata_task_t simdata = NULL;
598
599   CHECK_HOST();
600
601   simdata = task->simdata;
602   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
603               "This taks is executed somewhere else. Go fix your code!");
604   task->simdata->using++;
605   simdata->compute = surf_workstation_resource->extension_public->
606     execute(MSG_process_get_host(process)->simdata->host,
607             simdata->computation_amount);
608   surf_workstation_resource->common_public->
609     set_priority(simdata->compute, simdata->priority);
610
611   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
612   task->simdata->using--;
613 }
614
615 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
616 {
617   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
618   simdata_task_t simdata = task->simdata;
619
620   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
621   simdata->using++;
622   do {
623     __MSG_task_wait_event(process, task);
624     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
625   } while (state==SURF_ACTION_RUNNING);
626   simdata->using--;
627     
628
629   if(state == SURF_ACTION_DONE) {
630     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
631       simdata->compute = NULL;
632     simdata->computation_amount = 0.0;
633     XBT_OUT;
634     MSG_RETURN(MSG_OK);
635   } else if(surf_workstation_resource->extension_public->
636             get_state(MSG_process_get_host(process)->simdata->host) 
637             == SURF_CPU_OFF) {
638     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
639       simdata->compute = NULL;
640     XBT_OUT;
641     MSG_RETURN(MSG_HOST_FAILURE);
642   } else {
643     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
644       simdata->compute = NULL;
645     XBT_OUT;
646     MSG_RETURN(MSG_TASK_CANCELLED);
647   }
648 }
649 /** \ingroup m_task_management
650  * \brief Creates a new #m_task_t (a parallel one....).
651  *
652  * A constructor for #m_task_t taking six arguments and returning the 
653    corresponding object.
654  * \param name a name for the object. It is for user-level information
655    and can be NULL.
656  * \param host_nb the number of hosts implied in the parallel task.
657  * \param host_list an array of \p host_nb m_host_t.
658  * \param computation_amount an array of \p host_nb
659    doubles. computation_amount[i] is the total number of operations
660    that have to be performed on host_list[i].
661  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
662  * \param data a pointer to any data may want to attach to the new
663    object.  It is for user-level information and can be NULL. It can
664    be retrieved with the function \ref MSG_task_get_data.
665  * \see m_task_t
666  * \return The new corresponding object.
667  */
668 m_task_t MSG_parallel_task_create(const char *name, 
669                                   int host_nb,
670                                   const m_host_t *host_list,
671                                   double *computation_amount,
672                                   double *communication_amount,
673                                   void *data)
674 {
675   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
676   m_task_t task = xbt_new0(s_m_task_t,1);
677   int i;
678
679   /* Task structure */
680   task->name = xbt_strdup(name);
681   task->simdata = simdata;
682   task->data = data;
683
684   /* Simulator Data */
685   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
686   simdata->rate = -1.0;
687   simdata->using = 1;
688   simdata->sender = NULL;
689   simdata->source = NULL;
690   simdata->host_nb = host_nb;
691   
692   simdata->host_list = xbt_new0(void *, host_nb);
693   simdata->comp_amount = computation_amount;
694   simdata->comm_amount = communication_amount;
695
696   for(i=0;i<host_nb;i++)
697     simdata->host_list[i] = host_list[i]->simdata->host;
698
699   return task;
700 }
701
702
703 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
704 {
705   simdata_task_t simdata = NULL;
706
707   CHECK_HOST();
708
709   simdata = task->simdata;
710
711   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
712
713   simdata->compute = surf_workstation_resource->extension_public->
714   execute_parallel_task(task->simdata->host_nb,
715                         task->simdata->host_list,
716                         task->simdata->comp_amount,
717                         task->simdata->comm_amount,
718                         1.0,
719                         -1.0);
720   if(simdata->compute)
721     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
722 }
723
724 MSG_error_t MSG_parallel_task_execute(m_task_t task)
725 {
726   m_process_t process = MSG_process_self();
727   MSG_error_t res;
728
729   DEBUG0("Computing on a tons of guys");
730   
731   __MSG_parallel_task_execute(process, task);
732
733   if(task->simdata->compute)
734     res = __MSG_wait_for_computation(process,task);
735   else 
736     res = MSG_OK;
737
738   return res;  
739 }
740
741
742 /** \ingroup msg_gos_functions
743  * \brief Sleep for the specified number of seconds
744  *
745  * Makes the current process sleep until \a time seconds have elapsed.
746  *
747  * \param nb_sec a number of second
748  */
749 MSG_error_t MSG_process_sleep(double nb_sec)
750 {
751   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
752   m_process_t process = MSG_process_self();
753   m_task_t dummy = NULL;
754   simdata_task_t simdata = NULL;
755
756   CHECK_HOST();
757   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
758   simdata = dummy->simdata;
759
760   simdata->compute = surf_workstation_resource->extension_public->
761     sleep(MSG_process_get_host(process)->simdata->host,
762             simdata->computation_amount);
763   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
764
765   
766   simdata->using++;
767   do {
768     __MSG_task_wait_event(process, dummy);
769     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
770   } while (state==SURF_ACTION_RUNNING);
771   simdata->using--;
772     
773   if(state == SURF_ACTION_DONE) {
774     if(surf_workstation_resource->extension_public->
775        get_state(MSG_process_get_host(process)->simdata->host) 
776        == SURF_CPU_OFF) {
777       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
778         simdata->compute = NULL;
779       MSG_RETURN(MSG_HOST_FAILURE);
780     }
781     if(__MSG_process_isBlocked(process)) {
782       __MSG_process_unblock(MSG_process_self());
783     }
784     if(surf_workstation_resource->extension_public->
785        get_state(MSG_process_get_host(process)->simdata->host) 
786        == SURF_CPU_OFF) {
787       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
788         simdata->compute = NULL;
789       MSG_RETURN(MSG_HOST_FAILURE);
790     }
791     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
792       simdata->compute = NULL;
793     MSG_task_destroy(dummy);
794     MSG_RETURN(MSG_OK);
795   } else MSG_RETURN(MSG_HOST_FAILURE);
796 }
797
798 /** \ingroup msg_gos_functions
799  * \brief Return the number of MSG tasks currently running on
800  * the host of the current running process.
801  */
802 static int MSG_get_msgload(void) 
803 {
804   m_process_t process;
805    
806   CHECK_HOST();
807   
808   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
809   process = MSG_process_self();
810   return xbt_fifo_size(process->simdata->host->simdata->process_list);
811 }
812
813 /** \ingroup msg_gos_functions
814  *
815  * \brief Return the last value returned by a MSG function (except
816  * MSG_get_errno...).
817  */
818 MSG_error_t MSG_get_errno(void)
819 {
820   return PROCESS_GET_ERRNO();
821 }