Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
c1ddbe54a4b8731c338cef13dd9d5df4f17440a6
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
15                                 "Logging specific to MSG (gos)");
16
17 /** \defgroup msg_gos_functions MSG Operating System Functions
18  *  \brief This section describes the functions that can be used
19  *  by an agent for handling some task.
20  */
21
22 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
23                                                           m_channel_t
24                                                           channel,
25                                                           double
26                                                           max_duration,
27                                                           m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task\n");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0) {
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     } else
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->s_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   t_simdata->using++;
115   /* create SIMIX action to the communication */
116   t_simdata->comm =
117       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
118                                simdata->s_host,
119                                process->simdata->m_host->simdata->s_host,
120                                t->name, t_simdata->message_size,
121                                t_simdata->rate);
122   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
123   if (MSG_process_is_suspended(t_simdata->sender)) {
124     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
125     SIMIX_action_set_priority(t_simdata->comm, 0);
126   }
127   process->simdata->waiting_task = t;
128   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
129   SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
131   process->simdata->waiting_task = NULL;
132
133   /* the task has already finished and the pointer must be null */
134   if (t->simdata->sender) {
135     t->simdata->sender->simdata->waiting_task = NULL;
136     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
137     //t->simdata->comm = NULL;
138     //t->simdata->compute = NULL;
139   }
140   /* for this process, don't need to change in get function */
141   t->simdata->receiver = NULL;
142   SIMIX_mutex_unlock(t_simdata->mutex);
143
144
145   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
146     //t_simdata->comm = NULL;
147     SIMIX_action_destroy(t_simdata->comm);
148     t_simdata->comm = NULL;
149     MSG_RETURN(MSG_OK);
150   } else if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
151     //t_simdata->comm = NULL;
152     SIMIX_action_destroy(t_simdata->comm);
153     t_simdata->comm = NULL;
154     MSG_RETURN(MSG_HOST_FAILURE);
155   } else {
156     //t_simdata->comm = NULL;
157     SIMIX_action_destroy(t_simdata->comm);
158     t_simdata->comm = NULL;
159     MSG_RETURN(MSG_TRANSFER_FAILURE);
160   }
161
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Listen on a channel and wait for receiving a task.
166  *
167  * It takes two parameters.
168  * \param task a memory location for storing a #m_task_t. It will
169    hold a task when this function will return. Thus \a task should not
170    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
171    those two condition does not hold, there will be a warning message.
172  * \param channel the channel on which the agent should be
173    listening. This value has to be >=0 and < than the maximal
174    number of channels fixed with MSG_set_channel_number().
175  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
176  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
177  */
178 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
179 {
180   return MSG_task_get_with_time_out(task, channel, -1);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Listen on a channel and wait for receiving a task with a timeout.
185  *
186  * It takes three parameters.
187  * \param task a memory location for storing a #m_task_t. It will
188    hold a task when this function will return. Thus \a task should not
189    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
190    those two condition does not hold, there will be a warning message.
191  * \param channel the channel on which the agent should be
192    listening. This value has to be >=0 and < than the maximal
193    number of channels fixed with MSG_set_channel_number().
194  * \param max_duration the maximum time to wait for a task before giving
195     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
196     will not be modified and will still be
197     equal to \c NULL when returning. 
198  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
199    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
202                                        m_channel_t channel,
203                                        double max_duration)
204 {
205   return __MSG_task_get_with_time_out_from_host(task, channel,
206                                                 max_duration, NULL);
207 }
208
209 /** \ingroup msg_gos_functions
210  * \brief Listen on \a channel and waits for receiving a task from \a host.
211  *
212  * It takes three parameters.
213  * \param task a memory location for storing a #m_task_t. It will
214    hold a task when this function will return. Thus \a task should not
215    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
216    those two condition does not hold, there will be a warning message.
217  * \param channel the channel on which the agent should be
218    listening. This value has to be >=0 and < than the maximal
219    number of channels fixed with MSG_set_channel_number().
220  * \param host the host that is to be watched.
221  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
222    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
223  */
224 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
225                                    m_host_t host)
226 {
227   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
228 }
229
230 /** \ingroup msg_gos_functions
231  * \brief Test whether there is a pending communication on a channel.
232  *
233  * It takes one parameter.
234  * \param channel the channel on which the agent should be
235    listening. This value has to be >=0 and < than the maximal
236    number of channels fixed with MSG_set_channel_number().
237  * \return 1 if there is a pending communication and 0 otherwise
238  */
239 int MSG_task_Iprobe(m_channel_t channel)
240 {
241   m_host_t h = NULL;
242
243   xbt_assert1((channel >= 0)
244               && (channel < msg_global->max_channel), "Invalid channel %d",
245               channel);
246   CHECK_HOST();
247
248   DEBUG2("Probing on channel %d (%s)", channel, h->name);
249
250   h = MSG_host_self();
251   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
252 }
253
254 /** \ingroup msg_gos_functions
255  * \brief Test whether there is a pending communication on a channel, and who sent it.
256  *
257  * It takes one parameter.
258  * \param channel the channel on which the agent should be
259    listening. This value has to be >=0 and < than the maximal
260    number of channels fixed with MSG_set_channel_number().
261  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
262  */
263 int MSG_task_probe_from(m_channel_t channel)
264 {
265   m_host_t h = NULL;
266   xbt_fifo_item_t item;
267   m_task_t t;
268
269   xbt_assert1((channel >= 0)
270               && (channel < msg_global->max_channel), "Invalid channel %d",
271               channel);
272   CHECK_HOST();
273
274   h = MSG_host_self();
275
276   DEBUG2("Probing on channel %d (%s)", channel, h->name);
277
278   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
279   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
280     return -1;
281
282   return MSG_process_get_PID(t->simdata->sender);
283 }
284
285 /** \ingroup msg_gos_functions
286  * \brief Wait for at most \a max_duration second for a task reception
287    on \a channel. *\a PID is updated with the PID of the first process
288    that triggered this event if any.
289  *
290  * It takes three parameters:
291  * \param channel the channel on which the agent should be
292    listening. This value has to be >=0 and < than the maximal.
293    number of channels fixed with MSG_set_channel_number().
294  * \param PID a memory location for storing an int.
295  * \param max_duration the maximum time to wait for a task before
296     giving up. In the case of a reception, *\a PID will be updated
297     with the PID of the first process to send a task.
298  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
299    and #MSG_OK otherwise.
300  */
301 MSG_error_t MSG_channel_select_from(m_channel_t channel,
302                                     double max_duration, int *PID)
303 {
304   m_host_t h = NULL;
305   simdata_host_t h_simdata = NULL;
306   xbt_fifo_item_t item;
307   m_task_t t;
308   int first_time = 1;
309   smx_cond_t cond;
310
311   xbt_assert1((channel >= 0)
312               && (channel < msg_global->max_channel), "Invalid channel %d",
313               channel);
314   if (PID) {
315     *PID = -1;
316   }
317
318   if (max_duration == 0.0) {
319     *PID = MSG_task_probe_from(channel);
320     MSG_RETURN(MSG_OK);
321   } else {
322     CHECK_HOST();
323     h = MSG_host_self();
324     h_simdata = h->simdata;
325
326     DEBUG2("Probing on channel %d (%s)", channel, h->name);
327     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
328       if (max_duration > 0) {
329         if (!first_time) {
330           MSG_RETURN(MSG_OK);
331         }
332       }
333       SIMIX_mutex_lock(h_simdata->mutex);
334       xbt_assert1(!(h_simdata->sleeping[channel]),
335                   "A process is already blocked on this channel %d",
336                   channel);
337       cond = SIMIX_cond_init();
338       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
339       if (max_duration > 0) {
340         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
341       } else {
342         SIMIX_cond_wait(cond, h_simdata->mutex);
343       }
344       SIMIX_cond_destroy(cond);
345       SIMIX_mutex_unlock(h_simdata->mutex);
346       if (SIMIX_host_get_state(h_simdata->s_host) == 0) {
347         MSG_RETURN(MSG_HOST_FAILURE);
348       }
349       h_simdata->sleeping[channel] = NULL;
350       first_time = 0;
351     }
352     if (!item || !(t = xbt_fifo_get_item_content(item))) {
353       MSG_RETURN(MSG_OK);
354     }
355     if (PID) {
356       *PID = MSG_process_get_PID(t->simdata->sender);
357     }
358     MSG_RETURN(MSG_OK);
359   }
360 }
361
362
363 /** \ingroup msg_gos_functions
364
365  * \brief Return the number of tasks waiting to be received on a \a
366    channel and sent by \a host.
367  *
368  * It takes two parameters.
369  * \param channel the channel on which the agent should be
370    listening. This value has to be >=0 and < than the maximal
371    number of channels fixed with MSG_set_channel_number().
372  * \param host the host that is to be watched.
373  * \return the number of tasks waiting to be received on \a channel
374    and sent by \a host.
375  */
376 int MSG_task_probe_from_host(int channel, m_host_t host)
377 {
378   xbt_fifo_item_t item;
379   m_task_t t;
380   int count = 0;
381   m_host_t h = NULL;
382
383   xbt_assert1((channel >= 0)
384               && (channel < msg_global->max_channel), "Invalid channel %d",
385               channel);
386   CHECK_HOST();
387   h = MSG_host_self();
388
389   DEBUG2("Probing on channel %d (%s)", channel, h->name);
390
391   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
392     if (t->simdata->source == host)
393       count++;
394   }
395
396   return count;
397 }
398
399 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
400  * host (with a timeout on the waiting of the destination host) and
401  * waits for the end of the transmission.
402  *
403  * This function is used for describing the behavior of an agent. It
404  * takes four parameter.
405  * \param task a #m_task_t to send on another location. This task
406    will not be usable anymore when the function will return. There is
407    no automatic task duplication and you have to save your parameters
408    before calling this function. Tasks are unique and once it has been
409    sent to another location, you should not access it anymore. You do
410    not need to call MSG_task_destroy() but to avoid using, as an
411    effect of inattention, this task anymore, you definitely should
412    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
413    can be transfered iff it has been correctly created with
414    MSG_task_create().
415  * \param dest the destination of the message
416  * \param channel the channel on which the agent should put this
417    task. This value has to be >=0 and < than the maximal number of
418    channels fixed with MSG_set_channel_number().
419  * \param max_duration the maximum time to wait for a task before giving
420     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
421     will not be modified 
422  * \return #MSG_FATAL if \a task is not properly initialized and
423    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
424    this function was called was shut down. Returns
425    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
426    (network failure, dest failure, timeout...)
427  */
428 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
429                                       m_channel_t channel,
430                                       double max_duration)
431 {
432
433
434   m_process_t process = MSG_process_self();
435   simdata_task_t task_simdata = NULL;
436   m_host_t local_host = NULL;
437   m_host_t remote_host = NULL;
438   CHECK_HOST();
439
440   xbt_assert1((channel >= 0)
441               && (channel < msg_global->max_channel), "Invalid channel %d",
442               channel);
443
444   task_simdata = task->simdata;
445   task_simdata->sender = process;
446   task_simdata->source = MSG_process_get_host(process);
447   xbt_assert0(task_simdata->using == 1,
448               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
449   task_simdata->comm = NULL;
450
451   local_host = ((simdata_process_t) process->simdata)->m_host;
452   remote_host = dest;
453
454   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
455          task->simdata->message_size / 1000, local_host->name,
456          remote_host->name, channel);
457
458   SIMIX_mutex_lock(remote_host->simdata->mutex);
459   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
460                 mbox[channel], task);
461
462
463   if (remote_host->simdata->sleeping[channel]) {
464     DEBUG0("Somebody is listening. Let's wake him up!");
465     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
466   }
467   SIMIX_mutex_unlock(remote_host->simdata->mutex);
468
469   process->simdata->put_host = dest;
470   process->simdata->put_channel = channel;
471   SIMIX_mutex_lock(task->simdata->mutex);
472   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
473
474   process->simdata->waiting_task = task;
475   if (max_duration > 0) {
476     SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
477                             max_duration);
478     /* verify if the timeout happened and the communication didn't started yet */
479     if (task->simdata->comm == NULL) {
480       task->simdata->using--;
481       process->simdata->waiting_task = NULL;
482       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
483                       mbox[channel], task);
484       if (task->simdata->receiver) {
485         task->simdata->receiver->simdata->waiting_task = NULL;
486       }
487       task->simdata->sender = NULL;
488       SIMIX_mutex_unlock(task->simdata->mutex);
489       MSG_RETURN(MSG_TRANSFER_FAILURE);
490     }
491   } else {
492     SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
493   }
494
495   DEBUG1("Action terminated %s", task->name);
496   task->simdata->using--;
497   process->simdata->waiting_task = NULL;
498   /* the task has already finished and the pointer must be null */
499   if (task->simdata->receiver) {
500     task->simdata->receiver->simdata->waiting_task = NULL;
501     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
502     //      task->simdata->comm = NULL;
503     //task->simdata->compute = NULL;
504   }
505   task->simdata->sender = NULL;
506   SIMIX_mutex_unlock(task->simdata->mutex);
507
508   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
509     MSG_RETURN(MSG_OK);
510   } else if (SIMIX_host_get_state(local_host->simdata->s_host) == 0) {
511     MSG_RETURN(MSG_HOST_FAILURE);
512   } else {
513     MSG_RETURN(MSG_TRANSFER_FAILURE);
514   }
515 }
516
517 /** \ingroup msg_gos_functions
518  * \brief Put a task on a channel of an host and waits for the end of the
519  * transmission.
520  *
521  * This function is used for describing the behavior of an agent. It
522  * takes three parameter.
523  * \param task a #m_task_t to send on another location. This task
524    will not be usable anymore when the function will return. There is
525    no automatic task duplication and you have to save your parameters
526    before calling this function. Tasks are unique and once it has been
527    sent to another location, you should not access it anymore. You do
528    not need to call MSG_task_destroy() but to avoid using, as an
529    effect of inattention, this task anymore, you definitely should
530    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
531    can be transfered iff it has been correctly created with
532    MSG_task_create().
533  * \param dest the destination of the message
534  * \param channel the channel on which the agent should put this
535    task. This value has to be >=0 and < than the maximal number of
536    channels fixed with MSG_set_channel_number().
537  * \return #MSG_FATAL if \a task is not properly initialized and
538  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
539  * this function was called was shut down. Returns
540  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
541  * (network failure, dest failure)
542  */
543 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
544 {
545   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
546 }
547
548 /** \ingroup msg_gos_functions
549  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
550  * rate.
551  *
552  * \sa MSG_task_put
553  */
554 MSG_error_t MSG_task_put_bounded(m_task_t task,
555                                  m_host_t dest, m_channel_t channel,
556                                  double max_rate)
557 {
558   MSG_error_t res = MSG_OK;
559   task->simdata->rate = max_rate;
560   res = MSG_task_put(task, dest, channel);
561   return (res);
562 }
563
564 /** \ingroup msg_gos_functions
565  * \brief Executes a task and waits for its termination.
566  *
567  * This function is used for describing the behavior of an agent. It
568  * takes only one parameter.
569  * \param task a #m_task_t to execute on the location on which the
570    agent is running.
571  * \return #MSG_FATAL if \a task is not properly initialized and
572  * #MSG_OK otherwise.
573  */
574 MSG_error_t MSG_task_execute(m_task_t task)
575 {
576   simdata_task_t simdata = NULL;
577   m_process_t self = MSG_process_self();
578   CHECK_HOST();
579
580   simdata = task->simdata;
581   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
582               "This task is executed somewhere else. Go fix your code!");
583
584   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
585   simdata->using++;
586   SIMIX_mutex_lock(simdata->mutex);
587   simdata->compute =
588       SIMIX_action_execute(SIMIX_host_self(), task->name,
589                            simdata->computation_amount);
590   SIMIX_action_set_priority(simdata->compute, simdata->priority);
591
592   self->simdata->waiting_task = task;
593   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
594   SIMIX_cond_wait(simdata->cond, simdata->mutex);
595   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
596   self->simdata->waiting_task = NULL;
597
598   SIMIX_mutex_unlock(simdata->mutex);
599   simdata->using--;
600
601   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
602     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
603     SIMIX_action_destroy(task->simdata->compute);
604     simdata->computation_amount = 0.0;
605     simdata->comm = NULL;
606     simdata->compute = NULL;
607     MSG_RETURN(MSG_OK);
608   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
609     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
610     SIMIX_action_destroy(task->simdata->compute);
611     simdata->comm = NULL;
612     simdata->compute = NULL;
613     MSG_RETURN(MSG_HOST_FAILURE);
614   } else {
615     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
616     SIMIX_action_destroy(task->simdata->compute);
617     simdata->comm = NULL;
618     simdata->compute = NULL;
619     MSG_RETURN(MSG_TASK_CANCELLED);
620   }
621 }
622
623
624 /** \ingroup m_task_management
625  * \brief Creates a new #m_task_t (a parallel one....).
626  *
627  * A constructor for #m_task_t taking six arguments and returning the 
628    corresponding object.
629  * \param name a name for the object. It is for user-level information
630    and can be NULL.
631  * \param host_nb the number of hosts implied in the parallel task.
632  * \param host_list an array of \p host_nb m_host_t.
633  * \param computation_amount an array of \p host_nb
634    doubles. computation_amount[i] is the total number of operations
635    that have to be performed on host_list[i].
636  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
637  * \param data a pointer to any data may want to attach to the new
638    object.  It is for user-level information and can be NULL. It can
639    be retrieved with the function \ref MSG_task_get_data.
640  * \see m_task_t
641  * \return The new corresponding object.
642  */
643 m_task_t MSG_parallel_task_create(const char *name,
644                                   int host_nb,
645                                   const m_host_t * host_list,
646                                   double *computation_amount,
647                                   double *communication_amount, void *data)
648 {
649   int i;
650   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
651   m_task_t task = xbt_new0(s_m_task_t, 1);
652   task->simdata = simdata;
653
654   /* Task structure */
655   task->name = xbt_strdup(name);
656   task->data = data;
657
658   /* Simulator Data */
659   simdata->computation_amount = 0;
660   simdata->message_size = 0;
661   simdata->cond = SIMIX_cond_init();
662   simdata->mutex = SIMIX_mutex_init();
663   simdata->compute = NULL;
664   simdata->comm = NULL;
665   simdata->rate = -1.0;
666   simdata->using = 1;
667   simdata->sender = NULL;
668   simdata->receiver = NULL;
669   simdata->source = NULL;
670
671   simdata->host_nb = host_nb;
672   simdata->host_list = xbt_new0(void *, host_nb);
673   simdata->comp_amount = computation_amount;
674   simdata->comm_amount = communication_amount;
675
676   for (i = 0; i < host_nb; i++)
677     simdata->host_list[i] = host_list[i]->simdata->s_host;
678
679   return task;
680
681 }
682
683
684 MSG_error_t MSG_parallel_task_execute(m_task_t task)
685 {
686   simdata_task_t simdata = NULL;
687   m_process_t self = MSG_process_self();
688   CHECK_HOST();
689
690   simdata = task->simdata;
691   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
692               "This task is executed somewhere else. Go fix your code!");
693
694   xbt_assert0(simdata->host_nb,
695               "This is not a parallel task. Go to hell.");
696
697   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
698   simdata->using++;
699   SIMIX_mutex_lock(simdata->mutex);
700   simdata->compute =
701       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
702                                     simdata->host_list,
703                                     simdata->comp_amount,
704                                     simdata->comm_amount, 1.0, -1.0);
705
706   self->simdata->waiting_task = task;
707   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
708   SIMIX_cond_wait(simdata->cond, simdata->mutex);
709   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
710   self->simdata->waiting_task = NULL;
711
712
713   SIMIX_mutex_unlock(simdata->mutex);
714   simdata->using--;
715
716   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
717     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
718     SIMIX_action_destroy(task->simdata->compute);
719     simdata->computation_amount = 0.0;
720     simdata->comm = NULL;
721     simdata->compute = NULL;
722     MSG_RETURN(MSG_OK);
723   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
724     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
725     SIMIX_action_destroy(task->simdata->compute);
726     simdata->comm = NULL;
727     simdata->compute = NULL;
728     MSG_RETURN(MSG_HOST_FAILURE);
729   } else {
730     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
731     SIMIX_action_destroy(task->simdata->compute);
732     simdata->comm = NULL;
733     simdata->compute = NULL;
734     MSG_RETURN(MSG_TASK_CANCELLED);
735   }
736
737 }
738
739
740 /** \ingroup msg_gos_functions
741  * \brief Sleep for the specified number of seconds
742  *
743  * Makes the current process sleep until \a time seconds have elapsed.
744  *
745  * \param nb_sec a number of second
746  */
747 MSG_error_t MSG_process_sleep(double nb_sec)
748 {
749   smx_action_t act_sleep;
750   m_process_t proc = MSG_process_self();
751   smx_mutex_t mutex;
752   smx_cond_t cond;
753   /* create action to sleep */
754   act_sleep =
755       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
756                          nb_sec);
757
758   mutex = SIMIX_mutex_init();
759   SIMIX_mutex_lock(mutex);
760   /* create conditional and register action to it */
761   cond = SIMIX_cond_init();
762
763   SIMIX_register_action_to_condition(act_sleep, cond);
764   SIMIX_cond_wait(cond, mutex);
765   SIMIX_unregister_action_to_condition(act_sleep, cond);
766   SIMIX_mutex_unlock(mutex);
767
768   /* remove variables */
769   SIMIX_cond_destroy(cond);
770   SIMIX_mutex_destroy(mutex);
771
772   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
773     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
774       SIMIX_action_destroy(act_sleep);
775       MSG_RETURN(MSG_HOST_FAILURE);
776     }
777   } else {
778     SIMIX_action_destroy(act_sleep);
779     MSG_RETURN(MSG_HOST_FAILURE);
780   }
781
782
783   MSG_RETURN(MSG_OK);
784 }
785
786 /** \ingroup msg_gos_functions
787  * \brief Return the number of MSG tasks currently running on
788  * the host of the current running process.
789  */
790 static int MSG_get_msgload(void)
791 {
792   xbt_die("not implemented yet");
793   return 0;
794 }
795
796 /** \ingroup msg_gos_functions
797  *
798  * \brief Return the last value returned by a MSG function (except
799  * MSG_get_errno...).
800  */
801 MSG_error_t MSG_get_errno(void)
802 {
803   return PROCESS_GET_ERRNO();
804 }