Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
oldies
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         PAJE_PROCESS_POP_STATE(process);
66         PAJE_COMM_STOP(process,t,channel);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       MSG_RETURN(MSG_HOST_FAILURE);
86     /* OK, we should both be ready now. Are you there ? */
87   }
88
89   DEBUG1("OK, got a task (%s)", t->name);
90
91   t_simdata = t->simdata;
92   /*   *task = __MSG_task_copy(t); */
93   *task=t;
94
95   /* Transfer */
96   t_simdata->using++;
97
98   while(MSG_process_is_suspended(t_simdata->sender)) {
99     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
100            t_simdata->sender->name);
101     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
102     if(__MSG_process_isBlocked(t_simdata->sender)) {
103       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
104       __MSG_process_unblock(t_simdata->sender);
105       task_to_wait_for->simdata->using++;
106       __MSG_task_wait_event(process, task_to_wait_for);
107       MSG_task_destroy(task_to_wait_for);
108     } else {
109       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
110       task_to_wait_for->simdata->using++;
111       __MSG_task_wait_event(process, task_to_wait_for);
112       MSG_task_destroy(task_to_wait_for);
113       DEBUG0("He's resumed. He should block again. So let's free him.");
114       __MSG_process_unblock(t_simdata->sender);
115       break;
116     }
117   }
118   DEBUG0("Calling SURF for communication creation");
119   t_simdata->comm = surf_workstation_resource->extension_public->
120     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
121                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
122   
123   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
124
125   if(__MSG_process_isBlocked(t_simdata->sender)) {
126     DEBUG1("Unblocking %s",t_simdata->sender->name);
127     __MSG_process_unblock(t_simdata->sender);
128   }
129
130   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
131
132   do {
133     DEBUG0("Waiting for action termination");
134     __MSG_task_wait_event(process, t);
135     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
136   } while (state==SURF_ACTION_RUNNING);
137   DEBUG0("Action terminated");
138
139   if(t->simdata->using>1) {
140     xbt_fifo_unshift(msg_global->process_to_run,process);
141     xbt_context_yield();
142   }
143
144   PAJE_PROCESS_POP_STATE(process);
145   PAJE_COMM_STOP(process,t,channel);
146
147   if(state == SURF_ACTION_DONE) {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_OK);
151   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
152           == SURF_CPU_OFF) {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_HOST_FAILURE);
156   } else {
157     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
158       t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Listen on a channel and wait for receiving a task.
165  *
166  * It takes two parameters.
167  * \param task a memory location for storing a #m_task_t. It will
168    hold a task when this function will return. Thus \a task should not
169    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
170    those two condition does not hold, there will be a warning message.
171  * \param channel the channel on which the agent should be
172    listening. This value has to be >=0 and < than the maximal
173    number of channels fixed with MSG_set_channel_number().
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get(m_task_t * task,
178                          m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
206 }
207
208 /** \ingroup msg_gos_functions
209  * \brief Listen on \a channel and waits for receiving a task from \a host.
210  *
211  * It takes three parameters.
212  * \param task a memory location for storing a #m_task_t. It will
213    hold a task when this function will return. Thus \a task should not
214    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
215    those two condition does not hold, there will be a warning message.
216  * \param channel the channel on which the agent should be
217    listening. This value has to be >=0 and < than the maximal
218    number of channels fixed with MSG_set_channel_number().
219  * \param host the host that is to be watched.
220  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
221    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
222  */
223 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
224                                    m_host_t host)
225 {
226   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
227 }
228
229 /** \ingroup msg_gos_functions
230  * \brief Test whether there is a pending communication on a channel.
231  *
232  * It takes one parameter.
233  * \param channel the channel on which the agent should be
234    listening. This value has to be >=0 and < than the maximal
235    number of channels fixed with MSG_set_channel_number().
236  * \return 1 if there is a pending communication and 0 otherwise
237  */
238 int MSG_task_Iprobe(m_channel_t channel)
239 {
240   m_host_t h = NULL;
241   simdata_host_t h_simdata = NULL;
242
243   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
244   DEBUG2("Probing on channel %d (%s)", channel,h->name);
245   CHECK_HOST();
246   h = MSG_host_self();
247   h_simdata = h->simdata;
248   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
249 }
250
251 /** \ingroup msg_gos_functions
252  * \brief Test whether there is a pending communication on a channel, and who sent it.
253  *
254  * It takes one parameter.
255  * \param channel the channel on which the agent should be
256    listening. This value has to be >=0 and < than the maximal
257    number of channels fixed with MSG_set_channel_number().
258  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
259  */
260 int MSG_task_probe_from(m_channel_t channel)
261 {
262   m_host_t h = NULL;
263   simdata_host_t h_simdata = NULL;
264   xbt_fifo_item_t item;
265   m_task_t t;
266
267   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
268   CHECK_HOST();
269   h = MSG_host_self();
270   h_simdata = h->simdata;
271
272   DEBUG2("Probing on channel %d (%s)", channel,h->name);
273    
274   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
275   if (!item || !(t = xbt_fifo_get_item_content(item)))
276     return -1;
277    
278   return MSG_process_get_PID(t->simdata->sender);
279 }
280
281 /** \ingroup msg_gos_functions
282  * \brief Wait for at most \a max_duration second for a task reception
283    on \a channel. *\a PID is updated with the PID of the first process
284    that triggered this event if any.
285  *
286  * It takes three parameters:
287  * \param channel the channel on which the agent should be
288    listening. This value has to be >=0 and < than the maximal.
289    number of channels fixed with MSG_set_channel_number().
290  * \param PID a memory location for storing an int.
291  * \param max_duration the maximum time to wait for a task before
292     giving up. In the case of a reception, *\a PID will be updated
293     with the PID of the first process to send a task.
294  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
295    and #MSG_OK otherwise.
296  */
297 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
298                                     int *PID)
299 {
300   m_host_t h = NULL;
301   simdata_host_t h_simdata = NULL;
302   xbt_fifo_item_t item;
303   m_task_t t;
304   int first_time = 1;
305   m_process_t process = MSG_process_self();
306
307   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
308   if(PID) {
309     *PID = -1;
310   }
311
312   if(max_duration==0.0) {
313     *PID = MSG_task_probe_from(channel);
314     MSG_RETURN(MSG_OK);
315   } else {
316     CHECK_HOST();
317     h = MSG_host_self();
318     h_simdata = h->simdata;
319     
320     DEBUG2("Probing on channel %d (%s)", channel,h->name);
321     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
322       if(max_duration>0) {
323         if(!first_time) {
324           MSG_RETURN(MSG_OK);
325         }
326       }
327       xbt_assert2(!(h_simdata->sleeping[channel]),
328                   "A process (%s(%d)) is already blocked on this channel",
329                   h_simdata->sleeping[channel]->name,
330                   h_simdata->sleeping[channel]->simdata->PID);
331       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
332       if(max_duration>0) {
333         __MSG_process_block(max_duration,"");
334       } else {
335         __MSG_process_block(-1,"");
336       }
337       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
338          == SURF_CPU_OFF) {
339         MSG_RETURN(MSG_HOST_FAILURE);
340       }
341       h_simdata->sleeping[channel] = NULL;
342       first_time = 0;
343     }
344     if (!item || !(t = xbt_fifo_get_item_content(item))) {
345       MSG_RETURN(MSG_OK);
346     }
347     if(PID) {
348       *PID = MSG_process_get_PID(t->simdata->sender);
349     }
350     MSG_RETURN(MSG_OK);
351   }
352 }
353
354
355 /** \ingroup msg_gos_functions
356
357  * \brief Return the number of tasks waiting to be received on a \a
358    channel and sent by \a host.
359  *
360  * It takes two parameters.
361  * \param channel the channel on which the agent should be
362    listening. This value has to be >=0 and < than the maximal
363    number of channels fixed with MSG_set_channel_number().
364  * \param host the host that is to be watched.
365  * \return the number of tasks waiting to be received on \a channel
366    and sent by \a host.
367  */
368 int MSG_task_probe_from_host(int channel, m_host_t host)
369 {
370   simdata_host_t h_simdata = NULL;
371   xbt_fifo_item_t item;
372   m_task_t t;
373   int count = 0;
374   m_host_t h = NULL;
375   
376   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
377   CHECK_HOST();
378   h = MSG_host_self();
379   h_simdata = h->simdata;
380
381   DEBUG2("Probing on channel %d (%s)", channel,h->name);
382    
383   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
384     if(t->simdata->source==host) count++;
385   }
386    
387   return count;
388 }
389
390 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
391  * host (with a timeout on the waiting of the destination host) and
392  * waits for the end of the transmission.
393  *
394  * This function is used for describing the behavior of an agent. It
395  * takes four parameter.
396  * \param task a #m_task_t to send on another location. This task
397    will not be usable anymore when the function will return. There is
398    no automatic task duplication and you have to save your parameters
399    before calling this function. Tasks are unique and once it has been
400    sent to another location, you should not access it anymore. You do
401    not need to call MSG_task_destroy() but to avoid using, as an
402    effect of inattention, this task anymore, you definitely should
403    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
404    can be transfered iff it has been correctly created with
405    MSG_task_create().
406  * \param dest the destination of the message
407  * \param channel the channel on which the agent should put this
408    task. This value has to be >=0 and < than the maximal number of
409    channels fixed with MSG_set_channel_number().
410  * \param max_duration the maximum time to wait for a task before giving
411     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
412     will not be modified 
413  * \return #MSG_FATAL if \a task is not properly initialized and
414    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
415    this function was called was shut down. Returns
416    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
417    (network failure, dest failure, timeout...)
418  */
419 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
420                                       m_channel_t channel, double max_duration)
421 {
422   m_process_t process = MSG_process_self();
423   simdata_task_t task_simdata = NULL;
424   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
425   m_host_t local_host = NULL;
426   m_host_t remote_host = NULL;
427   int first_time = 1;
428
429   CHECK_HOST();
430
431   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
432
433   task_simdata = task->simdata;
434   task_simdata->sender = process;
435   task_simdata->source = MSG_process_get_host(process);
436   xbt_assert0(task_simdata->using==1,
437               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
438   task_simdata->comm = NULL;
439   
440   local_host = ((simdata_process_t) process->simdata)->host;
441   remote_host = dest;
442
443   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
444          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
445
446   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
447                 mbox[channel], task);
448
449   PAJE_COMM_START(process,task,channel);
450     
451   if(remote_host->simdata->sleeping[channel]) {
452     DEBUG0("Somebody is listening. Let's wake him up!");
453     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
454   }
455
456   process->simdata->put_host = dest;
457   process->simdata->put_channel = channel;
458   while(!(task_simdata->comm)) {
459     if(max_duration>0) {
460       if(!first_time) {
461         PAJE_PROCESS_POP_STATE(process);
462         PAJE_COMM_STOP(process,task,channel);
463         MSG_RETURN(MSG_TRANSFER_FAILURE);
464       }
465     }
466     DEBUG0("Communication not initiated yet. Let's block!");
467     if(max_duration>0)
468       __MSG_process_block(max_duration,task->name);
469     else
470       __MSG_process_block(-1,task->name);
471
472     first_time = 0;
473
474     if(surf_workstation_resource->extension_public->
475        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
476       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
477                       task);
478       PAJE_PROCESS_POP_STATE(process);
479       PAJE_COMM_STOP(process,task,channel);
480       MSG_task_destroy(task);
481       MSG_RETURN(MSG_HOST_FAILURE);
482     }
483   }
484   DEBUG0("Registering to this communication");
485   surf_workstation_resource->common_public->action_use(task_simdata->comm);
486   process->simdata->put_host = NULL;
487   process->simdata->put_channel = -1;
488
489
490   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
491
492   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
493   while (state==SURF_ACTION_RUNNING) {
494     DEBUG0("Waiting for action termination");
495     __MSG_task_wait_event(process, task);
496     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
497   }
498   DEBUG0("Action terminated");
499   task->simdata->rate=-1.0; /* Sets the rate back to default */
500
501   PAJE_PROCESS_POP_STATE(process);  
502
503   if(state == SURF_ACTION_DONE) {
504     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
505       task_simdata->comm = NULL;
506     MSG_task_destroy(task);
507     MSG_RETURN(MSG_OK);
508   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
509             == SURF_CPU_OFF) {
510     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
511       task_simdata->comm = NULL;
512     MSG_task_destroy(task);
513     MSG_RETURN(MSG_HOST_FAILURE);
514   } else { 
515     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
516       task_simdata->comm = NULL;
517     MSG_task_destroy(task);
518     MSG_RETURN(MSG_TRANSFER_FAILURE);
519   }
520 }
521 /** \ingroup msg_gos_functions
522  * \brief Put a task on a channel of an host and waits for the end of the
523  * transmission.
524  *
525  * This function is used for describing the behavior of an agent. It
526  * takes three parameter.
527  * \param task a #m_task_t to send on another location. This task
528    will not be usable anymore when the function will return. There is
529    no automatic task duplication and you have to save your parameters
530    before calling this function. Tasks are unique and once it has been
531    sent to another location, you should not access it anymore. You do
532    not need to call MSG_task_destroy() but to avoid using, as an
533    effect of inattention, this task anymore, you definitely should
534    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
535    can be transfered iff it has been correctly created with
536    MSG_task_create().
537  * \param dest the destination of the message
538  * \param channel the channel on which the agent should put this
539    task. This value has to be >=0 and < than the maximal number of
540    channels fixed with MSG_set_channel_number().
541  * \return #MSG_FATAL if \a task is not properly initialized and
542  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
543  * this function was called was shut down. Returns
544  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
545  * (network failure, dest failure)
546  */
547 MSG_error_t MSG_task_put(m_task_t task,
548                          m_host_t dest, m_channel_t channel)
549 {
550   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
551 }
552
553 /** \ingroup msg_gos_functions
554  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
555  * rate.
556  *
557  * \sa MSG_task_put
558  */
559 MSG_error_t MSG_task_put_bounded(m_task_t task,
560                                  m_host_t dest, m_channel_t channel,
561                                  double max_rate)
562 {
563   MSG_error_t res = MSG_OK;
564   task->simdata->rate=max_rate;
565   res = MSG_task_put(task, dest, channel);
566   return(res);
567 }
568
569 /** \ingroup msg_gos_functions
570  * \brief Executes a task and waits for its termination.
571  *
572  * This function is used for describing the behavior of an agent. It
573  * takes only one parameter.
574  * \param task a #m_task_t to execute on the location on which the
575    agent is running.
576  * \return #MSG_FATAL if \a task is not properly initialized and
577  * #MSG_OK otherwise.
578  */
579 MSG_error_t MSG_task_execute(m_task_t task)
580 {
581   m_process_t process = MSG_process_self();
582   MSG_error_t res;
583
584   DEBUG1("Computing on %s", process->simdata->host->name);
585
586   __MSG_task_execute(process, task);
587
588   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
589   res = __MSG_wait_for_computation(process,task);
590   PAJE_PROCESS_POP_STATE(process);
591   return res;
592 }
593
594 void __MSG_task_execute(m_process_t process, m_task_t task)
595 {
596   simdata_task_t simdata = NULL;
597
598   CHECK_HOST();
599
600   simdata = task->simdata;
601   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
602               "This taks is executed somewhere else. Go fix your code!");
603   task->simdata->using++;
604   simdata->compute = surf_workstation_resource->extension_public->
605     execute(MSG_process_get_host(process)->simdata->host,
606             simdata->computation_amount);
607   surf_workstation_resource->common_public->
608     set_priority(simdata->compute, simdata->priority);
609
610   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
611   task->simdata->using--;
612 }
613
614 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
615 {
616   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
617   simdata_task_t simdata = task->simdata;
618
619   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
620   simdata->using++;
621   do {
622     __MSG_task_wait_event(process, task);
623     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
624   } while (state==SURF_ACTION_RUNNING);
625   simdata->using--;
626     
627
628   if(state == SURF_ACTION_DONE) {
629     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
630       simdata->compute = NULL;
631     simdata->computation_amount = 0.0;
632     XBT_OUT;
633     MSG_RETURN(MSG_OK);
634   } else if(surf_workstation_resource->extension_public->
635             get_state(MSG_process_get_host(process)->simdata->host) 
636             == SURF_CPU_OFF) {
637     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
638       simdata->compute = NULL;
639     XBT_OUT;
640     MSG_RETURN(MSG_HOST_FAILURE);
641   } else {
642     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
643       simdata->compute = NULL;
644     XBT_OUT;
645     MSG_RETURN(MSG_TASK_CANCELLED);
646   }
647 }
648 /** \ingroup m_task_management
649  * \brief Creates a new #m_task_t (a parallel one....).
650  *
651  * A constructor for #m_task_t taking six arguments and returning the 
652    corresponding object.
653  * \param name a name for the object. It is for user-level information
654    and can be NULL.
655  * \param host_nb the number of hosts implied in the parallel task.
656  * \param host_list an array of \p host_nb m_host_t.
657  * \param computation_amount an array of \p host_nb
658    doubles. computation_amount[i] is the total number of operations
659    that have to be performed on host_list[i].
660  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
661  * \param data a pointer to any data may want to attach to the new
662    object.  It is for user-level information and can be NULL. It can
663    be retrieved with the function \ref MSG_task_get_data.
664  * \see m_task_t
665  * \return The new corresponding object.
666  */
667 m_task_t MSG_parallel_task_create(const char *name, 
668                                   int host_nb,
669                                   const m_host_t *host_list,
670                                   double *computation_amount,
671                                   double *communication_amount,
672                                   void *data)
673 {
674   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
675   m_task_t task = xbt_new0(s_m_task_t,1);
676   int i;
677
678   /* Task structure */
679   task->name = xbt_strdup(name);
680   task->simdata = simdata;
681   task->data = data;
682
683   /* Simulator Data */
684   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
685   simdata->rate = -1.0;
686   simdata->using = 1;
687   simdata->sender = NULL;
688   simdata->source = NULL;
689   simdata->host_nb = host_nb;
690   
691   simdata->host_list = xbt_new0(void *, host_nb);
692   simdata->comp_amount = computation_amount;
693   simdata->comm_amount = communication_amount;
694
695   for(i=0;i<host_nb;i++)
696     simdata->host_list[i] = host_list[i]->simdata->host;
697
698   return task;
699 }
700
701
702 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
703 {
704   simdata_task_t simdata = NULL;
705
706   CHECK_HOST();
707
708   simdata = task->simdata;
709
710   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
711
712   simdata->compute = surf_workstation_resource->extension_public->
713   execute_parallel_task(task->simdata->host_nb,
714                         task->simdata->host_list,
715                         task->simdata->comp_amount,
716                         task->simdata->comm_amount,
717                         1.0,
718                         -1.0);
719   if(simdata->compute)
720     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
721 }
722
723 MSG_error_t MSG_parallel_task_execute(m_task_t task)
724 {
725   m_process_t process = MSG_process_self();
726   MSG_error_t res;
727
728   DEBUG0("Computing on a tons of guys");
729   
730   __MSG_parallel_task_execute(process, task);
731
732   if(task->simdata->compute)
733     res = __MSG_wait_for_computation(process,task);
734   else 
735     res = MSG_OK;
736
737   return res;  
738 }
739
740
741 /** \ingroup msg_gos_functions
742  * \brief Sleep for the specified number of seconds
743  *
744  * Makes the current process sleep until \a time seconds have elapsed.
745  *
746  * \param nb_sec a number of second
747  */
748 MSG_error_t MSG_process_sleep(double nb_sec)
749 {
750   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
751   m_process_t process = MSG_process_self();
752   m_task_t dummy = NULL;
753   simdata_task_t simdata = NULL;
754
755   CHECK_HOST();
756   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
757   simdata = dummy->simdata;
758
759   simdata->compute = surf_workstation_resource->extension_public->
760     sleep(MSG_process_get_host(process)->simdata->host,
761             simdata->computation_amount);
762   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
763
764   
765   simdata->using++;
766   do {
767     __MSG_task_wait_event(process, dummy);
768     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
769   } while (state==SURF_ACTION_RUNNING);
770   simdata->using--;
771     
772   if(state == SURF_ACTION_DONE) {
773     if(surf_workstation_resource->extension_public->
774        get_state(MSG_process_get_host(process)->simdata->host) 
775        == SURF_CPU_OFF) {
776       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
777         simdata->compute = NULL;
778       MSG_RETURN(MSG_HOST_FAILURE);
779     }
780     if(__MSG_process_isBlocked(process)) {
781       __MSG_process_unblock(MSG_process_self());
782     }
783     if(surf_workstation_resource->extension_public->
784        get_state(MSG_process_get_host(process)->simdata->host) 
785        == SURF_CPU_OFF) {
786       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
787         simdata->compute = NULL;
788       MSG_RETURN(MSG_HOST_FAILURE);
789     }
790     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
791       simdata->compute = NULL;
792     MSG_task_destroy(dummy);
793     MSG_RETURN(MSG_OK);
794   } else MSG_RETURN(MSG_HOST_FAILURE);
795 }
796
797 /** \ingroup msg_gos_functions
798  * \brief Return the number of MSG tasks currently running on
799  * the host of the current running process.
800  */
801 static int MSG_get_msgload(void) 
802 {
803   m_process_t process;
804    
805   CHECK_HOST();
806   
807   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
808   process = MSG_process_self();
809   return xbt_fifo_size(process->simdata->host->simdata->process_list);
810 }
811
812 /** \ingroup msg_gos_functions
813  *
814  * \brief Return the last value returned by a MSG function (except
815  * MSG_get_errno...).
816  */
817 MSG_error_t MSG_get_errno(void)
818 {
819   return PROCESS_GET_ERRNO();
820 }