Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8297d8f71df37184fab7f641c7630b538117fc23
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "msg_mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \defgroup msg_gos_functions MSG Operating System Functions
20  *  \brief This section describes the functions that can be used
21  *  by an agent for handling some task.
22  */
23
24 MSG_error_t MSG_task_get_ext(m_task_t * task,
25                              m_channel_t channel,
26                              double max_duration,
27                              m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task storage");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0)
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     else 
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   /* create SIMIX action to the communication */
115   t_simdata->comm =
116       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
117                                simdata->smx_host,
118                                process->simdata->m_host->simdata->smx_host,
119                                t->name, t_simdata->message_size,
120                                t_simdata->rate);
121   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
122   if (MSG_process_is_suspended(t_simdata->sender)) {
123     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
124     SIMIX_action_set_priority(t_simdata->comm, 0);
125   }
126   process->simdata->waiting_task = t;
127   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
128         while (1) {
129                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
131                         break;
132         }
133   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
134   process->simdata->waiting_task = NULL;
135
136   /* the task has already finished and the pointer must be null */
137   if (t->simdata->sender) {
138     t->simdata->sender->simdata->waiting_task = NULL;
139     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
140     //t->simdata->comm = NULL;
141     //t->simdata->compute = NULL;
142   }
143   /* for this process, don't need to change in get function */
144   t->simdata->receiver = NULL;
145   SIMIX_mutex_unlock(t_simdata->mutex);
146
147
148   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
149     //t_simdata->comm = NULL;
150     SIMIX_action_destroy(t_simdata->comm);
151     t_simdata->comm = NULL;
152                 t_simdata->using--;
153     MSG_RETURN(MSG_OK);
154   } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
155     //t_simdata->comm = NULL;
156     SIMIX_action_destroy(t_simdata->comm);
157     t_simdata->comm = NULL;
158                 t_simdata->using--;
159     MSG_RETURN(MSG_HOST_FAILURE);
160   } else {
161     //t_simdata->comm = NULL;
162     SIMIX_action_destroy(t_simdata->comm);
163     t_simdata->comm = NULL;
164                 t_simdata->using--;
165     MSG_RETURN(MSG_TRANSFER_FAILURE);
166   }
167
168 }
169
170 /** \ingroup msg_gos_functions
171  * \brief Listen on a channel and wait for receiving a task.
172  *
173  * It takes two parameters.
174  * \param task a memory location for storing a #m_task_t. It will
175    hold a task when this function will return. Thus \a task should not
176    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
177    those two condition does not hold, there will be a warning message.
178  * \param channel the channel on which the agent should be
179    listening. This value has to be >=0 and < than the maximal
180    number of channels fixed with MSG_set_channel_number().
181  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
182  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
183  */
184 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
185 {
186   return MSG_task_get_with_time_out(task, channel, -1);
187 }
188
189 /** \ingroup msg_gos_functions
190  * \brief Listen on a channel and wait for receiving a task with a timeout.
191  *
192  * It takes three parameters.
193  * \param task a memory location for storing a #m_task_t. It will
194    hold a task when this function will return. Thus \a task should not
195    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
196    those two condition does not hold, there will be a warning message.
197  * \param channel the channel on which the agent should be
198    listening. This value has to be >=0 and < than the maximal
199    number of channels fixed with MSG_set_channel_number().
200  * \param max_duration the maximum time to wait for a task before giving
201     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
202     will not be modified and will still be
203     equal to \c NULL when returning. 
204  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
205    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
206  */
207 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
208                                        m_channel_t channel,
209                                        double max_duration)
210 {
211   return MSG_task_get_ext(task, channel, max_duration, NULL);
212 }
213
214 /** \ingroup msg_gos_functions
215  * \brief Listen on \a channel and waits for receiving a task from \a host.
216  *
217  * It takes three parameters.
218  * \param task a memory location for storing a #m_task_t. It will
219    hold a task when this function will return. Thus \a task should not
220    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
221    those two condition does not hold, there will be a warning message.
222  * \param channel the channel on which the agent should be
223    listening. This value has to be >=0 and < than the maximal
224    number of channels fixed with MSG_set_channel_number().
225  * \param host the host that is to be watched.
226  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
227    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
228  */
229 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
230                                    m_host_t host)
231 {
232   return MSG_task_get_ext(task, channel, -1, host);
233 }
234
235 /** \ingroup msg_gos_functions
236  * \brief Test whether there is a pending communication on a channel.
237  *
238  * It takes one parameter.
239  * \param channel the channel on which the agent should be
240    listening. This value has to be >=0 and < than the maximal
241    number of channels fixed with MSG_set_channel_number().
242  * \return 1 if there is a pending communication and 0 otherwise
243  */
244 int MSG_task_Iprobe(m_channel_t channel)
245 {
246   m_host_t h = NULL;
247
248   xbt_assert1((channel >= 0)
249               && (channel < msg_global->max_channel), "Invalid channel %d",
250               channel);
251   CHECK_HOST();
252
253   DEBUG2("Probing on channel %d (%s)", channel, h->name);
254
255   h = MSG_host_self();
256   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
257 }
258
259 /** \ingroup msg_gos_functions
260  * \brief Test whether there is a pending communication on a channel, and who sent it.
261  *
262  * It takes one parameter.
263  * \param channel the channel on which the agent should be
264    listening. This value has to be >=0 and < than the maximal
265    number of channels fixed with MSG_set_channel_number().
266  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
267  */
268 int MSG_task_probe_from(m_channel_t channel)
269 {
270   m_host_t h = NULL;
271   xbt_fifo_item_t item;
272   m_task_t t;
273
274   xbt_assert1((channel >= 0)
275               && (channel < msg_global->max_channel), "Invalid channel %d",
276               channel);
277   CHECK_HOST();
278
279   h = MSG_host_self();
280
281   DEBUG2("Probing on channel %d (%s)", channel, h->name);
282
283   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
284   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
285     return -1;
286
287   return MSG_process_get_PID(t->simdata->sender);
288 }
289
290 /** \ingroup msg_gos_functions
291  * \brief Wait for at most \a max_duration second for a task reception
292    on \a channel. 
293  
294  * \a PID is updated with the PID of the first process that triggered this event if any.
295  *
296  * It takes three parameters:
297  * \param channel the channel on which the agent should be
298    listening. This value has to be >=0 and < than the maximal.
299    number of channels fixed with MSG_set_channel_number().
300  * \param PID a memory location for storing an int.
301  * \param max_duration the maximum time to wait for a task before
302     giving up. In the case of a reception, *\a PID will be updated
303     with the PID of the first process to send a task.
304  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
305    and #MSG_OK otherwise.
306  */
307 MSG_error_t MSG_channel_select_from(m_channel_t channel,
308                                     double max_duration, int *PID)
309 {
310   m_host_t h = NULL;
311   simdata_host_t h_simdata = NULL;
312   xbt_fifo_item_t item;
313   m_task_t t;
314   int first_time = 1;
315   smx_cond_t cond;
316
317   xbt_assert1((channel >= 0)
318               && (channel < msg_global->max_channel), "Invalid channel %d",
319               channel);
320   if (PID) {
321     *PID = -1;
322   }
323
324   if (max_duration == 0.0) {
325     *PID = MSG_task_probe_from(channel);
326     MSG_RETURN(MSG_OK);
327   } else {
328     CHECK_HOST();
329     h = MSG_host_self();
330     h_simdata = h->simdata;
331
332     DEBUG2("Probing on channel %d (%s)", channel, h->name);
333     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
334       if (max_duration > 0) {
335         if (!first_time) {
336           MSG_RETURN(MSG_OK);
337         }
338       }
339       SIMIX_mutex_lock(h_simdata->mutex);
340       xbt_assert1(!(h_simdata->sleeping[channel]),
341                   "A process is already blocked on this channel %d",
342                   channel);
343       cond = SIMIX_cond_init();
344       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
345       if (max_duration > 0) {
346         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
347       } else {
348         SIMIX_cond_wait(cond, h_simdata->mutex);
349       }
350       SIMIX_cond_destroy(cond);
351       SIMIX_mutex_unlock(h_simdata->mutex);
352       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
353         MSG_RETURN(MSG_HOST_FAILURE);
354       }
355       h_simdata->sleeping[channel] = NULL;
356       first_time = 0;
357     }
358     if (!item || !(t = xbt_fifo_get_item_content(item))) {
359       MSG_RETURN(MSG_OK);
360     }
361     if (PID) {
362       *PID = MSG_process_get_PID(t->simdata->sender);
363     }
364     MSG_RETURN(MSG_OK);
365   }
366 }
367
368
369 /** \ingroup msg_gos_functions
370
371  * \brief Return the number of tasks waiting to be received on a \a
372    channel and sent by \a host.
373  *
374  * It takes two parameters.
375  * \param channel the channel on which the agent should be
376    listening. This value has to be >=0 and < than the maximal
377    number of channels fixed with MSG_set_channel_number().
378  * \param host the host that is to be watched.
379  * \return the number of tasks waiting to be received on \a channel
380    and sent by \a host.
381  */
382 int MSG_task_probe_from_host(int channel, m_host_t host)
383 {
384   xbt_fifo_item_t item;
385   m_task_t t;
386   int count = 0;
387   m_host_t h = NULL;
388
389   xbt_assert1((channel >= 0)
390               && (channel < msg_global->max_channel), "Invalid channel %d",
391               channel);
392   CHECK_HOST();
393   h = MSG_host_self();
394
395   DEBUG2("Probing on channel %d (%s)", channel, h->name);
396
397   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
398     if (t->simdata->source == host)
399       count++;
400   }
401
402   return count;
403 }
404
405 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
406  * host (with a timeout on the waiting of the destination host) and
407  * waits for the end of the transmission.
408  *
409  * This function is used for describing the behavior of an agent. It
410  * takes four parameter.
411  * \param task a #m_task_t to send on another location. This task
412    will not be usable anymore when the function will return. There is
413    no automatic task duplication and you have to save your parameters
414    before calling this function. Tasks are unique and once it has been
415    sent to another location, you should not access it anymore. You do
416    not need to call MSG_task_destroy() but to avoid using, as an
417    effect of inattention, this task anymore, you definitely should
418    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
419    can be transfered iff it has been correctly created with
420    MSG_task_create().
421  * \param dest the destination of the message
422  * \param channel the channel on which the agent should put this
423    task. This value has to be >=0 and < than the maximal number of
424    channels fixed with MSG_set_channel_number().
425  * \param max_duration the maximum time to wait for a task before giving
426     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
427     will not be modified 
428  * \return #MSG_FATAL if \a task is not properly initialized and
429    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
430    this function was called was shut down. Returns
431    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
432    (network failure, dest failure, timeout...)
433  */
434 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
435                                       m_channel_t channel,
436                                       double max_duration)
437 {
438
439
440   m_process_t process = MSG_process_self();
441   simdata_task_t task_simdata = NULL;
442   m_host_t local_host = NULL;
443   m_host_t remote_host = NULL;
444   CHECK_HOST();
445
446   xbt_assert1((channel >= 0)
447               && (channel < msg_global->max_channel), "Invalid channel %d",
448               channel);
449
450   task_simdata = task->simdata;
451   task_simdata->sender = process;
452   task_simdata->source = MSG_process_get_host(process);
453   xbt_assert0(task_simdata->using == 1,
454               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
455   task_simdata->comm = NULL;
456
457         task_simdata->using++;
458   local_host = ((simdata_process_t) process->simdata)->m_host;
459   remote_host = dest;
460
461   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
462          task->simdata->message_size / 1000, local_host->name,
463          remote_host->name, channel);
464
465   SIMIX_mutex_lock(remote_host->simdata->mutex);
466   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
467                 mbox[channel], task);
468
469
470   if (remote_host->simdata->sleeping[channel]) {
471     DEBUG0("Somebody is listening. Let's wake him up!");
472     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
473   }
474   SIMIX_mutex_unlock(remote_host->simdata->mutex);
475
476   process->simdata->put_host = dest;
477   process->simdata->put_channel = channel;
478   SIMIX_mutex_lock(task->simdata->mutex);
479   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
480
481   process->simdata->waiting_task = task;
482   if (max_duration > 0) {
483     xbt_ex_t e;
484                 double time;
485                 double time_elapsed;
486                 time = SIMIX_get_clock();
487     TRY {
488                 /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
489                         while (1) {
490                         time_elapsed = SIMIX_get_clock() - time;
491                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
492                                                 max_duration-time_elapsed);
493                                 if ((task->simdata->comm != NULL) &&
494                                                 (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
495                                         break;
496                         }
497     } CATCH(e) {
498       if(e.category==timeout_error) {
499         xbt_ex_free(e);
500         /* verify if the timeout happened and the communication didn't started yet */
501         if (task->simdata->comm == NULL) {
502           process->simdata->waiting_task = NULL;
503           xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
504                           mbox[channel], task);
505           if (task->simdata->receiver) {
506             task->simdata->receiver->simdata->waiting_task = NULL;
507           }
508           task->simdata->sender = NULL;
509           SIMIX_mutex_unlock(task->simdata->mutex);
510           MSG_RETURN(MSG_TRANSFER_FAILURE);
511         }
512       } else {
513         RETHROW;
514       }
515     }
516   } else {
517                 while (1) {
518                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
519                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
520                                 break;
521                 }
522   }
523
524   DEBUG1("Action terminated %s", task->name);
525   process->simdata->waiting_task = NULL;
526   /* the task has already finished and the pointer must be null */
527   if (task->simdata->receiver) {
528     task->simdata->receiver->simdata->waiting_task = NULL;
529     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
530     //      task->simdata->comm = NULL;
531     //task->simdata->compute = NULL;
532   }
533   task->simdata->sender = NULL;
534   SIMIX_mutex_unlock(task->simdata->mutex);
535
536   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
537     MSG_RETURN(MSG_OK);
538   } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
539     MSG_RETURN(MSG_HOST_FAILURE);
540   } else {
541     MSG_RETURN(MSG_TRANSFER_FAILURE);
542   }
543 }
544
545 /** \ingroup msg_gos_functions
546  * \brief Put a task on a channel of an host and waits for the end of the
547  * transmission.
548  *
549  * This function is used for describing the behavior of an agent. It
550  * takes three parameter.
551  * \param task a #m_task_t to send on another location. This task
552    will not be usable anymore when the function will return. There is
553    no automatic task duplication and you have to save your parameters
554    before calling this function. Tasks are unique and once it has been
555    sent to another location, you should not access it anymore. You do
556    not need to call MSG_task_destroy() but to avoid using, as an
557    effect of inattention, this task anymore, you definitely should
558    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
559    can be transfered iff it has been correctly created with
560    MSG_task_create().
561  * \param dest the destination of the message
562  * \param channel the channel on which the agent should put this
563    task. This value has to be >=0 and < than the maximal number of
564    channels fixed with MSG_set_channel_number().
565  * \return #MSG_FATAL if \a task is not properly initialized and
566  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
567  * this function was called was shut down. Returns
568  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
569  * (network failure, dest failure)
570  */
571 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
572 {
573   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
574 }
575
576 /** \ingroup msg_gos_functions
577  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
578  * rate.
579  *
580  * \sa MSG_task_put
581  */
582 MSG_error_t MSG_task_put_bounded(m_task_t task,
583                                  m_host_t dest, m_channel_t channel,
584                                  double max_rate)
585 {
586   MSG_error_t res = MSG_OK;
587   task->simdata->rate = max_rate;
588   res = MSG_task_put(task, dest, channel);
589   return (res);
590 }
591
592 /** \ingroup msg_gos_functions
593  * \brief Executes a task and waits for its termination.
594  *
595  * This function is used for describing the behavior of an agent. It
596  * takes only one parameter.
597  * \param task a #m_task_t to execute on the location on which the
598    agent is running.
599  * \return #MSG_FATAL if \a task is not properly initialized and
600  * #MSG_OK otherwise.
601  */
602 MSG_error_t MSG_task_execute(m_task_t task)
603 {
604   simdata_task_t simdata = NULL;
605   m_process_t self = MSG_process_self();
606   CHECK_HOST();
607
608   simdata = task->simdata;
609   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
610               "This task is executed somewhere else. Go fix your code!");
611
612   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
613   simdata->using++;
614   SIMIX_mutex_lock(simdata->mutex);
615   simdata->compute =
616       SIMIX_action_execute(SIMIX_host_self(), task->name,
617                            simdata->computation_amount);
618   SIMIX_action_set_priority(simdata->compute, simdata->priority);
619
620   self->simdata->waiting_task = task;
621   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
622   SIMIX_cond_wait(simdata->cond, simdata->mutex);
623   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
624   self->simdata->waiting_task = NULL;
625
626   SIMIX_mutex_unlock(simdata->mutex);
627   simdata->using--;
628
629   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
630     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
631     SIMIX_action_destroy(task->simdata->compute);
632     simdata->computation_amount = 0.0;
633     simdata->comm = NULL;
634     simdata->compute = NULL;
635     MSG_RETURN(MSG_OK);
636   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
637     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
638     SIMIX_action_destroy(task->simdata->compute);
639     simdata->comm = NULL;
640     simdata->compute = NULL;
641     MSG_RETURN(MSG_HOST_FAILURE);
642   } else {
643     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
644     SIMIX_action_destroy(task->simdata->compute);
645     simdata->comm = NULL;
646     simdata->compute = NULL;
647     MSG_RETURN(MSG_TASK_CANCELLED);
648   }
649 }
650
651
652 /** \ingroup m_task_management
653  * \brief Creates a new #m_task_t (a parallel one....).
654  *
655  * A constructor for #m_task_t taking six arguments and returning the 
656    corresponding object.
657  * \param name a name for the object. It is for user-level information
658    and can be NULL.
659  * \param host_nb the number of hosts implied in the parallel task.
660  * \param host_list an array of \p host_nb m_host_t.
661  * \param computation_amount an array of \p host_nb
662    doubles. computation_amount[i] is the total number of operations
663    that have to be performed on host_list[i].
664  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
665  * \param data a pointer to any data may want to attach to the new
666    object.  It is for user-level information and can be NULL. It can
667    be retrieved with the function \ref MSG_task_get_data.
668  * \see m_task_t
669  * \return The new corresponding object.
670  */
671 m_task_t MSG_parallel_task_create(const char *name,
672                                   int host_nb,
673                                   const m_host_t * host_list,
674                                   double *computation_amount,
675                                   double *communication_amount, void *data)
676 {
677   int i;
678   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
679   m_task_t task = xbt_new0(s_m_task_t, 1);
680   task->simdata = simdata;
681
682   /* Task structure */
683   task->name = xbt_strdup(name);
684   task->data = data;
685
686   /* Simulator Data */
687   simdata->computation_amount = 0;
688   simdata->message_size = 0;
689   simdata->cond = SIMIX_cond_init();
690   simdata->mutex = SIMIX_mutex_init();
691   simdata->compute = NULL;
692   simdata->comm = NULL;
693   simdata->rate = -1.0;
694   simdata->using = 1;
695   simdata->sender = NULL;
696   simdata->receiver = NULL;
697   simdata->source = NULL;
698
699   simdata->host_nb = host_nb;
700   simdata->host_list = xbt_new0(smx_host_t, host_nb);
701   simdata->comp_amount = computation_amount;
702   simdata->comm_amount = communication_amount;
703
704   for (i = 0; i < host_nb; i++)
705     simdata->host_list[i] = host_list[i]->simdata->smx_host;
706
707   return task;
708
709 }
710
711
712 MSG_error_t MSG_parallel_task_execute(m_task_t task)
713 {
714   simdata_task_t simdata = NULL;
715   m_process_t self = MSG_process_self();
716   CHECK_HOST();
717
718   simdata = task->simdata;
719   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
720               "This task is executed somewhere else. Go fix your code!");
721
722   xbt_assert0(simdata->host_nb,
723               "This is not a parallel task. Go to hell.");
724
725   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
726   simdata->using++;
727   SIMIX_mutex_lock(simdata->mutex);
728   simdata->compute =
729       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
730                                     simdata->host_list,
731                                     simdata->comp_amount,
732                                     simdata->comm_amount, 1.0, -1.0);
733
734   self->simdata->waiting_task = task;
735   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
736   SIMIX_cond_wait(simdata->cond, simdata->mutex);
737   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
738   self->simdata->waiting_task = NULL;
739
740
741   SIMIX_mutex_unlock(simdata->mutex);
742   simdata->using--;
743
744   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
745     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
746     SIMIX_action_destroy(task->simdata->compute);
747     simdata->computation_amount = 0.0;
748     simdata->comm = NULL;
749     simdata->compute = NULL;
750     MSG_RETURN(MSG_OK);
751   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
752     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
753     SIMIX_action_destroy(task->simdata->compute);
754     simdata->comm = NULL;
755     simdata->compute = NULL;
756     MSG_RETURN(MSG_HOST_FAILURE);
757   } else {
758     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
759     SIMIX_action_destroy(task->simdata->compute);
760     simdata->comm = NULL;
761     simdata->compute = NULL;
762     MSG_RETURN(MSG_TASK_CANCELLED);
763   }
764
765 }
766
767
768 /** \ingroup msg_gos_functions
769  * \brief Sleep for the specified number of seconds
770  *
771  * Makes the current process sleep until \a time seconds have elapsed.
772  *
773  * \param nb_sec a number of second
774  */
775 MSG_error_t MSG_process_sleep(double nb_sec)
776 {
777   smx_action_t act_sleep;
778   m_process_t proc = MSG_process_self();
779   smx_mutex_t mutex;
780   smx_cond_t cond;
781   /* create action to sleep */
782   act_sleep =
783       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
784                          nb_sec);
785
786   mutex = SIMIX_mutex_init();
787   SIMIX_mutex_lock(mutex);
788   /* create conditional and register action to it */
789   cond = SIMIX_cond_init();
790
791   SIMIX_register_action_to_condition(act_sleep, cond);
792   SIMIX_cond_wait(cond, mutex);
793   SIMIX_unregister_action_to_condition(act_sleep, cond);
794   SIMIX_mutex_unlock(mutex);
795
796   /* remove variables */
797   SIMIX_cond_destroy(cond);
798   SIMIX_mutex_destroy(mutex);
799
800   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
801     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
802       SIMIX_action_destroy(act_sleep);
803       MSG_RETURN(MSG_HOST_FAILURE);
804     }
805   } else {
806     SIMIX_action_destroy(act_sleep);
807     MSG_RETURN(MSG_HOST_FAILURE);
808   }
809
810
811   MSG_RETURN(MSG_OK);
812 }
813
814 /** \ingroup msg_gos_functions
815  * \brief Return the number of MSG tasks currently running on
816  * the host of the current running process.
817  */
818 static int MSG_get_msgload(void)
819 {
820   xbt_die("not implemented yet");
821   return 0;
822 }
823
824 /** \ingroup msg_gos_functions
825  *
826  * \brief Return the last value returned by a MSG function (except
827  * MSG_get_errno...).
828  */
829 MSG_error_t MSG_get_errno(void)
830 {
831   return PROCESS_GET_ERRNO();
832 }
833
834 MSG_error_t 
835 MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host)
836 {
837         m_process_t process = MSG_process_self();
838         m_task_t t = NULL;
839         m_host_t h = NULL;
840         simdata_task_t t_simdata = NULL;
841         simdata_host_t h_simdata = NULL;
842         int first_time = 1;
843         xbt_fifo_item_t item = NULL;
844         
845         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
846         
847         /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
848         msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
849         
850         CHECK_HOST();
851         
852         /* Sanity check */
853         xbt_assert0(task, "Null pointer for the task storage");
854         
855         if (*task)
856                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
857         
858         /* Get the task */
859         h = MSG_host_self();
860         h_simdata = h->simdata;
861         
862         DEBUG2("Waiting for a task on channel aliased by %s (%s)", alias, h->name);
863         
864         SIMIX_mutex_lock(h->simdata->mutex);
865         
866         while (1) 
867         {
868                 /* if the mailbox is empty (has no task */
869                 if(!MSG_mailbox_is_empty(mailbox))
870                 {
871                         if(!host) 
872                         {
873                                 /* pop the head of the mailbox */
874                                 t = MSG_mailbox_pop_head(mailbox);
875                                 break;
876                         } 
877                         else 
878                         {
879                                 /* get the first task of the host */
880                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))
881                                         break;
882                         }
883                 }
884         
885                 if(timeout > 0) 
886                 {
887                         if (!first_time) 
888                         {
889                                 SIMIX_mutex_unlock(h->simdata->mutex);
890                                 /* set the simix condition of the mailbox to NULL */
891                                 MSG_mailbox_set_cond(mailbox, NULL);
892                                 SIMIX_cond_destroy(cond);
893                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
894                         }
895                 }
896                 
897                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel aliased by %s", alias);
898                 
899                 cond = SIMIX_cond_init();
900                 
901                 /* set the condition of the mailbox */
902                 MSG_mailbox_set_cond(mailbox, cond);
903                 
904                 if (timeout > 0)
905                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);
906                 else 
907                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);
908                         
909         
910                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
911                         MSG_RETURN(MSG_HOST_FAILURE);
912         
913                 first_time = 0;
914         }
915         
916         SIMIX_mutex_unlock(h->simdata->mutex);
917         
918         DEBUG1("OK, got a task (%s)", t->name);
919         /* clean conditional */
920         if (cond) 
921         {
922                 SIMIX_cond_destroy(cond);
923                 
924                 MSG_mailbox_set_cond(mailbox,NULL);
925         }
926         
927         t_simdata = t->simdata;
928         t_simdata->receiver = process;
929         *task = t;
930         
931         SIMIX_mutex_lock(t_simdata->mutex);
932         
933         /* Transfer */
934         /* create SIMIX action to the communication */
935         t_simdata->comm = SIMIX_action_communicate(
936                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,
937                                                                                                 process->simdata->m_host->simdata->smx_host,
938                                                                                                 t->name, 
939                                                                                                 t_simdata->message_size,
940                                                                                                 t_simdata->rate
941                                                                                         );
942         
943         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
944         if (MSG_process_is_suspended(t_simdata->sender)) 
945         {
946                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
947                 SIMIX_action_set_priority(t_simdata->comm, 0);
948         }
949         
950         process->simdata->waiting_task = t;
951         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
952         
953         while (1) 
954         {
955                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
956                 
957                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
958                         break;
959         }
960         
961         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
962                 process->simdata->waiting_task = NULL;
963         
964         /* the task has already finished and the pointer must be null */
965         if (t->simdata->sender) 
966         {
967                 t->simdata->sender->simdata->waiting_task = NULL;
968         }
969         
970         /* for this process, don't need to change in get function */
971         t->simdata->receiver = NULL;
972         SIMIX_mutex_unlock(t_simdata->mutex);
973         
974         
975         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) 
976         {
977                 SIMIX_action_destroy(t_simdata->comm);
978                 t_simdata->comm = NULL;
979                 t_simdata->using--;
980                 MSG_RETURN(MSG_OK);
981         } 
982         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
983         {
984                 SIMIX_action_destroy(t_simdata->comm);
985                 t_simdata->comm = NULL;
986                 t_simdata->using--;
987                 MSG_RETURN(MSG_HOST_FAILURE);
988         } 
989         else 
990         {
991                 SIMIX_action_destroy(t_simdata->comm);
992                 t_simdata->comm = NULL;
993                 t_simdata->using--;
994                 MSG_RETURN(MSG_TRANSFER_FAILURE);
995         }
996 }
997
998 MSG_error_t 
999 MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout)
1000 {
1001         return MSG_task_receive_ext(task, alias, timeout, NULL);                
1002 }
1003                              
1004 MSG_error_t 
1005 MSG_task_receive(m_task_t * task, const char* alias)
1006 {
1007         return MSG_task_receive_with_time_out(task, alias, -1); 
1008 }
1009
1010 int 
1011 MSG_task_listen(const char* alias)
1012 {
1013         CHECK_HOST();
1014         
1015         DEBUG2("Probing on channel aliased by %s (%s)", alias, MSG_host_self()->name);
1016         
1017         return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
1018 }
1019
1020 int 
1021 MSG_task_listen_from_host(const char* alias, m_host_t host)
1022 {
1023
1024         return MSG_mailbox_get_count_host_tasks(MSG_mailbox_get_by_alias(alias),host);
1025                         
1026 }
1027
1028 MSG_error_t 
1029 MSG_alias_select_from(const char* alias, double timeout, int* PID)
1030 {
1031         m_host_t h = NULL;
1032         simdata_host_t h_simdata = NULL;
1033         m_task_t t;
1034         int first_time = 1;
1035         smx_cond_t cond;
1036         msg_mailbox_t mailbox;
1037         
1038         if (PID) 
1039         {
1040                 *PID = -1;
1041         }
1042         
1043         if(timeout == 0.0) 
1044         {
1045                 *PID = MSG_task_listen_from(alias);
1046                 MSG_RETURN(MSG_OK);
1047         } 
1048         else 
1049         {
1050                 CHECK_HOST();
1051                 h = MSG_host_self();
1052                 h_simdata = h->simdata;
1053         
1054                 DEBUG2("Probing on alias %s (%s)", alias, h->name);
1055                 
1056                 mailbox = MSG_mailbox_get_by_alias(alias);      
1057                 
1058                 while(MSG_mailbox_is_empty(mailbox))
1059                 {
1060                         if(timeout > 0) 
1061                         {
1062                                 if (!first_time) 
1063                                 {
1064                                         MSG_RETURN(MSG_OK);
1065                                 }
1066                         }
1067                         
1068                         SIMIX_mutex_lock(h_simdata->mutex);
1069                         
1070                         xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias);
1071                         
1072                         cond = SIMIX_cond_init();
1073                         
1074                         MSG_mailbox_set_cond(mailbox, cond);
1075                         
1076                         if (timeout > 0) 
1077                         {
1078                                 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
1079                         } 
1080                         else 
1081                         {
1082                                 SIMIX_cond_wait(cond, h_simdata->mutex);
1083                         }
1084                         
1085                         SIMIX_cond_destroy(cond);
1086                         SIMIX_mutex_unlock(h_simdata->mutex);
1087                         
1088                         if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
1089                         {
1090                                 MSG_RETURN(MSG_HOST_FAILURE);
1091                         }
1092                         
1093                         MSG_mailbox_set_cond(mailbox,NULL);
1094                         first_time = 0;
1095                 }
1096                 
1097                 if(NULL == (t = MSG_mailbox_get_head(mailbox)))
1098                         MSG_RETURN(MSG_OK);
1099                                 
1100                 
1101                 if (PID) 
1102                 {
1103                         *PID = MSG_process_get_PID(t->simdata->sender);
1104                 }
1105                 
1106                 MSG_RETURN(MSG_OK);
1107         }
1108 }
1109                                     
1110 MSG_error_t 
1111 MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout)
1112 {
1113         m_process_t process = MSG_process_self();
1114         const char* hostname;
1115         simdata_task_t task_simdata = NULL;
1116         m_host_t local_host = NULL;
1117         m_host_t remote_host = NULL;
1118         smx_cond_t cond = NULL;
1119         
1120         /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
1121         msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
1122
1123         CHECK_HOST();
1124         
1125         task_simdata = task->simdata;
1126         task_simdata->sender = process;
1127         task_simdata->source = MSG_process_get_host(process);
1128         
1129         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
1130         
1131         task_simdata->comm = NULL;
1132         
1133         task_simdata->using++;
1134         local_host = ((simdata_process_t) process->simdata)->m_host;
1135         
1136         /* get the host name containing the mailbox */
1137         hostname = MSG_mailbox_get_hostname(mailbox);
1138
1139         remote_host = MSG_get_host_by_name(hostname);
1140
1141         if(NULL == remote_host)
1142                 THROW1(not_found_error,0,"Host %s not fount", hostname);
1143
1144
1145         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));
1146         
1147         SIMIX_mutex_lock(remote_host->simdata->mutex);
1148
1149         /* put the task in the mailbox */
1150         MSG_mailbox_put(mailbox,task);
1151         
1152         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))
1153         {
1154                 DEBUG0("Somebody is listening. Let's wake him up!");
1155                 SIMIX_cond_signal(cond);
1156         }
1157
1158         
1159         
1160         SIMIX_mutex_unlock(remote_host->simdata->mutex);
1161         
1162         SIMIX_mutex_lock(task->simdata->mutex);
1163
1164         process->simdata->waiting_task = task;
1165         
1166         if(timeout > 0) 
1167         {
1168                 xbt_ex_t e;
1169                 double time;
1170                 double time_elapsed;
1171                 time = SIMIX_get_clock();
1172                 
1173                 TRY 
1174                 {
1175                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
1176                         while (1) 
1177                         {
1178                                 time_elapsed = SIMIX_get_clock() - time;
1179                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);
1180                                 
1181                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
1182                                         break;
1183                         }
1184                 } 
1185                 CATCH(e) 
1186                 {
1187                         if(e.category==timeout_error) 
1188                         {
1189                                 xbt_ex_free(e);
1190                                 /* verify if the timeout happened and the communication didn't started yet */
1191                                 if (task->simdata->comm == NULL) 
1192                                 {
1193                                         process->simdata->waiting_task = NULL;
1194                                         
1195                                         /* remove the task from the mailbox */
1196                                         MSG_mailbox_remove(mailbox,task);
1197                                         
1198                                         if (task->simdata->receiver) 
1199                                         {
1200                                                 task->simdata->receiver->simdata->waiting_task = NULL;
1201                                         }
1202                                         
1203                                         task->simdata->sender = NULL;
1204                                         
1205                                         SIMIX_mutex_unlock(task->simdata->mutex);
1206                                         MSG_RETURN(MSG_TRANSFER_FAILURE);
1207                                 }
1208                         } 
1209                         else 
1210                         {
1211                                 RETHROW;
1212                         }
1213                 }
1214         } 
1215         else 
1216         {
1217                 while (1) 
1218                 {
1219                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
1220                         
1221                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
1222                                 break;
1223                 }
1224         }
1225         
1226         DEBUG1("Action terminated %s", task->name);
1227         process->simdata->waiting_task = NULL;
1228         
1229         /* the task has already finished and the pointer must be null */
1230         if (task->simdata->receiver) 
1231         {
1232                 task->simdata->receiver->simdata->waiting_task = NULL;
1233         }
1234         
1235         task->simdata->sender = NULL;
1236         SIMIX_mutex_unlock(task->simdata->mutex);
1237
1238         
1239         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)
1240         {
1241                 MSG_RETURN(MSG_OK);
1242         } 
1243         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) 
1244         {
1245                 MSG_RETURN(MSG_HOST_FAILURE);
1246         } 
1247         else 
1248         {
1249                 MSG_RETURN(MSG_TRANSFER_FAILURE);
1250         }
1251 }
1252
1253 MSG_error_t 
1254 MSG_task_send(m_task_t task,const char* alias)
1255 {
1256         return MSG_task_send_with_timeout(task, alias, -1);
1257 }
1258
1259
1260 MSG_error_t 
1261 MSG_task_send_bounded(m_task_t task, const char* alias, double rate)
1262 {
1263         task->simdata->rate = rate;
1264         return MSG_task_send(task, alias);
1265 }
1266
1267 int
1268 MSG_task_listen_from(const char* alias)
1269 {
1270         m_host_t h = NULL;
1271         m_task_t t;
1272
1273         CHECK_HOST();
1274
1275         h = MSG_host_self();
1276
1277         DEBUG2("Probing on alias %s(%s)", alias, h->name);
1278
1279         if(NULL == (t = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
1280                 return -1;
1281
1282         return MSG_process_get_PID(t->simdata->sender);
1283 }
1284