Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Ooups. How come my code ever worked without changing that ?...
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \defgroup msg_gos_functions MSG Operating System Functions
18  *  \brief This section describes the functions that can be used
19  *  by an agent for handling some task.
20  */
21
22 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
23                                                           m_channel_t
24                                                           channel,
25                                                           double
26                                                           max_duration,
27                                                           m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task\n");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0)
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     else 
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->s_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   /* create SIMIX action to the communication */
115   t_simdata->comm =
116       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
117                                simdata->s_host,
118                                process->simdata->m_host->simdata->s_host,
119                                t->name, t_simdata->message_size,
120                                t_simdata->rate);
121   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
122   if (MSG_process_is_suspended(t_simdata->sender)) {
123     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
124     SIMIX_action_set_priority(t_simdata->comm, 0);
125   }
126   process->simdata->waiting_task = t;
127   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
128         while (1) {
129                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
131                         break;
132         }
133   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
134   process->simdata->waiting_task = NULL;
135
136   /* the task has already finished and the pointer must be null */
137   if (t->simdata->sender) {
138     t->simdata->sender->simdata->waiting_task = NULL;
139     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
140     //t->simdata->comm = NULL;
141     //t->simdata->compute = NULL;
142   }
143   /* for this process, don't need to change in get function */
144   t->simdata->receiver = NULL;
145   SIMIX_mutex_unlock(t_simdata->mutex);
146
147
148   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
149     //t_simdata->comm = NULL;
150     SIMIX_action_destroy(t_simdata->comm);
151     t_simdata->comm = NULL;
152                 t_simdata->using--;
153     MSG_RETURN(MSG_OK);
154   } else if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
155     //t_simdata->comm = NULL;
156     SIMIX_action_destroy(t_simdata->comm);
157     t_simdata->comm = NULL;
158                 t_simdata->using--;
159     MSG_RETURN(MSG_HOST_FAILURE);
160   } else {
161     //t_simdata->comm = NULL;
162     SIMIX_action_destroy(t_simdata->comm);
163     t_simdata->comm = NULL;
164                 t_simdata->using--;
165     MSG_RETURN(MSG_TRANSFER_FAILURE);
166   }
167
168 }
169
170 /** \ingroup msg_gos_functions
171  * \brief Listen on a channel and wait for receiving a task.
172  *
173  * It takes two parameters.
174  * \param task a memory location for storing a #m_task_t. It will
175    hold a task when this function will return. Thus \a task should not
176    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
177    those two condition does not hold, there will be a warning message.
178  * \param channel the channel on which the agent should be
179    listening. This value has to be >=0 and < than the maximal
180    number of channels fixed with MSG_set_channel_number().
181  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
182  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
183  */
184 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
185 {
186   return MSG_task_get_with_time_out(task, channel, -1);
187 }
188
189 /** \ingroup msg_gos_functions
190  * \brief Listen on a channel and wait for receiving a task with a timeout.
191  *
192  * It takes three parameters.
193  * \param task a memory location for storing a #m_task_t. It will
194    hold a task when this function will return. Thus \a task should not
195    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
196    those two condition does not hold, there will be a warning message.
197  * \param channel the channel on which the agent should be
198    listening. This value has to be >=0 and < than the maximal
199    number of channels fixed with MSG_set_channel_number().
200  * \param max_duration the maximum time to wait for a task before giving
201     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
202     will not be modified and will still be
203     equal to \c NULL when returning. 
204  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
205    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
206  */
207 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
208                                        m_channel_t channel,
209                                        double max_duration)
210 {
211   return __MSG_task_get_with_time_out_from_host(task, channel,
212                                                 max_duration, NULL);
213 }
214
215 /** \ingroup msg_gos_functions
216  * \brief Listen on \a channel and waits for receiving a task from \a host.
217  *
218  * It takes three parameters.
219  * \param task a memory location for storing a #m_task_t. It will
220    hold a task when this function will return. Thus \a task should not
221    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
222    those two condition does not hold, there will be a warning message.
223  * \param channel the channel on which the agent should be
224    listening. This value has to be >=0 and < than the maximal
225    number of channels fixed with MSG_set_channel_number().
226  * \param host the host that is to be watched.
227  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
228    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
229  */
230 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
231                                    m_host_t host)
232 {
233   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
234 }
235
236 /** \ingroup msg_gos_functions
237  * \brief Test whether there is a pending communication on a channel.
238  *
239  * It takes one parameter.
240  * \param channel the channel on which the agent should be
241    listening. This value has to be >=0 and < than the maximal
242    number of channels fixed with MSG_set_channel_number().
243  * \return 1 if there is a pending communication and 0 otherwise
244  */
245 int MSG_task_Iprobe(m_channel_t channel)
246 {
247   m_host_t h = NULL;
248
249   xbt_assert1((channel >= 0)
250               && (channel < msg_global->max_channel), "Invalid channel %d",
251               channel);
252   CHECK_HOST();
253
254   DEBUG2("Probing on channel %d (%s)", channel, h->name);
255
256   h = MSG_host_self();
257   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
258 }
259
260 /** \ingroup msg_gos_functions
261  * \brief Test whether there is a pending communication on a channel, and who sent it.
262  *
263  * It takes one parameter.
264  * \param channel the channel on which the agent should be
265    listening. This value has to be >=0 and < than the maximal
266    number of channels fixed with MSG_set_channel_number().
267  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
268  */
269 int MSG_task_probe_from(m_channel_t channel)
270 {
271   m_host_t h = NULL;
272   xbt_fifo_item_t item;
273   m_task_t t;
274
275   xbt_assert1((channel >= 0)
276               && (channel < msg_global->max_channel), "Invalid channel %d",
277               channel);
278   CHECK_HOST();
279
280   h = MSG_host_self();
281
282   DEBUG2("Probing on channel %d (%s)", channel, h->name);
283
284   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
285   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
286     return -1;
287
288   return MSG_process_get_PID(t->simdata->sender);
289 }
290
291 /** \ingroup msg_gos_functions
292  * \brief Wait for at most \a max_duration second for a task reception
293    on \a channel. *\a PID is updated with the PID of the first process
294    that triggered this event if any.
295  *
296  * It takes three parameters:
297  * \param channel the channel on which the agent should be
298    listening. This value has to be >=0 and < than the maximal.
299    number of channels fixed with MSG_set_channel_number().
300  * \param PID a memory location for storing an int.
301  * \param max_duration the maximum time to wait for a task before
302     giving up. In the case of a reception, *\a PID will be updated
303     with the PID of the first process to send a task.
304  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
305    and #MSG_OK otherwise.
306  */
307 MSG_error_t MSG_channel_select_from(m_channel_t channel,
308                                     double max_duration, int *PID)
309 {
310   m_host_t h = NULL;
311   simdata_host_t h_simdata = NULL;
312   xbt_fifo_item_t item;
313   m_task_t t;
314   int first_time = 1;
315   smx_cond_t cond;
316
317   xbt_assert1((channel >= 0)
318               && (channel < msg_global->max_channel), "Invalid channel %d",
319               channel);
320   if (PID) {
321     *PID = -1;
322   }
323
324   if (max_duration == 0.0) {
325     *PID = MSG_task_probe_from(channel);
326     MSG_RETURN(MSG_OK);
327   } else {
328     CHECK_HOST();
329     h = MSG_host_self();
330     h_simdata = h->simdata;
331
332     DEBUG2("Probing on channel %d (%s)", channel, h->name);
333     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
334       if (max_duration > 0) {
335         if (!first_time) {
336           MSG_RETURN(MSG_OK);
337         }
338       }
339       SIMIX_mutex_lock(h_simdata->mutex);
340       xbt_assert1(!(h_simdata->sleeping[channel]),
341                   "A process is already blocked on this channel %d",
342                   channel);
343       cond = SIMIX_cond_init();
344       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
345       if (max_duration > 0) {
346         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
347       } else {
348         SIMIX_cond_wait(cond, h_simdata->mutex);
349       }
350       SIMIX_cond_destroy(cond);
351       SIMIX_mutex_unlock(h_simdata->mutex);
352       if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
353         MSG_RETURN(MSG_HOST_FAILURE);
354       }
355       h_simdata->sleeping[channel] = NULL;
356       first_time = 0;
357     }
358     if (!item || !(t = xbt_fifo_get_item_content(item))) {
359       MSG_RETURN(MSG_OK);
360     }
361     if (PID) {
362       *PID = MSG_process_get_PID(t->simdata->sender);
363     }
364     MSG_RETURN(MSG_OK);
365   }
366 }
367
368
369 /** \ingroup msg_gos_functions
370
371  * \brief Return the number of tasks waiting to be received on a \a
372    channel and sent by \a host.
373  *
374  * It takes two parameters.
375  * \param channel the channel on which the agent should be
376    listening. This value has to be >=0 and < than the maximal
377    number of channels fixed with MSG_set_channel_number().
378  * \param host the host that is to be watched.
379  * \return the number of tasks waiting to be received on \a channel
380    and sent by \a host.
381  */
382 int MSG_task_probe_from_host(int channel, m_host_t host)
383 {
384   xbt_fifo_item_t item;
385   m_task_t t;
386   int count = 0;
387   m_host_t h = NULL;
388
389   xbt_assert1((channel >= 0)
390               && (channel < msg_global->max_channel), "Invalid channel %d",
391               channel);
392   CHECK_HOST();
393   h = MSG_host_self();
394
395   DEBUG2("Probing on channel %d (%s)", channel, h->name);
396
397   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
398     if (t->simdata->source == host)
399       count++;
400   }
401
402   return count;
403 }
404
405 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
406  * host (with a timeout on the waiting of the destination host) and
407  * waits for the end of the transmission.
408  *
409  * This function is used for describing the behavior of an agent. It
410  * takes four parameter.
411  * \param task a #m_task_t to send on another location. This task
412    will not be usable anymore when the function will return. There is
413    no automatic task duplication and you have to save your parameters
414    before calling this function. Tasks are unique and once it has been
415    sent to another location, you should not access it anymore. You do
416    not need to call MSG_task_destroy() but to avoid using, as an
417    effect of inattention, this task anymore, you definitely should
418    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
419    can be transfered iff it has been correctly created with
420    MSG_task_create().
421  * \param dest the destination of the message
422  * \param channel the channel on which the agent should put this
423    task. This value has to be >=0 and < than the maximal number of
424    channels fixed with MSG_set_channel_number().
425  * \param max_duration the maximum time to wait for a task before giving
426     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
427     will not be modified 
428  * \return #MSG_FATAL if \a task is not properly initialized and
429    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
430    this function was called was shut down. Returns
431    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
432    (network failure, dest failure, timeout...)
433  */
434 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
435                                       m_channel_t channel,
436                                       double max_duration)
437 {
438
439
440   m_process_t process = MSG_process_self();
441   simdata_task_t task_simdata = NULL;
442   m_host_t local_host = NULL;
443   m_host_t remote_host = NULL;
444   CHECK_HOST();
445
446   xbt_assert1((channel >= 0)
447               && (channel < msg_global->max_channel), "Invalid channel %d",
448               channel);
449
450   task_simdata = task->simdata;
451   task_simdata->sender = process;
452   task_simdata->source = MSG_process_get_host(process);
453   xbt_assert0(task_simdata->using == 1,
454               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
455   task_simdata->comm = NULL;
456
457         task_simdata->using++;
458   local_host = ((simdata_process_t) process->simdata)->m_host;
459   remote_host = dest;
460
461   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
462          task->simdata->message_size / 1000, local_host->name,
463          remote_host->name, channel);
464
465   SIMIX_mutex_lock(remote_host->simdata->mutex);
466   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
467                 mbox[channel], task);
468
469
470   if (remote_host->simdata->sleeping[channel]) {
471     DEBUG0("Somebody is listening. Let's wake him up!");
472     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
473   }
474   SIMIX_mutex_unlock(remote_host->simdata->mutex);
475
476   process->simdata->put_host = dest;
477   process->simdata->put_channel = channel;
478   SIMIX_mutex_lock(task->simdata->mutex);
479   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
480
481   process->simdata->waiting_task = task;
482   if (max_duration > 0) {
483     xbt_ex_t e;
484                 double time;
485                 double time_elapsed;
486                 time = SIMIX_get_clock();
487     TRY {
488                 /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
489                         while (1) {
490                         time_elapsed = SIMIX_get_clock() - time;
491                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
492                                                 max_duration-time_elapsed);
493                                 if ((task->simdata->comm != NULL) &&
494                                                 (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
495                                         break;
496                         }
497     } CATCH(e) {
498       if(e.category==timeout_error) {
499         xbt_ex_free(e);
500         /* verify if the timeout happened and the communication didn't started yet */
501         if (task->simdata->comm == NULL) {
502           process->simdata->waiting_task = NULL;
503           xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
504                           mbox[channel], task);
505           if (task->simdata->receiver) {
506             task->simdata->receiver->simdata->waiting_task = NULL;
507           }
508           task->simdata->sender = NULL;
509           SIMIX_mutex_unlock(task->simdata->mutex);
510           MSG_RETURN(MSG_TRANSFER_FAILURE);
511         }
512       } else {
513         RETHROW;
514       }
515     }
516   } else {
517                 while (1) {
518                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
519                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
520                                 break;
521                 }
522   }
523
524   DEBUG1("Action terminated %s", task->name);
525   process->simdata->waiting_task = NULL;
526   /* the task has already finished and the pointer must be null */
527   if (task->simdata->receiver) {
528     task->simdata->receiver->simdata->waiting_task = NULL;
529     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
530     //      task->simdata->comm = NULL;
531     //task->simdata->compute = NULL;
532   }
533   task->simdata->sender = NULL;
534   SIMIX_mutex_unlock(task->simdata->mutex);
535
536   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
537     MSG_RETURN(MSG_OK);
538   } else if (SIMIX_host_get_state(local_host->simdata->s_host) == 0) {
539     MSG_RETURN(MSG_HOST_FAILURE);
540   } else {
541     MSG_RETURN(MSG_TRANSFER_FAILURE);
542   }
543 }
544
545 /** \ingroup msg_gos_functions
546  * \brief Put a task on a channel of an host and waits for the end of the
547  * transmission.
548  *
549  * This function is used for describing the behavior of an agent. It
550  * takes three parameter.
551  * \param task a #m_task_t to send on another location. This task
552    will not be usable anymore when the function will return. There is
553    no automatic task duplication and you have to save your parameters
554    before calling this function. Tasks are unique and once it has been
555    sent to another location, you should not access it anymore. You do
556    not need to call MSG_task_destroy() but to avoid using, as an
557    effect of inattention, this task anymore, you definitely should
558    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
559    can be transfered iff it has been correctly created with
560    MSG_task_create().
561  * \param dest the destination of the message
562  * \param channel the channel on which the agent should put this
563    task. This value has to be >=0 and < than the maximal number of
564    channels fixed with MSG_set_channel_number().
565  * \return #MSG_FATAL if \a task is not properly initialized and
566  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
567  * this function was called was shut down. Returns
568  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
569  * (network failure, dest failure)
570  */
571 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
572 {
573   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
574 }
575
576 /** \ingroup msg_gos_functions
577  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
578  * rate.
579  *
580  * \sa MSG_task_put
581  */
582 MSG_error_t MSG_task_put_bounded(m_task_t task,
583                                  m_host_t dest, m_channel_t channel,
584                                  double max_rate)
585 {
586   MSG_error_t res = MSG_OK;
587   task->simdata->rate = max_rate;
588   res = MSG_task_put(task, dest, channel);
589   return (res);
590 }
591
592 /** \ingroup msg_gos_functions
593  * \brief Executes a task and waits for its termination.
594  *
595  * This function is used for describing the behavior of an agent. It
596  * takes only one parameter.
597  * \param task a #m_task_t to execute on the location on which the
598    agent is running.
599  * \return #MSG_FATAL if \a task is not properly initialized and
600  * #MSG_OK otherwise.
601  */
602 MSG_error_t MSG_task_execute(m_task_t task)
603 {
604   simdata_task_t simdata = NULL;
605   m_process_t self = MSG_process_self();
606   CHECK_HOST();
607
608   simdata = task->simdata;
609   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
610               "This task is executed somewhere else. Go fix your code!");
611
612   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
613   simdata->using++;
614   SIMIX_mutex_lock(simdata->mutex);
615   simdata->compute =
616       SIMIX_action_execute(SIMIX_host_self(), task->name,
617                            simdata->computation_amount);
618   SIMIX_action_set_priority(simdata->compute, simdata->priority);
619
620   self->simdata->waiting_task = task;
621   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
622   SIMIX_cond_wait(simdata->cond, simdata->mutex);
623   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
624   self->simdata->waiting_task = NULL;
625
626   SIMIX_mutex_unlock(simdata->mutex);
627   simdata->using--;
628
629   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
630     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
631     SIMIX_action_destroy(task->simdata->compute);
632     simdata->computation_amount = 0.0;
633     simdata->comm = NULL;
634     simdata->compute = NULL;
635     MSG_RETURN(MSG_OK);
636   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
637     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
638     SIMIX_action_destroy(task->simdata->compute);
639     simdata->comm = NULL;
640     simdata->compute = NULL;
641     MSG_RETURN(MSG_HOST_FAILURE);
642   } else {
643     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
644     SIMIX_action_destroy(task->simdata->compute);
645     simdata->comm = NULL;
646     simdata->compute = NULL;
647     MSG_RETURN(MSG_TASK_CANCELLED);
648   }
649 }
650
651
652 /** \ingroup m_task_management
653  * \brief Creates a new #m_task_t (a parallel one....).
654  *
655  * A constructor for #m_task_t taking six arguments and returning the 
656    corresponding object.
657  * \param name a name for the object. It is for user-level information
658    and can be NULL.
659  * \param host_nb the number of hosts implied in the parallel task.
660  * \param host_list an array of \p host_nb m_host_t.
661  * \param computation_amount an array of \p host_nb
662    doubles. computation_amount[i] is the total number of operations
663    that have to be performed on host_list[i].
664  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
665  * \param data a pointer to any data may want to attach to the new
666    object.  It is for user-level information and can be NULL. It can
667    be retrieved with the function \ref MSG_task_get_data.
668  * \see m_task_t
669  * \return The new corresponding object.
670  */
671 m_task_t MSG_parallel_task_create(const char *name,
672                                   int host_nb,
673                                   const m_host_t * host_list,
674                                   double *computation_amount,
675                                   double *communication_amount, void *data)
676 {
677   int i;
678   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
679   m_task_t task = xbt_new0(s_m_task_t, 1);
680   task->simdata = simdata;
681
682   /* Task structure */
683   task->name = xbt_strdup(name);
684   task->data = data;
685
686   /* Simulator Data */
687   simdata->computation_amount = 0;
688   simdata->message_size = 0;
689   simdata->cond = SIMIX_cond_init();
690   simdata->mutex = SIMIX_mutex_init();
691   simdata->compute = NULL;
692   simdata->comm = NULL;
693   simdata->rate = -1.0;
694   simdata->using = 1;
695   simdata->sender = NULL;
696   simdata->receiver = NULL;
697   simdata->source = NULL;
698
699   simdata->host_nb = host_nb;
700   simdata->host_list = xbt_new0(smx_host_t, host_nb);
701   simdata->comp_amount = computation_amount;
702   simdata->comm_amount = communication_amount;
703
704   for (i = 0; i < host_nb; i++)
705     simdata->host_list[i] = host_list[i]->simdata->s_host;
706
707   return task;
708
709 }
710
711
712 MSG_error_t MSG_parallel_task_execute(m_task_t task)
713 {
714   simdata_task_t simdata = NULL;
715   m_process_t self = MSG_process_self();
716   CHECK_HOST();
717
718   simdata = task->simdata;
719   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
720               "This task is executed somewhere else. Go fix your code!");
721
722   xbt_assert0(simdata->host_nb,
723               "This is not a parallel task. Go to hell.");
724
725   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
726   simdata->using++;
727   SIMIX_mutex_lock(simdata->mutex);
728   simdata->compute =
729       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
730                                     simdata->host_list,
731                                     simdata->comp_amount,
732                                     simdata->comm_amount, 1.0, -1.0);
733
734   self->simdata->waiting_task = task;
735   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
736   SIMIX_cond_wait(simdata->cond, simdata->mutex);
737   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
738   self->simdata->waiting_task = NULL;
739
740
741   SIMIX_mutex_unlock(simdata->mutex);
742   simdata->using--;
743
744   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
745     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
746     SIMIX_action_destroy(task->simdata->compute);
747     simdata->computation_amount = 0.0;
748     simdata->comm = NULL;
749     simdata->compute = NULL;
750     MSG_RETURN(MSG_OK);
751   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
752     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
753     SIMIX_action_destroy(task->simdata->compute);
754     simdata->comm = NULL;
755     simdata->compute = NULL;
756     MSG_RETURN(MSG_HOST_FAILURE);
757   } else {
758     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
759     SIMIX_action_destroy(task->simdata->compute);
760     simdata->comm = NULL;
761     simdata->compute = NULL;
762     MSG_RETURN(MSG_TASK_CANCELLED);
763   }
764
765 }
766
767
768 /** \ingroup msg_gos_functions
769  * \brief Sleep for the specified number of seconds
770  *
771  * Makes the current process sleep until \a time seconds have elapsed.
772  *
773  * \param nb_sec a number of second
774  */
775 MSG_error_t MSG_process_sleep(double nb_sec)
776 {
777   smx_action_t act_sleep;
778   m_process_t proc = MSG_process_self();
779   smx_mutex_t mutex;
780   smx_cond_t cond;
781   /* create action to sleep */
782   act_sleep =
783       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
784                          nb_sec);
785
786   mutex = SIMIX_mutex_init();
787   SIMIX_mutex_lock(mutex);
788   /* create conditional and register action to it */
789   cond = SIMIX_cond_init();
790
791   SIMIX_register_action_to_condition(act_sleep, cond);
792   SIMIX_cond_wait(cond, mutex);
793   SIMIX_unregister_action_to_condition(act_sleep, cond);
794   SIMIX_mutex_unlock(mutex);
795
796   /* remove variables */
797   SIMIX_cond_destroy(cond);
798   SIMIX_mutex_destroy(mutex);
799
800   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
801     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
802       SIMIX_action_destroy(act_sleep);
803       MSG_RETURN(MSG_HOST_FAILURE);
804     }
805   } else {
806     SIMIX_action_destroy(act_sleep);
807     MSG_RETURN(MSG_HOST_FAILURE);
808   }
809
810
811   MSG_RETURN(MSG_OK);
812 }
813
814 /** \ingroup msg_gos_functions
815  * \brief Return the number of MSG tasks currently running on
816  * the host of the current running process.
817  */
818 static int MSG_get_msgload(void)
819 {
820   xbt_die("not implemented yet");
821   return 0;
822 }
823
824 /** \ingroup msg_gos_functions
825  *
826  * \brief Return the last value returned by a MSG function (except
827  * MSG_get_errno...).
828  */
829 MSG_error_t MSG_get_errno(void)
830 {
831   return PROCESS_GET_ERRNO();
832 }