Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ok, Paje is in a sorry state, but this leads to segfaults so kill it
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         PAJE_PROCESS_POP_STATE(process);
66         MSG_RETURN(MSG_TRANSFER_FAILURE);
67       }
68     }
69     xbt_assert3(!(h_simdata->sleeping[channel]),
70                 "A process (%s(%d)) is already blocked on channel %d",
71                 h_simdata->sleeping[channel]->name,
72                 h_simdata->sleeping[channel]->simdata->PID,
73                 channel);
74     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
75     if(max_duration>0) {
76       __MSG_process_block(max_duration,"");
77     } else {
78       __MSG_process_block(-1,"");
79     }
80     h_simdata->sleeping[channel] = NULL;
81     first_time = 0;
82     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
83        == SURF_CPU_OFF)
84       MSG_RETURN(MSG_HOST_FAILURE);
85     /* OK, we should both be ready now. Are you there ? */
86   }
87
88   DEBUG1("OK, got a task (%s)", t->name);
89
90   t_simdata = t->simdata;
91   /*   *task = __MSG_task_copy(t); */
92   *task=t;
93
94   /* Transfer */
95   t_simdata->using++;
96
97   while(MSG_process_is_suspended(t_simdata->sender)) {
98     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
99            t_simdata->sender->name);
100     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
101     if(__MSG_process_isBlocked(t_simdata->sender)) {
102       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
103       __MSG_process_unblock(t_simdata->sender);
104       task_to_wait_for->simdata->using++;
105       __MSG_task_wait_event(process, task_to_wait_for);
106       MSG_task_destroy(task_to_wait_for);
107     } else {
108       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
109       task_to_wait_for->simdata->using++;
110       __MSG_task_wait_event(process, task_to_wait_for);
111       MSG_task_destroy(task_to_wait_for);
112       DEBUG0("He's resumed. He should block again. So let's free him.");
113       __MSG_process_unblock(t_simdata->sender);
114       break;
115     }
116   }
117   DEBUG0("Calling SURF for communication creation");
118   t_simdata->comm = surf_workstation_resource->extension_public->
119     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
120                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
121   
122   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
123
124   if(__MSG_process_isBlocked(t_simdata->sender)) {
125     DEBUG1("Unblocking %s",t_simdata->sender->name);
126     __MSG_process_unblock(t_simdata->sender);
127   }
128
129   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
130
131   do {
132     DEBUG0("Waiting for action termination");
133     __MSG_task_wait_event(process, t);
134     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
135   } while (state==SURF_ACTION_RUNNING);
136   DEBUG0("Action terminated");
137
138   if(t->simdata->using>1) {
139     xbt_fifo_unshift(msg_global->process_to_run,process);
140     xbt_context_yield();
141   }
142
143   PAJE_PROCESS_POP_STATE(process);
144   PAJE_COMM_STOP(process,t,channel);
145
146   if(state == SURF_ACTION_DONE) {
147     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
148       t_simdata->comm = NULL;
149     MSG_RETURN(MSG_OK);
150   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
151           == SURF_CPU_OFF) {
152     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
153       t_simdata->comm = NULL;
154     MSG_RETURN(MSG_HOST_FAILURE);
155   } else {
156     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
157       t_simdata->comm = NULL;
158     MSG_RETURN(MSG_TRANSFER_FAILURE);
159   }
160 }
161
162 /** \ingroup msg_gos_functions
163  * \brief Listen on a channel and wait for receiving a task.
164  *
165  * It takes two parameters.
166  * \param task a memory location for storing a #m_task_t. It will
167    hold a task when this function will return. Thus \a task should not
168    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
169    those two condition does not hold, there will be a warning message.
170  * \param channel the channel on which the agent should be
171    listening. This value has to be >=0 and < than the maximal
172    number of channels fixed with MSG_set_channel_number().
173  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
174  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
175  */
176 MSG_error_t MSG_task_get(m_task_t * task,
177                          m_channel_t channel)
178 {
179   return MSG_task_get_with_time_out(task, channel, -1);
180 }
181
182 /** \ingroup msg_gos_functions
183  * \brief Listen on a channel and wait for receiving a task with a timeout.
184  *
185  * It takes three parameters.
186  * \param task a memory location for storing a #m_task_t. It will
187    hold a task when this function will return. Thus \a task should not
188    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
189    those two condition does not hold, there will be a warning message.
190  * \param channel the channel on which the agent should be
191    listening. This value has to be >=0 and < than the maximal
192    number of channels fixed with MSG_set_channel_number().
193  * \param max_duration the maximum time to wait for a task before giving
194     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
195     will not be modified and will still be
196     equal to \c NULL when returning. 
197  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
198    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
199  */
200 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
201                                        m_channel_t channel,
202                                        double max_duration)
203 {
204   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
205 }
206
207 /** \ingroup msg_gos_functions
208  * \brief Listen on \a channel and waits for receiving a task from \a host.
209  *
210  * It takes three parameters.
211  * \param task a memory location for storing a #m_task_t. It will
212    hold a task when this function will return. Thus \a task should not
213    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
214    those two condition does not hold, there will be a warning message.
215  * \param channel the channel on which the agent should be
216    listening. This value has to be >=0 and < than the maximal
217    number of channels fixed with MSG_set_channel_number().
218  * \param host the host that is to be watched.
219  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
220    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
221  */
222 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
223                                    m_host_t host)
224 {
225   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
226 }
227
228 /** \ingroup msg_gos_functions
229  * \brief Test whether there is a pending communication on a channel.
230  *
231  * It takes one parameter.
232  * \param channel the channel on which the agent should be
233    listening. This value has to be >=0 and < than the maximal
234    number of channels fixed with MSG_set_channel_number().
235  * \return 1 if there is a pending communication and 0 otherwise
236  */
237 int MSG_task_Iprobe(m_channel_t channel)
238 {
239   m_host_t h = NULL;
240   simdata_host_t h_simdata = NULL;
241
242   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
243   DEBUG2("Probing on channel %d (%s)", channel,h->name);
244   CHECK_HOST();
245   h = MSG_host_self();
246   h_simdata = h->simdata;
247   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
248 }
249
250 /** \ingroup msg_gos_functions
251  * \brief Test whether there is a pending communication on a channel, and who sent it.
252  *
253  * It takes one parameter.
254  * \param channel the channel on which the agent should be
255    listening. This value has to be >=0 and < than the maximal
256    number of channels fixed with MSG_set_channel_number().
257  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
258  */
259 int MSG_task_probe_from(m_channel_t channel)
260 {
261   m_host_t h = NULL;
262   simdata_host_t h_simdata = NULL;
263   xbt_fifo_item_t item;
264   m_task_t t;
265
266   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
267   CHECK_HOST();
268   h = MSG_host_self();
269   h_simdata = h->simdata;
270
271   DEBUG2("Probing on channel %d (%s)", channel,h->name);
272    
273   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
274   if (!item || !(t = xbt_fifo_get_item_content(item)))
275     return -1;
276    
277   return MSG_process_get_PID(t->simdata->sender);
278 }
279
280 /** \ingroup msg_gos_functions
281  * \brief Wait for at most \a max_duration second for a task reception
282    on \a channel. *\a PID is updated with the PID of the first process
283    that triggered this event if any.
284  *
285  * It takes three parameters:
286  * \param channel the channel on which the agent should be
287    listening. This value has to be >=0 and < than the maximal.
288    number of channels fixed with MSG_set_channel_number().
289  * \param PID a memory location for storing an int.
290  * \param max_duration the maximum time to wait for a task before
291     giving up. In the case of a reception, *\a PID will be updated
292     with the PID of the first process to send a task.
293  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
294    and #MSG_OK otherwise.
295  */
296 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
297                                     int *PID)
298 {
299   m_host_t h = NULL;
300   simdata_host_t h_simdata = NULL;
301   xbt_fifo_item_t item;
302   m_task_t t;
303   int first_time = 1;
304   m_process_t process = MSG_process_self();
305
306   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
307   if(PID) {
308     *PID = -1;
309   }
310
311   if(max_duration==0.0) {
312     *PID = MSG_task_probe_from(channel);
313     MSG_RETURN(MSG_OK);
314   } else {
315     CHECK_HOST();
316     h = MSG_host_self();
317     h_simdata = h->simdata;
318     
319     DEBUG2("Probing on channel %d (%s)", channel,h->name);
320     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
321       if(max_duration>0) {
322         if(!first_time) {
323           MSG_RETURN(MSG_OK);
324         }
325       }
326       xbt_assert2(!(h_simdata->sleeping[channel]),
327                   "A process (%s(%d)) is already blocked on this channel",
328                   h_simdata->sleeping[channel]->name,
329                   h_simdata->sleeping[channel]->simdata->PID);
330       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
331       if(max_duration>0) {
332         __MSG_process_block(max_duration,"");
333       } else {
334         __MSG_process_block(-1,"");
335       }
336       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
337          == SURF_CPU_OFF) {
338         MSG_RETURN(MSG_HOST_FAILURE);
339       }
340       h_simdata->sleeping[channel] = NULL;
341       first_time = 0;
342     }
343     if (!item || !(t = xbt_fifo_get_item_content(item))) {
344       MSG_RETURN(MSG_OK);
345     }
346     if(PID) {
347       *PID = MSG_process_get_PID(t->simdata->sender);
348     }
349     MSG_RETURN(MSG_OK);
350   }
351 }
352
353
354 /** \ingroup msg_gos_functions
355
356  * \brief Return the number of tasks waiting to be received on a \a
357    channel and sent by \a host.
358  *
359  * It takes two parameters.
360  * \param channel the channel on which the agent should be
361    listening. This value has to be >=0 and < than the maximal
362    number of channels fixed with MSG_set_channel_number().
363  * \param host the host that is to be watched.
364  * \return the number of tasks waiting to be received on \a channel
365    and sent by \a host.
366  */
367 int MSG_task_probe_from_host(int channel, m_host_t host)
368 {
369   simdata_host_t h_simdata = NULL;
370   xbt_fifo_item_t item;
371   m_task_t t;
372   int count = 0;
373   m_host_t h = NULL;
374   
375   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
376   CHECK_HOST();
377   h = MSG_host_self();
378   h_simdata = h->simdata;
379
380   DEBUG2("Probing on channel %d (%s)", channel,h->name);
381    
382   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
383     if(t->simdata->source==host) count++;
384   }
385    
386   return count;
387 }
388
389 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
390  * host (with a timeout on the waiting of the destination host) and
391  * waits for the end of the transmission.
392  *
393  * This function is used for describing the behavior of an agent. It
394  * takes four parameter.
395  * \param task a #m_task_t to send on another location. This task
396    will not be usable anymore when the function will return. There is
397    no automatic task duplication and you have to save your parameters
398    before calling this function. Tasks are unique and once it has been
399    sent to another location, you should not access it anymore. You do
400    not need to call MSG_task_destroy() but to avoid using, as an
401    effect of inattention, this task anymore, you definitely should
402    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
403    can be transfered iff it has been correctly created with
404    MSG_task_create().
405  * \param dest the destination of the message
406  * \param channel the channel on which the agent should put this
407    task. This value has to be >=0 and < than the maximal number of
408    channels fixed with MSG_set_channel_number().
409  * \param max_duration the maximum time to wait for a task before giving
410     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
411     will not be modified 
412  * \return #MSG_FATAL if \a task is not properly initialized and
413    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
414    this function was called was shut down. Returns
415    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
416    (network failure, dest failure, timeout...)
417  */
418 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
419                                       m_channel_t channel, double max_duration)
420 {
421   m_process_t process = MSG_process_self();
422   simdata_task_t task_simdata = NULL;
423   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
424   m_host_t local_host = NULL;
425   m_host_t remote_host = NULL;
426   int first_time = 1;
427
428   CHECK_HOST();
429
430   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
431
432   task_simdata = task->simdata;
433   task_simdata->sender = process;
434   task_simdata->source = MSG_process_get_host(process);
435   xbt_assert0(task_simdata->using==1,
436               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
437   task_simdata->comm = NULL;
438   
439   local_host = ((simdata_process_t) process->simdata)->host;
440   remote_host = dest;
441
442   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
443          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
444
445   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
446                 mbox[channel], task);
447
448   PAJE_COMM_START(process,task,channel);
449     
450   if(remote_host->simdata->sleeping[channel]) {
451     DEBUG0("Somebody is listening. Let's wake him up!");
452     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
453   }
454
455   process->simdata->put_host = dest;
456   process->simdata->put_channel = channel;
457   while(!(task_simdata->comm)) {
458     if(max_duration>0) {
459       if(!first_time) {
460         PAJE_PROCESS_POP_STATE(process);
461         PAJE_COMM_STOP(process,task,channel);
462         MSG_RETURN(MSG_TRANSFER_FAILURE);
463       }
464     }
465     DEBUG0("Communication not initiated yet. Let's block!");
466     if(max_duration>0)
467       __MSG_process_block(max_duration,task->name);
468     else
469       __MSG_process_block(-1,task->name);
470
471     first_time = 0;
472
473     if(surf_workstation_resource->extension_public->
474        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
475       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
476                       task);
477       PAJE_PROCESS_POP_STATE(process);
478       PAJE_COMM_STOP(process,task,channel);
479       MSG_task_destroy(task);
480       MSG_RETURN(MSG_HOST_FAILURE);
481     }
482   }
483   DEBUG0("Registering to this communication");
484   surf_workstation_resource->common_public->action_use(task_simdata->comm);
485   process->simdata->put_host = NULL;
486   process->simdata->put_channel = -1;
487
488
489   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
490
491   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
492   while (state==SURF_ACTION_RUNNING) {
493     DEBUG0("Waiting for action termination");
494     __MSG_task_wait_event(process, task);
495     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
496   }
497   DEBUG0("Action terminated");
498   task->simdata->rate=-1.0; /* Sets the rate back to default */
499
500   PAJE_PROCESS_POP_STATE(process);  
501
502   if(state == SURF_ACTION_DONE) {
503     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
504       task_simdata->comm = NULL;
505     MSG_task_destroy(task);
506     MSG_RETURN(MSG_OK);
507   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
508             == SURF_CPU_OFF) {
509     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
510       task_simdata->comm = NULL;
511     MSG_task_destroy(task);
512     MSG_RETURN(MSG_HOST_FAILURE);
513   } else { 
514     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
515       task_simdata->comm = NULL;
516     MSG_task_destroy(task);
517     MSG_RETURN(MSG_TRANSFER_FAILURE);
518   }
519 }
520 /** \ingroup msg_gos_functions
521  * \brief Put a task on a channel of an host and waits for the end of the
522  * transmission.
523  *
524  * This function is used for describing the behavior of an agent. It
525  * takes three parameter.
526  * \param task a #m_task_t to send on another location. This task
527    will not be usable anymore when the function will return. There is
528    no automatic task duplication and you have to save your parameters
529    before calling this function. Tasks are unique and once it has been
530    sent to another location, you should not access it anymore. You do
531    not need to call MSG_task_destroy() but to avoid using, as an
532    effect of inattention, this task anymore, you definitely should
533    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
534    can be transfered iff it has been correctly created with
535    MSG_task_create().
536  * \param dest the destination of the message
537  * \param channel the channel on which the agent should put this
538    task. This value has to be >=0 and < than the maximal number of
539    channels fixed with MSG_set_channel_number().
540  * \return #MSG_FATAL if \a task is not properly initialized and
541  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
542  * this function was called was shut down. Returns
543  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
544  * (network failure, dest failure)
545  */
546 MSG_error_t MSG_task_put(m_task_t task,
547                          m_host_t dest, m_channel_t channel)
548 {
549   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
550 }
551
552 /** \ingroup msg_gos_functions
553  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
554  * rate.
555  *
556  * \sa MSG_task_put
557  */
558 MSG_error_t MSG_task_put_bounded(m_task_t task,
559                                  m_host_t dest, m_channel_t channel,
560                                  double max_rate)
561 {
562   MSG_error_t res = MSG_OK;
563   task->simdata->rate=max_rate;
564   res = MSG_task_put(task, dest, channel);
565   return(res);
566 }
567
568 /** \ingroup msg_gos_functions
569  * \brief Executes a task and waits for its termination.
570  *
571  * This function is used for describing the behavior of an agent. It
572  * takes only one parameter.
573  * \param task a #m_task_t to execute on the location on which the
574    agent is running.
575  * \return #MSG_FATAL if \a task is not properly initialized and
576  * #MSG_OK otherwise.
577  */
578 MSG_error_t MSG_task_execute(m_task_t task)
579 {
580   m_process_t process = MSG_process_self();
581   MSG_error_t res;
582
583   DEBUG1("Computing on %s", process->simdata->host->name);
584
585   __MSG_task_execute(process, task);
586
587   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
588   res = __MSG_wait_for_computation(process,task);
589   PAJE_PROCESS_POP_STATE(process);
590   return res;
591 }
592
593 void __MSG_task_execute(m_process_t process, m_task_t task)
594 {
595   simdata_task_t simdata = NULL;
596
597   CHECK_HOST();
598
599   simdata = task->simdata;
600   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
601               "This taks is executed somewhere else. Go fix your code!");
602   task->simdata->using++;
603   simdata->compute = surf_workstation_resource->extension_public->
604     execute(MSG_process_get_host(process)->simdata->host,
605             simdata->computation_amount);
606   surf_workstation_resource->common_public->
607     set_priority(simdata->compute, simdata->priority);
608
609   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
610   task->simdata->using--;
611 }
612
613 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
614 {
615   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
616   simdata_task_t simdata = task->simdata;
617
618   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
619   simdata->using++;
620   do {
621     __MSG_task_wait_event(process, task);
622     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
623   } while (state==SURF_ACTION_RUNNING);
624   simdata->using--;
625     
626
627   if(state == SURF_ACTION_DONE) {
628     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
629       simdata->compute = NULL;
630     simdata->computation_amount = 0.0;
631     XBT_OUT;
632     MSG_RETURN(MSG_OK);
633   } else if(surf_workstation_resource->extension_public->
634             get_state(MSG_process_get_host(process)->simdata->host) 
635             == SURF_CPU_OFF) {
636     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
637       simdata->compute = NULL;
638     XBT_OUT;
639     MSG_RETURN(MSG_HOST_FAILURE);
640   } else {
641     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
642       simdata->compute = NULL;
643     XBT_OUT;
644     MSG_RETURN(MSG_TASK_CANCELLED);
645   }
646 }
647 /** \ingroup m_task_management
648  * \brief Creates a new #m_task_t (a parallel one....).
649  *
650  * A constructor for #m_task_t taking six arguments and returning the 
651    corresponding object.
652  * \param name a name for the object. It is for user-level information
653    and can be NULL.
654  * \param host_nb the number of hosts implied in the parallel task.
655  * \param host_list an array of \p host_nb m_host_t.
656  * \param computation_amount an array of \p host_nb
657    doubles. computation_amount[i] is the total number of operations
658    that have to be performed on host_list[i].
659  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
660  * \param data a pointer to any data may want to attach to the new
661    object.  It is for user-level information and can be NULL. It can
662    be retrieved with the function \ref MSG_task_get_data.
663  * \see m_task_t
664  * \return The new corresponding object.
665  */
666 m_task_t MSG_parallel_task_create(const char *name, 
667                                   int host_nb,
668                                   const m_host_t *host_list,
669                                   double *computation_amount,
670                                   double *communication_amount,
671                                   void *data)
672 {
673   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
674   m_task_t task = xbt_new0(s_m_task_t,1);
675   int i;
676
677   /* Task structure */
678   task->name = xbt_strdup(name);
679   task->simdata = simdata;
680   task->data = data;
681
682   /* Simulator Data */
683   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
684   simdata->rate = -1.0;
685   simdata->using = 1;
686   simdata->sender = NULL;
687   simdata->source = NULL;
688   simdata->host_nb = host_nb;
689   
690   simdata->host_list = xbt_new0(void *, host_nb);
691   simdata->comp_amount = computation_amount;
692   simdata->comm_amount = communication_amount;
693
694   for(i=0;i<host_nb;i++)
695     simdata->host_list[i] = host_list[i]->simdata->host;
696
697   return task;
698 }
699
700
701 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
702 {
703   simdata_task_t simdata = NULL;
704
705   CHECK_HOST();
706
707   simdata = task->simdata;
708
709   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
710
711   simdata->compute = surf_workstation_resource->extension_public->
712   execute_parallel_task(task->simdata->host_nb,
713                         task->simdata->host_list,
714                         task->simdata->comp_amount,
715                         task->simdata->comm_amount,
716                         1.0,
717                         -1.0);
718   if(simdata->compute)
719     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
720 }
721
722 MSG_error_t MSG_parallel_task_execute(m_task_t task)
723 {
724   m_process_t process = MSG_process_self();
725   MSG_error_t res;
726
727   DEBUG0("Computing on a tons of guys");
728   
729   __MSG_parallel_task_execute(process, task);
730
731   if(task->simdata->compute)
732     res = __MSG_wait_for_computation(process,task);
733   else 
734     res = MSG_OK;
735
736   return res;  
737 }
738
739
740 /** \ingroup msg_gos_functions
741  * \brief Sleep for the specified number of seconds
742  *
743  * Makes the current process sleep until \a time seconds have elapsed.
744  *
745  * \param nb_sec a number of second
746  */
747 MSG_error_t MSG_process_sleep(double nb_sec)
748 {
749   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
750   m_process_t process = MSG_process_self();
751   m_task_t dummy = NULL;
752   simdata_task_t simdata = NULL;
753
754   CHECK_HOST();
755   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
756   simdata = dummy->simdata;
757
758   simdata->compute = surf_workstation_resource->extension_public->
759     sleep(MSG_process_get_host(process)->simdata->host,
760             simdata->computation_amount);
761   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
762
763   
764   simdata->using++;
765   do {
766     __MSG_task_wait_event(process, dummy);
767     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
768   } while (state==SURF_ACTION_RUNNING);
769   simdata->using--;
770     
771   if(state == SURF_ACTION_DONE) {
772     if(surf_workstation_resource->extension_public->
773        get_state(MSG_process_get_host(process)->simdata->host) 
774        == SURF_CPU_OFF) {
775       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
776         simdata->compute = NULL;
777       MSG_RETURN(MSG_HOST_FAILURE);
778     }
779     if(__MSG_process_isBlocked(process)) {
780       __MSG_process_unblock(MSG_process_self());
781     }
782     if(surf_workstation_resource->extension_public->
783        get_state(MSG_process_get_host(process)->simdata->host) 
784        == SURF_CPU_OFF) {
785       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
786         simdata->compute = NULL;
787       MSG_RETURN(MSG_HOST_FAILURE);
788     }
789     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
790       simdata->compute = NULL;
791     MSG_task_destroy(dummy);
792     MSG_RETURN(MSG_OK);
793   } else MSG_RETURN(MSG_HOST_FAILURE);
794 }
795
796 /** \ingroup msg_gos_functions
797  * \brief Return the number of MSG tasks currently running on
798  * the host of the current running process.
799  */
800 static int MSG_get_msgload(void) 
801 {
802   m_process_t process;
803    
804   CHECK_HOST();
805   
806   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
807   process = MSG_process_self();
808   return xbt_fifo_size(process->simdata->host->simdata->process_list);
809 }
810
811 /** \ingroup msg_gos_functions
812  *
813  * \brief Return the last value returned by a MSG function (except
814  * MSG_get_errno...).
815  */
816 MSG_error_t MSG_get_errno(void)
817 {
818   return PROCESS_GET_ERRNO();
819 }