Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
379ca36228474ca44fe74a59b2b8033cad2412e3
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_TRANSFER_FAILURE);
66       }
67     }
68     xbt_assert3(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on channel %d",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID,
72                 channel);
73     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
74     if(max_duration>0) {
75       __MSG_process_block(max_duration,"");
76     } else {
77       __MSG_process_block(-1,"");
78     }
79     h_simdata->sleeping[channel] = NULL;
80     first_time = 0;
81     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
82        == SURF_CPU_OFF)
83       MSG_RETURN(MSG_HOST_FAILURE);
84     /* OK, we should both be ready now. Are you there ? */
85   }
86
87   DEBUG1("OK, got a task (%s)", t->name);
88
89   t_simdata = t->simdata;
90   /*   *task = __MSG_task_copy(t); */
91   *task=t;
92
93   /* Transfer */
94   t_simdata->using++;
95
96   while(MSG_process_is_suspended(t_simdata->sender)) {
97     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
98            t_simdata->sender->name);
99     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
100     if(__MSG_process_isBlocked(t_simdata->sender)) {
101       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
102       __MSG_process_unblock(t_simdata->sender);
103       task_to_wait_for->simdata->using++;
104       __MSG_task_wait_event(process, task_to_wait_for);
105       MSG_task_destroy(task_to_wait_for);
106     } else {
107       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
108       task_to_wait_for->simdata->using++;
109       __MSG_task_wait_event(process, task_to_wait_for);
110       MSG_task_destroy(task_to_wait_for);
111       DEBUG0("He's resumed. He should block again. So let's free him.");
112       __MSG_process_unblock(t_simdata->sender);
113       break;
114     }
115   }
116   DEBUG0("Calling SURF for communication creation");
117   t_simdata->comm = surf_workstation_resource->extension_public->
118     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
119                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
120   
121   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
122
123   if(__MSG_process_isBlocked(t_simdata->sender)) {
124     DEBUG1("Unblocking %s",t_simdata->sender->name);
125     __MSG_process_unblock(t_simdata->sender);
126   }
127
128   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
129
130   do {
131     DEBUG0("Waiting for action termination");
132     __MSG_task_wait_event(process, t);
133     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
134   } while (state==SURF_ACTION_RUNNING);
135   DEBUG0("Action terminated");
136
137   if(t->simdata->using>1) {
138     xbt_fifo_unshift(msg_global->process_to_run,process);
139     xbt_context_yield();
140   }
141
142   PAJE_PROCESS_POP_STATE(process);
143   PAJE_COMM_STOP(process,t,channel);
144
145   if(state == SURF_ACTION_DONE) {
146     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
147       t_simdata->comm = NULL;
148     MSG_RETURN(MSG_OK);
149   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
150           == SURF_CPU_OFF) {
151     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
152       t_simdata->comm = NULL;
153     MSG_RETURN(MSG_HOST_FAILURE);
154   } else {
155     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
156       t_simdata->comm = NULL;
157     MSG_RETURN(MSG_TRANSFER_FAILURE);
158   }
159 }
160
161 /** \ingroup msg_gos_functions
162  * \brief Listen on a channel and wait for receiving a task.
163  *
164  * It takes two parameters.
165  * \param task a memory location for storing a #m_task_t. It will
166    hold a task when this function will return. Thus \a task should not
167    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
168    those two condition does not hold, there will be a warning message.
169  * \param channel the channel on which the agent should be
170    listening. This value has to be >=0 and < than the maximal
171    number of channels fixed with MSG_set_channel_number().
172  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
173  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
174  */
175 MSG_error_t MSG_task_get(m_task_t * task,
176                          m_channel_t channel)
177 {
178   return MSG_task_get_with_time_out(task, channel, -1);
179 }
180
181 /** \ingroup msg_gos_functions
182  * \brief Listen on a channel and wait for receiving a task with a timeout.
183  *
184  * It takes three parameters.
185  * \param task a memory location for storing a #m_task_t. It will
186    hold a task when this function will return. Thus \a task should not
187    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
188    those two condition does not hold, there will be a warning message.
189  * \param channel the channel on which the agent should be
190    listening. This value has to be >=0 and < than the maximal
191    number of channels fixed with MSG_set_channel_number().
192  * \param max_duration the maximum time to wait for a task before giving
193     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
194     will not be modified and will still be
195     equal to \c NULL when returning. 
196  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
197    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
198  */
199 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
200                                        m_channel_t channel,
201                                        double max_duration)
202 {
203   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
204 }
205
206 /** \ingroup msg_gos_functions
207  * \brief Listen on \a channel and waits for receiving a task from \a host.
208  *
209  * It takes three parameters.
210  * \param task a memory location for storing a #m_task_t. It will
211    hold a task when this function will return. Thus \a task should not
212    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
213    those two condition does not hold, there will be a warning message.
214  * \param channel the channel on which the agent should be
215    listening. This value has to be >=0 and < than the maximal
216    number of channels fixed with MSG_set_channel_number().
217  * \param host the host that is to be watched.
218  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
219    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
220  */
221 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
222                                    m_host_t host)
223 {
224   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
225 }
226
227 /** \ingroup msg_gos_functions
228  * \brief Test whether there is a pending communication on a channel.
229  *
230  * It takes one parameter.
231  * \param channel the channel on which the agent should be
232    listening. This value has to be >=0 and < than the maximal
233    number of channels fixed with MSG_set_channel_number().
234  * \return 1 if there is a pending communication and 0 otherwise
235  */
236 int MSG_task_Iprobe(m_channel_t channel)
237 {
238   m_host_t h = NULL;
239   simdata_host_t h_simdata = NULL;
240
241   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
242   DEBUG2("Probing on channel %d (%s)", channel,h->name);
243   CHECK_HOST();
244   h = MSG_host_self();
245   h_simdata = h->simdata;
246   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
247 }
248
249 /** \ingroup msg_gos_functions
250  * \brief Test whether there is a pending communication on a channel, and who sent it.
251  *
252  * It takes one parameter.
253  * \param channel the channel on which the agent should be
254    listening. This value has to be >=0 and < than the maximal
255    number of channels fixed with MSG_set_channel_number().
256  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
257  */
258 int MSG_task_probe_from(m_channel_t channel)
259 {
260   m_host_t h = NULL;
261   simdata_host_t h_simdata = NULL;
262   xbt_fifo_item_t item;
263   m_task_t t;
264
265   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
266   CHECK_HOST();
267   h = MSG_host_self();
268   h_simdata = h->simdata;
269
270   DEBUG2("Probing on channel %d (%s)", channel,h->name);
271    
272   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
273   if (!item || !(t = xbt_fifo_get_item_content(item)))
274     return -1;
275    
276   return MSG_process_get_PID(t->simdata->sender);
277 }
278
279 /** \ingroup msg_gos_functions
280  * \brief Wait for at most \a max_duration second for a task reception
281    on \a channel. *\a PID is updated with the PID of the first process
282    that triggered this event if any.
283  *
284  * It takes three parameters:
285  * \param channel the channel on which the agent should be
286    listening. This value has to be >=0 and < than the maximal.
287    number of channels fixed with MSG_set_channel_number().
288  * \param PID a memory location for storing an int.
289  * \param max_duration the maximum time to wait for a task before
290     giving up. In the case of a reception, *\a PID will be updated
291     with the PID of the first process to send a task.
292  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
293    and #MSG_OK otherwise.
294  */
295 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
296                                     int *PID)
297 {
298   m_host_t h = NULL;
299   simdata_host_t h_simdata = NULL;
300   xbt_fifo_item_t item;
301   m_task_t t;
302   int first_time = 1;
303   m_process_t process = MSG_process_self();
304
305   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
306   if(PID) {
307     *PID = -1;
308   }
309
310   if(max_duration==0.0) {
311     *PID = MSG_task_probe_from(channel);
312     MSG_RETURN(MSG_OK);
313   } else {
314     CHECK_HOST();
315     h = MSG_host_self();
316     h_simdata = h->simdata;
317     
318     DEBUG2("Probing on channel %d (%s)", channel,h->name);
319     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
320       if(max_duration>0) {
321         if(!first_time) {
322           MSG_RETURN(MSG_OK);
323         }
324       }
325       xbt_assert2(!(h_simdata->sleeping[channel]),
326                   "A process (%s(%d)) is already blocked on this channel",
327                   h_simdata->sleeping[channel]->name,
328                   h_simdata->sleeping[channel]->simdata->PID);
329       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
330       if(max_duration>0) {
331         __MSG_process_block(max_duration,"");
332       } else {
333         __MSG_process_block(-1,"");
334       }
335       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
336          == SURF_CPU_OFF) {
337         MSG_RETURN(MSG_HOST_FAILURE);
338       }
339       h_simdata->sleeping[channel] = NULL;
340       first_time = 0;
341     }
342     if (!item || !(t = xbt_fifo_get_item_content(item))) {
343       MSG_RETURN(MSG_OK);
344     }
345     if(PID) {
346       *PID = MSG_process_get_PID(t->simdata->sender);
347     }
348     MSG_RETURN(MSG_OK);
349   }
350 }
351
352
353 /** \ingroup msg_gos_functions
354
355  * \brief Return the number of tasks waiting to be received on a \a
356    channel and sent by \a host.
357  *
358  * It takes two parameters.
359  * \param channel the channel on which the agent should be
360    listening. This value has to be >=0 and < than the maximal
361    number of channels fixed with MSG_set_channel_number().
362  * \param host the host that is to be watched.
363  * \return the number of tasks waiting to be received on \a channel
364    and sent by \a host.
365  */
366 int MSG_task_probe_from_host(int channel, m_host_t host)
367 {
368   simdata_host_t h_simdata = NULL;
369   xbt_fifo_item_t item;
370   m_task_t t;
371   int count = 0;
372   m_host_t h = NULL;
373   
374   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
375   CHECK_HOST();
376   h = MSG_host_self();
377   h_simdata = h->simdata;
378
379   DEBUG2("Probing on channel %d (%s)", channel,h->name);
380    
381   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
382     if(t->simdata->source==host) count++;
383   }
384    
385   return count;
386 }
387
388 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
389  * host (with a timeout on the waiting of the destination host) and
390  * waits for the end of the transmission.
391  *
392  * This function is used for describing the behavior of an agent. It
393  * takes four parameter.
394  * \param task a #m_task_t to send on another location. This task
395    will not be usable anymore when the function will return. There is
396    no automatic task duplication and you have to save your parameters
397    before calling this function. Tasks are unique and once it has been
398    sent to another location, you should not access it anymore. You do
399    not need to call MSG_task_destroy() but to avoid using, as an
400    effect of inattention, this task anymore, you definitely should
401    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
402    can be transfered iff it has been correctly created with
403    MSG_task_create().
404  * \param dest the destination of the message
405  * \param channel the channel on which the agent should put this
406    task. This value has to be >=0 and < than the maximal number of
407    channels fixed with MSG_set_channel_number().
408  * \param max_duration the maximum time to wait for a task before giving
409     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
410     will not be modified 
411  * \return #MSG_FATAL if \a task is not properly initialized and
412    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
413    this function was called was shut down. Returns
414    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
415    (network failure, dest failure, timeout...)
416  */
417 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
418                                       m_channel_t channel, double max_duration)
419 {
420   m_process_t process = MSG_process_self();
421   simdata_task_t task_simdata = NULL;
422   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
423   m_host_t local_host = NULL;
424   m_host_t remote_host = NULL;
425   int first_time = 1;
426
427   CHECK_HOST();
428
429   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
430
431   task_simdata = task->simdata;
432   task_simdata->sender = process;
433   task_simdata->source = MSG_process_get_host(process);
434   xbt_assert0(task_simdata->using==1,
435               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
436   task_simdata->comm = NULL;
437   
438   local_host = ((simdata_process_t) process->simdata)->host;
439   remote_host = dest;
440
441   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
442          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
443
444   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
445                 mbox[channel], task);
446
447   PAJE_COMM_START(process,task,channel);
448     
449   if(remote_host->simdata->sleeping[channel]) {
450     DEBUG0("Somebody is listening. Let's wake him up!");
451     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
452   }
453
454   process->simdata->put_host = dest;
455   process->simdata->put_channel = channel;
456   while(!(task_simdata->comm)) {
457     if(max_duration>0) {
458       if(!first_time) {
459         MSG_RETURN(MSG_TRANSFER_FAILURE);
460       }
461     }
462     DEBUG0("Communication not initiated yet. Let's block!");
463     if(max_duration>0)
464       __MSG_process_block(max_duration,task->name);
465     else
466       __MSG_process_block(-1,task->name);
467
468     first_time = 0;
469
470     if(surf_workstation_resource->extension_public->
471        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
472       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
473                       task);
474       MSG_task_destroy(task);
475       MSG_RETURN(MSG_HOST_FAILURE);
476     }
477   }
478   DEBUG0("Registering to this communication");
479   surf_workstation_resource->common_public->action_use(task_simdata->comm);
480   process->simdata->put_host = NULL;
481   process->simdata->put_channel = -1;
482
483
484   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
485
486   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
487   while (state==SURF_ACTION_RUNNING) {
488     DEBUG0("Waiting for action termination");
489     __MSG_task_wait_event(process, task);
490     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
491   }
492   DEBUG0("Action terminated");
493   task->simdata->rate=-1.0; /* Sets the rate back to default */
494
495   PAJE_PROCESS_POP_STATE(process);  
496
497   if(state == SURF_ACTION_DONE) {
498     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
499       task_simdata->comm = NULL;
500     MSG_task_destroy(task);
501     MSG_RETURN(MSG_OK);
502   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
503             == SURF_CPU_OFF) {
504     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
505       task_simdata->comm = NULL;
506     MSG_task_destroy(task);
507     MSG_RETURN(MSG_HOST_FAILURE);
508   } else { 
509     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
510       task_simdata->comm = NULL;
511     MSG_task_destroy(task);
512     MSG_RETURN(MSG_TRANSFER_FAILURE);
513   }
514 }
515 /** \ingroup msg_gos_functions
516  * \brief Put a task on a channel of an host and waits for the end of the
517  * transmission.
518  *
519  * This function is used for describing the behavior of an agent. It
520  * takes three parameter.
521  * \param task a #m_task_t to send on another location. This task
522    will not be usable anymore when the function will return. There is
523    no automatic task duplication and you have to save your parameters
524    before calling this function. Tasks are unique and once it has been
525    sent to another location, you should not access it anymore. You do
526    not need to call MSG_task_destroy() but to avoid using, as an
527    effect of inattention, this task anymore, you definitely should
528    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
529    can be transfered iff it has been correctly created with
530    MSG_task_create().
531  * \param dest the destination of the message
532  * \param channel the channel on which the agent should put this
533    task. This value has to be >=0 and < than the maximal number of
534    channels fixed with MSG_set_channel_number().
535  * \return #MSG_FATAL if \a task is not properly initialized and
536  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
537  * this function was called was shut down. Returns
538  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
539  * (network failure, dest failure)
540  */
541 MSG_error_t MSG_task_put(m_task_t task,
542                          m_host_t dest, m_channel_t channel)
543 {
544   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
545 }
546
547 /** \ingroup msg_gos_functions
548  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
549  * rate.
550  *
551  * \sa MSG_task_put
552  */
553 MSG_error_t MSG_task_put_bounded(m_task_t task,
554                                  m_host_t dest, m_channel_t channel,
555                                  double max_rate)
556 {
557   MSG_error_t res = MSG_OK;
558   task->simdata->rate=max_rate;
559   res = MSG_task_put(task, dest, channel);
560   return(res);
561 }
562
563 /** \ingroup msg_gos_functions
564  * \brief Executes a task and waits for its termination.
565  *
566  * This function is used for describing the behavior of an agent. It
567  * takes only one parameter.
568  * \param task a #m_task_t to execute on the location on which the
569    agent is running.
570  * \return #MSG_FATAL if \a task is not properly initialized and
571  * #MSG_OK otherwise.
572  */
573 MSG_error_t MSG_task_execute(m_task_t task)
574 {
575   m_process_t process = MSG_process_self();
576   MSG_error_t res;
577
578   DEBUG1("Computing on %s", process->simdata->host->name);
579
580   __MSG_task_execute(process, task);
581
582   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
583   res = __MSG_wait_for_computation(process,task);
584   PAJE_PROCESS_POP_STATE(process);
585   return res;
586 }
587
588 void __MSG_task_execute(m_process_t process, m_task_t task)
589 {
590   simdata_task_t simdata = NULL;
591
592   CHECK_HOST();
593
594   simdata = task->simdata;
595   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
596               "This taks is executed somewhere else. Go fix your code!");
597   task->simdata->using++;
598   simdata->compute = surf_workstation_resource->extension_public->
599     execute(MSG_process_get_host(process)->simdata->host,
600             simdata->computation_amount);
601   surf_workstation_resource->common_public->
602     set_priority(simdata->compute, simdata->priority);
603
604   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
605   task->simdata->using--;
606 }
607
608 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
609 {
610   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
611   simdata_task_t simdata = task->simdata;
612
613   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
614   simdata->using++;
615   do {
616     __MSG_task_wait_event(process, task);
617     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
618   } while (state==SURF_ACTION_RUNNING);
619   simdata->using--;
620     
621
622   if(state == SURF_ACTION_DONE) {
623     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
624       simdata->compute = NULL;
625     simdata->computation_amount = 0.0;
626     XBT_OUT;
627     MSG_RETURN(MSG_OK);
628   } else if(surf_workstation_resource->extension_public->
629             get_state(MSG_process_get_host(process)->simdata->host) 
630             == SURF_CPU_OFF) {
631     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
632       simdata->compute = NULL;
633     XBT_OUT;
634     MSG_RETURN(MSG_HOST_FAILURE);
635   } else {
636     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
637       simdata->compute = NULL;
638     XBT_OUT;
639     MSG_RETURN(MSG_TASK_CANCELLED);
640   }
641 }
642 /** \ingroup m_task_management
643  * \brief Creates a new #m_task_t (a parallel one....).
644  *
645  * A constructor for #m_task_t taking six arguments and returning the 
646    corresponding object.
647  * \param name a name for the object. It is for user-level information
648    and can be NULL.
649  * \param host_nb the number of hosts implied in the parallel task.
650  * \param host_list an array of \p host_nb m_host_t.
651  * \param computation_amount an array of \p host_nb
652    doubles. computation_amount[i] is the total number of operations
653    that have to be performed on host_list[i].
654  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
655  * \param data a pointer to any data may want to attach to the new
656    object.  It is for user-level information and can be NULL. It can
657    be retrieved with the function \ref MSG_task_get_data.
658  * \see m_task_t
659  * \return The new corresponding object.
660  */
661 m_task_t MSG_parallel_task_create(const char *name, 
662                                   int host_nb,
663                                   const m_host_t *host_list,
664                                   double *computation_amount,
665                                   double *communication_amount,
666                                   void *data)
667 {
668   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
669   m_task_t task = xbt_new0(s_m_task_t,1);
670   int i;
671
672   /* Task structure */
673   task->name = xbt_strdup(name);
674   task->simdata = simdata;
675   task->data = data;
676
677   /* Simulator Data */
678   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
679   simdata->rate = -1.0;
680   simdata->using = 1;
681   simdata->sender = NULL;
682   simdata->source = NULL;
683   simdata->host_nb = host_nb;
684   
685   simdata->host_list = xbt_new0(void *, host_nb);
686   simdata->comp_amount = computation_amount;
687   simdata->comm_amount = communication_amount;
688
689   for(i=0;i<host_nb;i++)
690     simdata->host_list[i] = host_list[i]->simdata->host;
691
692   return task;
693 }
694
695
696 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
697 {
698   simdata_task_t simdata = NULL;
699
700   CHECK_HOST();
701
702   simdata = task->simdata;
703
704   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
705
706   simdata->compute = surf_workstation_resource->extension_public->
707   execute_parallel_task(task->simdata->host_nb,
708                         task->simdata->host_list,
709                         task->simdata->comp_amount,
710                         task->simdata->comm_amount,
711                         1.0,
712                         -1.0);
713   if(simdata->compute)
714     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
715 }
716
717 MSG_error_t MSG_parallel_task_execute(m_task_t task)
718 {
719   m_process_t process = MSG_process_self();
720   MSG_error_t res;
721
722   DEBUG0("Computing on a tons of guys");
723   
724   __MSG_parallel_task_execute(process, task);
725
726   if(task->simdata->compute)
727     res = __MSG_wait_for_computation(process,task);
728   else 
729     res = MSG_OK;
730
731   return res;  
732 }
733
734
735 /** \ingroup msg_gos_functions
736  * \brief Sleep for the specified number of seconds
737  *
738  * Makes the current process sleep until \a time seconds have elapsed.
739  *
740  * \param nb_sec a number of second
741  */
742 MSG_error_t MSG_process_sleep(double nb_sec)
743 {
744   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
745   m_process_t process = MSG_process_self();
746   m_task_t dummy = NULL;
747   simdata_task_t simdata = NULL;
748
749   CHECK_HOST();
750   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
751   simdata = dummy->simdata;
752
753   simdata->compute = surf_workstation_resource->extension_public->
754     sleep(MSG_process_get_host(process)->simdata->host,
755             simdata->computation_amount);
756   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
757
758   
759   simdata->using++;
760   do {
761     __MSG_task_wait_event(process, dummy);
762     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
763   } while (state==SURF_ACTION_RUNNING);
764   simdata->using--;
765     
766   if(state == SURF_ACTION_DONE) {
767     if(surf_workstation_resource->extension_public->
768        get_state(MSG_process_get_host(process)->simdata->host) 
769        == SURF_CPU_OFF) {
770       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
771         simdata->compute = NULL;
772       MSG_RETURN(MSG_HOST_FAILURE);
773     }
774     if(__MSG_process_isBlocked(process)) {
775       __MSG_process_unblock(MSG_process_self());
776     }
777     if(surf_workstation_resource->extension_public->
778        get_state(MSG_process_get_host(process)->simdata->host) 
779        == SURF_CPU_OFF) {
780       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
781         simdata->compute = NULL;
782       MSG_RETURN(MSG_HOST_FAILURE);
783     }
784     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
785       simdata->compute = NULL;
786     MSG_task_destroy(dummy);
787     MSG_RETURN(MSG_OK);
788   } else MSG_RETURN(MSG_HOST_FAILURE);
789 }
790
791 /** \ingroup msg_gos_functions
792  * \brief Return the number of MSG tasks currently running on
793  * the host of the current running process.
794  */
795 static int MSG_get_msgload(void) 
796 {
797   m_process_t process;
798    
799   CHECK_HOST();
800   
801   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
802   process = MSG_process_self();
803   return xbt_fifo_size(process->simdata->host->simdata->process_list);
804 }
805
806 /** \ingroup msg_gos_functions
807  *
808  * \brief Return the last value returned by a MSG function (except
809  * MSG_get_errno...).
810  */
811 MSG_error_t MSG_get_errno(void)
812 {
813   return PROCESS_GET_ERRNO();
814 }