Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Smarter assert in MSG_task_execute and take care of failures as suggested by
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_OK);
66       }
67     }
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     if(max_duration>0) {
74       __MSG_process_block(max_duration);
75     } else {
76       __MSG_process_block(-1);
77     }
78     h_simdata->sleeping[channel] = NULL;
79     first_time = 0;
80     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
81        == SURF_CPU_OFF)
82       MSG_RETURN(MSG_HOST_FAILURE);
83     /* OK, we should both be ready now. Are you there ? */
84   }
85
86   DEBUG1("OK, got a task (%s)", t->name);
87
88   t_simdata = t->simdata;
89   /*   *task = __MSG_task_copy(t); */
90   *task=t;
91
92   /* Transfer */
93   t_simdata->using++;
94
95   while(MSG_process_is_suspended(t_simdata->sender)) {
96     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
97            t_simdata->sender->name);
98     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
99     if(__MSG_process_isBlocked(t_simdata->sender)) {
100       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
101       __MSG_process_unblock(t_simdata->sender);
102       task_to_wait_for->simdata->using++;
103       __MSG_task_wait_event(process, task_to_wait_for);
104       MSG_task_destroy(task_to_wait_for);
105     } else {
106       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
107       task_to_wait_for->simdata->using++;
108       __MSG_task_wait_event(process, task_to_wait_for);
109       MSG_task_destroy(task_to_wait_for);
110       DEBUG0("He's resumed. He should block again. So let's free him.");
111       __MSG_process_unblock(t_simdata->sender);
112       break;
113     }
114   }
115   DEBUG0("Calling SURF for communication creation");
116   t_simdata->comm = surf_workstation_resource->extension_public->
117     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
118                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
119   
120   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
121
122   if(__MSG_process_isBlocked(t_simdata->sender)) {
123     DEBUG1("Unblocking %s",t_simdata->sender->name);
124     __MSG_process_unblock(t_simdata->sender);
125   }
126
127   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
128
129   do {
130     DEBUG0("Waiting for action termination");
131     __MSG_task_wait_event(process, t);
132     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
133   } while (state==SURF_ACTION_RUNNING);
134   DEBUG0("Action terminated");
135
136   if(t->simdata->using>1) {
137     xbt_fifo_unshift(msg_global->process_to_run,process);
138     xbt_context_yield();
139   }
140
141   PAJE_PROCESS_POP_STATE(process);
142   PAJE_COMM_STOP(process,t,channel);
143
144   if(state == SURF_ACTION_DONE) {
145     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
146       t_simdata->comm = NULL;
147     MSG_RETURN(MSG_OK);
148   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
149           == SURF_CPU_OFF) {
150     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
151       t_simdata->comm = NULL;
152     MSG_RETURN(MSG_HOST_FAILURE);
153   } else {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_TRANSFER_FAILURE);
157   }
158 }
159
160 /** \ingroup msg_gos_functions
161  * \brief Listen on a channel and wait for receiving a task.
162  *
163  * It takes two parameters.
164  * \param task a memory location for storing a #m_task_t. It will
165    hold a task when this function will return. Thus \a task should not
166    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
167    those two condition does not hold, there will be a warning message.
168  * \param channel the channel on which the agent should be
169    listening. This value has to be >=0 and < than the maximal
170    number of channels fixed with MSG_set_channel_number().
171  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
172  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
173  */
174 MSG_error_t MSG_task_get(m_task_t * task,
175                          m_channel_t channel)
176 {
177   return MSG_task_get_with_time_out(task, channel, -1);
178 }
179
180 /** \ingroup msg_gos_functions
181  * \brief Listen on a channel and wait for receiving a task with a timeout.
182  *
183  * It takes three parameters.
184  * \param task a memory location for storing a #m_task_t. It will
185    hold a task when this function will return. Thus \a task should not
186    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
187    those two condition does not hold, there will be a warning message.
188  * \param channel the channel on which the agent should be
189    listening. This value has to be >=0 and < than the maximal
190    number of channels fixed with MSG_set_channel_number().
191  * \param max_duration the maximum time to wait for a task before giving
192     up. In such a case, \a task will not be modified and will still be
193     equal to \c NULL when returning.
194  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
195    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
196  */
197 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
198                                        m_channel_t channel,
199                                        double max_duration)
200 {
201   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
202 }
203
204 /** \ingroup msg_gos_functions
205  * \brief Listen on \a channel and waits for receiving a task from \a host.
206  *
207  * It takes three parameters.
208  * \param task a memory location for storing a #m_task_t. It will
209    hold a task when this function will return. Thus \a task should not
210    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
211    those two condition does not hold, there will be a warning message.
212  * \param channel the channel on which the agent should be
213    listening. This value has to be >=0 and < than the maximal
214    number of channels fixed with MSG_set_channel_number().
215  * \param host the host that is to be watched.
216  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
217    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
218  */
219 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
220                                    m_host_t host)
221 {
222   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
223 }
224
225 /** \ingroup msg_gos_functions
226  * \brief Test whether there is a pending communication on a channel.
227  *
228  * It takes one parameter.
229  * \param channel the channel on which the agent should be
230    listening. This value has to be >=0 and < than the maximal
231    number of channels fixed with MSG_set_channel_number().
232  * \return 1 if there is a pending communication and 0 otherwise
233  */
234 int MSG_task_Iprobe(m_channel_t channel)
235 {
236   m_host_t h = NULL;
237   simdata_host_t h_simdata = NULL;
238
239   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
240   DEBUG2("Probing on channel %d (%s)", channel,h->name);
241   CHECK_HOST();
242   h = MSG_host_self();
243   h_simdata = h->simdata;
244   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
245 }
246
247 /** \ingroup msg_gos_functions
248  * \brief Test whether there is a pending communication on a channel, and who sent it.
249  *
250  * It takes one parameter.
251  * \param channel the channel on which the agent should be
252    listening. This value has to be >=0 and < than the maximal
253    number of channels fixed with MSG_set_channel_number().
254  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
255  */
256 int MSG_task_probe_from(m_channel_t channel)
257 {
258   m_host_t h = NULL;
259   simdata_host_t h_simdata = NULL;
260   xbt_fifo_item_t item;
261   m_task_t t;
262
263   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
264   CHECK_HOST();
265   h = MSG_host_self();
266   h_simdata = h->simdata;
267
268   DEBUG2("Probing on channel %d (%s)", channel,h->name);
269    
270   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
271   if (!item || !(t = xbt_fifo_get_item_content(item)))
272     return -1;
273    
274   return MSG_process_get_PID(t->simdata->sender);
275 }
276
277 /** \ingroup msg_gos_functions
278  * \brief Wait for at most \a max_duration second for a task reception
279    on \a channel. *\a PID is updated with the PID of the first process
280    that triggered this event if any.
281  *
282  * It takes three parameters:
283  * \param channel the channel on which the agent should be
284    listening. This value has to be >=0 and < than the maximal.
285    number of channels fixed with MSG_set_channel_number().
286  * \param PID a memory location for storing an int.
287  * \param max_duration the maximum time to wait for a task before
288     giving up. In the case of a reception, *\a PID will be updated
289     with the PID of the first process to send a task.
290  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
291    and #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
294                                     int *PID)
295 {
296   m_host_t h = NULL;
297   simdata_host_t h_simdata = NULL;
298   xbt_fifo_item_t item;
299   m_task_t t;
300   int first_time = 1;
301   m_process_t process = MSG_process_self();
302
303   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
304   if(PID) {
305     *PID = -1;
306   }
307
308   if(max_duration==0.0) {
309     *PID = MSG_task_probe_from(channel);
310     MSG_RETURN(MSG_OK);
311   } else {
312     CHECK_HOST();
313     h = MSG_host_self();
314     h_simdata = h->simdata;
315     
316     DEBUG2("Probing on channel %d (%s)", channel,h->name);
317     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
318       if(max_duration>0) {
319         if(!first_time) {
320           MSG_RETURN(MSG_OK);
321         }
322       }
323       xbt_assert2(!(h_simdata->sleeping[channel]),
324                   "A process (%s(%d)) is already blocked on this channel",
325                   h_simdata->sleeping[channel]->name,
326                   h_simdata->sleeping[channel]->simdata->PID);
327       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
328       if(max_duration>0) {
329         __MSG_process_block(max_duration);
330       } else {
331         __MSG_process_block(-1);
332       }
333       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
334          == SURF_CPU_OFF) {
335         MSG_RETURN(MSG_HOST_FAILURE);
336       }
337       h_simdata->sleeping[channel] = NULL;
338       first_time = 0;
339     }
340     if (!item || !(t = xbt_fifo_get_item_content(item))) {
341       MSG_RETURN(MSG_OK);
342     }
343     if(PID) {
344       *PID = MSG_process_get_PID(t->simdata->sender);
345     }
346     MSG_RETURN(MSG_OK);
347   }
348 }
349
350
351 /** \ingroup msg_gos_functions
352
353  * \brief Return the number of tasks waiting to be received on a \a
354    channel and sent by \a host.
355  *
356  * It takes two parameters.
357  * \param channel the channel on which the agent should be
358    listening. This value has to be >=0 and < than the maximal
359    number of channels fixed with MSG_set_channel_number().
360  * \param host the host that is to be watched.
361  * \return the number of tasks waiting to be received on \a channel
362    and sent by \a host.
363  */
364 int MSG_task_probe_from_host(int channel, m_host_t host)
365 {
366   simdata_host_t h_simdata = NULL;
367   xbt_fifo_item_t item;
368   m_task_t t;
369   int count = 0;
370   m_host_t h = NULL;
371   
372   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
373   CHECK_HOST();
374   h = MSG_host_self();
375   h_simdata = h->simdata;
376
377   DEBUG2("Probing on channel %d (%s)", channel,h->name);
378    
379   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
380     if(t->simdata->source==host) count++;
381   }
382    
383   return count;
384 }
385
386 /** \ingroup msg_gos_functions
387  * \brief Put a task on a channel of an host and waits for the end of the
388  * transmission.
389  *
390  * This function is used for describing the behavior of an agent. It
391  * takes three parameter.
392  * \param task a #m_task_t to send on another location. This task
393    will not be usable anymore when the function will return. There is
394    no automatic task duplication and you have to save your parameters
395    before calling this function. Tasks are unique and once it has been
396    sent to another location, you should not access it anymore. You do
397    not need to call MSG_task_destroy() but to avoid using, as an
398    effect of inattention, this task anymore, you definitely should
399    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
400    can be transfered iff it has been correctly created with
401    MSG_task_create().
402  * \param dest the destination of the message
403  * \param channel the channel on which the agent should put this
404    task. This value has to be >=0 and < than the maximal number of
405    channels fixed with MSG_set_channel_number().
406  * \return #MSG_FATAL if \a task is not properly initialized and
407  * #MSG_OK otherwise.
408  */
409 MSG_error_t MSG_task_put(m_task_t task,
410                          m_host_t dest, m_channel_t channel)
411 {
412   m_process_t process = MSG_process_self();
413   simdata_task_t task_simdata = NULL;
414   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
415   m_host_t local_host = NULL;
416   m_host_t remote_host = NULL;
417
418   CHECK_HOST();
419
420   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
421
422   task_simdata = task->simdata;
423   task_simdata->sender = process;
424   task_simdata->source = MSG_process_get_host(process);
425   xbt_assert0(task_simdata->using==1,
426               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
427   task_simdata->comm = NULL;
428   
429   local_host = ((simdata_process_t) process->simdata)->host;
430   remote_host = dest;
431
432   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
433          task->simdata->message_size,local_host->name, remote_host->name, channel);
434
435   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
436                 mbox[channel], task);
437
438   PAJE_COMM_START(process,task,channel);
439     
440   if(remote_host->simdata->sleeping[channel]) {
441     DEBUG0("Somebody is listening. Let's wake him up!");
442     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
443   }
444
445   process->simdata->put_host = dest;
446   process->simdata->put_channel = channel;
447   while(!(task_simdata->comm)) {
448     DEBUG0("Communication not initiated yet. Let's block!");
449     __MSG_process_block(-1);
450     if(surf_workstation_resource->extension_public->
451        get_state(remote_host->simdata->host) == SURF_CPU_OFF) {
452       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
453                       task);
454       MSG_task_destroy(task);
455       MSG_RETURN(MSG_HOST_FAILURE);
456     }
457   }
458   DEBUG0("Registering to this communication");
459   surf_workstation_resource->common_public->action_use(task_simdata->comm);
460   process->simdata->put_host = NULL;
461   process->simdata->put_channel = -1;
462
463
464   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
465
466   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
467   while (state==SURF_ACTION_RUNNING) {
468     DEBUG0("Waiting for action termination");
469     __MSG_task_wait_event(process, task);
470     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
471   }
472   DEBUG0("Action terminated");
473   task->simdata->rate=-1.0; /* Sets the rate back to default */
474
475   PAJE_PROCESS_POP_STATE(process);  
476
477   if(state == SURF_ACTION_DONE) {
478     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
479       task_simdata->comm = NULL;
480     MSG_task_destroy(task);
481     MSG_RETURN(MSG_OK);
482   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
483             == SURF_CPU_OFF) {
484     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
485       task_simdata->comm = NULL;
486     MSG_task_destroy(task);
487     MSG_RETURN(MSG_HOST_FAILURE);
488   } else { 
489     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
490       task_simdata->comm = NULL;
491     MSG_task_destroy(task);
492     MSG_RETURN(MSG_TRANSFER_FAILURE);
493   }
494 }
495
496 /** \ingroup msg_gos_functions
497  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
498  * rate.
499  *
500  * \sa MSG_task_put
501  */
502 MSG_error_t MSG_task_put_bounded(m_task_t task,
503                                  m_host_t dest, m_channel_t channel,
504                                  double max_rate)
505 {
506   MSG_error_t res = MSG_OK;
507   task->simdata->rate=max_rate;
508   res = MSG_task_put(task, dest, channel);
509   return(res);
510 }
511
512 /** \ingroup msg_gos_functions
513  * \brief Executes a task and waits for its termination.
514  *
515  * This function is used for describing the behavior of an agent. It
516  * takes only one parameter.
517  * \param task a #m_task_t to execute on the location on which the
518    agent is running.
519  * \return #MSG_FATAL if \a task is not properly initialized and
520  * #MSG_OK otherwise.
521  */
522 MSG_error_t MSG_task_execute(m_task_t task)
523 {
524   m_process_t process = MSG_process_self();
525   MSG_error_t res;
526
527   DEBUG1("Computing on %s", process->simdata->host->name);
528
529   __MSG_task_execute(process, task);
530
531   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
532   res = __MSG_wait_for_computation(process,task);
533   PAJE_PROCESS_POP_STATE(process);
534   return res;
535 }
536
537 void __MSG_task_execute(m_process_t process, m_task_t task)
538 {
539   simdata_task_t simdata = NULL;
540
541   CHECK_HOST();
542
543   simdata = task->simdata;
544   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
545               "This taks is executed somewhere else. Go fix your code!");
546   task->simdata->using++;
547   simdata->compute = surf_workstation_resource->extension_public->
548     execute(MSG_process_get_host(process)->simdata->host,
549             simdata->computation_amount);
550   surf_workstation_resource->common_public->
551     set_priority(simdata->compute, simdata->priority);
552
553   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
554   task->simdata->using--;
555 }
556
557 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
558 {
559   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
560   simdata_task_t simdata = task->simdata;
561
562   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
563   simdata->using++;
564   do {
565     __MSG_task_wait_event(process, task);
566     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
567   } while (state==SURF_ACTION_RUNNING);
568   simdata->using--;
569     
570
571   if(state == SURF_ACTION_DONE) {
572     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
573       simdata->compute = NULL;
574     simdata->computation_amount = 0.0;
575     XBT_OUT;
576     MSG_RETURN(MSG_OK);
577   } else if(surf_workstation_resource->extension_public->
578             get_state(MSG_process_get_host(process)->simdata->host) 
579             == SURF_CPU_OFF) {
580     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
581       simdata->compute = NULL;
582     XBT_OUT;
583     MSG_RETURN(MSG_HOST_FAILURE);
584   } else {
585     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
586       simdata->compute = NULL;
587     XBT_OUT;
588     MSG_RETURN(MSG_TASK_CANCELLED);
589   }
590 }
591 /** \ingroup m_task_management
592  * \brief Creates a new #m_task_t (a parallel one....).
593  *
594  * A constructor for #m_task_t taking six arguments and returning the 
595    corresponding object.
596  * \param name a name for the object. It is for user-level information
597    and can be NULL.
598  * \param host_nb the number of hosts implied in the parallel task.
599  * \param host_list an array of \p host_nb m_host_t.
600  * \param computation_amount an array of \p host_nb
601    doubles. computation_amount[i] is the total number of operations
602    that have to be performed on host_list[i].
603  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
604  * \param data a pointer to any data may want to attach to the new
605    object.  It is for user-level information and can be NULL. It can
606    be retrieved with the function \ref MSG_task_get_data.
607  * \see m_task_t
608  * \return The new corresponding object.
609  */
610 m_task_t MSG_parallel_task_create(const char *name, 
611                                   int host_nb,
612                                   const m_host_t *host_list,
613                                   double *computation_amount,
614                                   double *communication_amount,
615                                   void *data)
616 {
617   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
618   m_task_t task = xbt_new0(s_m_task_t,1);
619   int i;
620
621   /* Task structure */
622   task->name = xbt_strdup(name);
623   task->simdata = simdata;
624   task->data = data;
625
626   /* Simulator Data */
627   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
628   simdata->rate = -1.0;
629   simdata->using = 1;
630   simdata->sender = NULL;
631   simdata->source = NULL;
632   simdata->host_nb = host_nb;
633   
634   simdata->host_list = xbt_new0(void *, host_nb);
635   simdata->comp_amount = computation_amount;
636   simdata->comm_amount = communication_amount;
637
638   for(i=0;i<host_nb;i++)
639     simdata->host_list[i] = host_list[i]->simdata->host;
640
641   return task;
642 }
643
644
645 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
646 {
647   simdata_task_t simdata = NULL;
648
649   CHECK_HOST();
650
651   simdata = task->simdata;
652
653   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
654
655   simdata->compute = surf_workstation_resource->extension_public->
656   execute_parallel_task(task->simdata->host_nb,
657                         task->simdata->host_list,
658                         task->simdata->comp_amount,
659                         task->simdata->comm_amount,
660                         1.0,
661                         -1.0);
662   if(simdata->compute)
663     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
664 }
665
666 MSG_error_t MSG_parallel_task_execute(m_task_t task)
667 {
668   m_process_t process = MSG_process_self();
669   MSG_error_t res;
670
671   DEBUG0("Computing on a tons of guys");
672   
673   __MSG_parallel_task_execute(process, task);
674
675   if(task->simdata->compute)
676     res = __MSG_wait_for_computation(process,task);
677   else 
678     res = MSG_OK;
679
680   return res;  
681 }
682
683
684 /** \ingroup msg_gos_functions
685  * \brief Sleep for the specified number of seconds
686  *
687  * Makes the current process sleep until \a time seconds have elapsed.
688  *
689  * \param nb_sec a number of second
690  */
691 MSG_error_t MSG_process_sleep(double nb_sec)
692 {
693   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
694   m_process_t process = MSG_process_self();
695   m_task_t dummy = NULL;
696   simdata_task_t simdata = NULL;
697
698   CHECK_HOST();
699   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
700   simdata = dummy->simdata;
701
702   simdata->compute = surf_workstation_resource->extension_public->
703     sleep(MSG_process_get_host(process)->simdata->host,
704             simdata->computation_amount);
705   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
706
707   
708   simdata->using++;
709   do {
710     __MSG_task_wait_event(process, dummy);
711     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
712   } while (state==SURF_ACTION_RUNNING);
713   simdata->using--;
714     
715   if(state == SURF_ACTION_DONE) {
716     if(surf_workstation_resource->extension_public->
717        get_state(MSG_process_get_host(process)->simdata->host) 
718        == SURF_CPU_OFF) {
719       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
720         simdata->compute = NULL;
721       MSG_RETURN(MSG_HOST_FAILURE);
722     }
723     if(__MSG_process_isBlocked(process)) {
724       __MSG_process_unblock(MSG_process_self());
725     }
726     if(surf_workstation_resource->extension_public->
727        get_state(MSG_process_get_host(process)->simdata->host) 
728        == SURF_CPU_OFF) {
729       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
730         simdata->compute = NULL;
731       MSG_RETURN(MSG_HOST_FAILURE);
732     }
733     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
734       simdata->compute = NULL;
735     MSG_task_destroy(dummy);
736     MSG_RETURN(MSG_OK);
737   } else MSG_RETURN(MSG_HOST_FAILURE);
738 }
739
740 /** \ingroup msg_gos_functions
741  * \brief Return the number of MSG tasks currently running on
742  * the host of the current running process.
743  */
744 static int MSG_get_msgload(void) 
745 {
746   m_process_t process;
747    
748   CHECK_HOST();
749   
750   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
751   process = MSG_process_self();
752   return xbt_fifo_size(process->simdata->host->simdata->process_list);
753 }
754
755 /** \ingroup msg_gos_functions
756  *
757  * \brief Return the last value returned by a MSG function (except
758  * MSG_get_errno...).
759  */
760 MSG_error_t MSG_get_errno(void)
761 {
762   return PROCESS_GET_ERRNO();
763 }