Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d255d1d5ff48e0aff42fb1714c693317faee62b5
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         PAJE_PROCESS_POP_STATE(process);
66         PAJE_COMM_STOP(process,t,channel);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       MSG_RETURN(MSG_HOST_FAILURE);
86     /* OK, we should both be ready now. Are you there ? */
87   }
88
89   DEBUG1("OK, got a task (%s)", t->name);
90
91   t_simdata = t->simdata;
92   /*   *task = __MSG_task_copy(t); */
93   *task=t;
94
95   /* Transfer */
96   t_simdata->using++;
97
98   while(MSG_process_is_suspended(t_simdata->sender)) {
99     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
100            t_simdata->sender->name);
101     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
102     if(__MSG_process_isBlocked(t_simdata->sender)) {
103       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
104       __MSG_process_unblock(t_simdata->sender);
105       task_to_wait_for->simdata->using++;
106       __MSG_task_wait_event(process, task_to_wait_for);
107       MSG_task_destroy(task_to_wait_for);
108     } else {
109       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
110       task_to_wait_for->simdata->using++;
111       __MSG_task_wait_event(process, task_to_wait_for);
112       MSG_task_destroy(task_to_wait_for);
113       DEBUG0("He's resumed. He should block again. So let's free him.");
114       __MSG_process_unblock(t_simdata->sender);
115       break;
116     }
117   }
118   DEBUG0("Calling SURF for communication creation");
119   t_simdata->comm = surf_workstation_resource->extension_public->
120     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
121                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
122   
123   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
124
125   if(__MSG_process_isBlocked(t_simdata->sender)) {
126     DEBUG1("Unblocking %s",t_simdata->sender->name);
127     __MSG_process_unblock(t_simdata->sender);
128   }
129
130   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
131
132   do {
133     DEBUG0("Waiting for action termination");
134     __MSG_task_wait_event(process, t);
135     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
136   } while (state==SURF_ACTION_RUNNING);
137   DEBUG0("Action terminated");
138
139   if(t->simdata->using>1) {
140     xbt_fifo_unshift(msg_global->process_to_run,process);
141     xbt_context_yield();
142   }
143
144   PAJE_PROCESS_POP_STATE(process);
145   PAJE_COMM_STOP(process,t,channel);
146
147   if(state == SURF_ACTION_DONE) {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_OK);
151   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
152           == SURF_CPU_OFF) {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_HOST_FAILURE);
156   } else {
157     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
158       t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Listen on a channel and wait for receiving a task.
165  *
166  * It takes two parameters.
167  * \param task a memory location for storing a #m_task_t. It will
168    hold a task when this function will return. Thus \a task should not
169    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
170    those two condition does not hold, there will be a warning message.
171  * \param channel the channel on which the agent should be
172    listening. This value has to be >=0 and < than the maximal
173    number of channels fixed with MSG_set_channel_number().
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get(m_task_t * task,
178                          m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
206 }
207
208 /** \ingroup msg_gos_functions
209  * \brief Listen on \a channel and waits for receiving a task from \a host.
210  *
211  * It takes three parameters.
212  * \param task a memory location for storing a #m_task_t. It will
213    hold a task when this function will return. Thus \a task should not
214    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
215    those two condition does not hold, there will be a warning message.
216  * \param channel the channel on which the agent should be
217    listening. This value has to be >=0 and < than the maximal
218    number of channels fixed with MSG_set_channel_number().
219  * \param host the host that is to be watched.
220  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
221    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
222  */
223 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
224                                    m_host_t host)
225 {
226   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
227 }
228
229 /** \ingroup msg_gos_functions
230  * \brief Test whether there is a pending communication on a channel.
231  *
232  * It takes one parameter.
233  * \param channel the channel on which the agent should be
234    listening. This value has to be >=0 and < than the maximal
235    number of channels fixed with MSG_set_channel_number().
236  * \return 1 if there is a pending communication and 0 otherwise
237  */
238 int MSG_task_Iprobe(m_channel_t channel)
239 {
240   m_host_t h = NULL;
241   simdata_host_t h_simdata = NULL;
242
243   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
244   DEBUG2("Probing on channel %d (%s)", channel,h->name);
245   CHECK_HOST();
246   h = MSG_host_self();
247   h_simdata = h->simdata;
248   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
249 }
250
251 /** \ingroup msg_gos_functions
252  * \brief Test whether there is a pending communication on a channel, and who sent it.
253  *
254  * It takes one parameter.
255  * \param channel the channel on which the agent should be
256    listening. This value has to be >=0 and < than the maximal
257    number of channels fixed with MSG_set_channel_number().
258  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
259  */
260 int MSG_task_probe_from(m_channel_t channel)
261 {
262   m_host_t h = NULL;
263   simdata_host_t h_simdata = NULL;
264   xbt_fifo_item_t item;
265   m_task_t t;
266
267   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
268   CHECK_HOST();
269   h = MSG_host_self();
270   h_simdata = h->simdata;
271
272   DEBUG2("Probing on channel %d (%s)", channel,h->name);
273    
274   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
275   if (!item || !(t = xbt_fifo_get_item_content(item)))
276     return -1;
277    
278   return MSG_process_get_PID(t->simdata->sender);
279 }
280
281 /** \ingroup msg_gos_functions
282  * \brief Wait for at most \a max_duration second for a task reception
283    on \a channel. *\a PID is updated with the PID of the first process
284    that triggered this event if any.
285  *
286  * It takes three parameters:
287  * \param channel the channel on which the agent should be
288    listening. This value has to be >=0 and < than the maximal.
289    number of channels fixed with MSG_set_channel_number().
290  * \param PID a memory location for storing an int.
291  * \param max_duration the maximum time to wait for a task before
292     giving up. In the case of a reception, *\a PID will be updated
293     with the PID of the first process to send a task.
294  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
295    and #MSG_OK otherwise.
296  */
297 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
298                                     int *PID)
299 {
300   m_host_t h = NULL;
301   simdata_host_t h_simdata = NULL;
302   xbt_fifo_item_t item;
303   m_task_t t;
304   int first_time = 1;
305   m_process_t process = MSG_process_self();
306
307   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
308   if(PID) {
309     *PID = -1;
310   }
311
312   if(max_duration==0.0) {
313     *PID = MSG_task_probe_from(channel);
314     MSG_RETURN(MSG_OK);
315   } else {
316     CHECK_HOST();
317     h = MSG_host_self();
318     h_simdata = h->simdata;
319     
320     DEBUG2("Probing on channel %d (%s)", channel,h->name);
321     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
322       if(max_duration>0) {
323         if(!first_time) {
324           MSG_RETURN(MSG_OK);
325         }
326       }
327       xbt_assert2(!(h_simdata->sleeping[channel]),
328                   "A process (%s(%d)) is already blocked on this channel",
329                   h_simdata->sleeping[channel]->name,
330                   h_simdata->sleeping[channel]->simdata->PID);
331       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
332       if(max_duration>0) {
333         __MSG_process_block(max_duration,"");
334       } else {
335         __MSG_process_block(-1,"");
336       }
337       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
338          == SURF_CPU_OFF) {
339         MSG_RETURN(MSG_HOST_FAILURE);
340       }
341       h_simdata->sleeping[channel] = NULL;
342       first_time = 0;
343     }
344     if (!item || !(t = xbt_fifo_get_item_content(item))) {
345       MSG_RETURN(MSG_OK);
346     }
347     if(PID) {
348       *PID = MSG_process_get_PID(t->simdata->sender);
349     }
350     MSG_RETURN(MSG_OK);
351   }
352 }
353
354
355 /** \ingroup msg_gos_functions
356
357  * \brief Return the number of tasks waiting to be received on a \a
358    channel and sent by \a host.
359  *
360  * It takes two parameters.
361  * \param channel the channel on which the agent should be
362    listening. This value has to be >=0 and < than the maximal
363    number of channels fixed with MSG_set_channel_number().
364  * \param host the host that is to be watched.
365  * \return the number of tasks waiting to be received on \a channel
366    and sent by \a host.
367  */
368 int MSG_task_probe_from_host(int channel, m_host_t host)
369 {
370   simdata_host_t h_simdata = NULL;
371   xbt_fifo_item_t item;
372   m_task_t t;
373   int count = 0;
374   m_host_t h = NULL;
375   
376   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
377   CHECK_HOST();
378   h = MSG_host_self();
379   h_simdata = h->simdata;
380
381   DEBUG2("Probing on channel %d (%s)", channel,h->name);
382    
383   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
384     if(t->simdata->source==host) count++;
385   }
386    
387   return count;
388 }
389
390 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
391  * host (with a timeout on the waiting of the destination host) and
392  * waits for the end of the transmission.
393  *
394  * This function is used for describing the behavior of an agent. It
395  * takes four parameter.
396  * \param task a #m_task_t to send on another location. This task
397    will not be usable anymore when the function will return. There is
398    no automatic task duplication and you have to save your parameters
399    before calling this function. Tasks are unique and once it has been
400    sent to another location, you should not access it anymore. You do
401    not need to call MSG_task_destroy() but to avoid using, as an
402    effect of inattention, this task anymore, you definitely should
403    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
404    can be transfered iff it has been correctly created with
405    MSG_task_create().
406  * \param dest the destination of the message
407  * \param channel the channel on which the agent should put this
408    task. This value has to be >=0 and < than the maximal number of
409    channels fixed with MSG_set_channel_number().
410  * \param max_duration the maximum time to wait for a task before giving
411     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
412     will not be modified 
413  * \return #MSG_FATAL if \a task is not properly initialized and
414    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
415    this function was called was shut down. Returns
416    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
417    (network failure, dest failure, timeout...)
418  */
419 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
420                                       m_channel_t channel, double max_duration)
421 {
422   m_process_t process = MSG_process_self();
423   simdata_task_t task_simdata = NULL;
424   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
425   m_host_t local_host = NULL;
426   m_host_t remote_host = NULL;
427   int first_time = 1;
428
429   CHECK_HOST();
430
431   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
432
433   task_simdata = task->simdata;
434   task_simdata->sender = process;
435   task_simdata->source = MSG_process_get_host(process);
436   xbt_assert0(task_simdata->using==1,
437               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
438   task_simdata->comm = NULL;
439   
440   local_host = ((simdata_process_t) process->simdata)->host;
441   remote_host = dest;
442
443   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
444          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
445
446   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
447                 mbox[channel], task);
448
449   PAJE_COMM_START(process,task,channel);
450     
451   if(remote_host->simdata->sleeping[channel]) {
452     DEBUG0("Somebody is listening. Let's wake him up!");
453     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
454   }
455
456   process->simdata->put_host = dest;
457   process->simdata->put_channel = channel;
458   while(!(task_simdata->comm)) {
459     if(max_duration>0) {
460       if(!first_time) {
461         MSG_RETURN(MSG_TRANSFER_FAILURE);
462       }
463     }
464     DEBUG0("Communication not initiated yet. Let's block!");
465     if(max_duration>0)
466       __MSG_process_block(max_duration,task->name);
467     else
468       __MSG_process_block(-1,task->name);
469
470     first_time = 0;
471
472     if(surf_workstation_resource->extension_public->
473        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
474       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
475                       task);
476       MSG_task_destroy(task);
477       MSG_RETURN(MSG_HOST_FAILURE);
478     }
479   }
480   DEBUG0("Registering to this communication");
481   surf_workstation_resource->common_public->action_use(task_simdata->comm);
482   process->simdata->put_host = NULL;
483   process->simdata->put_channel = -1;
484
485
486   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
487
488   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
489   while (state==SURF_ACTION_RUNNING) {
490     DEBUG0("Waiting for action termination");
491     __MSG_task_wait_event(process, task);
492     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
493   }
494   DEBUG0("Action terminated");
495   task->simdata->rate=-1.0; /* Sets the rate back to default */
496
497   PAJE_PROCESS_POP_STATE(process);  
498
499   if(state == SURF_ACTION_DONE) {
500     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
501       task_simdata->comm = NULL;
502     MSG_task_destroy(task);
503     MSG_RETURN(MSG_OK);
504   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
505             == SURF_CPU_OFF) {
506     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
507       task_simdata->comm = NULL;
508     MSG_task_destroy(task);
509     MSG_RETURN(MSG_HOST_FAILURE);
510   } else { 
511     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
512       task_simdata->comm = NULL;
513     MSG_task_destroy(task);
514     MSG_RETURN(MSG_TRANSFER_FAILURE);
515   }
516 }
517 /** \ingroup msg_gos_functions
518  * \brief Put a task on a channel of an host and waits for the end of the
519  * transmission.
520  *
521  * This function is used for describing the behavior of an agent. It
522  * takes three parameter.
523  * \param task a #m_task_t to send on another location. This task
524    will not be usable anymore when the function will return. There is
525    no automatic task duplication and you have to save your parameters
526    before calling this function. Tasks are unique and once it has been
527    sent to another location, you should not access it anymore. You do
528    not need to call MSG_task_destroy() but to avoid using, as an
529    effect of inattention, this task anymore, you definitely should
530    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
531    can be transfered iff it has been correctly created with
532    MSG_task_create().
533  * \param dest the destination of the message
534  * \param channel the channel on which the agent should put this
535    task. This value has to be >=0 and < than the maximal number of
536    channels fixed with MSG_set_channel_number().
537  * \return #MSG_FATAL if \a task is not properly initialized and
538  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
539  * this function was called was shut down. Returns
540  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
541  * (network failure, dest failure)
542  */
543 MSG_error_t MSG_task_put(m_task_t task,
544                          m_host_t dest, m_channel_t channel)
545 {
546   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
547 }
548
549 /** \ingroup msg_gos_functions
550  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
551  * rate.
552  *
553  * \sa MSG_task_put
554  */
555 MSG_error_t MSG_task_put_bounded(m_task_t task,
556                                  m_host_t dest, m_channel_t channel,
557                                  double max_rate)
558 {
559   MSG_error_t res = MSG_OK;
560   task->simdata->rate=max_rate;
561   res = MSG_task_put(task, dest, channel);
562   return(res);
563 }
564
565 /** \ingroup msg_gos_functions
566  * \brief Executes a task and waits for its termination.
567  *
568  * This function is used for describing the behavior of an agent. It
569  * takes only one parameter.
570  * \param task a #m_task_t to execute on the location on which the
571    agent is running.
572  * \return #MSG_FATAL if \a task is not properly initialized and
573  * #MSG_OK otherwise.
574  */
575 MSG_error_t MSG_task_execute(m_task_t task)
576 {
577   m_process_t process = MSG_process_self();
578   MSG_error_t res;
579
580   DEBUG1("Computing on %s", process->simdata->host->name);
581
582   __MSG_task_execute(process, task);
583
584   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
585   res = __MSG_wait_for_computation(process,task);
586   PAJE_PROCESS_POP_STATE(process);
587   return res;
588 }
589
590 void __MSG_task_execute(m_process_t process, m_task_t task)
591 {
592   simdata_task_t simdata = NULL;
593
594   CHECK_HOST();
595
596   simdata = task->simdata;
597   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
598               "This taks is executed somewhere else. Go fix your code!");
599   task->simdata->using++;
600   simdata->compute = surf_workstation_resource->extension_public->
601     execute(MSG_process_get_host(process)->simdata->host,
602             simdata->computation_amount);
603   surf_workstation_resource->common_public->
604     set_priority(simdata->compute, simdata->priority);
605
606   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
607   task->simdata->using--;
608 }
609
610 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
611 {
612   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
613   simdata_task_t simdata = task->simdata;
614
615   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
616   simdata->using++;
617   do {
618     __MSG_task_wait_event(process, task);
619     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
620   } while (state==SURF_ACTION_RUNNING);
621   simdata->using--;
622     
623
624   if(state == SURF_ACTION_DONE) {
625     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
626       simdata->compute = NULL;
627     simdata->computation_amount = 0.0;
628     XBT_OUT;
629     MSG_RETURN(MSG_OK);
630   } else if(surf_workstation_resource->extension_public->
631             get_state(MSG_process_get_host(process)->simdata->host) 
632             == SURF_CPU_OFF) {
633     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
634       simdata->compute = NULL;
635     XBT_OUT;
636     MSG_RETURN(MSG_HOST_FAILURE);
637   } else {
638     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
639       simdata->compute = NULL;
640     XBT_OUT;
641     MSG_RETURN(MSG_TASK_CANCELLED);
642   }
643 }
644 /** \ingroup m_task_management
645  * \brief Creates a new #m_task_t (a parallel one....).
646  *
647  * A constructor for #m_task_t taking six arguments and returning the 
648    corresponding object.
649  * \param name a name for the object. It is for user-level information
650    and can be NULL.
651  * \param host_nb the number of hosts implied in the parallel task.
652  * \param host_list an array of \p host_nb m_host_t.
653  * \param computation_amount an array of \p host_nb
654    doubles. computation_amount[i] is the total number of operations
655    that have to be performed on host_list[i].
656  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
657  * \param data a pointer to any data may want to attach to the new
658    object.  It is for user-level information and can be NULL. It can
659    be retrieved with the function \ref MSG_task_get_data.
660  * \see m_task_t
661  * \return The new corresponding object.
662  */
663 m_task_t MSG_parallel_task_create(const char *name, 
664                                   int host_nb,
665                                   const m_host_t *host_list,
666                                   double *computation_amount,
667                                   double *communication_amount,
668                                   void *data)
669 {
670   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
671   m_task_t task = xbt_new0(s_m_task_t,1);
672   int i;
673
674   /* Task structure */
675   task->name = xbt_strdup(name);
676   task->simdata = simdata;
677   task->data = data;
678
679   /* Simulator Data */
680   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
681   simdata->rate = -1.0;
682   simdata->using = 1;
683   simdata->sender = NULL;
684   simdata->source = NULL;
685   simdata->host_nb = host_nb;
686   
687   simdata->host_list = xbt_new0(void *, host_nb);
688   simdata->comp_amount = computation_amount;
689   simdata->comm_amount = communication_amount;
690
691   for(i=0;i<host_nb;i++)
692     simdata->host_list[i] = host_list[i]->simdata->host;
693
694   return task;
695 }
696
697
698 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
699 {
700   simdata_task_t simdata = NULL;
701
702   CHECK_HOST();
703
704   simdata = task->simdata;
705
706   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
707
708   simdata->compute = surf_workstation_resource->extension_public->
709   execute_parallel_task(task->simdata->host_nb,
710                         task->simdata->host_list,
711                         task->simdata->comp_amount,
712                         task->simdata->comm_amount,
713                         1.0,
714                         -1.0);
715   if(simdata->compute)
716     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
717 }
718
719 MSG_error_t MSG_parallel_task_execute(m_task_t task)
720 {
721   m_process_t process = MSG_process_self();
722   MSG_error_t res;
723
724   DEBUG0("Computing on a tons of guys");
725   
726   __MSG_parallel_task_execute(process, task);
727
728   if(task->simdata->compute)
729     res = __MSG_wait_for_computation(process,task);
730   else 
731     res = MSG_OK;
732
733   return res;  
734 }
735
736
737 /** \ingroup msg_gos_functions
738  * \brief Sleep for the specified number of seconds
739  *
740  * Makes the current process sleep until \a time seconds have elapsed.
741  *
742  * \param nb_sec a number of second
743  */
744 MSG_error_t MSG_process_sleep(double nb_sec)
745 {
746   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
747   m_process_t process = MSG_process_self();
748   m_task_t dummy = NULL;
749   simdata_task_t simdata = NULL;
750
751   CHECK_HOST();
752   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
753   simdata = dummy->simdata;
754
755   simdata->compute = surf_workstation_resource->extension_public->
756     sleep(MSG_process_get_host(process)->simdata->host,
757             simdata->computation_amount);
758   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
759
760   
761   simdata->using++;
762   do {
763     __MSG_task_wait_event(process, dummy);
764     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
765   } while (state==SURF_ACTION_RUNNING);
766   simdata->using--;
767     
768   if(state == SURF_ACTION_DONE) {
769     if(surf_workstation_resource->extension_public->
770        get_state(MSG_process_get_host(process)->simdata->host) 
771        == SURF_CPU_OFF) {
772       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
773         simdata->compute = NULL;
774       MSG_RETURN(MSG_HOST_FAILURE);
775     }
776     if(__MSG_process_isBlocked(process)) {
777       __MSG_process_unblock(MSG_process_self());
778     }
779     if(surf_workstation_resource->extension_public->
780        get_state(MSG_process_get_host(process)->simdata->host) 
781        == SURF_CPU_OFF) {
782       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
783         simdata->compute = NULL;
784       MSG_RETURN(MSG_HOST_FAILURE);
785     }
786     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
787       simdata->compute = NULL;
788     MSG_task_destroy(dummy);
789     MSG_RETURN(MSG_OK);
790   } else MSG_RETURN(MSG_HOST_FAILURE);
791 }
792
793 /** \ingroup msg_gos_functions
794  * \brief Return the number of MSG tasks currently running on
795  * the host of the current running process.
796  */
797 static int MSG_get_msgload(void) 
798 {
799   m_process_t process;
800    
801   CHECK_HOST();
802   
803   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
804   process = MSG_process_self();
805   return xbt_fifo_size(process->simdata->host->simdata->process_list);
806 }
807
808 /** \ingroup msg_gos_functions
809  *
810  * \brief Return the last value returned by a MSG function (except
811  * MSG_get_errno...).
812  */
813 MSG_error_t MSG_get_errno(void)
814 {
815   return PROCESS_GET_ERRNO();
816 }