Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
56fe081a980dec1af538d497460300599d10c084
[simgrid.git] / src / msg_simix / msg_simix_gos.c
1 #include "msg_simix_private.h"
2 #include "xbt/sysdep.h"
3 #include "xbt/log.h"
4
5 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
6
7 /** \defgroup msg_gos_functions MSG Operating System Functions
8  *  \brief This section describes the functions that can be used
9  *  by an agent for handling some task.
10  */
11
12 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
13                                                         m_channel_t channel,
14                                                         double max_duration,
15                                                         m_host_t host)
16 {
17
18   m_process_t process = MSG_process_self();
19   m_task_t t = NULL;
20   m_host_t h = NULL;
21   simdata_task_t t_simdata = NULL;
22   simdata_host_t h_simdata = NULL;
23   int first_time = 1;
24   xbt_fifo_item_t item = NULL;
25
26         smx_cond_t cond = NULL;                 //conditional wait if the task isn't on the channel yet
27
28   CHECK_HOST();
29   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
30   /* Sanity check */
31   xbt_assert0(task,"Null pointer for the task\n");
32
33   if (*task) 
34     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
35
36   /* Get the task */
37   h = MSG_host_self();
38   h_simdata = h->simdata;
39
40   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
41
42         SIMIX_mutex_lock(h->simdata->mutex);
43   while (1) {
44                 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
45                         if(!host) {
46                                 t = xbt_fifo_shift(h_simdata->mbox[channel]);
47                                 break;
48                         } else {
49                                 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
50                                         if(t->simdata->source==host) break;
51                                 }
52                                 if(item) {
53                                         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
54                                         break;
55                                 } 
56                         }
57                 }
58                 
59                 if(max_duration>0) {
60                         if(!first_time) {
61                                 SIMIX_mutex_unlock(h->simdata->mutex);
62                                 h_simdata->sleeping[channel] = NULL;
63                                 SIMIX_cond_destroy(cond);
64                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
65                         }
66                 }
67                 xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel);
68         
69                 cond = SIMIX_cond_init();
70                 h_simdata->sleeping[channel] = cond;
71                 if (max_duration > 0) {
72                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
73                 }
74                 else SIMIX_cond_wait(h_simdata->sleeping[channel],h->simdata->mutex);
75
76                 if(SIMIX_host_get_state(h_simdata->host)==0)
77       MSG_RETURN(MSG_HOST_FAILURE);
78
79                 first_time = 0;
80         }
81         SIMIX_mutex_unlock(h->simdata->mutex);
82
83   DEBUG1("OK, got a task (%s)", t->name);
84         /* clean conditional */
85         if (cond) {
86                 SIMIX_cond_destroy(cond);
87                 h_simdata->sleeping[channel] = NULL;
88         }
89
90   t_simdata = t->simdata;
91         t_simdata->receiver = process;
92   *task=t;
93
94         SIMIX_mutex_lock(t_simdata->mutex);
95
96   /* Transfer */
97   t_simdata->using++;
98         /* create SIMIX action to the communication */
99         t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
100                                                                                                                                                                                 process->simdata->host->simdata->host,t->name, t_simdata->message_size, 
101                                                                                                                                                                                 t_simdata->rate); 
102         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
103         if (MSG_process_is_suspended(t_simdata->sender)) {
104                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
105                 SIMIX_action_set_priority(t_simdata->comm,0);
106         }
107         process->simdata->waiting_task = t;
108         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
109         SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond);
110         SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex);
111         process->simdata->waiting_task = NULL;
112
113         /* the task has already finished and the pointer must be null*/
114         if (t->simdata->sender) {
115                 t->simdata->sender->simdata->waiting_task = NULL;
116                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
117                 //t->simdata->comm = NULL;
118                 //t->simdata->compute = NULL;
119         }
120         /* for this process, don't need to change in get function*/
121         t->simdata->receiver = NULL;
122         SIMIX_mutex_unlock(t_simdata->mutex);
123
124
125         if(SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
126                 //t_simdata->comm = NULL;
127                 SIMIX_action_destroy(t_simdata->comm);
128                 t_simdata->comm = NULL;
129     MSG_RETURN(MSG_OK);
130         } else if (SIMIX_host_get_state(h_simdata->host)==0) {
131                 //t_simdata->comm = NULL;
132                 SIMIX_action_destroy(t_simdata->comm);
133                 t_simdata->comm = NULL;
134     MSG_RETURN(MSG_HOST_FAILURE);
135   } else { 
136                 //t_simdata->comm = NULL;
137                 SIMIX_action_destroy(t_simdata->comm);
138                 t_simdata->comm = NULL;
139     MSG_RETURN(MSG_TRANSFER_FAILURE);
140   }
141
142 }
143         
144 /** \ingroup msg_gos_functions
145  * \brief Listen on a channel and wait for receiving a task.
146  *
147  * It takes two parameters.
148  * \param task a memory location for storing a #m_task_t. It will
149    hold a task when this function will return. Thus \a task should not
150    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
151    those two condition does not hold, there will be a warning message.
152  * \param channel the channel on which the agent should be
153    listening. This value has to be >=0 and < than the maximal
154    number of channels fixed with MSG_set_channel_number().
155  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
156  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
157  */
158 MSG_error_t MSG_task_get(m_task_t * task,
159                          m_channel_t channel)
160 {
161   return MSG_task_get_with_time_out(task, channel, -1);
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Listen on a channel and wait for receiving a task with a timeout.
166  *
167  * It takes three parameters.
168  * \param task a memory location for storing a #m_task_t. It will
169    hold a task when this function will return. Thus \a task should not
170    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
171    those two condition does not hold, there will be a warning message.
172  * \param channel the channel on which the agent should be
173    listening. This value has to be >=0 and < than the maximal
174    number of channels fixed with MSG_set_channel_number().
175  * \param max_duration the maximum time to wait for a task before giving
176     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
177     will not be modified and will still be
178     equal to \c NULL when returning. 
179  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
180    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
181  */
182 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
183                                        m_channel_t channel,
184                                        double max_duration)
185 {
186   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
187 }
188
189 /** \ingroup msg_gos_functions
190  * \brief Listen on \a channel and waits for receiving a task from \a host.
191  *
192  * It takes three parameters.
193  * \param task a memory location for storing a #m_task_t. It will
194    hold a task when this function will return. Thus \a task should not
195    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
196    those two condition does not hold, there will be a warning message.
197  * \param channel the channel on which the agent should be
198    listening. This value has to be >=0 and < than the maximal
199    number of channels fixed with MSG_set_channel_number().
200  * \param host the host that is to be watched.
201  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
202    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
203  */
204 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
205                                    m_host_t host)
206 {
207   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
208 }
209
210 /** \ingroup msg_gos_functions
211  * \brief Test whether there is a pending communication on a channel.
212  *
213  * It takes one parameter.
214  * \param channel the channel on which the agent should be
215    listening. This value has to be >=0 and < than the maximal
216    number of channels fixed with MSG_set_channel_number().
217  * \return 1 if there is a pending communication and 0 otherwise
218  */
219 int MSG_task_Iprobe(m_channel_t channel)
220 {
221   m_host_t h = NULL;
222
223   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
224   CHECK_HOST();
225
226   DEBUG2("Probing on channel %d (%s)", channel,h->name);
227
228   h = MSG_host_self();
229   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
230 }
231
232 /** \ingroup msg_gos_functions
233  * \brief Test whether there is a pending communication on a channel, and who sent it.
234  *
235  * It takes one parameter.
236  * \param channel the channel on which the agent should be
237    listening. This value has to be >=0 and < than the maximal
238    number of channels fixed with MSG_set_channel_number().
239  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
240  */
241 int MSG_task_probe_from(m_channel_t channel)
242 {
243   m_host_t h = NULL;
244   xbt_fifo_item_t item;
245   m_task_t t;
246
247   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
248   CHECK_HOST();
249
250   h = MSG_host_self();
251
252   DEBUG2("Probing on channel %d (%s)", channel,h->name);
253    
254   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
255   if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) )
256     return -1;
257    
258   return MSG_process_get_PID(t->simdata->sender);
259 }
260
261 /** \ingroup msg_gos_functions
262  * \brief Wait for at most \a max_duration second for a task reception
263    on \a channel. *\a PID is updated with the PID of the first process
264    that triggered this event if any.
265  *
266  * It takes three parameters:
267  * \param channel the channel on which the agent should be
268    listening. This value has to be >=0 and < than the maximal.
269    number of channels fixed with MSG_set_channel_number().
270  * \param PID a memory location for storing an int.
271  * \param max_duration the maximum time to wait for a task before
272     giving up. In the case of a reception, *\a PID will be updated
273     with the PID of the first process to send a task.
274  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
275    and #MSG_OK otherwise.
276  */
277 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
278                 int *PID)
279 {
280         m_host_t h = NULL;
281         simdata_host_t h_simdata = NULL;
282         xbt_fifo_item_t item;
283         m_task_t t;
284         int first_time = 1;
285         smx_cond_t cond;
286
287         xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
288         if(PID) {
289                 *PID = -1;
290         }
291
292         if(max_duration==0.0) {
293                 *PID = MSG_task_probe_from(channel);
294                 MSG_RETURN(MSG_OK);
295         } else {
296                 CHECK_HOST();
297                 h = MSG_host_self();
298                 h_simdata = h->simdata;
299
300                 DEBUG2("Probing on channel %d (%s)", channel,h->name);
301                 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
302                         if(max_duration>0) {
303                                 if(!first_time) {
304                                         MSG_RETURN(MSG_OK);
305                                 }
306                         }
307                         SIMIX_mutex_lock(h_simdata->mutex);
308                         xbt_assert1(!(h_simdata->sleeping[channel]),
309                                         "A process is already blocked on this channel %d", channel);
310                         cond = SIMIX_cond_init();
311                         h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
312                         if(max_duration>0) {
313                                 SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration);
314                         } else {
315                                 SIMIX_cond_wait(cond,h_simdata->mutex);
316                         }
317                         SIMIX_cond_destroy(cond);
318                         SIMIX_mutex_unlock(h_simdata->mutex);
319                         if(SIMIX_host_get_state(h_simdata->host)==0) {
320                                 MSG_RETURN(MSG_HOST_FAILURE);
321                         }
322                         h_simdata->sleeping[channel] = NULL;
323                         first_time = 0;
324                 }
325                 if (!item || !(t = xbt_fifo_get_item_content(item))) {
326                         MSG_RETURN(MSG_OK);
327                 }
328                 if(PID) {
329                         *PID = MSG_process_get_PID(t->simdata->sender);
330                 }
331                 MSG_RETURN(MSG_OK);
332         }
333 }
334
335
336 /** \ingroup msg_gos_functions
337
338  * \brief Return the number of tasks waiting to be received on a \a
339    channel and sent by \a host.
340  *
341  * It takes two parameters.
342  * \param channel the channel on which the agent should be
343    listening. This value has to be >=0 and < than the maximal
344    number of channels fixed with MSG_set_channel_number().
345  * \param host the host that is to be watched.
346  * \return the number of tasks waiting to be received on \a channel
347    and sent by \a host.
348  */
349 int MSG_task_probe_from_host(int channel, m_host_t host)
350 {
351   xbt_fifo_item_t item;
352   m_task_t t;
353   int count = 0;
354   m_host_t h = NULL;
355   
356   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
357   CHECK_HOST();
358   h = MSG_host_self();
359
360   DEBUG2("Probing on channel %d (%s)", channel,h->name);
361    
362   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
363     if(t->simdata->source==host) count++;
364   }
365    
366   return count;
367 }
368
369 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
370  * host (with a timeout on the waiting of the destination host) and
371  * waits for the end of the transmission.
372  *
373  * This function is used for describing the behavior of an agent. It
374  * takes four parameter.
375  * \param task a #m_task_t to send on another location. This task
376    will not be usable anymore when the function will return. There is
377    no automatic task duplication and you have to save your parameters
378    before calling this function. Tasks are unique and once it has been
379    sent to another location, you should not access it anymore. You do
380    not need to call MSG_task_destroy() but to avoid using, as an
381    effect of inattention, this task anymore, you definitely should
382    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
383    can be transfered iff it has been correctly created with
384    MSG_task_create().
385  * \param dest the destination of the message
386  * \param channel the channel on which the agent should put this
387    task. This value has to be >=0 and < than the maximal number of
388    channels fixed with MSG_set_channel_number().
389  * \param max_duration the maximum time to wait for a task before giving
390     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
391     will not be modified 
392  * \return #MSG_FATAL if \a task is not properly initialized and
393    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
394    this function was called was shut down. Returns
395    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
396    (network failure, dest failure, timeout...)
397  */
398 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
399                                       m_channel_t channel, double max_duration)
400 {
401
402
403   m_process_t process = MSG_process_self();
404   simdata_task_t task_simdata = NULL;
405   m_host_t local_host = NULL;
406   m_host_t remote_host = NULL;
407   CHECK_HOST();
408
409   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
410
411   task_simdata = task->simdata;
412   task_simdata->sender = process;
413   task_simdata->source = MSG_process_get_host(process);
414   xbt_assert0(task_simdata->using==1,
415               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
416   task_simdata->comm = NULL;
417   
418   local_host = ((simdata_process_t) process->simdata)->host;
419   remote_host = dest;
420
421   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
422          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
423
424         SIMIX_mutex_lock(remote_host->simdata->mutex);
425   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
426                 mbox[channel], task);
427
428   
429   if(remote_host->simdata->sleeping[channel]) {
430     DEBUG0("Somebody is listening. Let's wake him up!");
431                 SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
432   }
433         SIMIX_mutex_unlock(remote_host->simdata->mutex);
434
435   process->simdata->put_host = dest;
436   process->simdata->put_channel = channel;
437         SIMIX_mutex_lock(task->simdata->mutex);
438  // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
439
440         process->simdata->waiting_task = task;
441         if (max_duration >0) {
442                 SIMIX_cond_wait_timeout(task->simdata->cond,task->simdata->mutex,max_duration);
443                 /* verify if the timeout happened and the communication didn't started yet */
444                 if (task->simdata->comm==NULL) {
445                         task->simdata->using--;
446                         process->simdata->waiting_task = NULL;
447                         xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
448                         task);
449                         if (task->simdata->receiver) {
450                                 task->simdata->receiver->simdata->waiting_task = NULL;
451                         }
452                         task->simdata->sender = NULL;
453                         SIMIX_mutex_unlock(task->simdata->mutex);
454                         MSG_RETURN(MSG_TRANSFER_FAILURE);
455                 }
456         }
457         else {
458                 SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex);
459         }
460
461         DEBUG1("Action terminated %s",task->name);    
462         task->simdata->using--;
463         process->simdata->waiting_task = NULL;
464         /* the task has already finished and the pointer must be null*/
465         if (task->simdata->receiver) {
466                 task->simdata->receiver->simdata->waiting_task = NULL;
467                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
468         //      task->simdata->comm = NULL;
469                 //task->simdata->compute = NULL;
470         }
471         task->simdata->sender = NULL;
472         SIMIX_mutex_unlock(task->simdata->mutex);
473
474         if(SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
475     MSG_RETURN(MSG_OK);
476         } else if (SIMIX_host_get_state(local_host->simdata->host)==0) {
477     MSG_RETURN(MSG_HOST_FAILURE);
478   } else { 
479     MSG_RETURN(MSG_TRANSFER_FAILURE);
480   }
481 }
482
483 /** \ingroup msg_gos_functions
484  * \brief Put a task on a channel of an host and waits for the end of the
485  * transmission.
486  *
487  * This function is used for describing the behavior of an agent. It
488  * takes three parameter.
489  * \param task a #m_task_t to send on another location. This task
490    will not be usable anymore when the function will return. There is
491    no automatic task duplication and you have to save your parameters
492    before calling this function. Tasks are unique and once it has been
493    sent to another location, you should not access it anymore. You do
494    not need to call MSG_task_destroy() but to avoid using, as an
495    effect of inattention, this task anymore, you definitely should
496    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
497    can be transfered iff it has been correctly created with
498    MSG_task_create().
499  * \param dest the destination of the message
500  * \param channel the channel on which the agent should put this
501    task. This value has to be >=0 and < than the maximal number of
502    channels fixed with MSG_set_channel_number().
503  * \return #MSG_FATAL if \a task is not properly initialized and
504  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
505  * this function was called was shut down. Returns
506  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
507  * (network failure, dest failure)
508  */
509 MSG_error_t MSG_task_put(m_task_t task,
510                          m_host_t dest, m_channel_t channel)
511 {
512   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
513 }
514
515 /** \ingroup msg_gos_functions
516  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
517  * rate.
518  *
519  * \sa MSG_task_put
520  */
521 MSG_error_t MSG_task_put_bounded(m_task_t task,
522                                  m_host_t dest, m_channel_t channel,
523                                  double max_rate)
524 {
525   MSG_error_t res = MSG_OK;
526   task->simdata->rate=max_rate;
527   res = MSG_task_put(task, dest, channel);
528   return(res);
529 }
530
531 /** \ingroup msg_gos_functions
532  * \brief Executes a task and waits for its termination.
533  *
534  * This function is used for describing the behavior of an agent. It
535  * takes only one parameter.
536  * \param task a #m_task_t to execute on the location on which the
537    agent is running.
538  * \return #MSG_FATAL if \a task is not properly initialized and
539  * #MSG_OK otherwise.
540  */
541 MSG_error_t MSG_task_execute(m_task_t task)
542 {
543         simdata_task_t simdata = NULL;
544         m_process_t self = MSG_process_self();
545   CHECK_HOST();
546
547   simdata = task->simdata;
548   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
549               "This taks is executed somewhere else. Go fix your code!");
550         
551         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
552   simdata->using++;
553         SIMIX_mutex_lock(simdata->mutex);
554   simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
555         SIMIX_action_set_priority(simdata->compute, simdata->priority);
556
557         self->simdata->waiting_task = task;
558         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
559         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
560         SIMIX_cond_wait(simdata->cond, simdata->mutex);
561         self->simdata->waiting_task = NULL;
562
563         SIMIX_mutex_unlock(simdata->mutex);
564   simdata->using--;
565
566         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
567                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
568                 SIMIX_action_destroy(task->simdata->compute);
569                 simdata->computation_amount = 0.0;
570                 simdata->comm = NULL;
571                 simdata->compute = NULL;
572     MSG_RETURN(MSG_OK);
573         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
574                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
575                 SIMIX_action_destroy(task->simdata->compute);
576                 simdata->comm = NULL;
577                 simdata->compute = NULL;
578     MSG_RETURN(MSG_HOST_FAILURE);
579   } else { 
580                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
581                 SIMIX_action_destroy(task->simdata->compute);
582                 simdata->comm = NULL;
583                 simdata->compute = NULL;
584     MSG_RETURN(MSG_TASK_CANCELLED);
585   }
586 }
587
588
589 /** \ingroup m_task_management
590  * \brief Creates a new #m_task_t (a parallel one....).
591  *
592  * A constructor for #m_task_t taking six arguments and returning the 
593    corresponding object.
594  * \param name a name for the object. It is for user-level information
595    and can be NULL.
596  * \param host_nb the number of hosts implied in the parallel task.
597  * \param host_list an array of \p host_nb m_host_t.
598  * \param computation_amount an array of \p host_nb
599    doubles. computation_amount[i] is the total number of operations
600    that have to be performed on host_list[i].
601  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
602  * \param data a pointer to any data may want to attach to the new
603    object.  It is for user-level information and can be NULL. It can
604    be retrieved with the function \ref MSG_task_get_data.
605  * \see m_task_t
606  * \return The new corresponding object.
607  */
608 m_task_t MSG_parallel_task_create(const char *name, 
609                                   int host_nb,
610                                   const m_host_t *host_list,
611                                   double *computation_amount,
612                                   double *communication_amount,
613                                   void *data)
614 {
615   int i;
616   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
617   m_task_t task = xbt_new0(s_m_task_t,1);
618   task->simdata = simdata;
619
620   /* Task structure */
621   task->name = xbt_strdup(name);
622   task->data = data;
623
624   /* Simulator Data */
625   simdata->computation_amount = 0;
626   simdata->message_size = 0;
627         simdata->cond = SIMIX_cond_init();
628         simdata->mutex = SIMIX_mutex_init();
629         simdata->compute = NULL;
630         simdata->comm = NULL;
631   simdata->rate = -1.0;
632   simdata->using = 1;
633   simdata->sender = NULL;
634   simdata->receiver = NULL;
635   simdata->source = NULL;
636
637   simdata->host_nb = host_nb;
638   simdata->host_list = xbt_new0(void *, host_nb);
639   simdata->comp_amount = computation_amount;
640   simdata->comm_amount = communication_amount;
641
642   for(i=0;i<host_nb;i++)
643     simdata->host_list[i] = host_list[i]->simdata->host;
644
645   return task;
646
647 }
648
649
650 MSG_error_t MSG_parallel_task_execute(m_task_t task)
651 {
652         simdata_task_t simdata = NULL;
653         m_process_t self = MSG_process_self();
654   CHECK_HOST();
655
656   simdata = task->simdata;
657   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
658               "This taks is executed somewhere else. Go fix your code!");
659
660   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
661         
662         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
663   simdata->using++;
664         SIMIX_mutex_lock(simdata->mutex);
665   simdata->compute = SIMIX_action_parallel_execute(task->name, simdata->host_nb, simdata->host_list, simdata->comp_amount, simdata->comm_amount, 1.0, -1.0);
666
667         self->simdata->waiting_task = task;
668         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
669         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
670         SIMIX_cond_wait(simdata->cond, simdata->mutex);
671         self->simdata->waiting_task = NULL;
672
673
674         SIMIX_mutex_unlock(simdata->mutex);
675   simdata->using--;
676
677         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
678                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
679                 SIMIX_action_destroy(task->simdata->compute);
680                 simdata->computation_amount = 0.0;
681                 simdata->comm = NULL;
682                 simdata->compute = NULL;
683     MSG_RETURN(MSG_OK);
684         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
685                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
686                 SIMIX_action_destroy(task->simdata->compute);
687                 simdata->comm = NULL;
688                 simdata->compute = NULL;
689     MSG_RETURN(MSG_HOST_FAILURE);
690   } else { 
691                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
692                 SIMIX_action_destroy(task->simdata->compute);
693                 simdata->comm = NULL;
694                 simdata->compute = NULL;
695     MSG_RETURN(MSG_TASK_CANCELLED);
696   }     
697
698 }
699
700
701 /** \ingroup msg_gos_functions
702  * \brief Sleep for the specified number of seconds
703  *
704  * Makes the current process sleep until \a time seconds have elapsed.
705  *
706  * \param nb_sec a number of second
707  */
708 MSG_error_t MSG_process_sleep(double nb_sec)
709 {
710         smx_action_t act_sleep;
711         m_process_t proc = MSG_process_self();
712         smx_mutex_t mutex;
713         smx_cond_t cond;
714         /* create action to sleep */
715         act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
716         
717         mutex = SIMIX_mutex_init();
718         SIMIX_mutex_lock(mutex);
719         /* create conditional and register action to it */
720         cond = SIMIX_cond_init();
721
722         SIMIX_register_condition_to_action(act_sleep, cond);
723         SIMIX_register_action_to_condition(act_sleep, cond);
724         SIMIX_cond_wait(cond,mutex);
725         SIMIX_mutex_unlock(mutex);
726
727         /* remove variables */
728         SIMIX_cond_destroy(cond);
729         SIMIX_mutex_destroy(mutex);
730
731   if(SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
732     if(SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
733                         SIMIX_action_destroy(act_sleep);
734       MSG_RETURN(MSG_HOST_FAILURE);
735     }
736   }
737         else {
738                 SIMIX_action_destroy(act_sleep);
739                 MSG_RETURN(MSG_HOST_FAILURE);
740         }
741
742
743         MSG_RETURN(MSG_OK);
744 }
745
746 /** \ingroup msg_gos_functions
747  * \brief Return the number of MSG tasks currently running on
748  * the host of the current running process.
749  */
750 static int MSG_get_msgload(void) 
751 {
752         xbt_die("not implemented yet");
753         return 0;
754 }
755
756 /** \ingroup msg_gos_functions
757  *
758  * \brief Return the last value returned by a MSG function (except
759  * MSG_get_errno...).
760  */
761 MSG_error_t MSG_get_errno(void)
762 {
763   return PROCESS_GET_ERRNO();
764 }