Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Integrate Bruno's work on SIMIX onto main stream. Tests are broken, but it looks...
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2   
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9   
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
15
16 /** \defgroup msg_gos_functions MSG Operating System Functions
17  *  \brief This section describes the functions that can be used
18  *  by an agent for handling some task.
19  */
20
21 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
22                                                         m_channel_t channel,
23                                                         double max_duration,
24                                                         m_host_t host)
25 {
26
27   m_process_t process = MSG_process_self();
28   m_task_t t = NULL;
29   m_host_t h = NULL;
30   simdata_task_t t_simdata = NULL;
31   simdata_host_t h_simdata = NULL;
32   int first_time = 1;
33   xbt_fifo_item_t item = NULL;
34
35         smx_cond_t cond = NULL;                 //conditional wait if the task isn't on the channel yet
36
37   CHECK_HOST();
38   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
39   /* Sanity check */
40   xbt_assert0(task,"Null pointer for the task\n");
41
42   if (*task) 
43     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
44
45   /* Get the task */
46   h = MSG_host_self();
47   h_simdata = h->simdata;
48
49   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
50
51         SIMIX_mutex_lock(h->simdata->mutex);
52   while (1) {
53                 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
54                         if(!host) {
55                                 t = xbt_fifo_shift(h_simdata->mbox[channel]);
56                                 break;
57                         } else {
58                                 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
59                                         if(t->simdata->source==host) break;
60                                 }
61                                 if(item) {
62                                         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
63                                         break;
64                                 } 
65                         }
66                 }
67                 
68                 if(max_duration>0) {
69                         if(!first_time) {
70                                 SIMIX_mutex_unlock(h->simdata->mutex);
71                                 h_simdata->sleeping[channel] = NULL;
72                                 SIMIX_cond_destroy(cond);
73                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
74                         }
75                 }
76                 xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel);
77         
78                 cond = SIMIX_cond_init();
79                 h_simdata->sleeping[channel] = cond;
80                 if (max_duration > 0) {
81                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
82                 }
83                 else SIMIX_cond_wait(h_simdata->sleeping[channel],h->simdata->mutex);
84
85                 if(SIMIX_host_get_state(h_simdata->host)==0)
86       MSG_RETURN(MSG_HOST_FAILURE);
87
88                 first_time = 0;
89         }
90         SIMIX_mutex_unlock(h->simdata->mutex);
91
92   DEBUG1("OK, got a task (%s)", t->name);
93         /* clean conditional */
94         if (cond) {
95                 SIMIX_cond_destroy(cond);
96                 h_simdata->sleeping[channel] = NULL;
97         }
98
99   t_simdata = t->simdata;
100         t_simdata->receiver = process;
101   *task=t;
102
103         SIMIX_mutex_lock(t_simdata->mutex);
104
105   /* Transfer */
106   t_simdata->using++;
107         /* create SIMIX action to the communication */
108         t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
109                                                                                                                                                                                 process->simdata->host->simdata->host,t->name, t_simdata->message_size, 
110                                                                                                                                                                                 t_simdata->rate); 
111         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
112         if (MSG_process_is_suspended(t_simdata->sender)) {
113                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
114                 SIMIX_action_set_priority(t_simdata->comm,0);
115         }
116         process->simdata->waiting_task = t;
117         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
118         SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond);
119         SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex);
120         process->simdata->waiting_task = NULL;
121
122         /* the task has already finished and the pointer must be null*/
123         if (t->simdata->sender) {
124                 t->simdata->sender->simdata->waiting_task = NULL;
125                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
126                 //t->simdata->comm = NULL;
127                 //t->simdata->compute = NULL;
128         }
129         /* for this process, don't need to change in get function*/
130         t->simdata->receiver = NULL;
131         SIMIX_mutex_unlock(t_simdata->mutex);
132
133
134         if(SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
135                 //t_simdata->comm = NULL;
136                 SIMIX_action_destroy(t_simdata->comm);
137                 t_simdata->comm = NULL;
138     MSG_RETURN(MSG_OK);
139         } else if (SIMIX_host_get_state(h_simdata->host)==0) {
140                 //t_simdata->comm = NULL;
141                 SIMIX_action_destroy(t_simdata->comm);
142                 t_simdata->comm = NULL;
143     MSG_RETURN(MSG_HOST_FAILURE);
144   } else { 
145                 //t_simdata->comm = NULL;
146                 SIMIX_action_destroy(t_simdata->comm);
147                 t_simdata->comm = NULL;
148     MSG_RETURN(MSG_TRANSFER_FAILURE);
149   }
150
151 }
152         
153 /** \ingroup msg_gos_functions
154  * \brief Listen on a channel and wait for receiving a task.
155  *
156  * It takes two parameters.
157  * \param task a memory location for storing a #m_task_t. It will
158    hold a task when this function will return. Thus \a task should not
159    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
160    those two condition does not hold, there will be a warning message.
161  * \param channel the channel on which the agent should be
162    listening. This value has to be >=0 and < than the maximal
163    number of channels fixed with MSG_set_channel_number().
164  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
165  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
166  */
167 MSG_error_t MSG_task_get(m_task_t * task,
168                          m_channel_t channel)
169 {
170   return MSG_task_get_with_time_out(task, channel, -1);
171 }
172
173 /** \ingroup msg_gos_functions
174  * \brief Listen on a channel and wait for receiving a task with a timeout.
175  *
176  * It takes three parameters.
177  * \param task a memory location for storing a #m_task_t. It will
178    hold a task when this function will return. Thus \a task should not
179    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
180    those two condition does not hold, there will be a warning message.
181  * \param channel the channel on which the agent should be
182    listening. This value has to be >=0 and < than the maximal
183    number of channels fixed with MSG_set_channel_number().
184  * \param max_duration the maximum time to wait for a task before giving
185     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
186     will not be modified and will still be
187     equal to \c NULL when returning. 
188  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
189    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
190  */
191 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
192                                        m_channel_t channel,
193                                        double max_duration)
194 {
195   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
196 }
197
198 /** \ingroup msg_gos_functions
199  * \brief Listen on \a channel and waits for receiving a task from \a host.
200  *
201  * It takes three parameters.
202  * \param task a memory location for storing a #m_task_t. It will
203    hold a task when this function will return. Thus \a task should not
204    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
205    those two condition does not hold, there will be a warning message.
206  * \param channel the channel on which the agent should be
207    listening. This value has to be >=0 and < than the maximal
208    number of channels fixed with MSG_set_channel_number().
209  * \param host the host that is to be watched.
210  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
211    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
212  */
213 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
214                                    m_host_t host)
215 {
216   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
217 }
218
219 /** \ingroup msg_gos_functions
220  * \brief Test whether there is a pending communication on a channel.
221  *
222  * It takes one parameter.
223  * \param channel the channel on which the agent should be
224    listening. This value has to be >=0 and < than the maximal
225    number of channels fixed with MSG_set_channel_number().
226  * \return 1 if there is a pending communication and 0 otherwise
227  */
228 int MSG_task_Iprobe(m_channel_t channel)
229 {
230   m_host_t h = NULL;
231
232   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
233   CHECK_HOST();
234
235   DEBUG2("Probing on channel %d (%s)", channel,h->name);
236
237   h = MSG_host_self();
238   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
239 }
240
241 /** \ingroup msg_gos_functions
242  * \brief Test whether there is a pending communication on a channel, and who sent it.
243  *
244  * It takes one parameter.
245  * \param channel the channel on which the agent should be
246    listening. This value has to be >=0 and < than the maximal
247    number of channels fixed with MSG_set_channel_number().
248  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
249  */
250 int MSG_task_probe_from(m_channel_t channel)
251 {
252   m_host_t h = NULL;
253   xbt_fifo_item_t item;
254   m_task_t t;
255
256   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
257   CHECK_HOST();
258
259   h = MSG_host_self();
260
261   DEBUG2("Probing on channel %d (%s)", channel,h->name);
262    
263   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
264   if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) )
265     return -1;
266    
267   return MSG_process_get_PID(t->simdata->sender);
268 }
269
270 /** \ingroup msg_gos_functions
271  * \brief Wait for at most \a max_duration second for a task reception
272    on \a channel. *\a PID is updated with the PID of the first process
273    that triggered this event if any.
274  *
275  * It takes three parameters:
276  * \param channel the channel on which the agent should be
277    listening. This value has to be >=0 and < than the maximal.
278    number of channels fixed with MSG_set_channel_number().
279  * \param PID a memory location for storing an int.
280  * \param max_duration the maximum time to wait for a task before
281     giving up. In the case of a reception, *\a PID will be updated
282     with the PID of the first process to send a task.
283  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
284    and #MSG_OK otherwise.
285  */
286 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
287                 int *PID)
288 {
289         m_host_t h = NULL;
290         simdata_host_t h_simdata = NULL;
291         xbt_fifo_item_t item;
292         m_task_t t;
293         int first_time = 1;
294         smx_cond_t cond;
295
296         xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
297         if(PID) {
298                 *PID = -1;
299         }
300
301         if(max_duration==0.0) {
302                 *PID = MSG_task_probe_from(channel);
303                 MSG_RETURN(MSG_OK);
304         } else {
305                 CHECK_HOST();
306                 h = MSG_host_self();
307                 h_simdata = h->simdata;
308
309                 DEBUG2("Probing on channel %d (%s)", channel,h->name);
310                 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
311                         if(max_duration>0) {
312                                 if(!first_time) {
313                                         MSG_RETURN(MSG_OK);
314                                 }
315                         }
316                         SIMIX_mutex_lock(h_simdata->mutex);
317                         xbt_assert1(!(h_simdata->sleeping[channel]),
318                                         "A process is already blocked on this channel %d", channel);
319                         cond = SIMIX_cond_init();
320                         h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
321                         if(max_duration>0) {
322                                 SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration);
323                         } else {
324                                 SIMIX_cond_wait(cond,h_simdata->mutex);
325                         }
326                         SIMIX_cond_destroy(cond);
327                         SIMIX_mutex_unlock(h_simdata->mutex);
328                         if(SIMIX_host_get_state(h_simdata->host)==0) {
329                                 MSG_RETURN(MSG_HOST_FAILURE);
330                         }
331                         h_simdata->sleeping[channel] = NULL;
332                         first_time = 0;
333                 }
334                 if (!item || !(t = xbt_fifo_get_item_content(item))) {
335                         MSG_RETURN(MSG_OK);
336                 }
337                 if(PID) {
338                         *PID = MSG_process_get_PID(t->simdata->sender);
339                 }
340                 MSG_RETURN(MSG_OK);
341         }
342 }
343
344
345 /** \ingroup msg_gos_functions
346
347  * \brief Return the number of tasks waiting to be received on a \a
348    channel and sent by \a host.
349  *
350  * It takes two parameters.
351  * \param channel the channel on which the agent should be
352    listening. This value has to be >=0 and < than the maximal
353    number of channels fixed with MSG_set_channel_number().
354  * \param host the host that is to be watched.
355  * \return the number of tasks waiting to be received on \a channel
356    and sent by \a host.
357  */
358 int MSG_task_probe_from_host(int channel, m_host_t host)
359 {
360   xbt_fifo_item_t item;
361   m_task_t t;
362   int count = 0;
363   m_host_t h = NULL;
364   
365   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
366   CHECK_HOST();
367   h = MSG_host_self();
368
369   DEBUG2("Probing on channel %d (%s)", channel,h->name);
370    
371   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
372     if(t->simdata->source==host) count++;
373   }
374    
375   return count;
376 }
377
378 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
379  * host (with a timeout on the waiting of the destination host) and
380  * waits for the end of the transmission.
381  *
382  * This function is used for describing the behavior of an agent. It
383  * takes four parameter.
384  * \param task a #m_task_t to send on another location. This task
385    will not be usable anymore when the function will return. There is
386    no automatic task duplication and you have to save your parameters
387    before calling this function. Tasks are unique and once it has been
388    sent to another location, you should not access it anymore. You do
389    not need to call MSG_task_destroy() but to avoid using, as an
390    effect of inattention, this task anymore, you definitely should
391    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
392    can be transfered iff it has been correctly created with
393    MSG_task_create().
394  * \param dest the destination of the message
395  * \param channel the channel on which the agent should put this
396    task. This value has to be >=0 and < than the maximal number of
397    channels fixed with MSG_set_channel_number().
398  * \param max_duration the maximum time to wait for a task before giving
399     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
400     will not be modified 
401  * \return #MSG_FATAL if \a task is not properly initialized and
402    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
403    this function was called was shut down. Returns
404    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
405    (network failure, dest failure, timeout...)
406  */
407 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
408                                       m_channel_t channel, double max_duration)
409 {
410
411
412   m_process_t process = MSG_process_self();
413   simdata_task_t task_simdata = NULL;
414   m_host_t local_host = NULL;
415   m_host_t remote_host = NULL;
416   CHECK_HOST();
417
418   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
419
420   task_simdata = task->simdata;
421   task_simdata->sender = process;
422   task_simdata->source = MSG_process_get_host(process);
423   xbt_assert0(task_simdata->using==1,
424               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
425   task_simdata->comm = NULL;
426   
427   local_host = ((simdata_process_t) process->simdata)->host;
428   remote_host = dest;
429
430   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
431          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
432
433         SIMIX_mutex_lock(remote_host->simdata->mutex);
434   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
435                 mbox[channel], task);
436
437   
438   if(remote_host->simdata->sleeping[channel]) {
439     DEBUG0("Somebody is listening. Let's wake him up!");
440                 SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
441   }
442         SIMIX_mutex_unlock(remote_host->simdata->mutex);
443
444   process->simdata->put_host = dest;
445   process->simdata->put_channel = channel;
446         SIMIX_mutex_lock(task->simdata->mutex);
447  // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
448
449         process->simdata->waiting_task = task;
450         if (max_duration >0) {
451                 SIMIX_cond_wait_timeout(task->simdata->cond,task->simdata->mutex,max_duration);
452                 /* verify if the timeout happened and the communication didn't started yet */
453                 if (task->simdata->comm==NULL) {
454                         task->simdata->using--;
455                         process->simdata->waiting_task = NULL;
456                         xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
457                         task);
458                         if (task->simdata->receiver) {
459                                 task->simdata->receiver->simdata->waiting_task = NULL;
460                         }
461                         task->simdata->sender = NULL;
462                         SIMIX_mutex_unlock(task->simdata->mutex);
463                         MSG_RETURN(MSG_TRANSFER_FAILURE);
464                 }
465         }
466         else {
467                 SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex);
468         }
469
470         DEBUG1("Action terminated %s",task->name);    
471         task->simdata->using--;
472         process->simdata->waiting_task = NULL;
473         /* the task has already finished and the pointer must be null*/
474         if (task->simdata->receiver) {
475                 task->simdata->receiver->simdata->waiting_task = NULL;
476                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
477         //      task->simdata->comm = NULL;
478                 //task->simdata->compute = NULL;
479         }
480         task->simdata->sender = NULL;
481         SIMIX_mutex_unlock(task->simdata->mutex);
482
483         if(SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
484     MSG_RETURN(MSG_OK);
485         } else if (SIMIX_host_get_state(local_host->simdata->host)==0) {
486     MSG_RETURN(MSG_HOST_FAILURE);
487   } else { 
488     MSG_RETURN(MSG_TRANSFER_FAILURE);
489   }
490 }
491
492 /** \ingroup msg_gos_functions
493  * \brief Put a task on a channel of an host and waits for the end of the
494  * transmission.
495  *
496  * This function is used for describing the behavior of an agent. It
497  * takes three parameter.
498  * \param task a #m_task_t to send on another location. This task
499    will not be usable anymore when the function will return. There is
500    no automatic task duplication and you have to save your parameters
501    before calling this function. Tasks are unique and once it has been
502    sent to another location, you should not access it anymore. You do
503    not need to call MSG_task_destroy() but to avoid using, as an
504    effect of inattention, this task anymore, you definitely should
505    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
506    can be transfered iff it has been correctly created with
507    MSG_task_create().
508  * \param dest the destination of the message
509  * \param channel the channel on which the agent should put this
510    task. This value has to be >=0 and < than the maximal number of
511    channels fixed with MSG_set_channel_number().
512  * \return #MSG_FATAL if \a task is not properly initialized and
513  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
514  * this function was called was shut down. Returns
515  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
516  * (network failure, dest failure)
517  */
518 MSG_error_t MSG_task_put(m_task_t task,
519                          m_host_t dest, m_channel_t channel)
520 {
521   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
522 }
523
524 /** \ingroup msg_gos_functions
525  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
526  * rate.
527  *
528  * \sa MSG_task_put
529  */
530 MSG_error_t MSG_task_put_bounded(m_task_t task,
531                                  m_host_t dest, m_channel_t channel,
532                                  double max_rate)
533 {
534   MSG_error_t res = MSG_OK;
535   task->simdata->rate=max_rate;
536   res = MSG_task_put(task, dest, channel);
537   return(res);
538 }
539
540 /** \ingroup msg_gos_functions
541  * \brief Executes a task and waits for its termination.
542  *
543  * This function is used for describing the behavior of an agent. It
544  * takes only one parameter.
545  * \param task a #m_task_t to execute on the location on which the
546    agent is running.
547  * \return #MSG_FATAL if \a task is not properly initialized and
548  * #MSG_OK otherwise.
549  */
550 MSG_error_t MSG_task_execute(m_task_t task)
551 {
552         simdata_task_t simdata = NULL;
553         m_process_t self = MSG_process_self();
554   CHECK_HOST();
555
556   simdata = task->simdata;
557   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
558               "This task is executed somewhere else. Go fix your code!");
559         
560         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
561   simdata->using++;
562         SIMIX_mutex_lock(simdata->mutex);
563   simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
564         SIMIX_action_set_priority(simdata->compute, simdata->priority);
565
566         self->simdata->waiting_task = task;
567         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
568         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
569         SIMIX_cond_wait(simdata->cond, simdata->mutex);
570         self->simdata->waiting_task = NULL;
571
572         SIMIX_mutex_unlock(simdata->mutex);
573   simdata->using--;
574
575         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
576                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
577                 SIMIX_action_destroy(task->simdata->compute);
578                 simdata->computation_amount = 0.0;
579                 simdata->comm = NULL;
580                 simdata->compute = NULL;
581     MSG_RETURN(MSG_OK);
582         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
583                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
584                 SIMIX_action_destroy(task->simdata->compute);
585                 simdata->comm = NULL;
586                 simdata->compute = NULL;
587     MSG_RETURN(MSG_HOST_FAILURE);
588   } else { 
589                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
590                 SIMIX_action_destroy(task->simdata->compute);
591                 simdata->comm = NULL;
592                 simdata->compute = NULL;
593     MSG_RETURN(MSG_TASK_CANCELLED);
594   }
595 }
596
597
598 /** \ingroup m_task_management
599  * \brief Creates a new #m_task_t (a parallel one....).
600  *
601  * A constructor for #m_task_t taking six arguments and returning the 
602    corresponding object.
603  * \param name a name for the object. It is for user-level information
604    and can be NULL.
605  * \param host_nb the number of hosts implied in the parallel task.
606  * \param host_list an array of \p host_nb m_host_t.
607  * \param computation_amount an array of \p host_nb
608    doubles. computation_amount[i] is the total number of operations
609    that have to be performed on host_list[i].
610  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
611  * \param data a pointer to any data may want to attach to the new
612    object.  It is for user-level information and can be NULL. It can
613    be retrieved with the function \ref MSG_task_get_data.
614  * \see m_task_t
615  * \return The new corresponding object.
616  */
617 m_task_t MSG_parallel_task_create(const char *name, 
618                                   int host_nb,
619                                   const m_host_t *host_list,
620                                   double *computation_amount,
621                                   double *communication_amount,
622                                   void *data)
623 {
624   int i;
625   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
626   m_task_t task = xbt_new0(s_m_task_t,1);
627   task->simdata = simdata;
628
629   /* Task structure */
630   task->name = xbt_strdup(name);
631   task->data = data;
632
633   /* Simulator Data */
634   simdata->computation_amount = 0;
635   simdata->message_size = 0;
636         simdata->cond = SIMIX_cond_init();
637         simdata->mutex = SIMIX_mutex_init();
638         simdata->compute = NULL;
639         simdata->comm = NULL;
640   simdata->rate = -1.0;
641   simdata->using = 1;
642   simdata->sender = NULL;
643   simdata->receiver = NULL;
644   simdata->source = NULL;
645
646   simdata->host_nb = host_nb;
647   simdata->host_list = xbt_new0(void *, host_nb);
648   simdata->comp_amount = computation_amount;
649   simdata->comm_amount = communication_amount;
650
651   for(i=0;i<host_nb;i++)
652     simdata->host_list[i] = host_list[i]->simdata->host;
653
654   return task;
655
656 }
657
658
659 MSG_error_t MSG_parallel_task_execute(m_task_t task)
660 {
661         simdata_task_t simdata = NULL;
662         m_process_t self = MSG_process_self();
663   CHECK_HOST();
664
665   simdata = task->simdata;
666   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
667               "This task is executed somewhere else. Go fix your code!");
668
669   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
670         
671         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
672   simdata->using++;
673         SIMIX_mutex_lock(simdata->mutex);
674   simdata->compute = SIMIX_action_parallel_execute(task->name, simdata->host_nb, simdata->host_list, simdata->comp_amount, simdata->comm_amount, 1.0, -1.0);
675
676         self->simdata->waiting_task = task;
677         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
678         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
679         SIMIX_cond_wait(simdata->cond, simdata->mutex);
680         self->simdata->waiting_task = NULL;
681
682
683         SIMIX_mutex_unlock(simdata->mutex);
684   simdata->using--;
685
686         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
687                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
688                 SIMIX_action_destroy(task->simdata->compute);
689                 simdata->computation_amount = 0.0;
690                 simdata->comm = NULL;
691                 simdata->compute = NULL;
692     MSG_RETURN(MSG_OK);
693         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
694                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
695                 SIMIX_action_destroy(task->simdata->compute);
696                 simdata->comm = NULL;
697                 simdata->compute = NULL;
698     MSG_RETURN(MSG_HOST_FAILURE);
699   } else { 
700                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
701                 SIMIX_action_destroy(task->simdata->compute);
702                 simdata->comm = NULL;
703                 simdata->compute = NULL;
704     MSG_RETURN(MSG_TASK_CANCELLED);
705   }     
706
707 }
708
709
710 /** \ingroup msg_gos_functions
711  * \brief Sleep for the specified number of seconds
712  *
713  * Makes the current process sleep until \a time seconds have elapsed.
714  *
715  * \param nb_sec a number of second
716  */
717 MSG_error_t MSG_process_sleep(double nb_sec)
718 {
719         smx_action_t act_sleep;
720         m_process_t proc = MSG_process_self();
721         smx_mutex_t mutex;
722         smx_cond_t cond;
723         /* create action to sleep */
724         act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
725         
726         mutex = SIMIX_mutex_init();
727         SIMIX_mutex_lock(mutex);
728         /* create conditional and register action to it */
729         cond = SIMIX_cond_init();
730
731         SIMIX_register_condition_to_action(act_sleep, cond);
732         SIMIX_register_action_to_condition(act_sleep, cond);
733         SIMIX_cond_wait(cond,mutex);
734         SIMIX_mutex_unlock(mutex);
735
736         /* remove variables */
737         SIMIX_cond_destroy(cond);
738         SIMIX_mutex_destroy(mutex);
739
740   if(SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
741     if(SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
742                         SIMIX_action_destroy(act_sleep);
743       MSG_RETURN(MSG_HOST_FAILURE);
744     }
745   }
746         else {
747                 SIMIX_action_destroy(act_sleep);
748                 MSG_RETURN(MSG_HOST_FAILURE);
749         }
750
751
752         MSG_RETURN(MSG_OK);
753 }
754
755 /** \ingroup msg_gos_functions
756  * \brief Return the number of MSG tasks currently running on
757  * the host of the current running process.
758  */
759 static int MSG_get_msgload(void) 
760 {
761         xbt_die("not implemented yet");
762         return 0;
763 }
764
765 /** \ingroup msg_gos_functions
766  *
767  * \brief Return the last value returned by a MSG function (except
768  * MSG_get_errno...).
769  */
770 MSG_error_t MSG_get_errno(void)
771 {
772   return PROCESS_GET_ERRNO();
773 }