Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
It was hard to use FIFOs when xbt_fifo_push was not mentioned in the doc
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         PAJE_PROCESS_POP_STATE(process);
66         PAJE_COMM_STOP(process,t,channel);
67         MSG_RETURN(MSG_TRANSFER_FAILURE);
68       }
69     }
70     xbt_assert3(!(h_simdata->sleeping[channel]),
71                 "A process (%s(%d)) is already blocked on channel %d",
72                 h_simdata->sleeping[channel]->name,
73                 h_simdata->sleeping[channel]->simdata->PID,
74                 channel);
75     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
76     if(max_duration>0) {
77       __MSG_process_block(max_duration,"");
78     } else {
79       __MSG_process_block(-1,"");
80     }
81     h_simdata->sleeping[channel] = NULL;
82     first_time = 0;
83     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
84        == SURF_CPU_OFF)
85       PAJE_PROCESS_POP_STATE(process);
86       PAJE_COMM_STOP(process,t,channel);
87       MSG_RETURN(MSG_HOST_FAILURE);
88     /* OK, we should both be ready now. Are you there ? */
89   }
90
91   DEBUG1("OK, got a task (%s)", t->name);
92
93   t_simdata = t->simdata;
94   /*   *task = __MSG_task_copy(t); */
95   *task=t;
96
97   /* Transfer */
98   t_simdata->using++;
99
100   while(MSG_process_is_suspended(t_simdata->sender)) {
101     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
102            t_simdata->sender->name);
103     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
104     if(__MSG_process_isBlocked(t_simdata->sender)) {
105       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
106       __MSG_process_unblock(t_simdata->sender);
107       task_to_wait_for->simdata->using++;
108       __MSG_task_wait_event(process, task_to_wait_for);
109       MSG_task_destroy(task_to_wait_for);
110     } else {
111       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
112       task_to_wait_for->simdata->using++;
113       __MSG_task_wait_event(process, task_to_wait_for);
114       MSG_task_destroy(task_to_wait_for);
115       DEBUG0("He's resumed. He should block again. So let's free him.");
116       __MSG_process_unblock(t_simdata->sender);
117       break;
118     }
119   }
120   DEBUG0("Calling SURF for communication creation");
121   t_simdata->comm = surf_workstation_resource->extension_public->
122     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
123                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
124   
125   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
126
127   if(__MSG_process_isBlocked(t_simdata->sender)) {
128     DEBUG1("Unblocking %s",t_simdata->sender->name);
129     __MSG_process_unblock(t_simdata->sender);
130   }
131
132   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
133
134   do {
135     DEBUG0("Waiting for action termination");
136     __MSG_task_wait_event(process, t);
137     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
138   } while (state==SURF_ACTION_RUNNING);
139   DEBUG0("Action terminated");
140
141   if(t->simdata->using>1) {
142     xbt_fifo_unshift(msg_global->process_to_run,process);
143     xbt_context_yield();
144   }
145
146   PAJE_PROCESS_POP_STATE(process);
147   PAJE_COMM_STOP(process,t,channel);
148
149   if(state == SURF_ACTION_DONE) {
150     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
151       t_simdata->comm = NULL;
152     MSG_RETURN(MSG_OK);
153   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
154           == SURF_CPU_OFF) {
155     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
156       t_simdata->comm = NULL;
157     MSG_RETURN(MSG_HOST_FAILURE);
158   } else {
159     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
160       t_simdata->comm = NULL;
161     MSG_RETURN(MSG_TRANSFER_FAILURE);
162   }
163 }
164
165 /** \ingroup msg_gos_functions
166  * \brief Listen on a channel and wait for receiving a task.
167  *
168  * It takes two parameters.
169  * \param task a memory location for storing a #m_task_t. It will
170    hold a task when this function will return. Thus \a task should not
171    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
172    those two condition does not hold, there will be a warning message.
173  * \param channel the channel on which the agent should be
174    listening. This value has to be >=0 and < than the maximal
175    number of channels fixed with MSG_set_channel_number().
176  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
177  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
178  */
179 MSG_error_t MSG_task_get(m_task_t * task,
180                          m_channel_t channel)
181 {
182   return MSG_task_get_with_time_out(task, channel, -1);
183 }
184
185 /** \ingroup msg_gos_functions
186  * \brief Listen on a channel and wait for receiving a task with a timeout.
187  *
188  * It takes three parameters.
189  * \param task a memory location for storing a #m_task_t. It will
190    hold a task when this function will return. Thus \a task should not
191    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
192    those two condition does not hold, there will be a warning message.
193  * \param channel the channel on which the agent should be
194    listening. This value has to be >=0 and < than the maximal
195    number of channels fixed with MSG_set_channel_number().
196  * \param max_duration the maximum time to wait for a task before giving
197     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
198     will not be modified and will still be
199     equal to \c NULL when returning. 
200  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
201    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
202  */
203 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
204                                        m_channel_t channel,
205                                        double max_duration)
206 {
207   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
208 }
209
210 /** \ingroup msg_gos_functions
211  * \brief Listen on \a channel and waits for receiving a task from \a host.
212  *
213  * It takes three parameters.
214  * \param task a memory location for storing a #m_task_t. It will
215    hold a task when this function will return. Thus \a task should not
216    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
217    those two condition does not hold, there will be a warning message.
218  * \param channel the channel on which the agent should be
219    listening. This value has to be >=0 and < than the maximal
220    number of channels fixed with MSG_set_channel_number().
221  * \param host the host that is to be watched.
222  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
223    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
224  */
225 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
226                                    m_host_t host)
227 {
228   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
229 }
230
231 /** \ingroup msg_gos_functions
232  * \brief Test whether there is a pending communication on a channel.
233  *
234  * It takes one parameter.
235  * \param channel the channel on which the agent should be
236    listening. This value has to be >=0 and < than the maximal
237    number of channels fixed with MSG_set_channel_number().
238  * \return 1 if there is a pending communication and 0 otherwise
239  */
240 int MSG_task_Iprobe(m_channel_t channel)
241 {
242   m_host_t h = NULL;
243   simdata_host_t h_simdata = NULL;
244
245   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
246   DEBUG2("Probing on channel %d (%s)", channel,h->name);
247   CHECK_HOST();
248   h = MSG_host_self();
249   h_simdata = h->simdata;
250   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
251 }
252
253 /** \ingroup msg_gos_functions
254  * \brief Test whether there is a pending communication on a channel, and who sent it.
255  *
256  * It takes one parameter.
257  * \param channel the channel on which the agent should be
258    listening. This value has to be >=0 and < than the maximal
259    number of channels fixed with MSG_set_channel_number().
260  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
261  */
262 int MSG_task_probe_from(m_channel_t channel)
263 {
264   m_host_t h = NULL;
265   simdata_host_t h_simdata = NULL;
266   xbt_fifo_item_t item;
267   m_task_t t;
268
269   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
270   CHECK_HOST();
271   h = MSG_host_self();
272   h_simdata = h->simdata;
273
274   DEBUG2("Probing on channel %d (%s)", channel,h->name);
275    
276   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
277   if (!item || !(t = xbt_fifo_get_item_content(item)))
278     return -1;
279    
280   return MSG_process_get_PID(t->simdata->sender);
281 }
282
283 /** \ingroup msg_gos_functions
284  * \brief Wait for at most \a max_duration second for a task reception
285    on \a channel. *\a PID is updated with the PID of the first process
286    that triggered this event if any.
287  *
288  * It takes three parameters:
289  * \param channel the channel on which the agent should be
290    listening. This value has to be >=0 and < than the maximal.
291    number of channels fixed with MSG_set_channel_number().
292  * \param PID a memory location for storing an int.
293  * \param max_duration the maximum time to wait for a task before
294     giving up. In the case of a reception, *\a PID will be updated
295     with the PID of the first process to send a task.
296  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
297    and #MSG_OK otherwise.
298  */
299 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
300                                     int *PID)
301 {
302   m_host_t h = NULL;
303   simdata_host_t h_simdata = NULL;
304   xbt_fifo_item_t item;
305   m_task_t t;
306   int first_time = 1;
307   m_process_t process = MSG_process_self();
308
309   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
310   if(PID) {
311     *PID = -1;
312   }
313
314   if(max_duration==0.0) {
315     *PID = MSG_task_probe_from(channel);
316     MSG_RETURN(MSG_OK);
317   } else {
318     CHECK_HOST();
319     h = MSG_host_self();
320     h_simdata = h->simdata;
321     
322     DEBUG2("Probing on channel %d (%s)", channel,h->name);
323     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
324       if(max_duration>0) {
325         if(!first_time) {
326           MSG_RETURN(MSG_OK);
327         }
328       }
329       xbt_assert2(!(h_simdata->sleeping[channel]),
330                   "A process (%s(%d)) is already blocked on this channel",
331                   h_simdata->sleeping[channel]->name,
332                   h_simdata->sleeping[channel]->simdata->PID);
333       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
334       if(max_duration>0) {
335         __MSG_process_block(max_duration,"");
336       } else {
337         __MSG_process_block(-1,"");
338       }
339       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
340          == SURF_CPU_OFF) {
341         MSG_RETURN(MSG_HOST_FAILURE);
342       }
343       h_simdata->sleeping[channel] = NULL;
344       first_time = 0;
345     }
346     if (!item || !(t = xbt_fifo_get_item_content(item))) {
347       MSG_RETURN(MSG_OK);
348     }
349     if(PID) {
350       *PID = MSG_process_get_PID(t->simdata->sender);
351     }
352     MSG_RETURN(MSG_OK);
353   }
354 }
355
356
357 /** \ingroup msg_gos_functions
358
359  * \brief Return the number of tasks waiting to be received on a \a
360    channel and sent by \a host.
361  *
362  * It takes two parameters.
363  * \param channel the channel on which the agent should be
364    listening. This value has to be >=0 and < than the maximal
365    number of channels fixed with MSG_set_channel_number().
366  * \param host the host that is to be watched.
367  * \return the number of tasks waiting to be received on \a channel
368    and sent by \a host.
369  */
370 int MSG_task_probe_from_host(int channel, m_host_t host)
371 {
372   simdata_host_t h_simdata = NULL;
373   xbt_fifo_item_t item;
374   m_task_t t;
375   int count = 0;
376   m_host_t h = NULL;
377   
378   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
379   CHECK_HOST();
380   h = MSG_host_self();
381   h_simdata = h->simdata;
382
383   DEBUG2("Probing on channel %d (%s)", channel,h->name);
384    
385   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
386     if(t->simdata->source==host) count++;
387   }
388    
389   return count;
390 }
391
392 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
393  * host (with a timeout on the waiting of the destination host) and
394  * waits for the end of the transmission.
395  *
396  * This function is used for describing the behavior of an agent. It
397  * takes four parameter.
398  * \param task a #m_task_t to send on another location. This task
399    will not be usable anymore when the function will return. There is
400    no automatic task duplication and you have to save your parameters
401    before calling this function. Tasks are unique and once it has been
402    sent to another location, you should not access it anymore. You do
403    not need to call MSG_task_destroy() but to avoid using, as an
404    effect of inattention, this task anymore, you definitely should
405    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
406    can be transfered iff it has been correctly created with
407    MSG_task_create().
408  * \param dest the destination of the message
409  * \param channel the channel on which the agent should put this
410    task. This value has to be >=0 and < than the maximal number of
411    channels fixed with MSG_set_channel_number().
412  * \param max_duration the maximum time to wait for a task before giving
413     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
414     will not be modified 
415  * \return #MSG_FATAL if \a task is not properly initialized and
416    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
417    this function was called was shut down. Returns
418    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
419    (network failure, dest failure, timeout...)
420  */
421 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
422                                       m_channel_t channel, double max_duration)
423 {
424   m_process_t process = MSG_process_self();
425   simdata_task_t task_simdata = NULL;
426   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
427   m_host_t local_host = NULL;
428   m_host_t remote_host = NULL;
429   int first_time = 1;
430
431   CHECK_HOST();
432
433   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
434
435   task_simdata = task->simdata;
436   task_simdata->sender = process;
437   task_simdata->source = MSG_process_get_host(process);
438   xbt_assert0(task_simdata->using==1,
439               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
440   task_simdata->comm = NULL;
441   
442   local_host = ((simdata_process_t) process->simdata)->host;
443   remote_host = dest;
444
445   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
446          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
447
448   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
449                 mbox[channel], task);
450
451   PAJE_COMM_START(process,task,channel);
452     
453   if(remote_host->simdata->sleeping[channel]) {
454     DEBUG0("Somebody is listening. Let's wake him up!");
455     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
456   }
457
458   process->simdata->put_host = dest;
459   process->simdata->put_channel = channel;
460   while(!(task_simdata->comm)) {
461     if(max_duration>0) {
462       if(!first_time) {
463         PAJE_PROCESS_POP_STATE(process);
464         PAJE_COMM_STOP(process,task,channel);
465         MSG_RETURN(MSG_TRANSFER_FAILURE);
466       }
467     }
468     DEBUG0("Communication not initiated yet. Let's block!");
469     if(max_duration>0)
470       __MSG_process_block(max_duration,task->name);
471     else
472       __MSG_process_block(-1,task->name);
473
474     first_time = 0;
475
476     if(surf_workstation_resource->extension_public->
477        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
478       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
479                       task);
480       PAJE_PROCESS_POP_STATE(process);
481       PAJE_COMM_STOP(process,task,channel);
482       MSG_task_destroy(task);
483       MSG_RETURN(MSG_HOST_FAILURE);
484     }
485   }
486   DEBUG0("Registering to this communication");
487   surf_workstation_resource->common_public->action_use(task_simdata->comm);
488   process->simdata->put_host = NULL;
489   process->simdata->put_channel = -1;
490
491
492   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
493
494   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
495   while (state==SURF_ACTION_RUNNING) {
496     DEBUG0("Waiting for action termination");
497     __MSG_task_wait_event(process, task);
498     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
499   }
500   DEBUG0("Action terminated");
501   task->simdata->rate=-1.0; /* Sets the rate back to default */
502
503   PAJE_PROCESS_POP_STATE(process);  
504
505   if(state == SURF_ACTION_DONE) {
506     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
507       task_simdata->comm = NULL;
508     MSG_task_destroy(task);
509     MSG_RETURN(MSG_OK);
510   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
511             == SURF_CPU_OFF) {
512     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
513       task_simdata->comm = NULL;
514     MSG_task_destroy(task);
515     MSG_RETURN(MSG_HOST_FAILURE);
516   } else { 
517     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
518       task_simdata->comm = NULL;
519     MSG_task_destroy(task);
520     MSG_RETURN(MSG_TRANSFER_FAILURE);
521   }
522 }
523 /** \ingroup msg_gos_functions
524  * \brief Put a task on a channel of an host and waits for the end of the
525  * transmission.
526  *
527  * This function is used for describing the behavior of an agent. It
528  * takes three parameter.
529  * \param task a #m_task_t to send on another location. This task
530    will not be usable anymore when the function will return. There is
531    no automatic task duplication and you have to save your parameters
532    before calling this function. Tasks are unique and once it has been
533    sent to another location, you should not access it anymore. You do
534    not need to call MSG_task_destroy() but to avoid using, as an
535    effect of inattention, this task anymore, you definitely should
536    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
537    can be transfered iff it has been correctly created with
538    MSG_task_create().
539  * \param dest the destination of the message
540  * \param channel the channel on which the agent should put this
541    task. This value has to be >=0 and < than the maximal number of
542    channels fixed with MSG_set_channel_number().
543  * \return #MSG_FATAL if \a task is not properly initialized and
544  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
545  * this function was called was shut down. Returns
546  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
547  * (network failure, dest failure)
548  */
549 MSG_error_t MSG_task_put(m_task_t task,
550                          m_host_t dest, m_channel_t channel)
551 {
552   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
553 }
554
555 /** \ingroup msg_gos_functions
556  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
557  * rate.
558  *
559  * \sa MSG_task_put
560  */
561 MSG_error_t MSG_task_put_bounded(m_task_t task,
562                                  m_host_t dest, m_channel_t channel,
563                                  double max_rate)
564 {
565   MSG_error_t res = MSG_OK;
566   task->simdata->rate=max_rate;
567   res = MSG_task_put(task, dest, channel);
568   return(res);
569 }
570
571 /** \ingroup msg_gos_functions
572  * \brief Executes a task and waits for its termination.
573  *
574  * This function is used for describing the behavior of an agent. It
575  * takes only one parameter.
576  * \param task a #m_task_t to execute on the location on which the
577    agent is running.
578  * \return #MSG_FATAL if \a task is not properly initialized and
579  * #MSG_OK otherwise.
580  */
581 MSG_error_t MSG_task_execute(m_task_t task)
582 {
583   m_process_t process = MSG_process_self();
584   MSG_error_t res;
585
586   DEBUG1("Computing on %s", process->simdata->host->name);
587
588   __MSG_task_execute(process, task);
589
590   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
591   res = __MSG_wait_for_computation(process,task);
592   PAJE_PROCESS_POP_STATE(process);
593   return res;
594 }
595
596 void __MSG_task_execute(m_process_t process, m_task_t task)
597 {
598   simdata_task_t simdata = NULL;
599
600   CHECK_HOST();
601
602   simdata = task->simdata;
603   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
604               "This taks is executed somewhere else. Go fix your code!");
605   task->simdata->using++;
606   simdata->compute = surf_workstation_resource->extension_public->
607     execute(MSG_process_get_host(process)->simdata->host,
608             simdata->computation_amount);
609   surf_workstation_resource->common_public->
610     set_priority(simdata->compute, simdata->priority);
611
612   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
613   task->simdata->using--;
614 }
615
616 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
617 {
618   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
619   simdata_task_t simdata = task->simdata;
620
621   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
622   simdata->using++;
623   do {
624     __MSG_task_wait_event(process, task);
625     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
626   } while (state==SURF_ACTION_RUNNING);
627   simdata->using--;
628     
629
630   if(state == SURF_ACTION_DONE) {
631     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
632       simdata->compute = NULL;
633     simdata->computation_amount = 0.0;
634     XBT_OUT;
635     MSG_RETURN(MSG_OK);
636   } else if(surf_workstation_resource->extension_public->
637             get_state(MSG_process_get_host(process)->simdata->host) 
638             == SURF_CPU_OFF) {
639     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
640       simdata->compute = NULL;
641     XBT_OUT;
642     MSG_RETURN(MSG_HOST_FAILURE);
643   } else {
644     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
645       simdata->compute = NULL;
646     XBT_OUT;
647     MSG_RETURN(MSG_TASK_CANCELLED);
648   }
649 }
650 /** \ingroup m_task_management
651  * \brief Creates a new #m_task_t (a parallel one....).
652  *
653  * A constructor for #m_task_t taking six arguments and returning the 
654    corresponding object.
655  * \param name a name for the object. It is for user-level information
656    and can be NULL.
657  * \param host_nb the number of hosts implied in the parallel task.
658  * \param host_list an array of \p host_nb m_host_t.
659  * \param computation_amount an array of \p host_nb
660    doubles. computation_amount[i] is the total number of operations
661    that have to be performed on host_list[i].
662  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
663  * \param data a pointer to any data may want to attach to the new
664    object.  It is for user-level information and can be NULL. It can
665    be retrieved with the function \ref MSG_task_get_data.
666  * \see m_task_t
667  * \return The new corresponding object.
668  */
669 m_task_t MSG_parallel_task_create(const char *name, 
670                                   int host_nb,
671                                   const m_host_t *host_list,
672                                   double *computation_amount,
673                                   double *communication_amount,
674                                   void *data)
675 {
676   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
677   m_task_t task = xbt_new0(s_m_task_t,1);
678   int i;
679
680   /* Task structure */
681   task->name = xbt_strdup(name);
682   task->simdata = simdata;
683   task->data = data;
684
685   /* Simulator Data */
686   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
687   simdata->rate = -1.0;
688   simdata->using = 1;
689   simdata->sender = NULL;
690   simdata->source = NULL;
691   simdata->host_nb = host_nb;
692   
693   simdata->host_list = xbt_new0(void *, host_nb);
694   simdata->comp_amount = computation_amount;
695   simdata->comm_amount = communication_amount;
696
697   for(i=0;i<host_nb;i++)
698     simdata->host_list[i] = host_list[i]->simdata->host;
699
700   return task;
701 }
702
703
704 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
705 {
706   simdata_task_t simdata = NULL;
707
708   CHECK_HOST();
709
710   simdata = task->simdata;
711
712   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
713
714   simdata->compute = surf_workstation_resource->extension_public->
715   execute_parallel_task(task->simdata->host_nb,
716                         task->simdata->host_list,
717                         task->simdata->comp_amount,
718                         task->simdata->comm_amount,
719                         1.0,
720                         -1.0);
721   if(simdata->compute)
722     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
723 }
724
725 MSG_error_t MSG_parallel_task_execute(m_task_t task)
726 {
727   m_process_t process = MSG_process_self();
728   MSG_error_t res;
729
730   DEBUG0("Computing on a tons of guys");
731   
732   __MSG_parallel_task_execute(process, task);
733
734   if(task->simdata->compute)
735     res = __MSG_wait_for_computation(process,task);
736   else 
737     res = MSG_OK;
738
739   return res;  
740 }
741
742
743 /** \ingroup msg_gos_functions
744  * \brief Sleep for the specified number of seconds
745  *
746  * Makes the current process sleep until \a time seconds have elapsed.
747  *
748  * \param nb_sec a number of second
749  */
750 MSG_error_t MSG_process_sleep(double nb_sec)
751 {
752   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
753   m_process_t process = MSG_process_self();
754   m_task_t dummy = NULL;
755   simdata_task_t simdata = NULL;
756
757   CHECK_HOST();
758   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
759   simdata = dummy->simdata;
760
761   simdata->compute = surf_workstation_resource->extension_public->
762     sleep(MSG_process_get_host(process)->simdata->host,
763             simdata->computation_amount);
764   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
765
766   
767   simdata->using++;
768   do {
769     __MSG_task_wait_event(process, dummy);
770     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
771   } while (state==SURF_ACTION_RUNNING);
772   simdata->using--;
773     
774   if(state == SURF_ACTION_DONE) {
775     if(surf_workstation_resource->extension_public->
776        get_state(MSG_process_get_host(process)->simdata->host) 
777        == SURF_CPU_OFF) {
778       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
779         simdata->compute = NULL;
780       MSG_RETURN(MSG_HOST_FAILURE);
781     }
782     if(__MSG_process_isBlocked(process)) {
783       __MSG_process_unblock(MSG_process_self());
784     }
785     if(surf_workstation_resource->extension_public->
786        get_state(MSG_process_get_host(process)->simdata->host) 
787        == SURF_CPU_OFF) {
788       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
789         simdata->compute = NULL;
790       MSG_RETURN(MSG_HOST_FAILURE);
791     }
792     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
793       simdata->compute = NULL;
794     MSG_task_destroy(dummy);
795     MSG_RETURN(MSG_OK);
796   } else MSG_RETURN(MSG_HOST_FAILURE);
797 }
798
799 /** \ingroup msg_gos_functions
800  * \brief Return the number of MSG tasks currently running on
801  * the host of the current running process.
802  */
803 static int MSG_get_msgload(void) 
804 {
805   m_process_t process;
806    
807   CHECK_HOST();
808   
809   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
810   process = MSG_process_self();
811   return xbt_fifo_size(process->simdata->host->simdata->process_list);
812 }
813
814 /** \ingroup msg_gos_functions
815  *
816  * \brief Return the last value returned by a MSG function (except
817  * MSG_get_errno...).
818  */
819 MSG_error_t MSG_get_errno(void)
820 {
821   return PROCESS_GET_ERRNO();
822 }