Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fixed name inconsistency, etc.
[simgrid.git] / src / msg_simix / msg_simix_gos.c
1 #include "msg_simix_private.h"
2 #include "xbt/sysdep.h"
3 #include "xbt/log.h"
4
5 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
6
7 /** \defgroup msg_gos_functions MSG Operating System Functions
8  *  \brief This section describes the functions that can be used
9  *  by an agent for handling some task.
10  */
11
12 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
13                                                         m_channel_t channel,
14                                                         double max_duration,
15                                                         m_host_t host)
16 {
17
18   m_process_t process = MSG_process_self();
19   m_task_t t = NULL;
20   m_host_t h = NULL;
21   simdata_task_t t_simdata = NULL;
22   simdata_host_t h_simdata = NULL;
23   int first_time = 1;
24   xbt_fifo_item_t item = NULL;
25
26         smx_cond_t cond = NULL;                 //conditional wait if the task isn't on the channel yet
27
28   CHECK_HOST();
29   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
30   /* Sanity check */
31   xbt_assert0(task,"Null pointer for the task\n");
32
33   if (*task) 
34     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
35
36   /* Get the task */
37   h = MSG_host_self();
38   h_simdata = h->simdata;
39
40   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
41
42         SIMIX_mutex_lock(h->simdata->mutex);
43   while (1) {
44                 if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
45                         if(!host) {
46                                 t = xbt_fifo_shift(h_simdata->mbox[channel]);
47                                 break;
48                         } else {
49                                 xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
50                                         if(t->simdata->source==host) break;
51                                 }
52                                 if(item) {
53                                         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
54                                         break;
55                                 } 
56                         }
57                 }
58                 
59                 if(max_duration>0) {
60                         if(!first_time) {
61                                 SIMIX_mutex_unlock(h->simdata->mutex);
62                                 h_simdata->sleeping[channel] = NULL;
63                                 SIMIX_cond_destroy(cond);
64                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
65                         }
66                 }
67                 xbt_assert1(!(h_simdata->sleeping[channel]),"A process is already blocked on channel %d", channel);
68         
69                 cond = SIMIX_cond_init();
70                 h_simdata->sleeping[channel] = cond;
71                 if (max_duration > 0) {
72                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
73                 }
74                 else SIMIX_cond_wait(h_simdata->sleeping[channel],h->simdata->mutex);
75
76                 if(SIMIX_host_get_state(h_simdata->host)==0)
77       MSG_RETURN(MSG_HOST_FAILURE);
78
79                 first_time = 0;
80         }
81         SIMIX_mutex_unlock(h->simdata->mutex);
82
83   DEBUG1("OK, got a task (%s)", t->name);
84         /* clean conditional */
85         if (cond) {
86                 SIMIX_cond_destroy(cond);
87                 h_simdata->sleeping[channel] = NULL;
88         }
89
90   t_simdata = t->simdata;
91         t_simdata->receiver = process;
92   *task=t;
93
94         SIMIX_mutex_lock(t_simdata->mutex);
95
96   /* Transfer */
97   t_simdata->using++;
98         /* create SIMIX action to the communication */
99         t_simdata->comm = SIMIX_action_communicate(t_simdata->sender->simdata->host->simdata->host,
100                                                                                                                                                                                 process->simdata->host->simdata->host,t->name, t_simdata->message_size, 
101                                                                                                                                                                                 t_simdata->rate); 
102         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
103         if (MSG_process_is_suspended(t_simdata->sender)) {
104                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
105                 SIMIX_action_set_priority(t_simdata->comm,0);
106         }
107         process->simdata->waiting_task = t;
108         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
109         SIMIX_register_condition_to_action(t_simdata->comm, t_simdata->cond);
110         SIMIX_cond_wait(t_simdata->cond,t_simdata->mutex);
111         process->simdata->waiting_task = NULL;
112
113         /* the task has already finished and the pointer must be null*/
114         if (t->simdata->sender) {
115                 t->simdata->sender->simdata->waiting_task = NULL;
116                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
117                 //t->simdata->comm = NULL;
118                 //t->simdata->compute = NULL;
119         }
120         /* for this process, don't need to change in get function*/
121         t->simdata->receiver = NULL;
122         SIMIX_mutex_unlock(t_simdata->mutex);
123
124
125         if(SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
126                 //t_simdata->comm = NULL;
127                 SIMIX_action_destroy(t_simdata->comm);
128                 t_simdata->comm = NULL;
129     MSG_RETURN(MSG_OK);
130         } else if (SIMIX_host_get_state(h_simdata->host)==0) {
131                 //t_simdata->comm = NULL;
132                 SIMIX_action_destroy(t_simdata->comm);
133                 t_simdata->comm = NULL;
134     MSG_RETURN(MSG_HOST_FAILURE);
135   } else { 
136                 //t_simdata->comm = NULL;
137                 SIMIX_action_destroy(t_simdata->comm);
138                 t_simdata->comm = NULL;
139     MSG_RETURN(MSG_TRANSFER_FAILURE);
140   }
141
142 }
143         
144 /** \ingroup msg_gos_functions
145  * \brief Listen on a channel and wait for receiving a task.
146  *
147  * It takes two parameters.
148  * \param task a memory location for storing a #m_task_t. It will
149    hold a task when this function will return. Thus \a task should not
150    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
151    those two condition does not hold, there will be a warning message.
152  * \param channel the channel on which the agent should be
153    listening. This value has to be >=0 and < than the maximal
154    number of channels fixed with MSG_set_channel_number().
155  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
156  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
157  */
158 MSG_error_t MSG_task_get(m_task_t * task,
159                          m_channel_t channel)
160 {
161   return MSG_task_get_with_time_out(task, channel, -1);
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Listen on a channel and wait for receiving a task with a timeout.
166  *
167  * It takes three parameters.
168  * \param task a memory location for storing a #m_task_t. It will
169    hold a task when this function will return. Thus \a task should not
170    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
171    those two condition does not hold, there will be a warning message.
172  * \param channel the channel on which the agent should be
173    listening. This value has to be >=0 and < than the maximal
174    number of channels fixed with MSG_set_channel_number().
175  * \param max_duration the maximum time to wait for a task before giving
176     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
177     will not be modified and will still be
178     equal to \c NULL when returning. 
179  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
180    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
181  */
182 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
183                                        m_channel_t channel,
184                                        double max_duration)
185 {
186   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
187 }
188
189 /** \ingroup msg_gos_functions
190  * \brief Listen on \a channel and waits for receiving a task from \a host.
191  *
192  * It takes three parameters.
193  * \param task a memory location for storing a #m_task_t. It will
194    hold a task when this function will return. Thus \a task should not
195    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
196    those two condition does not hold, there will be a warning message.
197  * \param channel the channel on which the agent should be
198    listening. This value has to be >=0 and < than the maximal
199    number of channels fixed with MSG_set_channel_number().
200  * \param host the host that is to be watched.
201  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
202    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
203  */
204 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
205                                    m_host_t host)
206 {
207   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
208 }
209
210 /** \ingroup msg_gos_functions
211  * \brief Test whether there is a pending communication on a channel.
212  *
213  * It takes one parameter.
214  * \param channel the channel on which the agent should be
215    listening. This value has to be >=0 and < than the maximal
216    number of channels fixed with MSG_set_channel_number().
217  * \return 1 if there is a pending communication and 0 otherwise
218  */
219 int MSG_task_Iprobe(m_channel_t channel)
220 {
221   m_host_t h = NULL;
222
223   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
224   CHECK_HOST();
225
226   DEBUG2("Probing on channel %d (%s)", channel,h->name);
227
228   h = MSG_host_self();
229   return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
230 }
231
232 /** \ingroup msg_gos_functions
233  * \brief Test whether there is a pending communication on a channel, and who sent it.
234  *
235  * It takes one parameter.
236  * \param channel the channel on which the agent should be
237    listening. This value has to be >=0 and < than the maximal
238    number of channels fixed with MSG_set_channel_number().
239  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
240  */
241 int MSG_task_probe_from(m_channel_t channel)
242 {
243   m_host_t h = NULL;
244   xbt_fifo_item_t item;
245   m_task_t t;
246
247   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
248   CHECK_HOST();
249
250   h = MSG_host_self();
251
252   DEBUG2("Probing on channel %d (%s)", channel,h->name);
253    
254   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
255   if ( (!item) || (!(t = xbt_fifo_get_item_content(item))) )
256     return -1;
257    
258   return MSG_process_get_PID(t->simdata->sender);
259 }
260
261 /** \ingroup msg_gos_functions
262  * \brief Wait for at most \a max_duration second for a task reception
263    on \a channel. *\a PID is updated with the PID of the first process
264    that triggered this event if any.
265  *
266  * It takes three parameters:
267  * \param channel the channel on which the agent should be
268    listening. This value has to be >=0 and < than the maximal.
269    number of channels fixed with MSG_set_channel_number().
270  * \param PID a memory location for storing an int.
271  * \param max_duration the maximum time to wait for a task before
272     giving up. In the case of a reception, *\a PID will be updated
273     with the PID of the first process to send a task.
274  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
275    and #MSG_OK otherwise.
276  */
277 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
278                 int *PID)
279 {
280         m_host_t h = NULL;
281         simdata_host_t h_simdata = NULL;
282         xbt_fifo_item_t item;
283         m_task_t t;
284         int first_time = 1;
285         smx_cond_t cond;
286
287         xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
288         if(PID) {
289                 *PID = -1;
290         }
291
292         if(max_duration==0.0) {
293                 *PID = MSG_task_probe_from(channel);
294                 MSG_RETURN(MSG_OK);
295         } else {
296                 CHECK_HOST();
297                 h = MSG_host_self();
298                 h_simdata = h->simdata;
299
300                 DEBUG2("Probing on channel %d (%s)", channel,h->name);
301                 while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
302                         if(max_duration>0) {
303                                 if(!first_time) {
304                                         MSG_RETURN(MSG_OK);
305                                 }
306                         }
307                         SIMIX_mutex_lock(h_simdata->mutex);
308                         xbt_assert1(!(h_simdata->sleeping[channel]),
309                                         "A process is already blocked on this channel %d", channel);
310                         cond = SIMIX_cond_init();
311                         h_simdata->sleeping[channel] = cond; /* I'm waiting. Wake me up when you're ready */
312                         if(max_duration>0) {
313                                 SIMIX_cond_wait_timeout(cond,h_simdata->mutex, max_duration);
314                         } else {
315                                 SIMIX_cond_wait(cond,h_simdata->mutex);
316                         }
317                         SIMIX_cond_destroy(cond);
318                         SIMIX_mutex_unlock(h_simdata->mutex);
319                         if(SIMIX_host_get_state(h_simdata->host)==0) {
320                                 MSG_RETURN(MSG_HOST_FAILURE);
321                         }
322                         h_simdata->sleeping[channel] = NULL;
323                         first_time = 0;
324                 }
325                 if (!item || !(t = xbt_fifo_get_item_content(item))) {
326                         MSG_RETURN(MSG_OK);
327                 }
328                 if(PID) {
329                         *PID = MSG_process_get_PID(t->simdata->sender);
330                 }
331                 MSG_RETURN(MSG_OK);
332         }
333 }
334
335
336 /** \ingroup msg_gos_functions
337
338  * \brief Return the number of tasks waiting to be received on a \a
339    channel and sent by \a host.
340  *
341  * It takes two parameters.
342  * \param channel the channel on which the agent should be
343    listening. This value has to be >=0 and < than the maximal
344    number of channels fixed with MSG_set_channel_number().
345  * \param host the host that is to be watched.
346  * \return the number of tasks waiting to be received on \a channel
347    and sent by \a host.
348  */
349 int MSG_task_probe_from_host(int channel, m_host_t host)
350 {
351   xbt_fifo_item_t item;
352   m_task_t t;
353   int count = 0;
354   m_host_t h = NULL;
355   
356   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
357   CHECK_HOST();
358   h = MSG_host_self();
359
360   DEBUG2("Probing on channel %d (%s)", channel,h->name);
361    
362   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
363     if(t->simdata->source==host) count++;
364   }
365    
366   return count;
367 }
368
369 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
370  * host (with a timeout on the waiting of the destination host) and
371  * waits for the end of the transmission.
372  *
373  * This function is used for describing the behavior of an agent. It
374  * takes four parameter.
375  * \param task a #m_task_t to send on another location. This task
376    will not be usable anymore when the function will return. There is
377    no automatic task duplication and you have to save your parameters
378    before calling this function. Tasks are unique and once it has been
379    sent to another location, you should not access it anymore. You do
380    not need to call MSG_task_destroy() but to avoid using, as an
381    effect of inattention, this task anymore, you definitely should
382    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
383    can be transfered iff it has been correctly created with
384    MSG_task_create().
385  * \param dest the destination of the message
386  * \param channel the channel on which the agent should put this
387    task. This value has to be >=0 and < than the maximal number of
388    channels fixed with MSG_set_channel_number().
389  * \param max_duration the maximum time to wait for a task before giving
390     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
391     will not be modified 
392  * \return #MSG_FATAL if \a task is not properly initialized and
393    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
394    this function was called was shut down. Returns
395    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
396    (network failure, dest failure, timeout...)
397  */
398 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest, 
399                                       m_channel_t channel, double max_duration)
400 {
401
402
403   m_process_t process = MSG_process_self();
404   simdata_task_t task_simdata = NULL;
405   m_host_t local_host = NULL;
406   m_host_t remote_host = NULL;
407   CHECK_HOST();
408
409   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
410
411   task_simdata = task->simdata;
412   task_simdata->sender = process;
413   task_simdata->source = MSG_process_get_host(process);
414   xbt_assert0(task_simdata->using==1,
415               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
416   task_simdata->comm = NULL;
417   
418   local_host = ((simdata_process_t) process->simdata)->host;
419   remote_host = dest;
420
421   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
422          task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
423
424         SIMIX_mutex_lock(remote_host->simdata->mutex);
425   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
426                 mbox[channel], task);
427
428   
429   if(remote_host->simdata->sleeping[channel]) {
430     DEBUG0("Somebody is listening. Let's wake him up!");
431                 SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
432   }
433         SIMIX_mutex_unlock(remote_host->simdata->mutex);
434
435   process->simdata->put_host = dest;
436   process->simdata->put_channel = channel;
437         SIMIX_mutex_lock(task->simdata->mutex);
438  // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
439
440         process->simdata->waiting_task = task;
441         if (max_duration >0) {
442                 SIMIX_cond_wait_timeout(task->simdata->cond,task->simdata->mutex,max_duration);
443                 /* verify if the timeout happened and the communication didn't started yet */
444                 if (task->simdata->comm==NULL) {
445                         task->simdata->using--;
446                         process->simdata->waiting_task = NULL;
447                         if (task->simdata->receiver) {
448                                 task->simdata->receiver->simdata->waiting_task = NULL;
449                         }
450                         task->simdata->sender = NULL;
451                         SIMIX_mutex_unlock(task->simdata->mutex);
452                         MSG_RETURN(MSG_TRANSFER_FAILURE);
453                 }
454         }
455         else {
456                 SIMIX_cond_wait(task->simdata->cond,task->simdata->mutex);
457         }
458
459         DEBUG1("Action terminated %s",task->name);    
460         task->simdata->using--;
461         process->simdata->waiting_task = NULL;
462         /* the task has already finished and the pointer must be null*/
463         if (task->simdata->receiver) {
464                 task->simdata->receiver->simdata->waiting_task = NULL;
465                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
466         //      task->simdata->comm = NULL;
467                 //task->simdata->compute = NULL;
468         }
469         task->simdata->sender = NULL;
470         SIMIX_mutex_unlock(task->simdata->mutex);
471
472         if(SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
473     MSG_RETURN(MSG_OK);
474         } else if (SIMIX_host_get_state(local_host->simdata->host)==0) {
475     MSG_RETURN(MSG_HOST_FAILURE);
476   } else { 
477     MSG_RETURN(MSG_TRANSFER_FAILURE);
478   }
479 }
480
481 /** \ingroup msg_gos_functions
482  * \brief Put a task on a channel of an host and waits for the end of the
483  * transmission.
484  *
485  * This function is used for describing the behavior of an agent. It
486  * takes three parameter.
487  * \param task a #m_task_t to send on another location. This task
488    will not be usable anymore when the function will return. There is
489    no automatic task duplication and you have to save your parameters
490    before calling this function. Tasks are unique and once it has been
491    sent to another location, you should not access it anymore. You do
492    not need to call MSG_task_destroy() but to avoid using, as an
493    effect of inattention, this task anymore, you definitely should
494    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
495    can be transfered iff it has been correctly created with
496    MSG_task_create().
497  * \param dest the destination of the message
498  * \param channel the channel on which the agent should put this
499    task. This value has to be >=0 and < than the maximal number of
500    channels fixed with MSG_set_channel_number().
501  * \return #MSG_FATAL if \a task is not properly initialized and
502  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
503  * this function was called was shut down. Returns
504  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
505  * (network failure, dest failure)
506  */
507 MSG_error_t MSG_task_put(m_task_t task,
508                          m_host_t dest, m_channel_t channel)
509 {
510   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
511 }
512
513 /** \ingroup msg_gos_functions
514  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
515  * rate.
516  *
517  * \sa MSG_task_put
518  */
519 MSG_error_t MSG_task_put_bounded(m_task_t task,
520                                  m_host_t dest, m_channel_t channel,
521                                  double max_rate)
522 {
523   MSG_error_t res = MSG_OK;
524   task->simdata->rate=max_rate;
525   res = MSG_task_put(task, dest, channel);
526   return(res);
527 }
528
529 /** \ingroup msg_gos_functions
530  * \brief Executes a task and waits for its termination.
531  *
532  * This function is used for describing the behavior of an agent. It
533  * takes only one parameter.
534  * \param task a #m_task_t to execute on the location on which the
535    agent is running.
536  * \return #MSG_FATAL if \a task is not properly initialized and
537  * #MSG_OK otherwise.
538  */
539 MSG_error_t MSG_task_execute(m_task_t task)
540 {
541         simdata_task_t simdata = NULL;
542         m_process_t self = MSG_process_self();
543   CHECK_HOST();
544
545   simdata = task->simdata;
546   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
547               "This taks is executed somewhere else. Go fix your code!");
548         
549         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
550   simdata->using++;
551         SIMIX_mutex_lock(simdata->mutex);
552   simdata->compute = SIMIX_action_execute(SIMIX_host_self(), task->name, simdata->computation_amount);
553         SIMIX_action_set_priority(simdata->compute, simdata->priority);
554
555         self->simdata->waiting_task = task;
556         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
557         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
558         SIMIX_cond_wait(simdata->cond, simdata->mutex);
559         self->simdata->waiting_task = NULL;
560
561         SIMIX_mutex_unlock(simdata->mutex);
562   simdata->using--;
563
564         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
565                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
566                 SIMIX_action_destroy(task->simdata->compute);
567                 simdata->computation_amount = 0.0;
568                 simdata->comm = NULL;
569                 simdata->compute = NULL;
570     MSG_RETURN(MSG_OK);
571         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
572                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
573                 SIMIX_action_destroy(task->simdata->compute);
574                 simdata->comm = NULL;
575                 simdata->compute = NULL;
576     MSG_RETURN(MSG_HOST_FAILURE);
577   } else { 
578                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
579                 SIMIX_action_destroy(task->simdata->compute);
580                 simdata->comm = NULL;
581                 simdata->compute = NULL;
582     MSG_RETURN(MSG_TASK_CANCELLED);
583   }
584 }
585
586
587 /** \ingroup m_task_management
588  * \brief Creates a new #m_task_t (a parallel one....).
589  *
590  * A constructor for #m_task_t taking six arguments and returning the 
591    corresponding object.
592  * \param name a name for the object. It is for user-level information
593    and can be NULL.
594  * \param host_nb the number of hosts implied in the parallel task.
595  * \param host_list an array of \p host_nb m_host_t.
596  * \param computation_amount an array of \p host_nb
597    doubles. computation_amount[i] is the total number of operations
598    that have to be performed on host_list[i].
599  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
600  * \param data a pointer to any data may want to attach to the new
601    object.  It is for user-level information and can be NULL. It can
602    be retrieved with the function \ref MSG_task_get_data.
603  * \see m_task_t
604  * \return The new corresponding object.
605  */
606 m_task_t MSG_parallel_task_create(const char *name, 
607                                   int host_nb,
608                                   const m_host_t *host_list,
609                                   double *computation_amount,
610                                   double *communication_amount,
611                                   void *data)
612 {
613   int i;
614   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
615   m_task_t task = xbt_new0(s_m_task_t,1);
616   task->simdata = simdata;
617
618   /* Task structure */
619   task->name = xbt_strdup(name);
620   task->data = data;
621
622   /* Simulator Data */
623   simdata->computation_amount = 0;
624   simdata->message_size = 0;
625         simdata->cond = SIMIX_cond_init();
626         simdata->mutex = SIMIX_mutex_init();
627         simdata->compute = NULL;
628         simdata->comm = NULL;
629   simdata->rate = -1.0;
630   simdata->using = 1;
631   simdata->sender = NULL;
632   simdata->receiver = NULL;
633   simdata->source = NULL;
634
635   simdata->host_nb = host_nb;
636   simdata->host_list = xbt_new0(void *, host_nb);
637   simdata->comp_amount = computation_amount;
638   simdata->comm_amount = communication_amount;
639
640   for(i=0;i<host_nb;i++)
641     simdata->host_list[i] = host_list[i]->simdata->host;
642
643   return task;
644
645 }
646
647
648 MSG_error_t MSG_parallel_task_execute(m_task_t task)
649 {
650         simdata_task_t simdata = NULL;
651         m_process_t self = MSG_process_self();
652   CHECK_HOST();
653
654   simdata = task->simdata;
655   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
656               "This taks is executed somewhere else. Go fix your code!");
657
658   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
659         
660         DEBUG1("Computing on %s", MSG_process_self()->simdata->host->name);
661   simdata->using++;
662         SIMIX_mutex_lock(simdata->mutex);
663   simdata->compute = SIMIX_action_parallel_execute(task->name, simdata->host_nb, simdata->host_list, simdata->comp_amount, simdata->comm_amount, 1.0, -1.0);
664
665         self->simdata->waiting_task = task;
666         SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
667         SIMIX_register_condition_to_action(simdata->compute, simdata->cond);
668         SIMIX_cond_wait(simdata->cond, simdata->mutex);
669         self->simdata->waiting_task = NULL;
670
671
672         SIMIX_mutex_unlock(simdata->mutex);
673   simdata->using--;
674
675         if(SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
676                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
677                 SIMIX_action_destroy(task->simdata->compute);
678                 simdata->computation_amount = 0.0;
679                 simdata->comm = NULL;
680                 simdata->compute = NULL;
681     MSG_RETURN(MSG_OK);
682         } else if (SIMIX_host_get_state(SIMIX_host_self())==0) {
683                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
684                 SIMIX_action_destroy(task->simdata->compute);
685                 simdata->comm = NULL;
686                 simdata->compute = NULL;
687     MSG_RETURN(MSG_HOST_FAILURE);
688   } else { 
689                 /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
690                 SIMIX_action_destroy(task->simdata->compute);
691                 simdata->comm = NULL;
692                 simdata->compute = NULL;
693     MSG_RETURN(MSG_TASK_CANCELLED);
694   }     
695
696 }
697
698
699 /** \ingroup msg_gos_functions
700  * \brief Sleep for the specified number of seconds
701  *
702  * Makes the current process sleep until \a time seconds have elapsed.
703  *
704  * \param nb_sec a number of second
705  */
706 MSG_error_t MSG_process_sleep(double nb_sec)
707 {
708         smx_action_t act_sleep;
709         m_process_t proc = MSG_process_self();
710         smx_mutex_t mutex;
711         smx_cond_t cond;
712         /* create action to sleep */
713         act_sleep = SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->smx_process),nb_sec);
714         
715         mutex = SIMIX_mutex_init();
716         SIMIX_mutex_lock(mutex);
717         /* create conditional and register action to it */
718         cond = SIMIX_cond_init();
719
720         SIMIX_register_condition_to_action(act_sleep, cond);
721         SIMIX_register_action_to_condition(act_sleep, cond);
722         SIMIX_cond_wait(cond,mutex);
723         SIMIX_mutex_unlock(mutex);
724
725         /* remove variables */
726         SIMIX_cond_destroy(cond);
727         SIMIX_mutex_destroy(mutex);
728
729   if(SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
730     if(SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
731                         SIMIX_action_destroy(act_sleep);
732       MSG_RETURN(MSG_HOST_FAILURE);
733     }
734   }
735         else {
736                 SIMIX_action_destroy(act_sleep);
737                 MSG_RETURN(MSG_HOST_FAILURE);
738         }
739
740
741         MSG_RETURN(MSG_OK);
742 }
743
744 /** \ingroup msg_gos_functions
745  * \brief Return the number of MSG tasks currently running on
746  * the host of the current running process.
747  */
748 static int MSG_get_msgload(void) 
749 {
750         xbt_die("not implemented yet");
751         return 0;
752 }
753
754 /** \ingroup msg_gos_functions
755  *
756  * \brief Return the last value returned by a MSG function (except
757  * MSG_get_errno...).
758  */
759 MSG_error_t MSG_get_errno(void)
760 {
761   return PROCESS_GET_ERRNO();
762 }