Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
73967cdcf2e846c16626ebf9243c0df92adfcc56
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \defgroup msg_gos_functions MSG Operating System Functions
18  *  \brief This section describes the functions that can be used
19  *  by an agent for handling some task.
20  */
21
22 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
23                                                           m_channel_t
24                                                           channel,
25                                                           double
26                                                           max_duration,
27                                                           m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task\n");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0) {
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     } else
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->s_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   t_simdata->using++;
115   /* create SIMIX action to the communication */
116   t_simdata->comm =
117       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
118                                simdata->s_host,
119                                process->simdata->m_host->simdata->s_host,
120                                t->name, t_simdata->message_size,
121                                t_simdata->rate);
122   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
123   if (MSG_process_is_suspended(t_simdata->sender)) {
124     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
125     SIMIX_action_set_priority(t_simdata->comm, 0);
126   }
127   process->simdata->waiting_task = t;
128   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
129   SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130   process->simdata->waiting_task = NULL;
131
132   /* the task has already finished and the pointer must be null */
133   if (t->simdata->sender) {
134     t->simdata->sender->simdata->waiting_task = NULL;
135     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
136     //t->simdata->comm = NULL;
137     //t->simdata->compute = NULL;
138   }
139   /* for this process, don't need to change in get function */
140   t->simdata->receiver = NULL;
141   SIMIX_mutex_unlock(t_simdata->mutex);
142
143
144   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
145     //t_simdata->comm = NULL;
146     SIMIX_action_destroy(t_simdata->comm);
147     t_simdata->comm = NULL;
148     MSG_RETURN(MSG_OK);
149   } else if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
150     //t_simdata->comm = NULL;
151     SIMIX_action_destroy(t_simdata->comm);
152     t_simdata->comm = NULL;
153     MSG_RETURN(MSG_HOST_FAILURE);
154   } else {
155     //t_simdata->comm = NULL;
156     SIMIX_action_destroy(t_simdata->comm);
157     t_simdata->comm = NULL;
158     MSG_RETURN(MSG_TRANSFER_FAILURE);
159   }
160
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Listen on a channel and wait for receiving a task.
165  *
166  * It takes two parameters.
167  * \param task a memory location for storing a #m_task_t. It will
168    hold a task when this function will return. Thus \a task should not
169    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
170    those two condition does not hold, there will be a warning message.
171  * \param channel the channel on which the agent should be
172    listening. This value has to be >=0 and < than the maximal
173    number of channels fixed with MSG_set_channel_number().
174  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
175  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
176  */
177 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
178 {
179   return MSG_task_get_with_time_out(task, channel, -1);
180 }
181
182 /** \ingroup msg_gos_functions
183  * \brief Listen on a channel and wait for receiving a task with a timeout.
184  *
185  * It takes three parameters.
186  * \param task a memory location for storing a #m_task_t. It will
187    hold a task when this function will return. Thus \a task should not
188    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
189    those two condition does not hold, there will be a warning message.
190  * \param channel the channel on which the agent should be
191    listening. This value has to be >=0 and < than the maximal
192    number of channels fixed with MSG_set_channel_number().
193  * \param max_duration the maximum time to wait for a task before giving
194     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
195     will not be modified and will still be
196     equal to \c NULL when returning. 
197  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
198    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
199  */
200 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
201                                        m_channel_t channel,
202                                        double max_duration)
203 {
204   return __MSG_task_get_with_time_out_from_host(task, channel,
205                                                 max_duration, NULL);
206 }
207
208 /** \ingroup msg_gos_functions
209  * \brief Listen on \a channel and waits for receiving a task from \a host.
210  *
211  * It takes three parameters.
212  * \param task a memory location for storing a #m_task_t. It will
213    hold a task when this function will return. Thus \a task should not
214    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
215    those two condition does not hold, there will be a warning message.
216  * \param channel the channel on which the agent should be
217    listening. This value has to be >=0 and < than the maximal
218    number of channels fixed with MSG_set_channel_number().
219  * \param host the host that is to be watched.
220  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
221    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
222  */
223 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
224                                    m_host_t host)
225 {
226   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
227 }
228
229 /** \ingroup msg_gos_functions
230  * \brief Test whether there is a pending communication on a channel.
231  *
232  * It takes one parameter.
233  * \param channel the channel on which the agent should be
234    listening. This value has to be >=0 and < than the maximal
235    number of channels fixed with MSG_set_channel_number().
236  * \return 1 if there is a pending communication and 0 otherwise
237  */
238 int MSG_task_Iprobe(m_channel_t channel)
239 {
240   m_host_t h = NULL;
241
242   xbt_assert1((channel >= 0)
243               && (channel < msg_global->max_channel), "Invalid channel %d",
244               channel);
245   CHECK_HOST();
246
247   DEBUG2("Probing on channel %d (%s)", channel, h->name);
248
249   h = MSG_host_self();
250   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
251 }
252
253 /** \ingroup msg_gos_functions
254  * \brief Test whether there is a pending communication on a channel, and who sent it.
255  *
256  * It takes one parameter.
257  * \param channel the channel on which the agent should be
258    listening. This value has to be >=0 and < than the maximal
259    number of channels fixed with MSG_set_channel_number().
260  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
261  */
262 int MSG_task_probe_from(m_channel_t channel)
263 {
264   m_host_t h = NULL;
265   xbt_fifo_item_t item;
266   m_task_t t;
267
268   xbt_assert1((channel >= 0)
269               && (channel < msg_global->max_channel), "Invalid channel %d",
270               channel);
271   CHECK_HOST();
272
273   h = MSG_host_self();
274
275   DEBUG2("Probing on channel %d (%s)", channel, h->name);
276
277   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
278   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
279     return -1;
280
281   return MSG_process_get_PID(t->simdata->sender);
282 }
283
284 /** \ingroup msg_gos_functions
285  * \brief Wait for at most \a max_duration second for a task reception
286    on \a channel. *\a PID is updated with the PID of the first process
287    that triggered this event if any.
288  *
289  * It takes three parameters:
290  * \param channel the channel on which the agent should be
291    listening. This value has to be >=0 and < than the maximal.
292    number of channels fixed with MSG_set_channel_number().
293  * \param PID a memory location for storing an int.
294  * \param max_duration the maximum time to wait for a task before
295     giving up. In the case of a reception, *\a PID will be updated
296     with the PID of the first process to send a task.
297  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
298    and #MSG_OK otherwise.
299  */
300 MSG_error_t MSG_channel_select_from(m_channel_t channel,
301                                     double max_duration, int *PID)
302 {
303   m_host_t h = NULL;
304   simdata_host_t h_simdata = NULL;
305   xbt_fifo_item_t item;
306   m_task_t t;
307   int first_time = 1;
308   smx_cond_t cond;
309
310   xbt_assert1((channel >= 0)
311               && (channel < msg_global->max_channel), "Invalid channel %d",
312               channel);
313   if (PID) {
314     *PID = -1;
315   }
316
317   if (max_duration == 0.0) {
318     *PID = MSG_task_probe_from(channel);
319     MSG_RETURN(MSG_OK);
320   } else {
321     CHECK_HOST();
322     h = MSG_host_self();
323     h_simdata = h->simdata;
324
325     DEBUG2("Probing on channel %d (%s)", channel, h->name);
326     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
327       if (max_duration > 0) {
328         if (!first_time) {
329           MSG_RETURN(MSG_OK);
330         }
331       }
332       SIMIX_mutex_lock(h_simdata->mutex);
333       xbt_assert1(!(h_simdata->sleeping[channel]),
334                   "A process is already blocked on this channel %d",
335                   channel);
336       cond = SIMIX_cond_init();
337       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
338       if (max_duration > 0) {
339         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
340       } else {
341         SIMIX_cond_wait(cond, h_simdata->mutex);
342       }
343       SIMIX_cond_destroy(cond);
344       SIMIX_mutex_unlock(h_simdata->mutex);
345       if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
346         MSG_RETURN(MSG_HOST_FAILURE);
347       }
348       h_simdata->sleeping[channel] = NULL;
349       first_time = 0;
350     }
351     if (!item || !(t = xbt_fifo_get_item_content(item))) {
352       MSG_RETURN(MSG_OK);
353     }
354     if (PID) {
355       *PID = MSG_process_get_PID(t->simdata->sender);
356     }
357     MSG_RETURN(MSG_OK);
358   }
359 }
360
361
362 /** \ingroup msg_gos_functions
363
364  * \brief Return the number of tasks waiting to be received on a \a
365    channel and sent by \a host.
366  *
367  * It takes two parameters.
368  * \param channel the channel on which the agent should be
369    listening. This value has to be >=0 and < than the maximal
370    number of channels fixed with MSG_set_channel_number().
371  * \param host the host that is to be watched.
372  * \return the number of tasks waiting to be received on \a channel
373    and sent by \a host.
374  */
375 int MSG_task_probe_from_host(int channel, m_host_t host)
376 {
377   xbt_fifo_item_t item;
378   m_task_t t;
379   int count = 0;
380   m_host_t h = NULL;
381
382   xbt_assert1((channel >= 0)
383               && (channel < msg_global->max_channel), "Invalid channel %d",
384               channel);
385   CHECK_HOST();
386   h = MSG_host_self();
387
388   DEBUG2("Probing on channel %d (%s)", channel, h->name);
389
390   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
391     if (t->simdata->source == host)
392       count++;
393   }
394
395   return count;
396 }
397
398 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
399  * host (with a timeout on the waiting of the destination host) and
400  * waits for the end of the transmission.
401  *
402  * This function is used for describing the behavior of an agent. It
403  * takes four parameter.
404  * \param task a #m_task_t to send on another location. This task
405    will not be usable anymore when the function will return. There is
406    no automatic task duplication and you have to save your parameters
407    before calling this function. Tasks are unique and once it has been
408    sent to another location, you should not access it anymore. You do
409    not need to call MSG_task_destroy() but to avoid using, as an
410    effect of inattention, this task anymore, you definitely should
411    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
412    can be transfered iff it has been correctly created with
413    MSG_task_create().
414  * \param dest the destination of the message
415  * \param channel the channel on which the agent should put this
416    task. This value has to be >=0 and < than the maximal number of
417    channels fixed with MSG_set_channel_number().
418  * \param max_duration the maximum time to wait for a task before giving
419     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
420     will not be modified 
421  * \return #MSG_FATAL if \a task is not properly initialized and
422    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
423    this function was called was shut down. Returns
424    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
425    (network failure, dest failure, timeout...)
426  */
427 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
428                                       m_channel_t channel,
429                                       double max_duration)
430 {
431
432
433   m_process_t process = MSG_process_self();
434   simdata_task_t task_simdata = NULL;
435   m_host_t local_host = NULL;
436   m_host_t remote_host = NULL;
437   CHECK_HOST();
438
439   xbt_assert1((channel >= 0)
440               && (channel < msg_global->max_channel), "Invalid channel %d",
441               channel);
442
443   task_simdata = task->simdata;
444   task_simdata->sender = process;
445   task_simdata->source = MSG_process_get_host(process);
446   xbt_assert0(task_simdata->using == 1,
447               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
448   task_simdata->comm = NULL;
449
450   local_host = ((simdata_process_t) process->simdata)->m_host;
451   remote_host = dest;
452
453   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
454          task->simdata->message_size / 1000, local_host->name,
455          remote_host->name, channel);
456
457   SIMIX_mutex_lock(remote_host->simdata->mutex);
458   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
459                 mbox[channel], task);
460
461
462   if (remote_host->simdata->sleeping[channel]) {
463     DEBUG0("Somebody is listening. Let's wake him up!");
464     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
465   }
466   SIMIX_mutex_unlock(remote_host->simdata->mutex);
467
468   process->simdata->put_host = dest;
469   process->simdata->put_channel = channel;
470   SIMIX_mutex_lock(task->simdata->mutex);
471   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
472
473   process->simdata->waiting_task = task;
474   if (max_duration > 0) {
475     SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
476                             max_duration);
477     /* verify if the timeout happened and the communication didn't started yet */
478     if (task->simdata->comm == NULL) {
479       task->simdata->using--;
480       process->simdata->waiting_task = NULL;
481       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
482                       mbox[channel], task);
483       if (task->simdata->receiver) {
484         task->simdata->receiver->simdata->waiting_task = NULL;
485       }
486       task->simdata->sender = NULL;
487       SIMIX_mutex_unlock(task->simdata->mutex);
488       MSG_RETURN(MSG_TRANSFER_FAILURE);
489     }
490   } else {
491     SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
492   }
493
494   DEBUG1("Action terminated %s", task->name);
495   task->simdata->using--;
496   process->simdata->waiting_task = NULL;
497   /* the task has already finished and the pointer must be null */
498   if (task->simdata->receiver) {
499     task->simdata->receiver->simdata->waiting_task = NULL;
500     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
501     //      task->simdata->comm = NULL;
502     //task->simdata->compute = NULL;
503   }
504   task->simdata->sender = NULL;
505   SIMIX_mutex_unlock(task->simdata->mutex);
506
507   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
508     MSG_RETURN(MSG_OK);
509   } else if (SIMIX_host_get_state(local_host->simdata->s_host) == 0) {
510     MSG_RETURN(MSG_HOST_FAILURE);
511   } else {
512     MSG_RETURN(MSG_TRANSFER_FAILURE);
513   }
514 }
515
516 /** \ingroup msg_gos_functions
517  * \brief Put a task on a channel of an host and waits for the end of the
518  * transmission.
519  *
520  * This function is used for describing the behavior of an agent. It
521  * takes three parameter.
522  * \param task a #m_task_t to send on another location. This task
523    will not be usable anymore when the function will return. There is
524    no automatic task duplication and you have to save your parameters
525    before calling this function. Tasks are unique and once it has been
526    sent to another location, you should not access it anymore. You do
527    not need to call MSG_task_destroy() but to avoid using, as an
528    effect of inattention, this task anymore, you definitely should
529    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
530    can be transfered iff it has been correctly created with
531    MSG_task_create().
532  * \param dest the destination of the message
533  * \param channel the channel on which the agent should put this
534    task. This value has to be >=0 and < than the maximal number of
535    channels fixed with MSG_set_channel_number().
536  * \return #MSG_FATAL if \a task is not properly initialized and
537  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
538  * this function was called was shut down. Returns
539  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
540  * (network failure, dest failure)
541  */
542 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
543 {
544   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
545 }
546
547 /** \ingroup msg_gos_functions
548  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
549  * rate.
550  *
551  * \sa MSG_task_put
552  */
553 MSG_error_t MSG_task_put_bounded(m_task_t task,
554                                  m_host_t dest, m_channel_t channel,
555                                  double max_rate)
556 {
557   MSG_error_t res = MSG_OK;
558   task->simdata->rate = max_rate;
559   res = MSG_task_put(task, dest, channel);
560   return (res);
561 }
562
563 /** \ingroup msg_gos_functions
564  * \brief Executes a task and waits for its termination.
565  *
566  * This function is used for describing the behavior of an agent. It
567  * takes only one parameter.
568  * \param task a #m_task_t to execute on the location on which the
569    agent is running.
570  * \return #MSG_FATAL if \a task is not properly initialized and
571  * #MSG_OK otherwise.
572  */
573 MSG_error_t MSG_task_execute(m_task_t task)
574 {
575   simdata_task_t simdata = NULL;
576   m_process_t self = MSG_process_self();
577   CHECK_HOST();
578
579   simdata = task->simdata;
580   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
581               "This task is executed somewhere else. Go fix your code!");
582
583   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
584   simdata->using++;
585   SIMIX_mutex_lock(simdata->mutex);
586   simdata->compute =
587       SIMIX_action_execute(SIMIX_host_self(), task->name,
588                            simdata->computation_amount);
589   SIMIX_action_set_priority(simdata->compute, simdata->priority);
590
591   self->simdata->waiting_task = task;
592   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
593   SIMIX_cond_wait(simdata->cond, simdata->mutex);
594   self->simdata->waiting_task = NULL;
595
596   SIMIX_mutex_unlock(simdata->mutex);
597   simdata->using--;
598
599   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
600     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
601     SIMIX_action_destroy(task->simdata->compute);
602     simdata->computation_amount = 0.0;
603     simdata->comm = NULL;
604     simdata->compute = NULL;
605     MSG_RETURN(MSG_OK);
606   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
607     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
608     SIMIX_action_destroy(task->simdata->compute);
609     simdata->comm = NULL;
610     simdata->compute = NULL;
611     MSG_RETURN(MSG_HOST_FAILURE);
612   } else {
613     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
614     SIMIX_action_destroy(task->simdata->compute);
615     simdata->comm = NULL;
616     simdata->compute = NULL;
617     MSG_RETURN(MSG_TASK_CANCELLED);
618   }
619 }
620
621
622 /** \ingroup m_task_management
623  * \brief Creates a new #m_task_t (a parallel one....).
624  *
625  * A constructor for #m_task_t taking six arguments and returning the 
626    corresponding object.
627  * \param name a name for the object. It is for user-level information
628    and can be NULL.
629  * \param host_nb the number of hosts implied in the parallel task.
630  * \param host_list an array of \p host_nb m_host_t.
631  * \param computation_amount an array of \p host_nb
632    doubles. computation_amount[i] is the total number of operations
633    that have to be performed on host_list[i].
634  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
635  * \param data a pointer to any data may want to attach to the new
636    object.  It is for user-level information and can be NULL. It can
637    be retrieved with the function \ref MSG_task_get_data.
638  * \see m_task_t
639  * \return The new corresponding object.
640  */
641 m_task_t MSG_parallel_task_create(const char *name,
642                                   int host_nb,
643                                   const m_host_t * host_list,
644                                   double *computation_amount,
645                                   double *communication_amount, void *data)
646 {
647   int i;
648   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
649   m_task_t task = xbt_new0(s_m_task_t, 1);
650   task->simdata = simdata;
651
652   /* Task structure */
653   task->name = xbt_strdup(name);
654   task->data = data;
655
656   /* Simulator Data */
657   simdata->computation_amount = 0;
658   simdata->message_size = 0;
659   simdata->cond = SIMIX_cond_init();
660   simdata->mutex = SIMIX_mutex_init();
661   simdata->compute = NULL;
662   simdata->comm = NULL;
663   simdata->rate = -1.0;
664   simdata->using = 1;
665   simdata->sender = NULL;
666   simdata->receiver = NULL;
667   simdata->source = NULL;
668
669   simdata->host_nb = host_nb;
670   simdata->host_list = xbt_new0(void *, host_nb);
671   simdata->comp_amount = computation_amount;
672   simdata->comm_amount = communication_amount;
673
674   for (i = 0; i < host_nb; i++)
675     simdata->host_list[i] = host_list[i]->simdata->s_host;
676
677   return task;
678
679 }
680
681
682 MSG_error_t MSG_parallel_task_execute(m_task_t task)
683 {
684   simdata_task_t simdata = NULL;
685   m_process_t self = MSG_process_self();
686   CHECK_HOST();
687
688   simdata = task->simdata;
689   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
690               "This task is executed somewhere else. Go fix your code!");
691
692   xbt_assert0(simdata->host_nb,
693               "This is not a parallel task. Go to hell.");
694
695   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
696   simdata->using++;
697   SIMIX_mutex_lock(simdata->mutex);
698   simdata->compute =
699       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
700                                     simdata->host_list,
701                                     simdata->comp_amount,
702                                     simdata->comm_amount, 1.0, -1.0);
703
704   self->simdata->waiting_task = task;
705   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
706   SIMIX_cond_wait(simdata->cond, simdata->mutex);
707   self->simdata->waiting_task = NULL;
708
709
710   SIMIX_mutex_unlock(simdata->mutex);
711   simdata->using--;
712
713   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
714     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
715     SIMIX_action_destroy(task->simdata->compute);
716     simdata->computation_amount = 0.0;
717     simdata->comm = NULL;
718     simdata->compute = NULL;
719     MSG_RETURN(MSG_OK);
720   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
721     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
722     SIMIX_action_destroy(task->simdata->compute);
723     simdata->comm = NULL;
724     simdata->compute = NULL;
725     MSG_RETURN(MSG_HOST_FAILURE);
726   } else {
727     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
728     SIMIX_action_destroy(task->simdata->compute);
729     simdata->comm = NULL;
730     simdata->compute = NULL;
731     MSG_RETURN(MSG_TASK_CANCELLED);
732   }
733
734 }
735
736
737 /** \ingroup msg_gos_functions
738  * \brief Sleep for the specified number of seconds
739  *
740  * Makes the current process sleep until \a time seconds have elapsed.
741  *
742  * \param nb_sec a number of second
743  */
744 MSG_error_t MSG_process_sleep(double nb_sec)
745 {
746   smx_action_t act_sleep;
747   m_process_t proc = MSG_process_self();
748   smx_mutex_t mutex;
749   smx_cond_t cond;
750   /* create action to sleep */
751   act_sleep =
752       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
753                          nb_sec);
754
755   mutex = SIMIX_mutex_init();
756   SIMIX_mutex_lock(mutex);
757   /* create conditional and register action to it */
758   cond = SIMIX_cond_init();
759
760   SIMIX_register_action_to_condition(act_sleep, cond);
761   SIMIX_cond_wait(cond, mutex);
762   SIMIX_mutex_unlock(mutex);
763
764   /* remove variables */
765   SIMIX_cond_destroy(cond);
766   SIMIX_mutex_destroy(mutex);
767
768   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
769     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
770       SIMIX_action_destroy(act_sleep);
771       MSG_RETURN(MSG_HOST_FAILURE);
772     }
773   } else {
774     SIMIX_action_destroy(act_sleep);
775     MSG_RETURN(MSG_HOST_FAILURE);
776   }
777
778
779   MSG_RETURN(MSG_OK);
780 }
781
782 /** \ingroup msg_gos_functions
783  * \brief Return the number of MSG tasks currently running on
784  * the host of the current running process.
785  */
786 static int MSG_get_msgload(void)
787 {
788   xbt_die("not implemented yet");
789   return 0;
790 }
791
792 /** \ingroup msg_gos_functions
793  *
794  * \brief Return the last value returned by a MSG function (except
795  * MSG_get_errno...).
796  */
797 MSG_error_t MSG_get_errno(void)
798 {
799   return PROCESS_GET_ERRNO();
800 }