Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
82ce8e6630eb96fa2c8307b8c796fff5f35e6d80
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \defgroup msg_gos_functions MSG Operating System Functions
18  *  \brief This section describes the functions that can be used
19  *  by an agent for handling some task.
20  */
21
22 MSG_error_t MSG_task_get_ext(m_task_t * task,
23                              m_channel_t channel,
24                              double max_duration,
25                              m_host_t host)
26 {
27
28   m_process_t process = MSG_process_self();
29   m_task_t t = NULL;
30   m_host_t h = NULL;
31   simdata_task_t t_simdata = NULL;
32   simdata_host_t h_simdata = NULL;
33   int first_time = 1;
34   xbt_fifo_item_t item = NULL;
35
36   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
37
38   CHECK_HOST();
39   xbt_assert1((channel >= 0)
40               && (channel < msg_global->max_channel), "Invalid channel %d",
41               channel);
42   /* Sanity check */
43   xbt_assert0(task, "Null pointer for the task storage");
44
45   if (*task)
46     CRITICAL0
47         ("MSG_task_get() was asked to write in a non empty task struct.");
48
49   /* Get the task */
50   h = MSG_host_self();
51   h_simdata = h->simdata;
52
53   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
54
55   SIMIX_mutex_lock(h->simdata->mutex);
56   while (1) {
57     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
58       if (!host) {
59         t = xbt_fifo_shift(h_simdata->mbox[channel]);
60         break;
61       } else {
62         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
63           if (t->simdata->source == host)
64             break;
65         }
66         if (item) {
67           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
68           break;
69         }
70       }
71     }
72
73     if (max_duration > 0) {
74       if (!first_time) {
75         SIMIX_mutex_unlock(h->simdata->mutex);
76         h_simdata->sleeping[channel] = NULL;
77         SIMIX_cond_destroy(cond);
78         MSG_RETURN(MSG_TRANSFER_FAILURE);
79       }
80     }
81     xbt_assert1(!(h_simdata->sleeping[channel]),
82                 "A process is already blocked on channel %d", channel);
83
84     cond = SIMIX_cond_init();
85     h_simdata->sleeping[channel] = cond;
86     if (max_duration > 0)
87       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
88     else 
89       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
90
91     if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
92       MSG_RETURN(MSG_HOST_FAILURE);
93
94     first_time = 0;
95   }
96   SIMIX_mutex_unlock(h->simdata->mutex);
97
98   DEBUG1("OK, got a task (%s)", t->name);
99   /* clean conditional */
100   if (cond) {
101     SIMIX_cond_destroy(cond);
102     h_simdata->sleeping[channel] = NULL;
103   }
104
105   t_simdata = t->simdata;
106   t_simdata->receiver = process;
107   *task = t;
108
109   SIMIX_mutex_lock(t_simdata->mutex);
110
111   /* Transfer */
112   /* create SIMIX action to the communication */
113   t_simdata->comm =
114       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
115                                simdata->smx_host,
116                                process->simdata->m_host->simdata->smx_host,
117                                t->name, t_simdata->message_size,
118                                t_simdata->rate);
119   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
120   if (MSG_process_is_suspended(t_simdata->sender)) {
121     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
122     SIMIX_action_set_priority(t_simdata->comm, 0);
123   }
124   process->simdata->waiting_task = t;
125   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
126         while (1) {
127                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
128                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
129                         break;
130         }
131   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
132   process->simdata->waiting_task = NULL;
133
134   /* the task has already finished and the pointer must be null */
135   if (t->simdata->sender) {
136     t->simdata->sender->simdata->waiting_task = NULL;
137     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
138     //t->simdata->comm = NULL;
139     //t->simdata->compute = NULL;
140   }
141   /* for this process, don't need to change in get function */
142   t->simdata->receiver = NULL;
143   SIMIX_mutex_unlock(t_simdata->mutex);
144
145
146   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
147     //t_simdata->comm = NULL;
148     SIMIX_action_destroy(t_simdata->comm);
149     t_simdata->comm = NULL;
150                 t_simdata->using--;
151     MSG_RETURN(MSG_OK);
152   } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
153     //t_simdata->comm = NULL;
154     SIMIX_action_destroy(t_simdata->comm);
155     t_simdata->comm = NULL;
156                 t_simdata->using--;
157     MSG_RETURN(MSG_HOST_FAILURE);
158   } else {
159     //t_simdata->comm = NULL;
160     SIMIX_action_destroy(t_simdata->comm);
161     t_simdata->comm = NULL;
162                 t_simdata->using--;
163     MSG_RETURN(MSG_TRANSFER_FAILURE);
164   }
165
166 }
167
168 /** \ingroup msg_gos_functions
169  * \brief Listen on a channel and wait for receiving a task.
170  *
171  * It takes two parameters.
172  * \param task a memory location for storing a #m_task_t. It will
173    hold a task when this function will return. Thus \a task should not
174    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
175    those two condition does not hold, there will be a warning message.
176  * \param channel the channel on which the agent should be
177    listening. This value has to be >=0 and < than the maximal
178    number of channels fixed with MSG_set_channel_number().
179  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
180  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
181  */
182 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
183 {
184   return MSG_task_get_with_time_out(task, channel, -1);
185 }
186
187 /** \ingroup msg_gos_functions
188  * \brief Listen on a channel and wait for receiving a task with a timeout.
189  *
190  * It takes three parameters.
191  * \param task a memory location for storing a #m_task_t. It will
192    hold a task when this function will return. Thus \a task should not
193    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
194    those two condition does not hold, there will be a warning message.
195  * \param channel the channel on which the agent should be
196    listening. This value has to be >=0 and < than the maximal
197    number of channels fixed with MSG_set_channel_number().
198  * \param max_duration the maximum time to wait for a task before giving
199     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
200     will not be modified and will still be
201     equal to \c NULL when returning. 
202  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
203    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
204  */
205 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
206                                        m_channel_t channel,
207                                        double max_duration)
208 {
209   return MSG_task_get_ext(task, channel, max_duration, NULL);
210 }
211
212 /** \ingroup msg_gos_functions
213  * \brief Listen on \a channel and waits for receiving a task from \a host.
214  *
215  * It takes three parameters.
216  * \param task a memory location for storing a #m_task_t. It will
217    hold a task when this function will return. Thus \a task should not
218    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
219    those two condition does not hold, there will be a warning message.
220  * \param channel the channel on which the agent should be
221    listening. This value has to be >=0 and < than the maximal
222    number of channels fixed with MSG_set_channel_number().
223  * \param host the host that is to be watched.
224  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
225    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
226  */
227 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
228                                    m_host_t host)
229 {
230   return MSG_task_get_ext(task, channel, -1, host);
231 }
232
233 /** \ingroup msg_gos_functions
234  * \brief Test whether there is a pending communication on a channel.
235  *
236  * It takes one parameter.
237  * \param channel the channel on which the agent should be
238    listening. This value has to be >=0 and < than the maximal
239    number of channels fixed with MSG_set_channel_number().
240  * \return 1 if there is a pending communication and 0 otherwise
241  */
242 int MSG_task_Iprobe(m_channel_t channel)
243 {
244   m_host_t h = NULL;
245
246   xbt_assert1((channel >= 0)
247               && (channel < msg_global->max_channel), "Invalid channel %d",
248               channel);
249   CHECK_HOST();
250
251   DEBUG2("Probing on channel %d (%s)", channel, h->name);
252
253   h = MSG_host_self();
254   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
255 }
256
257 /** \ingroup msg_gos_functions
258  * \brief Test whether there is a pending communication on a channel, and who sent it.
259  *
260  * It takes one parameter.
261  * \param channel the channel on which the agent should be
262    listening. This value has to be >=0 and < than the maximal
263    number of channels fixed with MSG_set_channel_number().
264  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
265  */
266 int MSG_task_probe_from(m_channel_t channel)
267 {
268   m_host_t h = NULL;
269   xbt_fifo_item_t item;
270   m_task_t t;
271
272   xbt_assert1((channel >= 0)
273               && (channel < msg_global->max_channel), "Invalid channel %d",
274               channel);
275   CHECK_HOST();
276
277   h = MSG_host_self();
278
279   DEBUG2("Probing on channel %d (%s)", channel, h->name);
280
281   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
282   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
283     return -1;
284
285   return MSG_process_get_PID(t->simdata->sender);
286 }
287
288 /** \ingroup msg_gos_functions
289  * \brief Wait for at most \a max_duration second for a task reception
290    on \a channel. 
291  
292  * \a PID is updated with the PID of the first process that triggered this event if any.
293  *
294  * It takes three parameters:
295  * \param channel the channel on which the agent should be
296    listening. This value has to be >=0 and < than the maximal.
297    number of channels fixed with MSG_set_channel_number().
298  * \param PID a memory location for storing an int.
299  * \param max_duration the maximum time to wait for a task before
300     giving up. In the case of a reception, *\a PID will be updated
301     with the PID of the first process to send a task.
302  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
303    and #MSG_OK otherwise.
304  */
305 MSG_error_t MSG_channel_select_from(m_channel_t channel,
306                                     double max_duration, int *PID)
307 {
308   m_host_t h = NULL;
309   simdata_host_t h_simdata = NULL;
310   xbt_fifo_item_t item;
311   m_task_t t;
312   int first_time = 1;
313   smx_cond_t cond;
314
315   xbt_assert1((channel >= 0)
316               && (channel < msg_global->max_channel), "Invalid channel %d",
317               channel);
318   if (PID) {
319     *PID = -1;
320   }
321
322   if (max_duration == 0.0) {
323     *PID = MSG_task_probe_from(channel);
324     MSG_RETURN(MSG_OK);
325   } else {
326     CHECK_HOST();
327     h = MSG_host_self();
328     h_simdata = h->simdata;
329
330     DEBUG2("Probing on channel %d (%s)", channel, h->name);
331     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
332       if (max_duration > 0) {
333         if (!first_time) {
334           MSG_RETURN(MSG_OK);
335         }
336       }
337       SIMIX_mutex_lock(h_simdata->mutex);
338       xbt_assert1(!(h_simdata->sleeping[channel]),
339                   "A process is already blocked on this channel %d",
340                   channel);
341       cond = SIMIX_cond_init();
342       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
343       if (max_duration > 0) {
344         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
345       } else {
346         SIMIX_cond_wait(cond, h_simdata->mutex);
347       }
348       SIMIX_cond_destroy(cond);
349       SIMIX_mutex_unlock(h_simdata->mutex);
350       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
351         MSG_RETURN(MSG_HOST_FAILURE);
352       }
353       h_simdata->sleeping[channel] = NULL;
354       first_time = 0;
355     }
356     if (!item || !(t = xbt_fifo_get_item_content(item))) {
357       MSG_RETURN(MSG_OK);
358     }
359     if (PID) {
360       *PID = MSG_process_get_PID(t->simdata->sender);
361     }
362     MSG_RETURN(MSG_OK);
363   }
364 }
365
366
367 /** \ingroup msg_gos_functions
368
369  * \brief Return the number of tasks waiting to be received on a \a
370    channel and sent by \a host.
371  *
372  * It takes two parameters.
373  * \param channel the channel on which the agent should be
374    listening. This value has to be >=0 and < than the maximal
375    number of channels fixed with MSG_set_channel_number().
376  * \param host the host that is to be watched.
377  * \return the number of tasks waiting to be received on \a channel
378    and sent by \a host.
379  */
380 int MSG_task_probe_from_host(int channel, m_host_t host)
381 {
382   xbt_fifo_item_t item;
383   m_task_t t;
384   int count = 0;
385   m_host_t h = NULL;
386
387   xbt_assert1((channel >= 0)
388               && (channel < msg_global->max_channel), "Invalid channel %d",
389               channel);
390   CHECK_HOST();
391   h = MSG_host_self();
392
393   DEBUG2("Probing on channel %d (%s)", channel, h->name);
394
395   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
396     if (t->simdata->source == host)
397       count++;
398   }
399
400   return count;
401 }
402
403 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
404  * host (with a timeout on the waiting of the destination host) and
405  * waits for the end of the transmission.
406  *
407  * This function is used for describing the behavior of an agent. It
408  * takes four parameter.
409  * \param task a #m_task_t to send on another location. This task
410    will not be usable anymore when the function will return. There is
411    no automatic task duplication and you have to save your parameters
412    before calling this function. Tasks are unique and once it has been
413    sent to another location, you should not access it anymore. You do
414    not need to call MSG_task_destroy() but to avoid using, as an
415    effect of inattention, this task anymore, you definitely should
416    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
417    can be transfered iff it has been correctly created with
418    MSG_task_create().
419  * \param dest the destination of the message
420  * \param channel the channel on which the agent should put this
421    task. This value has to be >=0 and < than the maximal number of
422    channels fixed with MSG_set_channel_number().
423  * \param max_duration the maximum time to wait for a task before giving
424     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
425     will not be modified 
426  * \return #MSG_FATAL if \a task is not properly initialized and
427    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
428    this function was called was shut down. Returns
429    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
430    (network failure, dest failure, timeout...)
431  */
432 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
433                                       m_channel_t channel,
434                                       double max_duration)
435 {
436
437
438   m_process_t process = MSG_process_self();
439   simdata_task_t task_simdata = NULL;
440   m_host_t local_host = NULL;
441   m_host_t remote_host = NULL;
442   CHECK_HOST();
443
444   xbt_assert1((channel >= 0)
445               && (channel < msg_global->max_channel), "Invalid channel %d",
446               channel);
447
448   task_simdata = task->simdata;
449   task_simdata->sender = process;
450   task_simdata->source = MSG_process_get_host(process);
451   xbt_assert0(task_simdata->using == 1,
452               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
453   task_simdata->comm = NULL;
454
455         task_simdata->using++;
456   local_host = ((simdata_process_t) process->simdata)->m_host;
457   remote_host = dest;
458
459   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
460          task->simdata->message_size / 1000, local_host->name,
461          remote_host->name, channel);
462
463   SIMIX_mutex_lock(remote_host->simdata->mutex);
464   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
465                 mbox[channel], task);
466
467
468   if (remote_host->simdata->sleeping[channel]) {
469     DEBUG0("Somebody is listening. Let's wake him up!");
470     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
471   }
472   SIMIX_mutex_unlock(remote_host->simdata->mutex);
473
474   process->simdata->put_host = dest;
475   process->simdata->put_channel = channel;
476   SIMIX_mutex_lock(task->simdata->mutex);
477   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
478
479   process->simdata->waiting_task = task;
480   if (max_duration > 0) {
481     xbt_ex_t e;
482                 double time;
483                 double time_elapsed;
484                 time = SIMIX_get_clock();
485     TRY {
486                 /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
487                         while (1) {
488                         time_elapsed = SIMIX_get_clock() - time;
489                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
490                                                 max_duration-time_elapsed);
491                                 if ((task->simdata->comm != NULL) &&
492                                                 (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
493                                         break;
494                         }
495     } CATCH(e) {
496       if(e.category==timeout_error) {
497         xbt_ex_free(e);
498         /* verify if the timeout happened and the communication didn't started yet */
499         if (task->simdata->comm == NULL) {
500           process->simdata->waiting_task = NULL;
501           xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
502                           mbox[channel], task);
503           if (task->simdata->receiver) {
504             task->simdata->receiver->simdata->waiting_task = NULL;
505           }
506           task->simdata->sender = NULL;
507           SIMIX_mutex_unlock(task->simdata->mutex);
508           MSG_RETURN(MSG_TRANSFER_FAILURE);
509         }
510       } else {
511         RETHROW;
512       }
513     }
514   } else {
515                 while (1) {
516                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
517                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
518                                 break;
519                 }
520   }
521
522   DEBUG1("Action terminated %s", task->name);
523   process->simdata->waiting_task = NULL;
524   /* the task has already finished and the pointer must be null */
525   if (task->simdata->receiver) {
526     task->simdata->receiver->simdata->waiting_task = NULL;
527     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
528     //      task->simdata->comm = NULL;
529     //task->simdata->compute = NULL;
530   }
531   task->simdata->sender = NULL;
532   SIMIX_mutex_unlock(task->simdata->mutex);
533
534   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
535     MSG_RETURN(MSG_OK);
536   } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
537     MSG_RETURN(MSG_HOST_FAILURE);
538   } else {
539     MSG_RETURN(MSG_TRANSFER_FAILURE);
540   }
541 }
542
543 /** \ingroup msg_gos_functions
544  * \brief Put a task on a channel of an host and waits for the end of the
545  * transmission.
546  *
547  * This function is used for describing the behavior of an agent. It
548  * takes three parameter.
549  * \param task a #m_task_t to send on another location. This task
550    will not be usable anymore when the function will return. There is
551    no automatic task duplication and you have to save your parameters
552    before calling this function. Tasks are unique and once it has been
553    sent to another location, you should not access it anymore. You do
554    not need to call MSG_task_destroy() but to avoid using, as an
555    effect of inattention, this task anymore, you definitely should
556    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
557    can be transfered iff it has been correctly created with
558    MSG_task_create().
559  * \param dest the destination of the message
560  * \param channel the channel on which the agent should put this
561    task. This value has to be >=0 and < than the maximal number of
562    channels fixed with MSG_set_channel_number().
563  * \return #MSG_FATAL if \a task is not properly initialized and
564  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
565  * this function was called was shut down. Returns
566  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
567  * (network failure, dest failure)
568  */
569 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
570 {
571   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
572 }
573
574 /** \ingroup msg_gos_functions
575  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
576  * rate.
577  *
578  * \sa MSG_task_put
579  */
580 MSG_error_t MSG_task_put_bounded(m_task_t task,
581                                  m_host_t dest, m_channel_t channel,
582                                  double max_rate)
583 {
584   MSG_error_t res = MSG_OK;
585   task->simdata->rate = max_rate;
586   res = MSG_task_put(task, dest, channel);
587   return (res);
588 }
589
590 /** \ingroup msg_gos_functions
591  * \brief Executes a task and waits for its termination.
592  *
593  * This function is used for describing the behavior of an agent. It
594  * takes only one parameter.
595  * \param task a #m_task_t to execute on the location on which the
596    agent is running.
597  * \return #MSG_FATAL if \a task is not properly initialized and
598  * #MSG_OK otherwise.
599  */
600 MSG_error_t MSG_task_execute(m_task_t task)
601 {
602   simdata_task_t simdata = NULL;
603   m_process_t self = MSG_process_self();
604   CHECK_HOST();
605
606   simdata = task->simdata;
607   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
608               "This task is executed somewhere else. Go fix your code!");
609
610   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
611   simdata->using++;
612   SIMIX_mutex_lock(simdata->mutex);
613   simdata->compute =
614       SIMIX_action_execute(SIMIX_host_self(), task->name,
615                            simdata->computation_amount);
616   SIMIX_action_set_priority(simdata->compute, simdata->priority);
617
618   self->simdata->waiting_task = task;
619   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
620   SIMIX_cond_wait(simdata->cond, simdata->mutex);
621   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
622   self->simdata->waiting_task = NULL;
623
624   SIMIX_mutex_unlock(simdata->mutex);
625   simdata->using--;
626
627   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
628     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
629     SIMIX_action_destroy(task->simdata->compute);
630     simdata->computation_amount = 0.0;
631     simdata->comm = NULL;
632     simdata->compute = NULL;
633     MSG_RETURN(MSG_OK);
634   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
635     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
636     SIMIX_action_destroy(task->simdata->compute);
637     simdata->comm = NULL;
638     simdata->compute = NULL;
639     MSG_RETURN(MSG_HOST_FAILURE);
640   } else {
641     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
642     SIMIX_action_destroy(task->simdata->compute);
643     simdata->comm = NULL;
644     simdata->compute = NULL;
645     MSG_RETURN(MSG_TASK_CANCELLED);
646   }
647 }
648
649
650 /** \ingroup m_task_management
651  * \brief Creates a new #m_task_t (a parallel one....).
652  *
653  * A constructor for #m_task_t taking six arguments and returning the 
654    corresponding object.
655  * \param name a name for the object. It is for user-level information
656    and can be NULL.
657  * \param host_nb the number of hosts implied in the parallel task.
658  * \param host_list an array of \p host_nb m_host_t.
659  * \param computation_amount an array of \p host_nb
660    doubles. computation_amount[i] is the total number of operations
661    that have to be performed on host_list[i].
662  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
663  * \param data a pointer to any data may want to attach to the new
664    object.  It is for user-level information and can be NULL. It can
665    be retrieved with the function \ref MSG_task_get_data.
666  * \see m_task_t
667  * \return The new corresponding object.
668  */
669 m_task_t MSG_parallel_task_create(const char *name,
670                                   int host_nb,
671                                   const m_host_t * host_list,
672                                   double *computation_amount,
673                                   double *communication_amount, void *data)
674 {
675   int i;
676   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
677   m_task_t task = xbt_new0(s_m_task_t, 1);
678   task->simdata = simdata;
679
680   /* Task structure */
681   task->name = xbt_strdup(name);
682   task->data = data;
683
684   /* Simulator Data */
685   simdata->computation_amount = 0;
686   simdata->message_size = 0;
687   simdata->cond = SIMIX_cond_init();
688   simdata->mutex = SIMIX_mutex_init();
689   simdata->compute = NULL;
690   simdata->comm = NULL;
691   simdata->rate = -1.0;
692   simdata->using = 1;
693   simdata->sender = NULL;
694   simdata->receiver = NULL;
695   simdata->source = NULL;
696
697   simdata->host_nb = host_nb;
698   simdata->host_list = xbt_new0(smx_host_t, host_nb);
699   simdata->comp_amount = computation_amount;
700   simdata->comm_amount = communication_amount;
701
702   for (i = 0; i < host_nb; i++)
703     simdata->host_list[i] = host_list[i]->simdata->smx_host;
704
705   return task;
706
707 }
708
709
710 MSG_error_t MSG_parallel_task_execute(m_task_t task)
711 {
712   simdata_task_t simdata = NULL;
713   m_process_t self = MSG_process_self();
714   CHECK_HOST();
715
716   simdata = task->simdata;
717   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
718               "This task is executed somewhere else. Go fix your code!");
719
720   xbt_assert0(simdata->host_nb,
721               "This is not a parallel task. Go to hell.");
722
723   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
724   simdata->using++;
725   SIMIX_mutex_lock(simdata->mutex);
726   simdata->compute =
727       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
728                                     simdata->host_list,
729                                     simdata->comp_amount,
730                                     simdata->comm_amount, 1.0, -1.0);
731
732   self->simdata->waiting_task = task;
733   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
734   SIMIX_cond_wait(simdata->cond, simdata->mutex);
735   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
736   self->simdata->waiting_task = NULL;
737
738
739   SIMIX_mutex_unlock(simdata->mutex);
740   simdata->using--;
741
742   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
743     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
744     SIMIX_action_destroy(task->simdata->compute);
745     simdata->computation_amount = 0.0;
746     simdata->comm = NULL;
747     simdata->compute = NULL;
748     MSG_RETURN(MSG_OK);
749   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
750     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
751     SIMIX_action_destroy(task->simdata->compute);
752     simdata->comm = NULL;
753     simdata->compute = NULL;
754     MSG_RETURN(MSG_HOST_FAILURE);
755   } else {
756     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
757     SIMIX_action_destroy(task->simdata->compute);
758     simdata->comm = NULL;
759     simdata->compute = NULL;
760     MSG_RETURN(MSG_TASK_CANCELLED);
761   }
762
763 }
764
765
766 /** \ingroup msg_gos_functions
767  * \brief Sleep for the specified number of seconds
768  *
769  * Makes the current process sleep until \a time seconds have elapsed.
770  *
771  * \param nb_sec a number of second
772  */
773 MSG_error_t MSG_process_sleep(double nb_sec)
774 {
775   smx_action_t act_sleep;
776   m_process_t proc = MSG_process_self();
777   smx_mutex_t mutex;
778   smx_cond_t cond;
779   /* create action to sleep */
780   act_sleep =
781       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
782                          nb_sec);
783
784   mutex = SIMIX_mutex_init();
785   SIMIX_mutex_lock(mutex);
786   /* create conditional and register action to it */
787   cond = SIMIX_cond_init();
788
789   SIMIX_register_action_to_condition(act_sleep, cond);
790   SIMIX_cond_wait(cond, mutex);
791   SIMIX_unregister_action_to_condition(act_sleep, cond);
792   SIMIX_mutex_unlock(mutex);
793
794   /* remove variables */
795   SIMIX_cond_destroy(cond);
796   SIMIX_mutex_destroy(mutex);
797
798   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
799     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
800       SIMIX_action_destroy(act_sleep);
801       MSG_RETURN(MSG_HOST_FAILURE);
802     }
803   } else {
804     SIMIX_action_destroy(act_sleep);
805     MSG_RETURN(MSG_HOST_FAILURE);
806   }
807
808
809   MSG_RETURN(MSG_OK);
810 }
811
812 /** \ingroup msg_gos_functions
813  * \brief Return the number of MSG tasks currently running on
814  * the host of the current running process.
815  */
816 static int MSG_get_msgload(void)
817 {
818   xbt_die("not implemented yet");
819   return 0;
820 }
821
822 /** \ingroup msg_gos_functions
823  *
824  * \brief Return the last value returned by a MSG function (except
825  * MSG_get_errno...).
826  */
827 MSG_error_t MSG_get_errno(void)
828 {
829   return PROCESS_GET_ERRNO();
830 }