Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bugfix. Remove the task from the remote host fifo when there is a
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   m_task_t task_to_wait_for;
28   simdata_task_t t_simdata = NULL;
29   simdata_host_t h_simdata = NULL;
30   int first_time = 1;
31   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
32   xbt_fifo_item_t item = NULL;
33
34   CHECK_HOST();
35   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
36   /* Sanity check */
37   xbt_assert0(task,"Null pointer for the task\n");
38
39   if (*task) 
40     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
41
42   /* Get the task */
43   h = MSG_host_self();
44   h_simdata = h->simdata;
45
46   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
47
48   while (1) {
49     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
50       if(!host) {
51         t = xbt_fifo_shift(h_simdata->mbox[channel]);
52         break;
53       } else {
54         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
55           if(t->simdata->source==host) break;
56         }
57         if(item) {
58           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
59           break;
60         } 
61       }
62     }
63                                                        
64     if(max_duration>0) {
65       if(!first_time) {
66         PAJE_PROCESS_POP_STATE(process);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       MSG_RETURN(MSG_HOST_FAILURE);
86     /* OK, we should both be ready now. Are you there ? */
87   }
88
89   DEBUG1("OK, got a task (%s)", t->name);
90
91   t_simdata = t->simdata;
92   /*   *task = __MSG_task_copy(t); */
93   *task=t;
94
95   /* Transfer */
96   t_simdata->using++;
97
98   while(MSG_process_is_suspended(t_simdata->sender)) {
99     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
100            t_simdata->sender->name);
101     task_to_wait_for = t_simdata->sender->simdata->waiting_task;
102     if(__MSG_process_isBlocked(t_simdata->sender)) {
103       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
104       __MSG_process_unblock(t_simdata->sender);
105       task_to_wait_for->simdata->using++;
106       __MSG_task_wait_event(process, task_to_wait_for);
107       MSG_task_destroy(task_to_wait_for);
108     } else {
109       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
110       task_to_wait_for->simdata->using++;
111       __MSG_task_wait_event(process, task_to_wait_for);
112       MSG_task_destroy(task_to_wait_for);
113       DEBUG0("He's resumed. He should block again. So let's free him.");
114       __MSG_process_unblock(t_simdata->sender);
115       break;
116     }
117   }
118   DEBUG0("Calling SURF for communication creation");
119   t_simdata->comm = surf_workstation_resource->extension_public->
120     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
121                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
122   
123   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
124
125   if(__MSG_process_isBlocked(t_simdata->sender)) {
126     DEBUG1("Unblocking %s",t_simdata->sender->name);
127     __MSG_process_unblock(t_simdata->sender);
128   }
129
130   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
131
132   do {
133     DEBUG0("Waiting for action termination");
134     __MSG_task_wait_event(process, t);
135     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
136   } while (state==SURF_ACTION_RUNNING);
137   DEBUG0("Action terminated");
138
139   if(t->simdata->using>1) {
140     xbt_fifo_unshift(msg_global->process_to_run,process);
141     xbt_context_yield();
142   }
143
144   PAJE_PROCESS_POP_STATE(process);
145   PAJE_COMM_STOP(process,t,channel);
146
147   if(state == SURF_ACTION_DONE) {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_OK);
151   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
152           == SURF_CPU_OFF) {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_HOST_FAILURE);
156   } else {
157     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
158       t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Listen on a channel and wait for receiving a task.
165  *
166  * It takes two parameters.
167  * \param task a memory location for storing a #m_task_t. It will
168    hold a task when this function will return. Thus \a task should not
169    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
170    those two condition does not hold, there will be a warning message.
171  * \param channel the channel on which the agent should be
172    listening. This value has to be >=0 and < than the maximal
173    number of channels fixed with MSG_set_channel_number().
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get(m_task_t * task,
178                          m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
206 }
207
208 /** \ingroup msg_gos_functions
209  * \brief Listen on \a channel and waits for receiving a task from \a host.
210  *
211  * It takes three parameters.
212  * \param task a memory location for storing a #m_task_t. It will
213    hold a task when this function will return. Thus \a task should not
214    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
215    those two condition does not hold, there will be a warning message.
216  * \param channel the channel on which the agent should be
217    listening. This value has to be >=0 and < than the maximal
218    number of channels fixed with MSG_set_channel_number().
219  * \param host the host that is to be watched.
220  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
221    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
222  */
223 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
224                                    m_host_t host)
225 {
226   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
227 }
228
229 /** \ingroup msg_gos_functions
230  * \brief Test whether there is a pending communication on a channel.
231  *
232  * It takes one parameter.
233  * \param channel the channel on which the agent should be
234    listening. This value has to be >=0 and < than the maximal
235    number of channels fixed with MSG_set_channel_number().
236  * \return 1 if there is a pending communication and 0 otherwise
237  */
238 int MSG_task_Iprobe(m_channel_t channel)
239 {
240   m_host_t h = NULL;
241
242   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
243   DEBUG2("Probing on channel %d (%s)", channel,h->name);
244   CHECK_HOST();
245   h = MSG_host_self();
246   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
247 }
248
249 /** \ingroup msg_gos_functions
250  * \brief Test whether there is a pending communication on a channel, and who sent it.
251  *
252  * It takes one parameter.
253  * \param channel the channel on which the agent should be
254    listening. This value has to be >=0 and < than the maximal
255    number of channels fixed with MSG_set_channel_number().
256  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
257  */
258 int MSG_task_probe_from(m_channel_t channel)
259 {
260   m_host_t h = NULL;
261   xbt_fifo_item_t item;
262   m_task_t t;
263
264   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
265   CHECK_HOST();
266   h = MSG_host_self();
267
268   DEBUG2("Probing on channel %d (%s)", channel,h->name);
269    
270   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
271   if (!item || !(t = xbt_fifo_get_item_content(item)))
272     return -1;
273    
274   return MSG_process_get_PID(t->simdata->sender);
275 }
276
277 /** \ingroup msg_gos_functions
278  * \brief Wait for at most \a max_duration second for a task reception
279    on \a channel. *\a PID is updated with the PID of the first process
280    that triggered this event if any.
281  *
282  * It takes three parameters:
283  * \param channel the channel on which the agent should be
284    listening. This value has to be >=0 and < than the maximal.
285    number of channels fixed with MSG_set_channel_number().
286  * \param PID a memory location for storing an int.
287  * \param max_duration the maximum time to wait for a task before
288     giving up. In the case of a reception, *\a PID will be updated
289     with the PID of the first process to send a task.
290  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
291    and #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
294                                     int *PID)
295 {
296   m_host_t h = NULL;
297   simdata_host_t h_simdata = NULL;
298   xbt_fifo_item_t item;
299   m_task_t t;
300   int first_time = 1;
301   m_process_t process = MSG_process_self();
302
303   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
304   if(PID) {
305     *PID = -1;
306   }
307
308   if(max_duration==0.0) {
309     *PID = MSG_task_probe_from(channel);
310     MSG_RETURN(MSG_OK);
311   } else {
312     CHECK_HOST();
313     h = MSG_host_self();
314     h_simdata = h->simdata;
315     
316     DEBUG2("Probing on channel %d (%s)", channel,h->name);
317     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
318       if(max_duration>0) {
319         if(!first_time) {
320           MSG_RETURN(MSG_OK);
321         }
322       }
323       xbt_assert2(!(h_simdata->sleeping[channel]),
324                   "A process (%s(%d)) is already blocked on this channel",
325                   h_simdata->sleeping[channel]->name,
326                   h_simdata->sleeping[channel]->simdata->PID);
327       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
328       if(max_duration>0) {
329         __MSG_process_block(max_duration,"");
330       } else {
331         __MSG_process_block(-1,"");
332       }
333       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
334          == SURF_CPU_OFF) {
335         MSG_RETURN(MSG_HOST_FAILURE);
336       }
337       h_simdata->sleeping[channel] = NULL;
338       first_time = 0;
339     }
340     if (!item || !(t = xbt_fifo_get_item_content(item))) {
341       MSG_RETURN(MSG_OK);
342     }
343     if(PID) {
344       *PID = MSG_process_get_PID(t->simdata->sender);
345     }
346     MSG_RETURN(MSG_OK);
347   }
348 }
349
350
351 /** \ingroup msg_gos_functions
352
353  * \brief Return the number of tasks waiting to be received on a \a
354    channel and sent by \a host.
355  *
356  * It takes two parameters.
357  * \param channel the channel on which the agent should be
358    listening. This value has to be >=0 and < than the maximal
359    number of channels fixed with MSG_set_channel_number().
360  * \param host the host that is to be watched.
361  * \return the number of tasks waiting to be received on \a channel
362    and sent by \a host.
363  */
364 int MSG_task_probe_from_host(int channel, m_host_t host)
365 {
366   xbt_fifo_item_t item;
367   m_task_t t;
368   int count = 0;
369   m_host_t h = NULL;
370   
371   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
372   CHECK_HOST();
373   h = MSG_host_self();
374
375   DEBUG2("Probing on channel %d (%s)", channel,h->name);
376    
377   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
378     if(t->simdata->source==host) count++;
379   }
380    
381   return count;
382 }
383
384 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
385  * host (with a timeout on the waiting of the destination host) and
386  * waits for the end of the transmission.
387  *
388  * This function is used for describing the behavior of an agent. It
389  * takes four parameter.
390  * \param task a #m_task_t to send on another location. This task
391    will not be usable anymore when the function will return. There is
392    no automatic task duplication and you have to save your parameters
393    before calling this function. Tasks are unique and once it has been
394    sent to another location, you should not access it anymore. You do
395    not need to call MSG_task_destroy() but to avoid using, as an
396    effect of inattention, this task anymore, you definitely should
397    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
398    can be transfered iff it has been correctly created with
399    MSG_task_create().
400  * \param dest the destination of the message
401  * \param channel the channel on which the agent should put this
402    task. This value has to be >=0 and < than the maximal number of
403    channels fixed with MSG_set_channel_number().
404  * \param max_duration the maximum time to wait for a task before giving
405     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
406     will not be modified 
407  * \return #MSG_FATAL if \a task is not properly initialized and
408    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
409    this function was called was shut down. Returns
410    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
411    (network failure, dest failure, timeout...)
412  */
413 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
414                                       m_channel_t channel, double max_duration)
415 {
416   m_process_t process = MSG_process_self();
417   simdata_task_t task_simdata = NULL;
418   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
419   m_host_t local_host = NULL;
420   m_host_t remote_host = NULL;
421   int first_time = 1;
422
423   CHECK_HOST();
424
425   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
426
427   task_simdata = task->simdata;
428   task_simdata->sender = process;
429   task_simdata->source = MSG_process_get_host(process);
430   xbt_assert0(task_simdata->using==1,
431               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
432   task_simdata->comm = NULL;
433   
434   local_host = ((simdata_process_t) process->simdata)->host;
435   remote_host = dest;
436
437   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
438          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
439
440   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
441                 mbox[channel], task);
442
443   PAJE_COMM_START(process,task,channel);
444     
445   if(remote_host->simdata->sleeping[channel]) {
446     DEBUG0("Somebody is listening. Let's wake him up!");
447     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
448   }
449
450   process->simdata->put_host = dest;
451   process->simdata->put_channel = channel;
452   while(!(task_simdata->comm)) {
453     if(max_duration>0) {
454       if(!first_time) {
455         xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
456                         task);
457         PAJE_PROCESS_POP_STATE(process);
458         PAJE_COMM_STOP(process,task,channel);
459         MSG_RETURN(MSG_TRANSFER_FAILURE);
460       }
461     }
462     DEBUG0("Communication not initiated yet. Let's block!");
463     if(max_duration>0)
464       __MSG_process_block(max_duration,task->name);
465     else
466       __MSG_process_block(-1,task->name);
467
468     first_time = 0;
469
470     if(surf_workstation_resource->extension_public->
471        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
472       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
473                       task);
474       PAJE_PROCESS_POP_STATE(process);
475       PAJE_COMM_STOP(process,task,channel);
476       MSG_task_destroy(task);
477       MSG_RETURN(MSG_HOST_FAILURE);
478     }
479   }
480   DEBUG0("Registering to this communication");
481   surf_workstation_resource->common_public->action_use(task_simdata->comm);
482   process->simdata->put_host = NULL;
483   process->simdata->put_channel = -1;
484
485
486   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
487
488   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
489   while (state==SURF_ACTION_RUNNING) {
490     DEBUG0("Waiting for action termination");
491     __MSG_task_wait_event(process, task);
492     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
493   }
494   DEBUG0("Action terminated");
495   task->simdata->rate=-1.0; /* Sets the rate back to default */
496
497   PAJE_PROCESS_POP_STATE(process);  
498
499   if(state == SURF_ACTION_DONE) {
500     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
501       task_simdata->comm = NULL;
502     MSG_task_destroy(task);
503     MSG_RETURN(MSG_OK);
504   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
505             == SURF_CPU_OFF) {
506     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
507       task_simdata->comm = NULL;
508     MSG_task_destroy(task);
509     MSG_RETURN(MSG_HOST_FAILURE);
510   } else { 
511     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
512       task_simdata->comm = NULL;
513     MSG_task_destroy(task);
514     MSG_RETURN(MSG_TRANSFER_FAILURE);
515   }
516 }
517 /** \ingroup msg_gos_functions
518  * \brief Put a task on a channel of an host and waits for the end of the
519  * transmission.
520  *
521  * This function is used for describing the behavior of an agent. It
522  * takes three parameter.
523  * \param task a #m_task_t to send on another location. This task
524    will not be usable anymore when the function will return. There is
525    no automatic task duplication and you have to save your parameters
526    before calling this function. Tasks are unique and once it has been
527    sent to another location, you should not access it anymore. You do
528    not need to call MSG_task_destroy() but to avoid using, as an
529    effect of inattention, this task anymore, you definitely should
530    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
531    can be transfered iff it has been correctly created with
532    MSG_task_create().
533  * \param dest the destination of the message
534  * \param channel the channel on which the agent should put this
535    task. This value has to be >=0 and < than the maximal number of
536    channels fixed with MSG_set_channel_number().
537  * \return #MSG_FATAL if \a task is not properly initialized and
538  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
539  * this function was called was shut down. Returns
540  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
541  * (network failure, dest failure)
542  */
543 MSG_error_t MSG_task_put(m_task_t task,
544                          m_host_t dest, m_channel_t channel)
545 {
546   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
547 }
548
549 /** \ingroup msg_gos_functions
550  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
551  * rate.
552  *
553  * \sa MSG_task_put
554  */
555 MSG_error_t MSG_task_put_bounded(m_task_t task,
556                                  m_host_t dest, m_channel_t channel,
557                                  double max_rate)
558 {
559   MSG_error_t res = MSG_OK;
560   task->simdata->rate=max_rate;
561   res = MSG_task_put(task, dest, channel);
562   return(res);
563 }
564
565 /** \ingroup msg_gos_functions
566  * \brief Executes a task and waits for its termination.
567  *
568  * This function is used for describing the behavior of an agent. It
569  * takes only one parameter.
570  * \param task a #m_task_t to execute on the location on which the
571    agent is running.
572  * \return #MSG_FATAL if \a task is not properly initialized and
573  * #MSG_OK otherwise.
574  */
575 MSG_error_t MSG_task_execute(m_task_t task)
576 {
577   m_process_t process = MSG_process_self();
578   MSG_error_t res;
579
580   DEBUG1("Computing on %s", process->simdata->host->name);
581
582   __MSG_task_execute(process, task);
583
584   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
585   res = __MSG_wait_for_computation(process,task);
586   PAJE_PROCESS_POP_STATE(process);
587   return res;
588 }
589
590 void __MSG_task_execute(m_process_t process, m_task_t task)
591 {
592   simdata_task_t simdata = NULL;
593
594   CHECK_HOST();
595
596   simdata = task->simdata;
597   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
598               "This taks is executed somewhere else. Go fix your code!");
599   simdata->using++;
600   simdata->compute = surf_workstation_resource->extension_public->
601     execute(MSG_process_get_host(process)->simdata->host,
602             simdata->computation_amount);
603   surf_workstation_resource->common_public->
604     set_priority(simdata->compute, simdata->priority);
605
606   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
607   simdata->using--;
608 }
609
610 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
611 {
612   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
613   simdata_task_t simdata = task->simdata;
614
615   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
616   simdata->using++;
617   do {
618     __MSG_task_wait_event(process, task);
619     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
620   } while (state==SURF_ACTION_RUNNING);
621   simdata->using--;
622     
623
624   if(state == SURF_ACTION_DONE) {
625     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
626       simdata->compute = NULL;
627     simdata->computation_amount = 0.0;
628     XBT_OUT;
629     MSG_RETURN(MSG_OK);
630   } else if(surf_workstation_resource->extension_public->
631             get_state(MSG_process_get_host(process)->simdata->host) 
632             == SURF_CPU_OFF) {
633     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
634       simdata->compute = NULL;
635     XBT_OUT;
636     MSG_RETURN(MSG_HOST_FAILURE);
637   } else {
638     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
639       simdata->compute = NULL;
640     XBT_OUT;
641     MSG_RETURN(MSG_TASK_CANCELLED);
642   }
643 }
644 /** \ingroup m_task_management
645  * \brief Creates a new #m_task_t (a parallel one....).
646  *
647  * A constructor for #m_task_t taking six arguments and returning the 
648    corresponding object.
649  * \param name a name for the object. It is for user-level information
650    and can be NULL.
651  * \param host_nb the number of hosts implied in the parallel task.
652  * \param host_list an array of \p host_nb m_host_t.
653  * \param computation_amount an array of \p host_nb
654    doubles. computation_amount[i] is the total number of operations
655    that have to be performed on host_list[i].
656  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
657  * \param data a pointer to any data may want to attach to the new
658    object.  It is for user-level information and can be NULL. It can
659    be retrieved with the function \ref MSG_task_get_data.
660  * \see m_task_t
661  * \return The new corresponding object.
662  */
663 m_task_t MSG_parallel_task_create(const char *name, 
664                                   int host_nb,
665                                   const m_host_t *host_list,
666                                   double *computation_amount,
667                                   double *communication_amount,
668                                   void *data)
669 {
670   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
671   m_task_t task = xbt_new0(s_m_task_t,1);
672   int i;
673
674   /* Task structure */
675   task->name = xbt_strdup(name);
676   task->simdata = simdata;
677   task->data = data;
678
679   /* Simulator Data */
680   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
681   simdata->rate = -1.0;
682   simdata->using = 1;
683   simdata->sender = NULL;
684   simdata->source = NULL;
685   simdata->host_nb = host_nb;
686   
687   simdata->host_list = xbt_new0(void *, host_nb);
688   simdata->comp_amount = computation_amount;
689   simdata->comm_amount = communication_amount;
690
691   for(i=0;i<host_nb;i++)
692     simdata->host_list[i] = host_list[i]->simdata->host;
693
694   return task;
695 }
696
697
698 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
699 {
700   simdata_task_t simdata = NULL;
701
702   CHECK_HOST();
703
704   simdata = task->simdata;
705
706   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
707
708   simdata->compute = surf_workstation_resource->extension_public->
709   execute_parallel_task(task->simdata->host_nb,
710                         task->simdata->host_list,
711                         task->simdata->comp_amount,
712                         task->simdata->comm_amount,
713                         1.0,
714                         -1.0);
715   if(simdata->compute)
716     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
717 }
718
719 MSG_error_t MSG_parallel_task_execute(m_task_t task)
720 {
721   m_process_t process = MSG_process_self();
722   MSG_error_t res;
723
724   DEBUG0("Computing on a tons of guys");
725   
726   __MSG_parallel_task_execute(process, task);
727
728   if(task->simdata->compute)
729     res = __MSG_wait_for_computation(process,task);
730   else 
731     res = MSG_OK;
732
733   return res;  
734 }
735
736
737 /** \ingroup msg_gos_functions
738  * \brief Sleep for the specified number of seconds
739  *
740  * Makes the current process sleep until \a time seconds have elapsed.
741  *
742  * \param nb_sec a number of second
743  */
744 MSG_error_t MSG_process_sleep(double nb_sec)
745 {
746   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
747   m_process_t process = MSG_process_self();
748   m_task_t dummy = NULL;
749   simdata_task_t simdata = NULL;
750   
751   xbt_assert1(nb_sec>=0,"Invalid duration %g",nb_sec);
752
753   CHECK_HOST();
754   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
755   simdata = dummy->simdata;
756
757   simdata->compute = surf_workstation_resource->extension_public->
758     sleep(MSG_process_get_host(process)->simdata->host,
759             simdata->computation_amount);
760   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
761
762   
763   simdata->using++;
764   do {
765     __MSG_task_wait_event(process, dummy);
766     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
767   } while (state==SURF_ACTION_RUNNING);
768   simdata->using--;
769     
770   if(state == SURF_ACTION_DONE) {
771     if(surf_workstation_resource->extension_public->
772        get_state(MSG_process_get_host(process)->simdata->host) 
773        == SURF_CPU_OFF) {
774       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
775         simdata->compute = NULL;
776       MSG_RETURN(MSG_HOST_FAILURE);
777     }
778     if(__MSG_process_isBlocked(process)) {
779       __MSG_process_unblock(MSG_process_self());
780     }
781     if(surf_workstation_resource->extension_public->
782        get_state(MSG_process_get_host(process)->simdata->host) 
783        == SURF_CPU_OFF) {
784       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
785         simdata->compute = NULL;
786       MSG_RETURN(MSG_HOST_FAILURE);
787     }
788     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
789       simdata->compute = NULL;
790     MSG_task_destroy(dummy);
791     MSG_RETURN(MSG_OK);
792   } else MSG_RETURN(MSG_HOST_FAILURE);
793 }
794
795 /** \ingroup msg_gos_functions
796  * \brief Return the number of MSG tasks currently running on
797  * the host of the current running process.
798  */
799 static int MSG_get_msgload(void) 
800 {
801   m_process_t process;
802    
803   CHECK_HOST();
804   
805   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
806   process = MSG_process_self();
807   return xbt_fifo_size(process->simdata->host->simdata->process_list);
808 }
809
810 /** \ingroup msg_gos_functions
811  *
812  * \brief Return the last value returned by a MSG function (except
813  * MSG_get_errno...).
814  */
815 MSG_error_t MSG_get_errno(void)
816 {
817   return PROCESS_GET_ERRNO();
818 }