Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bab95282a2c7be99fd2161f6dedd5c433f20900b
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   m_task_t task_to_wait_for;
28   simdata_task_t t_simdata = NULL;
29   simdata_host_t h_simdata = NULL;
30   int first_time = 1;
31   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
32   xbt_fifo_item_t item = NULL;
33
34   CHECK_HOST();
35   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
36   /* Sanity check */
37   xbt_assert0(task,"Null pointer for the task\n");
38
39   if (*task) 
40     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
41
42   /* Get the task */
43   h = MSG_host_self();
44   h_simdata = h->simdata;
45
46   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
47
48   while (1) {
49     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
50       if(!host) {
51         t = xbt_fifo_shift(h_simdata->mbox[channel]);
52         break;
53       } else {
54         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
55           if(t->simdata->source==host) break;
56         }
57         if(item) {
58           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
59           break;
60         } 
61       }
62     }
63                                                        
64     if(max_duration>0) {
65       if(!first_time) {
66         PAJE_PROCESS_POP_STATE(process);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       MSG_RETURN(MSG_HOST_FAILURE);
86     /* OK, we should both be ready now. Are you there ? */
87   }
88
89   DEBUG1("OK, got a task (%s)", t->name);
90
91   t_simdata = t->simdata;
92   /*   *task = __MSG_task_copy(t); */
93   *task=t;
94
95   /* Transfer */
96   t_simdata->using++;
97
98   while(MSG_process_is_suspended(t_simdata->sender)) {
99     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
100            t_simdata->sender->name);
101     task_to_wait_for = t_simdata->sender->simdata->waiting_task;
102     if(__MSG_process_isBlocked(t_simdata->sender)) {
103       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
104       __MSG_process_unblock(t_simdata->sender);
105       task_to_wait_for->simdata->using++;
106       __MSG_task_wait_event(process, task_to_wait_for);
107       MSG_task_destroy(task_to_wait_for);
108     } else {
109       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
110       task_to_wait_for->simdata->using++;
111       __MSG_task_wait_event(process, task_to_wait_for);
112       MSG_task_destroy(task_to_wait_for);
113       DEBUG0("He's resumed. He should block again. So let's free him.");
114       __MSG_process_unblock(t_simdata->sender);
115       break;
116     }
117   }
118   DEBUG0("Calling SURF for communication creation");
119   t_simdata->comm = surf_workstation_resource->extension_public->
120     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
121                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
122   
123   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
124
125   if(__MSG_process_isBlocked(t_simdata->sender)) {
126     DEBUG1("Unblocking %s",t_simdata->sender->name);
127     __MSG_process_unblock(t_simdata->sender);
128   }
129
130   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
131
132   do {
133     DEBUG0("Waiting for action termination");
134     __MSG_task_wait_event(process, t);
135     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
136   } while (state==SURF_ACTION_RUNNING);
137   DEBUG0("Action terminated");
138
139   if(t->simdata->using>1) {
140     xbt_fifo_unshift(msg_global->process_to_run,process);
141     xbt_context_yield();
142   }
143
144   PAJE_PROCESS_POP_STATE(process);
145   PAJE_COMM_STOP(process,t,channel);
146
147   if(state == SURF_ACTION_DONE) {
148     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
149       t_simdata->comm = NULL;
150     MSG_RETURN(MSG_OK);
151   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
152           == SURF_CPU_OFF) {
153     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
154       t_simdata->comm = NULL;
155     MSG_RETURN(MSG_HOST_FAILURE);
156   } else {
157     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
158       t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Listen on a channel and wait for receiving a task.
165  *
166  * It takes two parameters.
167  * \param task a memory location for storing a #m_task_t. It will
168    hold a task when this function will return. Thus \a task should not
169    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
170    those two condition does not hold, there will be a warning message.
171  * \param channel the channel on which the agent should be
172    listening. This value has to be >=0 and < than the maximal
173    number of channels fixed with MSG_set_channel_number().
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get(m_task_t * task,
178                          m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
206 }
207
208 /** \ingroup msg_gos_functions
209  * \brief Listen on \a channel and waits for receiving a task from \a host.
210  *
211  * It takes three parameters.
212  * \param task a memory location for storing a #m_task_t. It will
213    hold a task when this function will return. Thus \a task should not
214    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
215    those two condition does not hold, there will be a warning message.
216  * \param channel the channel on which the agent should be
217    listening. This value has to be >=0 and < than the maximal
218    number of channels fixed with MSG_set_channel_number().
219  * \param host the host that is to be watched.
220  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
221    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
222  */
223 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
224                                    m_host_t host)
225 {
226   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
227 }
228
229 /** \ingroup msg_gos_functions
230  * \brief Test whether there is a pending communication on a channel.
231  *
232  * It takes one parameter.
233  * \param channel the channel on which the agent should be
234    listening. This value has to be >=0 and < than the maximal
235    number of channels fixed with MSG_set_channel_number().
236  * \return 1 if there is a pending communication and 0 otherwise
237  */
238 int MSG_task_Iprobe(m_channel_t channel)
239 {
240   m_host_t h = NULL;
241
242   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
243   DEBUG2("Probing on channel %d (%s)", channel,h->name);
244   CHECK_HOST();
245   h = MSG_host_self();
246   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
247 }
248
249 /** \ingroup msg_gos_functions
250  * \brief Test whether there is a pending communication on a channel, and who sent it.
251  *
252  * It takes one parameter.
253  * \param channel the channel on which the agent should be
254    listening. This value has to be >=0 and < than the maximal
255    number of channels fixed with MSG_set_channel_number().
256  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
257  */
258 int MSG_task_probe_from(m_channel_t channel)
259 {
260   m_host_t h = NULL;
261   xbt_fifo_item_t item;
262   m_task_t t;
263
264   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
265   CHECK_HOST();
266   h = MSG_host_self();
267
268   DEBUG2("Probing on channel %d (%s)", channel,h->name);
269    
270   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
271   if (!item || !(t = xbt_fifo_get_item_content(item)))
272     return -1;
273    
274   return MSG_process_get_PID(t->simdata->sender);
275 }
276
277 /** \ingroup msg_gos_functions
278  * \brief Wait for at most \a max_duration second for a task reception
279    on \a channel. *\a PID is updated with the PID of the first process
280    that triggered this event if any.
281  *
282  * It takes three parameters:
283  * \param channel the channel on which the agent should be
284    listening. This value has to be >=0 and < than the maximal.
285    number of channels fixed with MSG_set_channel_number().
286  * \param PID a memory location for storing an int.
287  * \param max_duration the maximum time to wait for a task before
288     giving up. In the case of a reception, *\a PID will be updated
289     with the PID of the first process to send a task.
290  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
291    and #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
294                                     int *PID)
295 {
296   m_host_t h = NULL;
297   simdata_host_t h_simdata = NULL;
298   xbt_fifo_item_t item;
299   m_task_t t;
300   int first_time = 1;
301   m_process_t process = MSG_process_self();
302
303   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
304   if(PID) {
305     *PID = -1;
306   }
307
308   if(max_duration==0.0) {
309     *PID = MSG_task_probe_from(channel);
310     MSG_RETURN(MSG_OK);
311   } else {
312     CHECK_HOST();
313     h = MSG_host_self();
314     h_simdata = h->simdata;
315     
316     DEBUG2("Probing on channel %d (%s)", channel,h->name);
317     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
318       if(max_duration>0) {
319         if(!first_time) {
320           MSG_RETURN(MSG_OK);
321         }
322       }
323       xbt_assert2(!(h_simdata->sleeping[channel]),
324                   "A process (%s(%d)) is already blocked on this channel",
325                   h_simdata->sleeping[channel]->name,
326                   h_simdata->sleeping[channel]->simdata->PID);
327       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
328       if(max_duration>0) {
329         __MSG_process_block(max_duration,"");
330       } else {
331         __MSG_process_block(-1,"");
332       }
333       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
334          == SURF_CPU_OFF) {
335         MSG_RETURN(MSG_HOST_FAILURE);
336       }
337       h_simdata->sleeping[channel] = NULL;
338       first_time = 0;
339     }
340     if (!item || !(t = xbt_fifo_get_item_content(item))) {
341       MSG_RETURN(MSG_OK);
342     }
343     if(PID) {
344       *PID = MSG_process_get_PID(t->simdata->sender);
345     }
346     MSG_RETURN(MSG_OK);
347   }
348 }
349
350
351 /** \ingroup msg_gos_functions
352
353  * \brief Return the number of tasks waiting to be received on a \a
354    channel and sent by \a host.
355  *
356  * It takes two parameters.
357  * \param channel the channel on which the agent should be
358    listening. This value has to be >=0 and < than the maximal
359    number of channels fixed with MSG_set_channel_number().
360  * \param host the host that is to be watched.
361  * \return the number of tasks waiting to be received on \a channel
362    and sent by \a host.
363  */
364 int MSG_task_probe_from_host(int channel, m_host_t host)
365 {
366   xbt_fifo_item_t item;
367   m_task_t t;
368   int count = 0;
369   m_host_t h = NULL;
370   
371   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
372   CHECK_HOST();
373   h = MSG_host_self();
374
375   DEBUG2("Probing on channel %d (%s)", channel,h->name);
376    
377   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
378     if(t->simdata->source==host) count++;
379   }
380    
381   return count;
382 }
383
384 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
385  * host (with a timeout on the waiting of the destination host) and
386  * waits for the end of the transmission.
387  *
388  * This function is used for describing the behavior of an agent. It
389  * takes four parameter.
390  * \param task a #m_task_t to send on another location. This task
391    will not be usable anymore when the function will return. There is
392    no automatic task duplication and you have to save your parameters
393    before calling this function. Tasks are unique and once it has been
394    sent to another location, you should not access it anymore. You do
395    not need to call MSG_task_destroy() but to avoid using, as an
396    effect of inattention, this task anymore, you definitely should
397    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
398    can be transfered iff it has been correctly created with
399    MSG_task_create().
400  * \param dest the destination of the message
401  * \param channel the channel on which the agent should put this
402    task. This value has to be >=0 and < than the maximal number of
403    channels fixed with MSG_set_channel_number().
404  * \param max_duration the maximum time to wait for a task before giving
405     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
406     will not be modified 
407  * \return #MSG_FATAL if \a task is not properly initialized and
408    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
409    this function was called was shut down. Returns
410    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
411    (network failure, dest failure, timeout...)
412  */
413 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
414                                       m_channel_t channel, double max_duration)
415 {
416   m_process_t process = MSG_process_self();
417   simdata_task_t task_simdata = NULL;
418   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
419   m_host_t local_host = NULL;
420   m_host_t remote_host = NULL;
421   int first_time = 1;
422
423   CHECK_HOST();
424
425   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
426
427   task_simdata = task->simdata;
428   task_simdata->sender = process;
429   task_simdata->source = MSG_process_get_host(process);
430   xbt_assert0(task_simdata->using==1,
431               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
432   task_simdata->comm = NULL;
433   
434   local_host = ((simdata_process_t) process->simdata)->host;
435   remote_host = dest;
436
437   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
438          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
439
440   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
441                 mbox[channel], task);
442
443   PAJE_COMM_START(process,task,channel);
444     
445   if(remote_host->simdata->sleeping[channel]) {
446     DEBUG0("Somebody is listening. Let's wake him up!");
447     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
448   }
449
450   process->simdata->put_host = dest;
451   process->simdata->put_channel = channel;
452   while(!(task_simdata->comm)) {
453     if(max_duration>0) {
454       if(!first_time) {
455         PAJE_PROCESS_POP_STATE(process);
456         PAJE_COMM_STOP(process,task,channel);
457         MSG_RETURN(MSG_TRANSFER_FAILURE);
458       }
459     }
460     DEBUG0("Communication not initiated yet. Let's block!");
461     if(max_duration>0)
462       __MSG_process_block(max_duration,task->name);
463     else
464       __MSG_process_block(-1,task->name);
465
466     first_time = 0;
467
468     if(surf_workstation_resource->extension_public->
469        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
470       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
471                       task);
472       PAJE_PROCESS_POP_STATE(process);
473       PAJE_COMM_STOP(process,task,channel);
474       MSG_task_destroy(task);
475       MSG_RETURN(MSG_HOST_FAILURE);
476     }
477   }
478   DEBUG0("Registering to this communication");
479   surf_workstation_resource->common_public->action_use(task_simdata->comm);
480   process->simdata->put_host = NULL;
481   process->simdata->put_channel = -1;
482
483
484   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
485
486   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
487   while (state==SURF_ACTION_RUNNING) {
488     DEBUG0("Waiting for action termination");
489     __MSG_task_wait_event(process, task);
490     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
491   }
492   DEBUG0("Action terminated");
493   task->simdata->rate=-1.0; /* Sets the rate back to default */
494
495   PAJE_PROCESS_POP_STATE(process);  
496
497   if(state == SURF_ACTION_DONE) {
498     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
499       task_simdata->comm = NULL;
500     MSG_task_destroy(task);
501     MSG_RETURN(MSG_OK);
502   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
503             == SURF_CPU_OFF) {
504     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
505       task_simdata->comm = NULL;
506     MSG_task_destroy(task);
507     MSG_RETURN(MSG_HOST_FAILURE);
508   } else { 
509     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
510       task_simdata->comm = NULL;
511     MSG_task_destroy(task);
512     MSG_RETURN(MSG_TRANSFER_FAILURE);
513   }
514 }
515 /** \ingroup msg_gos_functions
516  * \brief Put a task on a channel of an host and waits for the end of the
517  * transmission.
518  *
519  * This function is used for describing the behavior of an agent. It
520  * takes three parameter.
521  * \param task a #m_task_t to send on another location. This task
522    will not be usable anymore when the function will return. There is
523    no automatic task duplication and you have to save your parameters
524    before calling this function. Tasks are unique and once it has been
525    sent to another location, you should not access it anymore. You do
526    not need to call MSG_task_destroy() but to avoid using, as an
527    effect of inattention, this task anymore, you definitely should
528    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
529    can be transfered iff it has been correctly created with
530    MSG_task_create().
531  * \param dest the destination of the message
532  * \param channel the channel on which the agent should put this
533    task. This value has to be >=0 and < than the maximal number of
534    channels fixed with MSG_set_channel_number().
535  * \return #MSG_FATAL if \a task is not properly initialized and
536  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
537  * this function was called was shut down. Returns
538  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
539  * (network failure, dest failure)
540  */
541 MSG_error_t MSG_task_put(m_task_t task,
542                          m_host_t dest, m_channel_t channel)
543 {
544   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
545 }
546
547 /** \ingroup msg_gos_functions
548  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
549  * rate.
550  *
551  * \sa MSG_task_put
552  */
553 MSG_error_t MSG_task_put_bounded(m_task_t task,
554                                  m_host_t dest, m_channel_t channel,
555                                  double max_rate)
556 {
557   MSG_error_t res = MSG_OK;
558   task->simdata->rate=max_rate;
559   res = MSG_task_put(task, dest, channel);
560   return(res);
561 }
562
563 /** \ingroup msg_gos_functions
564  * \brief Executes a task and waits for its termination.
565  *
566  * This function is used for describing the behavior of an agent. It
567  * takes only one parameter.
568  * \param task a #m_task_t to execute on the location on which the
569    agent is running.
570  * \return #MSG_FATAL if \a task is not properly initialized and
571  * #MSG_OK otherwise.
572  */
573 MSG_error_t MSG_task_execute(m_task_t task)
574 {
575   m_process_t process = MSG_process_self();
576   MSG_error_t res;
577
578   DEBUG1("Computing on %s", process->simdata->host->name);
579
580   __MSG_task_execute(process, task);
581
582   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
583   res = __MSG_wait_for_computation(process,task);
584   PAJE_PROCESS_POP_STATE(process);
585   return res;
586 }
587
588 void __MSG_task_execute(m_process_t process, m_task_t task)
589 {
590   simdata_task_t simdata = NULL;
591
592   CHECK_HOST();
593
594   simdata = task->simdata;
595   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
596               "This taks is executed somewhere else. Go fix your code!");
597   simdata->using++;
598   simdata->compute = surf_workstation_resource->extension_public->
599     execute(MSG_process_get_host(process)->simdata->host,
600             simdata->computation_amount);
601   surf_workstation_resource->common_public->
602     set_priority(simdata->compute, simdata->priority);
603
604   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
605   simdata->using--;
606 }
607
608 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
609 {
610   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
611   simdata_task_t simdata = task->simdata;
612
613   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
614   simdata->using++;
615   do {
616     __MSG_task_wait_event(process, task);
617     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
618   } while (state==SURF_ACTION_RUNNING);
619   simdata->using--;
620     
621
622   if(state == SURF_ACTION_DONE) {
623     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
624       simdata->compute = NULL;
625     simdata->computation_amount = 0.0;
626     XBT_OUT;
627     MSG_RETURN(MSG_OK);
628   } else if(surf_workstation_resource->extension_public->
629             get_state(MSG_process_get_host(process)->simdata->host) 
630             == SURF_CPU_OFF) {
631     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
632       simdata->compute = NULL;
633     XBT_OUT;
634     MSG_RETURN(MSG_HOST_FAILURE);
635   } else {
636     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
637       simdata->compute = NULL;
638     XBT_OUT;
639     MSG_RETURN(MSG_TASK_CANCELLED);
640   }
641 }
642 /** \ingroup m_task_management
643  * \brief Creates a new #m_task_t (a parallel one....).
644  *
645  * A constructor for #m_task_t taking six arguments and returning the 
646    corresponding object.
647  * \param name a name for the object. It is for user-level information
648    and can be NULL.
649  * \param host_nb the number of hosts implied in the parallel task.
650  * \param host_list an array of \p host_nb m_host_t.
651  * \param computation_amount an array of \p host_nb
652    doubles. computation_amount[i] is the total number of operations
653    that have to be performed on host_list[i].
654  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
655  * \param data a pointer to any data may want to attach to the new
656    object.  It is for user-level information and can be NULL. It can
657    be retrieved with the function \ref MSG_task_get_data.
658  * \see m_task_t
659  * \return The new corresponding object.
660  */
661 m_task_t MSG_parallel_task_create(const char *name, 
662                                   int host_nb,
663                                   const m_host_t *host_list,
664                                   double *computation_amount,
665                                   double *communication_amount,
666                                   void *data)
667 {
668   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
669   m_task_t task = xbt_new0(s_m_task_t,1);
670   int i;
671
672   /* Task structure */
673   task->name = xbt_strdup(name);
674   task->simdata = simdata;
675   task->data = data;
676
677   /* Simulator Data */
678   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
679   simdata->rate = -1.0;
680   simdata->using = 1;
681   simdata->sender = NULL;
682   simdata->source = NULL;
683   simdata->host_nb = host_nb;
684   
685   simdata->host_list = xbt_new0(void *, host_nb);
686   simdata->comp_amount = computation_amount;
687   simdata->comm_amount = communication_amount;
688
689   for(i=0;i<host_nb;i++)
690     simdata->host_list[i] = host_list[i]->simdata->host;
691
692   return task;
693 }
694
695
696 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
697 {
698   simdata_task_t simdata = NULL;
699
700   CHECK_HOST();
701
702   simdata = task->simdata;
703
704   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
705
706   simdata->compute = surf_workstation_resource->extension_public->
707   execute_parallel_task(task->simdata->host_nb,
708                         task->simdata->host_list,
709                         task->simdata->comp_amount,
710                         task->simdata->comm_amount,
711                         1.0,
712                         -1.0);
713   if(simdata->compute)
714     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
715 }
716
717 MSG_error_t MSG_parallel_task_execute(m_task_t task)
718 {
719   m_process_t process = MSG_process_self();
720   MSG_error_t res;
721
722   DEBUG0("Computing on a tons of guys");
723   
724   __MSG_parallel_task_execute(process, task);
725
726   if(task->simdata->compute)
727     res = __MSG_wait_for_computation(process,task);
728   else 
729     res = MSG_OK;
730
731   return res;  
732 }
733
734
735 /** \ingroup msg_gos_functions
736  * \brief Sleep for the specified number of seconds
737  *
738  * Makes the current process sleep until \a time seconds have elapsed.
739  *
740  * \param nb_sec a number of second
741  */
742 MSG_error_t MSG_process_sleep(double nb_sec)
743 {
744   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
745   m_process_t process = MSG_process_self();
746   m_task_t dummy = NULL;
747   simdata_task_t simdata = NULL;
748   
749   xbt_assert1(nb_sec>=0,"Invalid duration %g",nb_sec);
750
751   CHECK_HOST();
752   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
753   simdata = dummy->simdata;
754
755   simdata->compute = surf_workstation_resource->extension_public->
756     sleep(MSG_process_get_host(process)->simdata->host,
757             simdata->computation_amount);
758   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
759
760   
761   simdata->using++;
762   do {
763     __MSG_task_wait_event(process, dummy);
764     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
765   } while (state==SURF_ACTION_RUNNING);
766   simdata->using--;
767     
768   if(state == SURF_ACTION_DONE) {
769     if(surf_workstation_resource->extension_public->
770        get_state(MSG_process_get_host(process)->simdata->host) 
771        == SURF_CPU_OFF) {
772       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
773         simdata->compute = NULL;
774       MSG_RETURN(MSG_HOST_FAILURE);
775     }
776     if(__MSG_process_isBlocked(process)) {
777       __MSG_process_unblock(MSG_process_self());
778     }
779     if(surf_workstation_resource->extension_public->
780        get_state(MSG_process_get_host(process)->simdata->host) 
781        == SURF_CPU_OFF) {
782       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
783         simdata->compute = NULL;
784       MSG_RETURN(MSG_HOST_FAILURE);
785     }
786     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
787       simdata->compute = NULL;
788     MSG_task_destroy(dummy);
789     MSG_RETURN(MSG_OK);
790   } else MSG_RETURN(MSG_HOST_FAILURE);
791 }
792
793 /** \ingroup msg_gos_functions
794  * \brief Return the number of MSG tasks currently running on
795  * the host of the current running process.
796  */
797 static int MSG_get_msgload(void) 
798 {
799   m_process_t process;
800    
801   CHECK_HOST();
802   
803   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
804   process = MSG_process_self();
805   return xbt_fifo_size(process->simdata->host->simdata->process_list);
806 }
807
808 /** \ingroup msg_gos_functions
809  *
810  * \brief Return the last value returned by a MSG function (except
811  * MSG_get_errno...).
812  */
813 MSG_error_t MSG_get_errno(void)
814 {
815   return PROCESS_GET_ERRNO();
816 }