Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a test in MSG_task_execute to stop whenever a task is being executed on two diffe...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_OK);
66       }
67     }
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     if(max_duration>0) {
74       __MSG_process_block(max_duration);
75     } else {
76       __MSG_process_block(-1);
77     }
78     h_simdata->sleeping[channel] = NULL;
79     first_time = 0;
80     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
81        == SURF_CPU_OFF)
82       MSG_RETURN(MSG_HOST_FAILURE);
83     /* OK, we should both be ready now. Are you there ? */
84   }
85
86   DEBUG1("OK, got a task (%s)", t->name);
87
88   t_simdata = t->simdata;
89   /*   *task = __MSG_task_copy(t); */
90   *task=t;
91
92   /* Transfer */
93   t_simdata->using++;
94
95   while(MSG_process_is_suspended(t_simdata->sender)) {
96     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
97            t_simdata->sender->name);
98     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
99     if(__MSG_process_isBlocked(t_simdata->sender)) {
100       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
101       __MSG_process_unblock(t_simdata->sender);
102       task_to_wait_for->simdata->using++;
103       __MSG_task_wait_event(process, task_to_wait_for);
104       MSG_task_destroy(task_to_wait_for);
105     } else {
106       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
107       task_to_wait_for->simdata->using++;
108       __MSG_task_wait_event(process, task_to_wait_for);
109       MSG_task_destroy(task_to_wait_for);
110       DEBUG0("He's resumed. He should block again. So let's free him.");
111       __MSG_process_unblock(t_simdata->sender);
112       break;
113     }
114   }
115   DEBUG0("Calling SURF for communication creation");
116   t_simdata->comm = surf_workstation_resource->extension_public->
117     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
118                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
119   
120   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
121
122   if(__MSG_process_isBlocked(t_simdata->sender)) {
123     DEBUG1("Unblocking %s",t_simdata->sender->name);
124     __MSG_process_unblock(t_simdata->sender);
125   }
126
127   PAJE_PROCESS_PUSH_STATE(process,"C");  
128
129   do {
130     DEBUG0("Waiting for action termination");
131     __MSG_task_wait_event(process, t);
132     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
133   } while (state==SURF_ACTION_RUNNING);
134   DEBUG0("Action terminated");
135
136   if(t->simdata->using>1) {
137     xbt_fifo_unshift(msg_global->process_to_run,process);
138     xbt_context_yield();
139   }
140
141   PAJE_PROCESS_POP_STATE(process);
142   PAJE_COMM_STOP(process,t,channel);
143
144   if(state == SURF_ACTION_DONE) {
145     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
146       t_simdata->comm = NULL;
147     MSG_RETURN(MSG_OK);
148   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
149           == SURF_CPU_OFF) {
150     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
151       t_simdata->comm = NULL;
152     MSG_RETURN(MSG_HOST_FAILURE);
153   } else {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_TRANSFER_FAILURE);
157   }
158 }
159
160 /** \ingroup msg_gos_functions
161  * \brief Listen on a channel and wait for receiving a task.
162  *
163  * It takes two parameters.
164  * \param task a memory location for storing a #m_task_t. It will
165    hold a task when this function will return. Thus \a task should not
166    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
167    those two condition does not hold, there will be a warning message.
168  * \param channel the channel on which the agent should be
169    listening. This value has to be >=0 and < than the maximal
170    number of channels fixed with MSG_set_channel_number().
171  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
172  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
173  */
174 MSG_error_t MSG_task_get(m_task_t * task,
175                          m_channel_t channel)
176 {
177   return MSG_task_get_with_time_out(task, channel, -1);
178 }
179
180 /** \ingroup msg_gos_functions
181  * \brief Listen on a channel and wait for receiving a task with a timeout.
182  *
183  * It takes three parameters.
184  * \param task a memory location for storing a #m_task_t. It will
185    hold a task when this function will return. Thus \a task should not
186    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
187    those two condition does not hold, there will be a warning message.
188  * \param channel the channel on which the agent should be
189    listening. This value has to be >=0 and < than the maximal
190    number of channels fixed with MSG_set_channel_number().
191  * \param max_duration the maximum time to wait for a task before giving
192     up. In such a case, \a task will not be modified and will still be
193     equal to \c NULL when returning.
194  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
195    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
196  */
197 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
198                                        m_channel_t channel,
199                                        double max_duration)
200 {
201   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
202 }
203
204 /** \ingroup msg_gos_functions
205  * \brief Listen on \a channel and waits for receiving a task from \a host.
206  *
207  * It takes three parameters.
208  * \param task a memory location for storing a #m_task_t. It will
209    hold a task when this function will return. Thus \a task should not
210    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
211    those two condition does not hold, there will be a warning message.
212  * \param channel the channel on which the agent should be
213    listening. This value has to be >=0 and < than the maximal
214    number of channels fixed with MSG_set_channel_number().
215  * \param host the host that is to be watched.
216  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
217    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
218  */
219 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
220                                    m_host_t host)
221 {
222   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
223 }
224
225 /** \ingroup msg_gos_functions
226  * \brief Test whether there is a pending communication on a channel.
227  *
228  * It takes one parameter.
229  * \param channel the channel on which the agent should be
230    listening. This value has to be >=0 and < than the maximal
231    number of channels fixed with MSG_set_channel_number().
232  * \return 1 if there is a pending communication and 0 otherwise
233  */
234 int MSG_task_Iprobe(m_channel_t channel)
235 {
236   m_host_t h = NULL;
237   simdata_host_t h_simdata = NULL;
238
239   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
240   DEBUG2("Probing on channel %d (%s)", channel,h->name);
241   CHECK_HOST();
242   h = MSG_host_self();
243   h_simdata = h->simdata;
244   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
245 }
246
247 /** \ingroup msg_gos_functions
248  * \brief Test whether there is a pending communication on a channel, and who sent it.
249  *
250  * It takes one parameter.
251  * \param channel the channel on which the agent should be
252    listening. This value has to be >=0 and < than the maximal
253    number of channels fixed with MSG_set_channel_number().
254  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
255  */
256 int MSG_task_probe_from(m_channel_t channel)
257 {
258   m_host_t h = NULL;
259   simdata_host_t h_simdata = NULL;
260   xbt_fifo_item_t item;
261   m_task_t t;
262
263   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
264   CHECK_HOST();
265   h = MSG_host_self();
266   h_simdata = h->simdata;
267
268   DEBUG2("Probing on channel %d (%s)", channel,h->name);
269    
270   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
271   if (!item || !(t = xbt_fifo_get_item_content(item)))
272     return -1;
273    
274   return MSG_process_get_PID(t->simdata->sender);
275 }
276
277 /** \ingroup msg_gos_functions
278  * \brief Wait for at most \a max_duration second for a task reception
279    on \a channel. *\a PID is updated with the PID of the first process
280    that triggered this event if any.
281  *
282  * It takes three parameters:
283  * \param channel the channel on which the agent should be
284    listening. This value has to be >=0 and < than the maximal.
285    number of channels fixed with MSG_set_channel_number().
286  * \param PID a memory location for storing an int.
287  * \param max_duration the maximum time to wait for a task before
288     giving up. In the case of a reception, *\a PID will be updated
289     with the PID of the first process to send a task.
290  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
291    and #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
294                                     int *PID)
295 {
296   m_host_t h = NULL;
297   simdata_host_t h_simdata = NULL;
298   xbt_fifo_item_t item;
299   m_task_t t;
300   int first_time = 1;
301   m_process_t process = MSG_process_self();
302
303   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
304   if(PID) {
305     *PID = -1;
306   }
307
308   if(max_duration==0.0) {
309     *PID = MSG_task_probe_from(channel);
310     MSG_RETURN(MSG_OK);
311   } else {
312     CHECK_HOST();
313     h = MSG_host_self();
314     h_simdata = h->simdata;
315     
316     DEBUG2("Probing on channel %d (%s)", channel,h->name);
317     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
318       if(max_duration>0) {
319         if(!first_time) {
320           MSG_RETURN(MSG_OK);
321         }
322       }
323       xbt_assert2(!(h_simdata->sleeping[channel]),
324                   "A process (%s(%d)) is already blocked on this channel",
325                   h_simdata->sleeping[channel]->name,
326                   h_simdata->sleeping[channel]->simdata->PID);
327       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
328       if(max_duration>0) {
329         __MSG_process_block(max_duration);
330       } else {
331         __MSG_process_block(-1);
332       }
333       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
334          == SURF_CPU_OFF) {
335         MSG_RETURN(MSG_HOST_FAILURE);
336       }
337       h_simdata->sleeping[channel] = NULL;
338       first_time = 0;
339     }
340     if (!item || !(t = xbt_fifo_get_item_content(item))) {
341       MSG_RETURN(MSG_OK);
342     }
343     if(PID) {
344       *PID = MSG_process_get_PID(t->simdata->sender);
345     }
346     MSG_RETURN(MSG_OK);
347   }
348 }
349
350
351 /** \ingroup msg_gos_functions
352
353  * \brief Return the number of tasks waiting to be received on a \a
354    channel and sent by \a host.
355  *
356  * It takes two parameters.
357  * \param channel the channel on which the agent should be
358    listening. This value has to be >=0 and < than the maximal
359    number of channels fixed with MSG_set_channel_number().
360  * \param host the host that is to be watched.
361  * \return the number of tasks waiting to be received on \a channel
362    and sent by \a host.
363  */
364 int MSG_task_probe_from_host(int channel, m_host_t host)
365 {
366   simdata_host_t h_simdata = NULL;
367   xbt_fifo_item_t item;
368   m_task_t t;
369   int count = 0;
370   m_host_t h = NULL;
371   
372   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
373   CHECK_HOST();
374   h = MSG_host_self();
375   h_simdata = h->simdata;
376
377   DEBUG2("Probing on channel %d (%s)", channel,h->name);
378    
379   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
380     if(t->simdata->source==host) count++;
381   }
382    
383   return count;
384 }
385
386 /** \ingroup msg_gos_functions
387  * \brief Put a task on a channel of an host and waits for the end of the
388  * transmission.
389  *
390  * This function is used for describing the behavior of an agent. It
391  * takes three parameter.
392  * \param task a #m_task_t to send on another location. This task
393    will not be usable anymore when the function will return. There is
394    no automatic task duplication and you have to save your parameters
395    before calling this function. Tasks are unique and once it has been
396    sent to another location, you should not access it anymore. You do
397    not need to call MSG_task_destroy() but to avoid using, as an
398    effect of inattention, this task anymore, you definitely should
399    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
400    can be transfered iff it has been correctly created with
401    MSG_task_create().
402  * \param dest the destination of the message
403  * \param channel the channel on which the agent should put this
404    task. This value has to be >=0 and < than the maximal number of
405    channels fixed with MSG_set_channel_number().
406  * \return #MSG_FATAL if \a task is not properly initialized and
407  * #MSG_OK otherwise.
408  */
409 MSG_error_t MSG_task_put(m_task_t task,
410                          m_host_t dest, m_channel_t channel)
411 {
412   m_process_t process = MSG_process_self();
413   simdata_task_t task_simdata = NULL;
414   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
415   m_host_t local_host = NULL;
416   m_host_t remote_host = NULL;
417
418   CHECK_HOST();
419
420   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
421
422   task_simdata = task->simdata;
423   task_simdata->sender = process;
424   task_simdata->source = MSG_process_get_host(process);
425   xbt_assert0(task_simdata->using==1,
426               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
427   task_simdata->comm = NULL;
428   
429   local_host = ((simdata_process_t) process->simdata)->host;
430   remote_host = dest;
431
432   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
433          task->simdata->message_size,local_host->name, remote_host->name, channel);
434
435   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
436                 mbox[channel], task);
437
438   PAJE_COMM_START(process,task,channel);
439     
440   if(remote_host->simdata->sleeping[channel]) {
441     DEBUG0("Somebody is listening. Let's wake him up!");
442     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
443   }
444
445   process->simdata->put_host = dest;
446   process->simdata->put_channel = channel;
447   while(!(task_simdata->comm)) {
448     DEBUG0("Communication not initiated yet. Let's block!");
449     __MSG_process_block(-1);
450   }
451   DEBUG0("Registering to this communication");
452   surf_workstation_resource->common_public->action_use(task_simdata->comm);
453   process->simdata->put_host = NULL;
454   process->simdata->put_channel = -1;
455
456
457   PAJE_PROCESS_PUSH_STATE(process,"C");  
458
459   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
460   while (state==SURF_ACTION_RUNNING) {
461     DEBUG0("Waiting for action termination");
462     __MSG_task_wait_event(process, task);
463     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
464   }
465   DEBUG0("Action terminated");
466   task->simdata->rate=-1.0; /* Sets the rate back to default */
467
468   PAJE_PROCESS_POP_STATE(process);  
469
470   if(state == SURF_ACTION_DONE) {
471     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
472       task_simdata->comm = NULL;
473     MSG_task_destroy(task);
474     MSG_RETURN(MSG_OK);
475   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
476             == SURF_CPU_OFF) {
477     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
478       task_simdata->comm = NULL;
479     MSG_task_destroy(task);
480     MSG_RETURN(MSG_HOST_FAILURE);
481   } else { 
482     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
483       task_simdata->comm = NULL;
484     MSG_task_destroy(task);
485     MSG_RETURN(MSG_TRANSFER_FAILURE);
486   }
487 }
488
489 /** \ingroup msg_gos_functions
490  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
491  * rate.
492  *
493  * \sa MSG_task_put
494  */
495 MSG_error_t MSG_task_put_bounded(m_task_t task,
496                                  m_host_t dest, m_channel_t channel,
497                                  double max_rate)
498 {
499   MSG_error_t res = MSG_OK;
500   task->simdata->rate=max_rate;
501   res = MSG_task_put(task, dest, channel);
502   return(res);
503 }
504
505 /** \ingroup msg_gos_functions
506  * \brief Executes a task and waits for its termination.
507  *
508  * This function is used for describing the behavior of an agent. It
509  * takes only one parameter.
510  * \param task a #m_task_t to execute on the location on which the
511    agent is running.
512  * \return #MSG_FATAL if \a task is not properly initialized and
513  * #MSG_OK otherwise.
514  */
515 MSG_error_t MSG_task_execute(m_task_t task)
516 {
517   m_process_t process = MSG_process_self();
518   MSG_error_t res;
519
520   DEBUG1("Computing on %s", process->simdata->host->name);
521
522   __MSG_task_execute(process, task);
523
524   PAJE_PROCESS_PUSH_STATE(process,"E");  
525   res = __MSG_wait_for_computation(process,task);
526   PAJE_PROCESS_POP_STATE(process);
527   return res;
528 }
529
530 void __MSG_task_execute(m_process_t process, m_task_t task)
531 {
532   simdata_task_t simdata = NULL;
533
534   CHECK_HOST();
535
536   simdata = task->simdata;
537   xbt_assert0(!simdata->compute,"This taks is executed somewhere else. Go fix your code!");
538
539   simdata->compute = surf_workstation_resource->extension_public->
540     execute(MSG_process_get_host(process)->simdata->host,
541             simdata->computation_amount);
542   surf_workstation_resource->common_public->
543     set_priority(simdata->compute, simdata->priority);
544
545   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
546 }
547
548 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
549 {
550   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
551   simdata_task_t simdata = task->simdata;
552
553   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
554   simdata->using++;
555   do {
556     __MSG_task_wait_event(process, task);
557     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
558   } while (state==SURF_ACTION_RUNNING);
559   simdata->using--;
560     
561
562   if(state == SURF_ACTION_DONE) {
563     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
564       simdata->compute = NULL;
565     simdata->computation_amount = 0.0;
566     XBT_OUT;
567     MSG_RETURN(MSG_OK);
568   } else if(surf_workstation_resource->extension_public->
569             get_state(MSG_process_get_host(process)->simdata->host) 
570             == SURF_CPU_OFF) {
571     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
572       simdata->compute = NULL;
573     XBT_OUT;
574     MSG_RETURN(MSG_HOST_FAILURE);
575   } else {
576     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
577       simdata->compute = NULL;
578     XBT_OUT;
579     MSG_RETURN(MSG_TASK_CANCELLED);
580   }
581 }
582 /** \ingroup m_task_management
583  * \brief Creates a new #m_task_t (a parallel one....).
584  *
585  * A constructor for #m_task_t taking six arguments and returning the 
586    corresponding object.
587  * \param name a name for the object. It is for user-level information
588    and can be NULL.
589  * \param host_nb the number of hosts implied in the parallel task.
590  * \param host_list an array of \p host_nb m_host_t.
591  * \param computation_amount an array of \p host_nb
592    doubles. computation_amount[i] is the total number of operations
593    that have to be performed on host_list[i].
594  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
595  * \param data a pointer to any data may want to attach to the new
596    object.  It is for user-level information and can be NULL. It can
597    be retrieved with the function \ref MSG_task_get_data.
598  * \see m_task_t
599  * \return The new corresponding object.
600  */
601 m_task_t MSG_parallel_task_create(const char *name, 
602                                   int host_nb,
603                                   const m_host_t *host_list,
604                                   double *computation_amount,
605                                   double *communication_amount,
606                                   void *data)
607 {
608   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
609   m_task_t task = xbt_new0(s_m_task_t,1);
610   int i;
611
612   /* Task structure */
613   task->name = xbt_strdup(name);
614   task->simdata = simdata;
615   task->data = data;
616
617   /* Simulator Data */
618   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
619   simdata->rate = -1.0;
620   simdata->using = 1;
621   simdata->sender = NULL;
622   simdata->source = NULL;
623   simdata->host_nb = host_nb;
624   
625   simdata->host_list = xbt_new0(void *, host_nb);
626   simdata->comp_amount = computation_amount;
627   simdata->comm_amount = communication_amount;
628
629   for(i=0;i<host_nb;i++)
630     simdata->host_list[i] = host_list[i]->simdata->host;
631
632   return task;
633 }
634
635
636 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
637 {
638   simdata_task_t simdata = NULL;
639
640   CHECK_HOST();
641
642   simdata = task->simdata;
643
644   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
645
646   simdata->compute = surf_workstation_resource->extension_public->
647   execute_parallel_task(task->simdata->host_nb,
648                         task->simdata->host_list,
649                         task->simdata->comp_amount,
650                         task->simdata->comm_amount,
651                         1.0,
652                         -1.0);
653   if(simdata->compute)
654     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
655 }
656
657 MSG_error_t MSG_parallel_task_execute(m_task_t task)
658 {
659   m_process_t process = MSG_process_self();
660   MSG_error_t res;
661
662   DEBUG0("Computing on a tons of guys");
663   
664   __MSG_parallel_task_execute(process, task);
665
666   if(task->simdata->compute)
667     res = __MSG_wait_for_computation(process,task);
668   else 
669     res = MSG_OK;
670
671   return res;  
672 }
673
674
675 /** \ingroup msg_gos_functions
676  * \brief Sleep for the specified number of seconds
677  *
678  * Makes the current process sleep until \a time seconds have elapsed.
679  *
680  * \param nb_sec a number of second
681  */
682 MSG_error_t MSG_process_sleep(double nb_sec)
683 {
684   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
685   m_process_t process = MSG_process_self();
686   m_task_t dummy = NULL;
687   simdata_task_t simdata = NULL;
688
689   CHECK_HOST();
690   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
691   simdata = dummy->simdata;
692
693   simdata->compute = surf_workstation_resource->extension_public->
694     sleep(MSG_process_get_host(process)->simdata->host,
695             simdata->computation_amount);
696   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
697
698   
699   simdata->using++;
700   do {
701     __MSG_task_wait_event(process, dummy);
702     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
703   } while (state==SURF_ACTION_RUNNING);
704   simdata->using--;
705     
706   if(state == SURF_ACTION_DONE) {
707     if(surf_workstation_resource->extension_public->
708        get_state(MSG_process_get_host(process)->simdata->host) 
709        == SURF_CPU_OFF) {
710       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
711         simdata->compute = NULL;
712       MSG_RETURN(MSG_HOST_FAILURE);
713     }
714     if(__MSG_process_isBlocked(process)) {
715       __MSG_process_unblock(MSG_process_self());
716     }
717     if(surf_workstation_resource->extension_public->
718        get_state(MSG_process_get_host(process)->simdata->host) 
719        == SURF_CPU_OFF) {
720       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
721         simdata->compute = NULL;
722       MSG_RETURN(MSG_HOST_FAILURE);
723     }
724     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
725       simdata->compute = NULL;
726     MSG_task_destroy(dummy);
727     MSG_RETURN(MSG_OK);
728   } else MSG_RETURN(MSG_HOST_FAILURE);
729 }
730
731 /** \ingroup msg_gos_functions
732  * \brief Return the number of MSG tasks currently running on
733  * the host of the current running process.
734  */
735 static int MSG_get_msgload(void) 
736 {
737   m_process_t process;
738    
739   CHECK_HOST();
740   
741   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
742   process = MSG_process_self();
743   return xbt_fifo_size(process->simdata->host->simdata->process_list);
744 }
745
746 /** \ingroup msg_gos_functions
747  *
748  * \brief Return the last value returned by a MSG function (except
749  * MSG_get_errno...).
750  */
751 MSG_error_t MSG_get_errno(void)
752 {
753   return PROCESS_GET_ERRNO();
754 }