Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
handle 0-size tasks.
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \defgroup msg_gos_functions MSG Operating System Functions
18  *  \brief This section describes the functions that can be used
19  *  by an agent for handling some task.
20  */
21
22 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
23                                                           m_channel_t
24                                                           channel,
25                                                           double
26                                                           max_duration,
27                                                           m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task\n");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0)
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     else 
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->s_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   t_simdata->using++;
115   /* create SIMIX action to the communication */
116   t_simdata->comm =
117       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
118                                simdata->s_host,
119                                process->simdata->m_host->simdata->s_host,
120                                t->name, t_simdata->message_size,
121                                t_simdata->rate);
122   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
123   if (MSG_process_is_suspended(t_simdata->sender)) {
124     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
125     SIMIX_action_set_priority(t_simdata->comm, 0);
126   }
127   process->simdata->waiting_task = t;
128   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
129   SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
131   process->simdata->waiting_task = NULL;
132
133   /* the task has already finished and the pointer must be null */
134   if (t->simdata->sender) {
135     t->simdata->sender->simdata->waiting_task = NULL;
136     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
137     //t->simdata->comm = NULL;
138     //t->simdata->compute = NULL;
139   }
140   /* for this process, don't need to change in get function */
141   t->simdata->receiver = NULL;
142   SIMIX_mutex_unlock(t_simdata->mutex);
143
144
145   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
146     //t_simdata->comm = NULL;
147     SIMIX_action_destroy(t_simdata->comm);
148     t_simdata->comm = NULL;
149     MSG_RETURN(MSG_OK);
150   } else if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
151     //t_simdata->comm = NULL;
152     SIMIX_action_destroy(t_simdata->comm);
153     t_simdata->comm = NULL;
154     MSG_RETURN(MSG_HOST_FAILURE);
155   } else {
156     //t_simdata->comm = NULL;
157     SIMIX_action_destroy(t_simdata->comm);
158     t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Listen on a channel and wait for receiving a task.
166  *
167  * It takes two parameters.
168  * \param task a memory location for storing a #m_task_t. It will
169    hold a task when this function will return. Thus \a task should not
170    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
171    those two condition does not hold, there will be a warning message.
172  * \param channel the channel on which the agent should be
173    listening. This value has to be >=0 and < than the maximal
174    number of channels fixed with MSG_set_channel_number().
175  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
176  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
177  */
178 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel,
206                                                 max_duration, NULL);
207 }
208
209 /** \ingroup msg_gos_functions
210  * \brief Listen on \a channel and waits for receiving a task from \a host.
211  *
212  * It takes three parameters.
213  * \param task a memory location for storing a #m_task_t. It will
214    hold a task when this function will return. Thus \a task should not
215    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
216    those two condition does not hold, there will be a warning message.
217  * \param channel the channel on which the agent should be
218    listening. This value has to be >=0 and < than the maximal
219    number of channels fixed with MSG_set_channel_number().
220  * \param host the host that is to be watched.
221  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
222    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
223  */
224 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
225                                    m_host_t host)
226 {
227   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
228 }
229
230 /** \ingroup msg_gos_functions
231  * \brief Test whether there is a pending communication on a channel.
232  *
233  * It takes one parameter.
234  * \param channel the channel on which the agent should be
235    listening. This value has to be >=0 and < than the maximal
236    number of channels fixed with MSG_set_channel_number().
237  * \return 1 if there is a pending communication and 0 otherwise
238  */
239 int MSG_task_Iprobe(m_channel_t channel)
240 {
241   m_host_t h = NULL;
242
243   xbt_assert1((channel >= 0)
244               && (channel < msg_global->max_channel), "Invalid channel %d",
245               channel);
246   CHECK_HOST();
247
248   DEBUG2("Probing on channel %d (%s)", channel, h->name);
249
250   h = MSG_host_self();
251   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
252 }
253
254 /** \ingroup msg_gos_functions
255  * \brief Test whether there is a pending communication on a channel, and who sent it.
256  *
257  * It takes one parameter.
258  * \param channel the channel on which the agent should be
259    listening. This value has to be >=0 and < than the maximal
260    number of channels fixed with MSG_set_channel_number().
261  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
262  */
263 int MSG_task_probe_from(m_channel_t channel)
264 {
265   m_host_t h = NULL;
266   xbt_fifo_item_t item;
267   m_task_t t;
268
269   xbt_assert1((channel >= 0)
270               && (channel < msg_global->max_channel), "Invalid channel %d",
271               channel);
272   CHECK_HOST();
273
274   h = MSG_host_self();
275
276   DEBUG2("Probing on channel %d (%s)", channel, h->name);
277
278   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
279   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
280     return -1;
281
282   return MSG_process_get_PID(t->simdata->sender);
283 }
284
285 /** \ingroup msg_gos_functions
286  * \brief Wait for at most \a max_duration second for a task reception
287    on \a channel. *\a PID is updated with the PID of the first process
288    that triggered this event if any.
289  *
290  * It takes three parameters:
291  * \param channel the channel on which the agent should be
292    listening. This value has to be >=0 and < than the maximal.
293    number of channels fixed with MSG_set_channel_number().
294  * \param PID a memory location for storing an int.
295  * \param max_duration the maximum time to wait for a task before
296     giving up. In the case of a reception, *\a PID will be updated
297     with the PID of the first process to send a task.
298  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
299    and #MSG_OK otherwise.
300  */
301 MSG_error_t MSG_channel_select_from(m_channel_t channel,
302                                     double max_duration, int *PID)
303 {
304   m_host_t h = NULL;
305   simdata_host_t h_simdata = NULL;
306   xbt_fifo_item_t item;
307   m_task_t t;
308   int first_time = 1;
309   smx_cond_t cond;
310
311   xbt_assert1((channel >= 0)
312               && (channel < msg_global->max_channel), "Invalid channel %d",
313               channel);
314   if (PID) {
315     *PID = -1;
316   }
317
318   if (max_duration == 0.0) {
319     *PID = MSG_task_probe_from(channel);
320     MSG_RETURN(MSG_OK);
321   } else {
322     CHECK_HOST();
323     h = MSG_host_self();
324     h_simdata = h->simdata;
325
326     DEBUG2("Probing on channel %d (%s)", channel, h->name);
327     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
328       if (max_duration > 0) {
329         if (!first_time) {
330           MSG_RETURN(MSG_OK);
331         }
332       }
333       SIMIX_mutex_lock(h_simdata->mutex);
334       xbt_assert1(!(h_simdata->sleeping[channel]),
335                   "A process is already blocked on this channel %d",
336                   channel);
337       cond = SIMIX_cond_init();
338       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
339       if (max_duration > 0) {
340         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
341       } else {
342         SIMIX_cond_wait(cond, h_simdata->mutex);
343       }
344       SIMIX_cond_destroy(cond);
345       SIMIX_mutex_unlock(h_simdata->mutex);
346       if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
347         MSG_RETURN(MSG_HOST_FAILURE);
348       }
349       h_simdata->sleeping[channel] = NULL;
350       first_time = 0;
351     }
352     if (!item || !(t = xbt_fifo_get_item_content(item))) {
353       MSG_RETURN(MSG_OK);
354     }
355     if (PID) {
356       *PID = MSG_process_get_PID(t->simdata->sender);
357     }
358     MSG_RETURN(MSG_OK);
359   }
360 }
361
362
363 /** \ingroup msg_gos_functions
364
365  * \brief Return the number of tasks waiting to be received on a \a
366    channel and sent by \a host.
367  *
368  * It takes two parameters.
369  * \param channel the channel on which the agent should be
370    listening. This value has to be >=0 and < than the maximal
371    number of channels fixed with MSG_set_channel_number().
372  * \param host the host that is to be watched.
373  * \return the number of tasks waiting to be received on \a channel
374    and sent by \a host.
375  */
376 int MSG_task_probe_from_host(int channel, m_host_t host)
377 {
378   xbt_fifo_item_t item;
379   m_task_t t;
380   int count = 0;
381   m_host_t h = NULL;
382
383   xbt_assert1((channel >= 0)
384               && (channel < msg_global->max_channel), "Invalid channel %d",
385               channel);
386   CHECK_HOST();
387   h = MSG_host_self();
388
389   DEBUG2("Probing on channel %d (%s)", channel, h->name);
390
391   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
392     if (t->simdata->source == host)
393       count++;
394   }
395
396   return count;
397 }
398
399 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
400  * host (with a timeout on the waiting of the destination host) and
401  * waits for the end of the transmission.
402  *
403  * This function is used for describing the behavior of an agent. It
404  * takes four parameter.
405  * \param task a #m_task_t to send on another location. This task
406    will not be usable anymore when the function will return. There is
407    no automatic task duplication and you have to save your parameters
408    before calling this function. Tasks are unique and once it has been
409    sent to another location, you should not access it anymore. You do
410    not need to call MSG_task_destroy() but to avoid using, as an
411    effect of inattention, this task anymore, you definitely should
412    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
413    can be transfered iff it has been correctly created with
414    MSG_task_create().
415  * \param dest the destination of the message
416  * \param channel the channel on which the agent should put this
417    task. This value has to be >=0 and < than the maximal number of
418    channels fixed with MSG_set_channel_number().
419  * \param max_duration the maximum time to wait for a task before giving
420     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
421     will not be modified 
422  * \return #MSG_FATAL if \a task is not properly initialized and
423    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
424    this function was called was shut down. Returns
425    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
426    (network failure, dest failure, timeout...)
427  */
428 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
429                                       m_channel_t channel,
430                                       double max_duration)
431 {
432
433
434   m_process_t process = MSG_process_self();
435   simdata_task_t task_simdata = NULL;
436   m_host_t local_host = NULL;
437   m_host_t remote_host = NULL;
438   CHECK_HOST();
439
440   xbt_assert1((channel >= 0)
441               && (channel < msg_global->max_channel), "Invalid channel %d",
442               channel);
443
444   task_simdata = task->simdata;
445   task_simdata->sender = process;
446   task_simdata->source = MSG_process_get_host(process);
447   xbt_assert0(task_simdata->using == 1,
448               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
449   task_simdata->comm = NULL;
450
451   local_host = ((simdata_process_t) process->simdata)->m_host;
452   remote_host = dest;
453
454   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
455          task->simdata->message_size / 1000, local_host->name,
456          remote_host->name, channel);
457
458   SIMIX_mutex_lock(remote_host->simdata->mutex);
459   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
460                 mbox[channel], task);
461
462
463   if (remote_host->simdata->sleeping[channel]) {
464     DEBUG0("Somebody is listening. Let's wake him up!");
465     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
466   }
467   SIMIX_mutex_unlock(remote_host->simdata->mutex);
468
469   process->simdata->put_host = dest;
470   process->simdata->put_channel = channel;
471   SIMIX_mutex_lock(task->simdata->mutex);
472   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
473
474   process->simdata->waiting_task = task;
475   if (max_duration > 0) {
476     xbt_ex_t e;
477     TRY {
478       SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
479                               max_duration);
480     } CATCH(e) {
481       if(e.category==timeout_error) {
482         xbt_ex_free(e);
483         /* verify if the timeout happened and the communication didn't started yet */
484         if (task->simdata->comm == NULL) {
485           task->simdata->using--;
486           process->simdata->waiting_task = NULL;
487           xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
488                           mbox[channel], task);
489           if (task->simdata->receiver) {
490             task->simdata->receiver->simdata->waiting_task = NULL;
491           }
492           task->simdata->sender = NULL;
493           SIMIX_mutex_unlock(task->simdata->mutex);
494           MSG_RETURN(MSG_TRANSFER_FAILURE);
495         }
496       } else {
497         RETHROW;
498       }
499     }
500   } else {
501     SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
502   }
503
504   DEBUG1("Action terminated %s", task->name);
505   task->simdata->using--;
506   process->simdata->waiting_task = NULL;
507   /* the task has already finished and the pointer must be null */
508   if (task->simdata->receiver) {
509     task->simdata->receiver->simdata->waiting_task = NULL;
510     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
511     //      task->simdata->comm = NULL;
512     //task->simdata->compute = NULL;
513   }
514   task->simdata->sender = NULL;
515   SIMIX_mutex_unlock(task->simdata->mutex);
516
517   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
518     MSG_RETURN(MSG_OK);
519   } else if (SIMIX_host_get_state(local_host->simdata->s_host) == 0) {
520     MSG_RETURN(MSG_HOST_FAILURE);
521   } else {
522     MSG_RETURN(MSG_TRANSFER_FAILURE);
523   }
524 }
525
526 /** \ingroup msg_gos_functions
527  * \brief Put a task on a channel of an host and waits for the end of the
528  * transmission.
529  *
530  * This function is used for describing the behavior of an agent. It
531  * takes three parameter.
532  * \param task a #m_task_t to send on another location. This task
533    will not be usable anymore when the function will return. There is
534    no automatic task duplication and you have to save your parameters
535    before calling this function. Tasks are unique and once it has been
536    sent to another location, you should not access it anymore. You do
537    not need to call MSG_task_destroy() but to avoid using, as an
538    effect of inattention, this task anymore, you definitely should
539    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
540    can be transfered iff it has been correctly created with
541    MSG_task_create().
542  * \param dest the destination of the message
543  * \param channel the channel on which the agent should put this
544    task. This value has to be >=0 and < than the maximal number of
545    channels fixed with MSG_set_channel_number().
546  * \return #MSG_FATAL if \a task is not properly initialized and
547  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
548  * this function was called was shut down. Returns
549  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
550  * (network failure, dest failure)
551  */
552 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
553 {
554   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
555 }
556
557 /** \ingroup msg_gos_functions
558  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
559  * rate.
560  *
561  * \sa MSG_task_put
562  */
563 MSG_error_t MSG_task_put_bounded(m_task_t task,
564                                  m_host_t dest, m_channel_t channel,
565                                  double max_rate)
566 {
567   MSG_error_t res = MSG_OK;
568   task->simdata->rate = max_rate;
569   res = MSG_task_put(task, dest, channel);
570   return (res);
571 }
572
573 /** \ingroup msg_gos_functions
574  * \brief Executes a task and waits for its termination.
575  *
576  * This function is used for describing the behavior of an agent. It
577  * takes only one parameter.
578  * \param task a #m_task_t to execute on the location on which the
579    agent is running.
580  * \return #MSG_FATAL if \a task is not properly initialized and
581  * #MSG_OK otherwise.
582  */
583 MSG_error_t MSG_task_execute(m_task_t task)
584 {
585   simdata_task_t simdata = NULL;
586   m_process_t self = MSG_process_self();
587   CHECK_HOST();
588
589   simdata = task->simdata;
590   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
591               "This task is executed somewhere else. Go fix your code!");
592
593   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
594   simdata->using++;
595   SIMIX_mutex_lock(simdata->mutex);
596   simdata->compute =
597       SIMIX_action_execute(SIMIX_host_self(), task->name,
598                            simdata->computation_amount);
599   SIMIX_action_set_priority(simdata->compute, simdata->priority);
600
601   self->simdata->waiting_task = task;
602   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
603   SIMIX_cond_wait(simdata->cond, simdata->mutex);
604   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
605   self->simdata->waiting_task = NULL;
606
607   SIMIX_mutex_unlock(simdata->mutex);
608   simdata->using--;
609
610   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
611     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
612     SIMIX_action_destroy(task->simdata->compute);
613     simdata->computation_amount = 0.0;
614     simdata->comm = NULL;
615     simdata->compute = NULL;
616     MSG_RETURN(MSG_OK);
617   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
618     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
619     SIMIX_action_destroy(task->simdata->compute);
620     simdata->comm = NULL;
621     simdata->compute = NULL;
622     MSG_RETURN(MSG_HOST_FAILURE);
623   } else {
624     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
625     SIMIX_action_destroy(task->simdata->compute);
626     simdata->comm = NULL;
627     simdata->compute = NULL;
628     MSG_RETURN(MSG_TASK_CANCELLED);
629   }
630 }
631
632
633 /** \ingroup m_task_management
634  * \brief Creates a new #m_task_t (a parallel one....).
635  *
636  * A constructor for #m_task_t taking six arguments and returning the 
637    corresponding object.
638  * \param name a name for the object. It is for user-level information
639    and can be NULL.
640  * \param host_nb the number of hosts implied in the parallel task.
641  * \param host_list an array of \p host_nb m_host_t.
642  * \param computation_amount an array of \p host_nb
643    doubles. computation_amount[i] is the total number of operations
644    that have to be performed on host_list[i].
645  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
646  * \param data a pointer to any data may want to attach to the new
647    object.  It is for user-level information and can be NULL. It can
648    be retrieved with the function \ref MSG_task_get_data.
649  * \see m_task_t
650  * \return The new corresponding object.
651  */
652 m_task_t MSG_parallel_task_create(const char *name,
653                                   int host_nb,
654                                   const m_host_t * host_list,
655                                   double *computation_amount,
656                                   double *communication_amount, void *data)
657 {
658   int i;
659   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
660   m_task_t task = xbt_new0(s_m_task_t, 1);
661   task->simdata = simdata;
662
663   /* Task structure */
664   task->name = xbt_strdup(name);
665   task->data = data;
666
667   /* Simulator Data */
668   simdata->computation_amount = 0;
669   simdata->message_size = 0;
670   simdata->cond = SIMIX_cond_init();
671   simdata->mutex = SIMIX_mutex_init();
672   simdata->compute = NULL;
673   simdata->comm = NULL;
674   simdata->rate = -1.0;
675   simdata->using = 1;
676   simdata->sender = NULL;
677   simdata->receiver = NULL;
678   simdata->source = NULL;
679
680   simdata->host_nb = host_nb;
681   simdata->host_list = xbt_new0(void *, host_nb);
682   simdata->comp_amount = computation_amount;
683   simdata->comm_amount = communication_amount;
684
685   for (i = 0; i < host_nb; i++)
686     simdata->host_list[i] = host_list[i]->simdata->s_host;
687
688   return task;
689
690 }
691
692
693 MSG_error_t MSG_parallel_task_execute(m_task_t task)
694 {
695   simdata_task_t simdata = NULL;
696   m_process_t self = MSG_process_self();
697   CHECK_HOST();
698
699   simdata = task->simdata;
700   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
701               "This task is executed somewhere else. Go fix your code!");
702
703   xbt_assert0(simdata->host_nb,
704               "This is not a parallel task. Go to hell.");
705
706   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
707   simdata->using++;
708   SIMIX_mutex_lock(simdata->mutex);
709   simdata->compute =
710       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
711                                     simdata->host_list,
712                                     simdata->comp_amount,
713                                     simdata->comm_amount, 1.0, -1.0);
714
715   self->simdata->waiting_task = task;
716   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
717   SIMIX_cond_wait(simdata->cond, simdata->mutex);
718   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
719   self->simdata->waiting_task = NULL;
720
721
722   SIMIX_mutex_unlock(simdata->mutex);
723   simdata->using--;
724
725   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
726     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
727     SIMIX_action_destroy(task->simdata->compute);
728     simdata->computation_amount = 0.0;
729     simdata->comm = NULL;
730     simdata->compute = NULL;
731     MSG_RETURN(MSG_OK);
732   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
733     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
734     SIMIX_action_destroy(task->simdata->compute);
735     simdata->comm = NULL;
736     simdata->compute = NULL;
737     MSG_RETURN(MSG_HOST_FAILURE);
738   } else {
739     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
740     SIMIX_action_destroy(task->simdata->compute);
741     simdata->comm = NULL;
742     simdata->compute = NULL;
743     MSG_RETURN(MSG_TASK_CANCELLED);
744   }
745
746 }
747
748
749 /** \ingroup msg_gos_functions
750  * \brief Sleep for the specified number of seconds
751  *
752  * Makes the current process sleep until \a time seconds have elapsed.
753  *
754  * \param nb_sec a number of second
755  */
756 MSG_error_t MSG_process_sleep(double nb_sec)
757 {
758   smx_action_t act_sleep;
759   m_process_t proc = MSG_process_self();
760   smx_mutex_t mutex;
761   smx_cond_t cond;
762   /* create action to sleep */
763   act_sleep =
764       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
765                          nb_sec);
766
767   mutex = SIMIX_mutex_init();
768   SIMIX_mutex_lock(mutex);
769   /* create conditional and register action to it */
770   cond = SIMIX_cond_init();
771
772   SIMIX_register_action_to_condition(act_sleep, cond);
773   SIMIX_cond_wait(cond, mutex);
774   SIMIX_unregister_action_to_condition(act_sleep, cond);
775   SIMIX_mutex_unlock(mutex);
776
777   /* remove variables */
778   SIMIX_cond_destroy(cond);
779   SIMIX_mutex_destroy(mutex);
780
781   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
782     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
783       SIMIX_action_destroy(act_sleep);
784       MSG_RETURN(MSG_HOST_FAILURE);
785     }
786   } else {
787     SIMIX_action_destroy(act_sleep);
788     MSG_RETURN(MSG_HOST_FAILURE);
789   }
790
791
792   MSG_RETURN(MSG_OK);
793 }
794
795 /** \ingroup msg_gos_functions
796  * \brief Return the number of MSG tasks currently running on
797  * the host of the current running process.
798  */
799 static int MSG_get_msgload(void)
800 {
801   xbt_die("not implemented yet");
802   return 0;
803 }
804
805 /** \ingroup msg_gos_functions
806  *
807  * \brief Return the last value returned by a MSG function (except
808  * MSG_get_errno...).
809  */
810 MSG_error_t MSG_get_errno(void)
811 {
812   return PROCESS_GET_ERRNO();
813 }