Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove the unused variable item.
[simgrid.git] / src / msg / gos.c
1 /*     $Id$      */
2
3 /* Copyright (c) 2002-2007 Arnaud Legrand.                                  */
4 /* Copyright (c) 2007 Bruno Donassolo.                                      */
5 /* All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "msg/private.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "msg_mailbox.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
17                                 "Logging specific to MSG (gos)");
18
19 /** \defgroup msg_gos_functions MSG Operating System Functions
20  *  \brief This section describes the functions that can be used
21  *  by an agent for handling some task.
22  */
23
24 MSG_error_t MSG_task_get_ext(m_task_t * task,
25                              m_channel_t channel,
26                              double max_duration,
27                              m_host_t host)
28 {
29
30   m_process_t process = MSG_process_self();
31   m_task_t t = NULL;
32   m_host_t h = NULL;
33   simdata_task_t t_simdata = NULL;
34   simdata_host_t h_simdata = NULL;
35   int first_time = 1;
36   xbt_fifo_item_t item = NULL;
37
38   smx_cond_t cond = NULL;       //conditional wait if the task isn't on the channel yet
39
40   CHECK_HOST();
41   xbt_assert1((channel >= 0)
42               && (channel < msg_global->max_channel), "Invalid channel %d",
43               channel);
44   /* Sanity check */
45   xbt_assert0(task, "Null pointer for the task storage");
46
47   if (*task)
48     CRITICAL0
49         ("MSG_task_get() was asked to write in a non empty task struct.");
50
51   /* Get the task */
52   h = MSG_host_self();
53   h_simdata = h->simdata;
54
55   DEBUG2("Waiting for a task on channel %d (%s)", channel, h->name);
56
57   SIMIX_mutex_lock(h->simdata->mutex);
58   while (1) {
59     if (xbt_fifo_size(h_simdata->mbox[channel]) > 0) {
60       if (!host) {
61         t = xbt_fifo_shift(h_simdata->mbox[channel]);
62         break;
63       } else {
64         xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
65           if (t->simdata->source == host)
66             break;
67         }
68         if (item) {
69           xbt_fifo_remove_item(h->simdata->mbox[channel], item);
70           break;
71         }
72       }
73     }
74
75     if (max_duration > 0) {
76       if (!first_time) {
77         SIMIX_mutex_unlock(h->simdata->mutex);
78         h_simdata->sleeping[channel] = NULL;
79         SIMIX_cond_destroy(cond);
80         MSG_RETURN(MSG_TRANSFER_FAILURE);
81       }
82     }
83     xbt_assert1(!(h_simdata->sleeping[channel]),
84                 "A process is already blocked on channel %d", channel);
85
86     cond = SIMIX_cond_init();
87     h_simdata->sleeping[channel] = cond;
88     if (max_duration > 0)
89       SIMIX_cond_wait_timeout(cond, h->simdata->mutex, max_duration);
90     else 
91       SIMIX_cond_wait(h_simdata->sleeping[channel], h->simdata->mutex);
92
93     if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
94       MSG_RETURN(MSG_HOST_FAILURE);
95
96     first_time = 0;
97   }
98   SIMIX_mutex_unlock(h->simdata->mutex);
99
100   DEBUG1("OK, got a task (%s)", t->name);
101   /* clean conditional */
102   if (cond) {
103     SIMIX_cond_destroy(cond);
104     h_simdata->sleeping[channel] = NULL;
105   }
106
107   t_simdata = t->simdata;
108   t_simdata->receiver = process;
109   *task = t;
110
111   SIMIX_mutex_lock(t_simdata->mutex);
112
113   /* Transfer */
114   /* create SIMIX action to the communication */
115   t_simdata->comm =
116       SIMIX_action_communicate(t_simdata->sender->simdata->m_host->
117                                simdata->smx_host,
118                                process->simdata->m_host->simdata->smx_host,
119                                t->name, t_simdata->message_size,
120                                t_simdata->rate);
121   /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
122   if (MSG_process_is_suspended(t_simdata->sender)) {
123     DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
124     SIMIX_action_set_priority(t_simdata->comm, 0);
125   }
126   process->simdata->waiting_task = t;
127   SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
128         while (1) {
129                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
130                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
131                         break;
132         }
133   SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
134   process->simdata->waiting_task = NULL;
135
136   /* the task has already finished and the pointer must be null */
137   if (t->simdata->sender) {
138     t->simdata->sender->simdata->waiting_task = NULL;
139     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
140     //t->simdata->comm = NULL;
141     //t->simdata->compute = NULL;
142   }
143   /* for this process, don't need to change in get function */
144   t->simdata->receiver = NULL;
145   SIMIX_mutex_unlock(t_simdata->mutex);
146
147
148   if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
149     //t_simdata->comm = NULL;
150     SIMIX_action_destroy(t_simdata->comm);
151     t_simdata->comm = NULL;
152                 t_simdata->using--;
153     MSG_RETURN(MSG_OK);
154   } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
155     //t_simdata->comm = NULL;
156     SIMIX_action_destroy(t_simdata->comm);
157     t_simdata->comm = NULL;
158                 t_simdata->using--;
159     MSG_RETURN(MSG_HOST_FAILURE);
160   } else {
161     //t_simdata->comm = NULL;
162     SIMIX_action_destroy(t_simdata->comm);
163     t_simdata->comm = NULL;
164                 t_simdata->using--;
165     MSG_RETURN(MSG_TRANSFER_FAILURE);
166   }
167
168 }
169
170 /** \ingroup msg_gos_functions
171  * \brief Listen on a channel and wait for receiving a task.
172  *
173  * It takes two parameters.
174  * \param task a memory location for storing a #m_task_t. It will
175    hold a task when this function will return. Thus \a task should not
176    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
177    those two condition does not hold, there will be a warning message.
178  * \param channel the channel on which the agent should be
179    listening. This value has to be >=0 and < than the maximal
180    number of channels fixed with MSG_set_channel_number().
181  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
182  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
183  */
184 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel)
185 {
186   return MSG_task_get_with_time_out(task, channel, -1);
187 }
188
189 /** \ingroup msg_gos_functions
190  * \brief Listen on a channel and wait for receiving a task with a timeout.
191  *
192  * It takes three parameters.
193  * \param task a memory location for storing a #m_task_t. It will
194    hold a task when this function will return. Thus \a task should not
195    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
196    those two condition does not hold, there will be a warning message.
197  * \param channel the channel on which the agent should be
198    listening. This value has to be >=0 and < than the maximal
199    number of channels fixed with MSG_set_channel_number().
200  * \param max_duration the maximum time to wait for a task before giving
201     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
202     will not be modified and will still be
203     equal to \c NULL when returning. 
204  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
205    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
206  */
207 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
208                                        m_channel_t channel,
209                                        double max_duration)
210 {
211   return MSG_task_get_ext(task, channel, max_duration, NULL);
212 }
213
214 /** \ingroup msg_gos_functions
215  * \brief Listen on \a channel and waits for receiving a task from \a host.
216  *
217  * It takes three parameters.
218  * \param task a memory location for storing a #m_task_t. It will
219    hold a task when this function will return. Thus \a task should not
220    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
221    those two condition does not hold, there will be a warning message.
222  * \param channel the channel on which the agent should be
223    listening. This value has to be >=0 and < than the maximal
224    number of channels fixed with MSG_set_channel_number().
225  * \param host the host that is to be watched.
226  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
227    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
228  */
229 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
230                                    m_host_t host)
231 {
232   return MSG_task_get_ext(task, channel, -1, host);
233 }
234
235 /** \ingroup msg_gos_functions
236  * \brief Test whether there is a pending communication on a channel.
237  *
238  * It takes one parameter.
239  * \param channel the channel on which the agent should be
240    listening. This value has to be >=0 and < than the maximal
241    number of channels fixed with MSG_set_channel_number().
242  * \return 1 if there is a pending communication and 0 otherwise
243  */
244 int MSG_task_Iprobe(m_channel_t channel)
245 {
246   m_host_t h = NULL;
247
248   xbt_assert1((channel >= 0)
249               && (channel < msg_global->max_channel), "Invalid channel %d",
250               channel);
251   CHECK_HOST();
252
253   DEBUG2("Probing on channel %d (%s)", channel, h->name);
254
255   h = MSG_host_self();
256   return (xbt_fifo_get_first_item(h->simdata->mbox[channel]) != NULL);
257 }
258
259 /** \ingroup msg_gos_functions
260  * \brief Test whether there is a pending communication on a channel, and who sent it.
261  *
262  * It takes one parameter.
263  * \param channel the channel on which the agent should be
264    listening. This value has to be >=0 and < than the maximal
265    number of channels fixed with MSG_set_channel_number().
266  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
267  */
268 int MSG_task_probe_from(m_channel_t channel)
269 {
270   m_host_t h = NULL;
271   xbt_fifo_item_t item;
272   m_task_t t;
273
274   xbt_assert1((channel >= 0)
275               && (channel < msg_global->max_channel), "Invalid channel %d",
276               channel);
277   CHECK_HOST();
278
279   h = MSG_host_self();
280
281   DEBUG2("Probing on channel %d (%s)", channel, h->name);
282
283   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
284   if ((!item) || (!(t = xbt_fifo_get_item_content(item))))
285     return -1;
286
287   return MSG_process_get_PID(t->simdata->sender);
288 }
289
290 /** \ingroup msg_gos_functions
291  * \brief Wait for at most \a max_duration second for a task reception
292    on \a channel. 
293  
294  * \a PID is updated with the PID of the first process that triggered this event if any.
295  *
296  * It takes three parameters:
297  * \param channel the channel on which the agent should be
298    listening. This value has to be >=0 and < than the maximal.
299    number of channels fixed with MSG_set_channel_number().
300  * \param PID a memory location for storing an int.
301  * \param max_duration the maximum time to wait for a task before
302     giving up. In the case of a reception, *\a PID will be updated
303     with the PID of the first process to send a task.
304  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
305    and #MSG_OK otherwise.
306  */
307 MSG_error_t MSG_channel_select_from(m_channel_t channel,
308                                     double max_duration, int *PID)
309 {
310   m_host_t h = NULL;
311   simdata_host_t h_simdata = NULL;
312   xbt_fifo_item_t item;
313   m_task_t t;
314   int first_time = 1;
315   smx_cond_t cond;
316
317   xbt_assert1((channel >= 0)
318               && (channel < msg_global->max_channel), "Invalid channel %d",
319               channel);
320   if (PID) {
321     *PID = -1;
322   }
323
324   if (max_duration == 0.0) {
325     *PID = MSG_task_probe_from(channel);
326     MSG_RETURN(MSG_OK);
327   } else {
328     CHECK_HOST();
329     h = MSG_host_self();
330     h_simdata = h->simdata;
331
332     DEBUG2("Probing on channel %d (%s)", channel, h->name);
333     while (!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
334       if (max_duration > 0) {
335         if (!first_time) {
336           MSG_RETURN(MSG_OK);
337         }
338       }
339       SIMIX_mutex_lock(h_simdata->mutex);
340       xbt_assert1(!(h_simdata->sleeping[channel]),
341                   "A process is already blocked on this channel %d",
342                   channel);
343       cond = SIMIX_cond_init();
344       h_simdata->sleeping[channel] = cond;      /* I'm waiting. Wake me up when you're ready */
345       if (max_duration > 0) {
346         SIMIX_cond_wait_timeout(cond, h_simdata->mutex, max_duration);
347       } else {
348         SIMIX_cond_wait(cond, h_simdata->mutex);
349       }
350       SIMIX_cond_destroy(cond);
351       SIMIX_mutex_unlock(h_simdata->mutex);
352       if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
353         MSG_RETURN(MSG_HOST_FAILURE);
354       }
355       h_simdata->sleeping[channel] = NULL;
356       first_time = 0;
357     }
358     if (!item || !(t = xbt_fifo_get_item_content(item))) {
359       MSG_RETURN(MSG_OK);
360     }
361     if (PID) {
362       *PID = MSG_process_get_PID(t->simdata->sender);
363     }
364     MSG_RETURN(MSG_OK);
365   }
366 }
367
368
369 /** \ingroup msg_gos_functions
370
371  * \brief Return the number of tasks waiting to be received on a \a
372    channel and sent by \a host.
373  *
374  * It takes two parameters.
375  * \param channel the channel on which the agent should be
376    listening. This value has to be >=0 and < than the maximal
377    number of channels fixed with MSG_set_channel_number().
378  * \param host the host that is to be watched.
379  * \return the number of tasks waiting to be received on \a channel
380    and sent by \a host.
381  */
382 int MSG_task_probe_from_host(int channel, m_host_t host)
383 {
384   xbt_fifo_item_t item;
385   m_task_t t;
386   int count = 0;
387   m_host_t h = NULL;
388
389   xbt_assert1((channel >= 0)
390               && (channel < msg_global->max_channel), "Invalid channel %d",
391               channel);
392   CHECK_HOST();
393   h = MSG_host_self();
394
395   DEBUG2("Probing on channel %d (%s)", channel, h->name);
396
397   xbt_fifo_foreach(h->simdata->mbox[channel], item, t, m_task_t) {
398     if (t->simdata->source == host)
399       count++;
400   }
401
402   return count;
403 }
404
405 /** \ingroup msg_gos_functions \brief Put a task on a channel of an
406  * host (with a timeout on the waiting of the destination host) and
407  * waits for the end of the transmission.
408  *
409  * This function is used for describing the behavior of an agent. It
410  * takes four parameter.
411  * \param task a #m_task_t to send on another location. This task
412    will not be usable anymore when the function will return. There is
413    no automatic task duplication and you have to save your parameters
414    before calling this function. Tasks are unique and once it has been
415    sent to another location, you should not access it anymore. You do
416    not need to call MSG_task_destroy() but to avoid using, as an
417    effect of inattention, this task anymore, you definitely should
418    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
419    can be transfered iff it has been correctly created with
420    MSG_task_create().
421  * \param dest the destination of the message
422  * \param channel the channel on which the agent should put this
423    task. This value has to be >=0 and < than the maximal number of
424    channels fixed with MSG_set_channel_number().
425  * \param max_duration the maximum time to wait for a task before giving
426     up. In such a case, #MSG_TRANSFER_FAILURE will be returned, \a task 
427     will not be modified 
428  * \return #MSG_FATAL if \a task is not properly initialized and
429    #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
430    this function was called was shut down. Returns
431    #MSG_TRANSFER_FAILURE if the transfer could not be properly done
432    (network failure, dest failure, timeout...)
433  */
434 MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
435                                       m_channel_t channel,
436                                       double max_duration)
437 {
438
439
440   m_process_t process = MSG_process_self();
441   simdata_task_t task_simdata = NULL;
442   m_host_t local_host = NULL;
443   m_host_t remote_host = NULL;
444   CHECK_HOST();
445
446   xbt_assert1((channel >= 0)
447               && (channel < msg_global->max_channel), "Invalid channel %d",
448               channel);
449
450   task_simdata = task->simdata;
451   task_simdata->sender = process;
452   task_simdata->source = MSG_process_get_host(process);
453   xbt_assert0(task_simdata->using == 1,
454               "This task is still being used somewhere else. You cannot send it now. Go fix your code!");
455   task_simdata->comm = NULL;
456
457         task_simdata->using++;
458   local_host = ((simdata_process_t) process->simdata)->m_host;
459   remote_host = dest;
460
461   DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d",
462          task->simdata->message_size / 1000, local_host->name,
463          remote_host->name, channel);
464
465   SIMIX_mutex_lock(remote_host->simdata->mutex);
466   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
467                 mbox[channel], task);
468
469
470   if (remote_host->simdata->sleeping[channel]) {
471     DEBUG0("Somebody is listening. Let's wake him up!");
472     SIMIX_cond_signal(remote_host->simdata->sleeping[channel]);
473   }
474   SIMIX_mutex_unlock(remote_host->simdata->mutex);
475
476   process->simdata->put_host = dest;
477   process->simdata->put_channel = channel;
478   SIMIX_mutex_lock(task->simdata->mutex);
479   // DEBUG4("Task sent (%g kB) from %s to %s on channel %d, waiting...", task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
480
481   process->simdata->waiting_task = task;
482   if (max_duration > 0) {
483     xbt_ex_t e;
484                 double time;
485                 double time_elapsed;
486                 time = SIMIX_get_clock();
487     TRY {
488                 /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
489                         while (1) {
490                         time_elapsed = SIMIX_get_clock() - time;
491                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,
492                                                 max_duration-time_elapsed);
493                                 if ((task->simdata->comm != NULL) &&
494                                                 (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
495                                         break;
496                         }
497     } CATCH(e) {
498       if(e.category==timeout_error) {
499         xbt_ex_free(e);
500         /* verify if the timeout happened and the communication didn't started yet */
501         if (task->simdata->comm == NULL) {
502           process->simdata->waiting_task = NULL;
503           xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->
504                           mbox[channel], task);
505           if (task->simdata->receiver) {
506             task->simdata->receiver->simdata->waiting_task = NULL;
507           }
508           task->simdata->sender = NULL;
509           SIMIX_mutex_unlock(task->simdata->mutex);
510           MSG_RETURN(MSG_TRANSFER_FAILURE);
511         }
512       } else {
513         RETHROW;
514       }
515     }
516   } else {
517                 while (1) {
518                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
519                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
520                                 break;
521                 }
522   }
523
524   DEBUG1("Action terminated %s", task->name);
525   process->simdata->waiting_task = NULL;
526   /* the task has already finished and the pointer must be null */
527   if (task->simdata->receiver) {
528     task->simdata->receiver->simdata->waiting_task = NULL;
529     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
530     //      task->simdata->comm = NULL;
531     //task->simdata->compute = NULL;
532   }
533   task->simdata->sender = NULL;
534   SIMIX_mutex_unlock(task->simdata->mutex);
535
536   if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
537     MSG_RETURN(MSG_OK);
538   } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
539     MSG_RETURN(MSG_HOST_FAILURE);
540   } else {
541     MSG_RETURN(MSG_TRANSFER_FAILURE);
542   }
543 }
544
545 /** \ingroup msg_gos_functions
546  * \brief Put a task on a channel of an host and waits for the end of the
547  * transmission.
548  *
549  * This function is used for describing the behavior of an agent. It
550  * takes three parameter.
551  * \param task a #m_task_t to send on another location. This task
552    will not be usable anymore when the function will return. There is
553    no automatic task duplication and you have to save your parameters
554    before calling this function. Tasks are unique and once it has been
555    sent to another location, you should not access it anymore. You do
556    not need to call MSG_task_destroy() but to avoid using, as an
557    effect of inattention, this task anymore, you definitely should
558    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
559    can be transfered iff it has been correctly created with
560    MSG_task_create().
561  * \param dest the destination of the message
562  * \param channel the channel on which the agent should put this
563    task. This value has to be >=0 and < than the maximal number of
564    channels fixed with MSG_set_channel_number().
565  * \return #MSG_FATAL if \a task is not properly initialized and
566  * #MSG_OK otherwise. Returns #MSG_HOST_FAILURE if the host on which
567  * this function was called was shut down. Returns
568  * #MSG_TRANSFER_FAILURE if the transfer could not be properly done
569  * (network failure, dest failure)
570  */
571 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel)
572 {
573   return MSG_task_put_with_timeout(task, dest, channel, -1.0);
574 }
575
576 /** \ingroup msg_gos_functions
577  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
578  * rate.
579  *
580  * \sa MSG_task_put
581  */
582 MSG_error_t MSG_task_put_bounded(m_task_t task,
583                                  m_host_t dest, m_channel_t channel,
584                                  double max_rate)
585 {
586   MSG_error_t res = MSG_OK;
587   task->simdata->rate = max_rate;
588   res = MSG_task_put(task, dest, channel);
589   return (res);
590 }
591
592 /** \ingroup msg_gos_functions
593  * \brief Executes a task and waits for its termination.
594  *
595  * This function is used for describing the behavior of an agent. It
596  * takes only one parameter.
597  * \param task a #m_task_t to execute on the location on which the
598    agent is running.
599  * \return #MSG_FATAL if \a task is not properly initialized and
600  * #MSG_OK otherwise.
601  */
602 MSG_error_t MSG_task_execute(m_task_t task)
603 {
604   simdata_task_t simdata = NULL;
605   m_process_t self = MSG_process_self();
606   CHECK_HOST();
607
608   simdata = task->simdata;
609   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
610               "This task is executed somewhere else. Go fix your code!");
611
612   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
613   simdata->using++;
614   SIMIX_mutex_lock(simdata->mutex);
615   simdata->compute =
616       SIMIX_action_execute(SIMIX_host_self(), task->name,
617                            simdata->computation_amount);
618   SIMIX_action_set_priority(simdata->compute, simdata->priority);
619
620   self->simdata->waiting_task = task;
621   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
622   SIMIX_cond_wait(simdata->cond, simdata->mutex);
623   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
624   self->simdata->waiting_task = NULL;
625
626   SIMIX_mutex_unlock(simdata->mutex);
627   simdata->using--;
628
629   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
630     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
631     SIMIX_action_destroy(task->simdata->compute);
632     simdata->computation_amount = 0.0;
633     simdata->comm = NULL;
634     simdata->compute = NULL;
635     MSG_RETURN(MSG_OK);
636   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
637     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
638     SIMIX_action_destroy(task->simdata->compute);
639     simdata->comm = NULL;
640     simdata->compute = NULL;
641     MSG_RETURN(MSG_HOST_FAILURE);
642   } else {
643     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
644     SIMIX_action_destroy(task->simdata->compute);
645     simdata->comm = NULL;
646     simdata->compute = NULL;
647     MSG_RETURN(MSG_TASK_CANCELLED);
648   }
649 }
650
651
652 /** \ingroup m_task_management
653  * \brief Creates a new #m_task_t (a parallel one....).
654  *
655  * A constructor for #m_task_t taking six arguments and returning the 
656    corresponding object.
657  * \param name a name for the object. It is for user-level information
658    and can be NULL.
659  * \param host_nb the number of hosts implied in the parallel task.
660  * \param host_list an array of \p host_nb m_host_t.
661  * \param computation_amount an array of \p host_nb
662    doubles. computation_amount[i] is the total number of operations
663    that have to be performed on host_list[i].
664  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
665  * \param data a pointer to any data may want to attach to the new
666    object.  It is for user-level information and can be NULL. It can
667    be retrieved with the function \ref MSG_task_get_data.
668  * \see m_task_t
669  * \return The new corresponding object.
670  */
671 m_task_t MSG_parallel_task_create(const char *name,
672                                   int host_nb,
673                                   const m_host_t * host_list,
674                                   double *computation_amount,
675                                   double *communication_amount, void *data)
676 {
677   int i;
678   simdata_task_t simdata = xbt_new0(s_simdata_task_t, 1);
679   m_task_t task = xbt_new0(s_m_task_t, 1);
680   task->simdata = simdata;
681
682   /* Task structure */
683   task->name = xbt_strdup(name);
684   task->data = data;
685
686   /* Simulator Data */
687   simdata->computation_amount = 0;
688   simdata->message_size = 0;
689   simdata->cond = SIMIX_cond_init();
690   simdata->mutex = SIMIX_mutex_init();
691   simdata->compute = NULL;
692   simdata->comm = NULL;
693   simdata->rate = -1.0;
694   simdata->using = 1;
695   simdata->sender = NULL;
696   simdata->receiver = NULL;
697   simdata->source = NULL;
698
699   simdata->host_nb = host_nb;
700   simdata->host_list = xbt_new0(smx_host_t, host_nb);
701   simdata->comp_amount = computation_amount;
702   simdata->comm_amount = communication_amount;
703
704   for (i = 0; i < host_nb; i++)
705     simdata->host_list[i] = host_list[i]->simdata->smx_host;
706
707   return task;
708
709 }
710
711
712 MSG_error_t MSG_parallel_task_execute(m_task_t task)
713 {
714   simdata_task_t simdata = NULL;
715   m_process_t self = MSG_process_self();
716   CHECK_HOST();
717
718   simdata = task->simdata;
719   xbt_assert0((!simdata->compute) && (task->simdata->using == 1),
720               "This task is executed somewhere else. Go fix your code!");
721
722   xbt_assert0(simdata->host_nb,
723               "This is not a parallel task. Go to hell.");
724
725   DEBUG1("Computing on %s", MSG_process_self()->simdata->m_host->name);
726   simdata->using++;
727   SIMIX_mutex_lock(simdata->mutex);
728   simdata->compute =
729       SIMIX_action_parallel_execute(task->name, simdata->host_nb,
730                                     simdata->host_list,
731                                     simdata->comp_amount,
732                                     simdata->comm_amount, 1.0, -1.0);
733
734   self->simdata->waiting_task = task;
735   SIMIX_register_action_to_condition(simdata->compute, simdata->cond);
736   SIMIX_cond_wait(simdata->cond, simdata->mutex);
737   SIMIX_unregister_action_to_condition(simdata->compute, simdata->cond);
738   self->simdata->waiting_task = NULL;
739
740
741   SIMIX_mutex_unlock(simdata->mutex);
742   simdata->using--;
743
744   if (SIMIX_action_get_state(task->simdata->compute) == SURF_ACTION_DONE) {
745     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
746     SIMIX_action_destroy(task->simdata->compute);
747     simdata->computation_amount = 0.0;
748     simdata->comm = NULL;
749     simdata->compute = NULL;
750     MSG_RETURN(MSG_OK);
751   } else if (SIMIX_host_get_state(SIMIX_host_self()) == 0) {
752     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
753     SIMIX_action_destroy(task->simdata->compute);
754     simdata->comm = NULL;
755     simdata->compute = NULL;
756     MSG_RETURN(MSG_HOST_FAILURE);
757   } else {
758     /* action ended, set comm and compute = NULL, the actions is already destroyed in the main function */
759     SIMIX_action_destroy(task->simdata->compute);
760     simdata->comm = NULL;
761     simdata->compute = NULL;
762     MSG_RETURN(MSG_TASK_CANCELLED);
763   }
764
765 }
766
767
768 /** \ingroup msg_gos_functions
769  * \brief Sleep for the specified number of seconds
770  *
771  * Makes the current process sleep until \a time seconds have elapsed.
772  *
773  * \param nb_sec a number of second
774  */
775 MSG_error_t MSG_process_sleep(double nb_sec)
776 {
777   smx_action_t act_sleep;
778   m_process_t proc = MSG_process_self();
779   smx_mutex_t mutex;
780   smx_cond_t cond;
781   /* create action to sleep */
782   act_sleep =
783       SIMIX_action_sleep(SIMIX_process_get_host(proc->simdata->s_process),
784                          nb_sec);
785
786   mutex = SIMIX_mutex_init();
787   SIMIX_mutex_lock(mutex);
788   /* create conditional and register action to it */
789   cond = SIMIX_cond_init();
790
791   SIMIX_register_action_to_condition(act_sleep, cond);
792   SIMIX_cond_wait(cond, mutex);
793   SIMIX_unregister_action_to_condition(act_sleep, cond);
794   SIMIX_mutex_unlock(mutex);
795
796   /* remove variables */
797   SIMIX_cond_destroy(cond);
798   SIMIX_mutex_destroy(mutex);
799
800   if (SIMIX_action_get_state(act_sleep) == SURF_ACTION_DONE) {
801     if (SIMIX_host_get_state(SIMIX_host_self()) == SURF_CPU_OFF) {
802       SIMIX_action_destroy(act_sleep);
803       MSG_RETURN(MSG_HOST_FAILURE);
804     }
805   } else {
806     SIMIX_action_destroy(act_sleep);
807     MSG_RETURN(MSG_HOST_FAILURE);
808   }
809
810
811   MSG_RETURN(MSG_OK);
812 }
813
814 /** \ingroup msg_gos_functions
815  * \brief Return the number of MSG tasks currently running on
816  * the host of the current running process.
817  */
818 static int MSG_get_msgload(void)
819 {
820   xbt_die("not implemented yet");
821   return 0;
822 }
823
824 /** \ingroup msg_gos_functions
825  *
826  * \brief Return the last value returned by a MSG function (except
827  * MSG_get_errno...).
828  */
829 MSG_error_t MSG_get_errno(void)
830 {
831   return PROCESS_GET_ERRNO();
832 }
833
834 MSG_error_t 
835 MSG_task_receive_ext(m_task_t* task, const char* alias, double timeout, m_host_t host)
836 {
837         m_process_t process = MSG_process_self();
838         m_task_t t = NULL;
839         m_host_t h = NULL;
840         simdata_task_t t_simdata = NULL;
841         simdata_host_t h_simdata = NULL;
842         int first_time = 1;
843         
844         smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
845         
846         /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
847         msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
848         
849         CHECK_HOST();
850         
851         /* Sanity check */
852         xbt_assert0(task, "Null pointer for the task storage");
853         
854         if (*task)
855                 CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
856         
857         /* Get the task */
858         h = MSG_host_self();
859         h_simdata = h->simdata;
860         
861         DEBUG2("Waiting for a task on channel aliased by %s (%s)", alias, h->name);
862         
863         SIMIX_mutex_lock(h->simdata->mutex);
864         
865         while (1) 
866         {
867                 /* if the mailbox is empty (has no task */
868                 if(!MSG_mailbox_is_empty(mailbox))
869                 {
870                         if(!host) 
871                         {
872                                 /* pop the head of the mailbox */
873                                 t = MSG_mailbox_pop_head(mailbox);
874                                 break;
875                         } 
876                         else 
877                         {
878                                 /* get the first task of the host */
879                                 if(NULL != (t = MSG_mailbox_get_first_host_task(mailbox,host)))
880                                         break;
881                         }
882                 }
883         
884                 if(timeout > 0) 
885                 {
886                         if (!first_time) 
887                         {
888                                 SIMIX_mutex_unlock(h->simdata->mutex);
889                                 /* set the simix condition of the mailbox to NULL */
890                                 MSG_mailbox_set_cond(mailbox, NULL);
891                                 SIMIX_cond_destroy(cond);
892                                 MSG_RETURN(MSG_TRANSFER_FAILURE);
893                         }
894                 }
895                 
896                 xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on the channel aliased by %s", alias);
897                 
898                 cond = SIMIX_cond_init();
899                 
900                 /* set the condition of the mailbox */
901                 MSG_mailbox_set_cond(mailbox, cond);
902                 
903                 if (timeout > 0)
904                         SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);
905                 else 
906                         SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);
907                         
908         
909                 if (SIMIX_host_get_state(h_simdata->smx_host) == 0)
910                         MSG_RETURN(MSG_HOST_FAILURE);
911         
912                 first_time = 0;
913         }
914         
915         SIMIX_mutex_unlock(h->simdata->mutex);
916         
917         DEBUG1("OK, got a task (%s)", t->name);
918         /* clean conditional */
919         if (cond) 
920         {
921                 SIMIX_cond_destroy(cond);
922                 
923                 MSG_mailbox_set_cond(mailbox,NULL);
924         }
925         
926         t_simdata = t->simdata;
927         t_simdata->receiver = process;
928         *task = t;
929         
930         SIMIX_mutex_lock(t_simdata->mutex);
931         
932         /* Transfer */
933         /* create SIMIX action to the communication */
934         t_simdata->comm = SIMIX_action_communicate(
935                                                                                                 t_simdata->sender->simdata->m_host->simdata->smx_host,
936                                                                                                 process->simdata->m_host->simdata->smx_host,
937                                                                                                 t->name, 
938                                                                                                 t_simdata->message_size,
939                                                                                                 t_simdata->rate
940                                                                                         );
941         
942         /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
943         if (MSG_process_is_suspended(t_simdata->sender)) 
944         {
945                 DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
946                 SIMIX_action_set_priority(t_simdata->comm, 0);
947         }
948         
949         process->simdata->waiting_task = t;
950         SIMIX_register_action_to_condition(t_simdata->comm, t_simdata->cond);
951         
952         while (1) 
953         {
954                 SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex);
955                 
956                 if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)
957                         break;
958         }
959         
960         SIMIX_unregister_action_to_condition(t_simdata->comm, t_simdata->cond);
961                 process->simdata->waiting_task = NULL;
962         
963         /* the task has already finished and the pointer must be null */
964         if (t->simdata->sender) 
965         {
966                 t->simdata->sender->simdata->waiting_task = NULL;
967         }
968         
969         /* for this process, don't need to change in get function */
970         t->simdata->receiver = NULL;
971         SIMIX_mutex_unlock(t_simdata->mutex);
972         
973         
974         if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) 
975         {
976                 SIMIX_action_destroy(t_simdata->comm);
977                 t_simdata->comm = NULL;
978                 t_simdata->using--;
979                 MSG_RETURN(MSG_OK);
980         } 
981         else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
982         {
983                 SIMIX_action_destroy(t_simdata->comm);
984                 t_simdata->comm = NULL;
985                 t_simdata->using--;
986                 MSG_RETURN(MSG_HOST_FAILURE);
987         } 
988         else 
989         {
990                 SIMIX_action_destroy(t_simdata->comm);
991                 t_simdata->comm = NULL;
992                 t_simdata->using--;
993                 MSG_RETURN(MSG_TRANSFER_FAILURE);
994         }
995 }
996
997 MSG_error_t 
998 MSG_task_receive_with_time_out(m_task_t * task, const char* alias, double timeout)
999 {
1000         return MSG_task_receive_ext(task, alias, timeout, NULL);                
1001 }
1002                              
1003 MSG_error_t 
1004 MSG_task_receive(m_task_t * task, const char* alias)
1005 {
1006         return MSG_task_receive_with_time_out(task, alias, -1); 
1007 }
1008
1009 int 
1010 MSG_task_listen(const char* alias)
1011 {
1012         CHECK_HOST();
1013         
1014         DEBUG2("Probing on channel aliased by %s (%s)", alias, MSG_host_self()->name);
1015         
1016         return !MSG_mailbox_is_empty(MSG_mailbox_get_by_alias(alias));
1017 }
1018
1019 int 
1020 MSG_task_listen_from_host(const char* alias, m_host_t host)
1021 {
1022
1023         return MSG_mailbox_get_count_host_tasks(MSG_mailbox_get_by_alias(alias),host);
1024                         
1025 }
1026
1027 MSG_error_t 
1028 MSG_alias_select_from(const char* alias, double timeout, int* PID)
1029 {
1030         m_host_t h = NULL;
1031         simdata_host_t h_simdata = NULL;
1032         m_task_t t;
1033         int first_time = 1;
1034         smx_cond_t cond;
1035         msg_mailbox_t mailbox;
1036         
1037         if (PID) 
1038         {
1039                 *PID = -1;
1040         }
1041         
1042         if(timeout == 0.0) 
1043         {
1044                 *PID = MSG_task_listen_from(alias);
1045                 MSG_RETURN(MSG_OK);
1046         } 
1047         else 
1048         {
1049                 CHECK_HOST();
1050                 h = MSG_host_self();
1051                 h_simdata = h->simdata;
1052         
1053                 DEBUG2("Probing on alias %s (%s)", alias, h->name);
1054                 
1055                 mailbox = MSG_mailbox_get_by_alias(alias);      
1056                 
1057                 while(MSG_mailbox_is_empty(mailbox))
1058                 {
1059                         if(timeout > 0) 
1060                         {
1061                                 if (!first_time) 
1062                                 {
1063                                         MSG_RETURN(MSG_OK);
1064                                 }
1065                         }
1066                         
1067                         SIMIX_mutex_lock(h_simdata->mutex);
1068                         
1069                         xbt_assert1(!MSG_mailbox_get_cond(mailbox),"A process is already blocked on this alias %s",alias);
1070                         
1071                         cond = SIMIX_cond_init();
1072                         
1073                         MSG_mailbox_set_cond(mailbox, cond);
1074                         
1075                         if (timeout > 0) 
1076                         {
1077                                 SIMIX_cond_wait_timeout(cond, h_simdata->mutex, timeout);
1078                         } 
1079                         else 
1080                         {
1081                                 SIMIX_cond_wait(cond, h_simdata->mutex);
1082                         }
1083                         
1084                         SIMIX_cond_destroy(cond);
1085                         SIMIX_mutex_unlock(h_simdata->mutex);
1086                         
1087                         if (SIMIX_host_get_state(h_simdata->smx_host) == 0) 
1088                         {
1089                                 MSG_RETURN(MSG_HOST_FAILURE);
1090                         }
1091                         
1092                         MSG_mailbox_set_cond(mailbox,NULL);
1093                         first_time = 0;
1094                 }
1095                 
1096                 if(NULL == (t = MSG_mailbox_get_head(mailbox)))
1097                         MSG_RETURN(MSG_OK);
1098                                 
1099                 
1100                 if (PID) 
1101                 {
1102                         *PID = MSG_process_get_PID(t->simdata->sender);
1103                 }
1104                 
1105                 MSG_RETURN(MSG_OK);
1106         }
1107 }
1108                                     
1109 MSG_error_t 
1110 MSG_task_send_with_timeout(m_task_t task, const char* alias, double timeout)
1111 {
1112         m_process_t process = MSG_process_self();
1113         const char* hostname;
1114         simdata_task_t task_simdata = NULL;
1115         m_host_t local_host = NULL;
1116         m_host_t remote_host = NULL;
1117         smx_cond_t cond = NULL;
1118         
1119         /* get the mailbox from the alias (if the mailbox doesn't exist, create it) */
1120         msg_mailbox_t mailbox = MSG_mailbox_get_by_alias(alias);
1121
1122         CHECK_HOST();
1123         
1124         task_simdata = task->simdata;
1125         task_simdata->sender = process;
1126         task_simdata->source = MSG_process_get_host(process);
1127         
1128         xbt_assert0(task_simdata->using == 1,"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
1129         
1130         task_simdata->comm = NULL;
1131         
1132         task_simdata->using++;
1133         local_host = ((simdata_process_t) process->simdata)->m_host;
1134         
1135         /* get the host name containing the mailbox */
1136         hostname = MSG_mailbox_get_hostname(mailbox);
1137
1138         remote_host = MSG_get_host_by_name(hostname);
1139
1140         if(NULL == remote_host)
1141                 THROW1(not_found_error,0,"Host %s not fount", hostname);
1142
1143
1144         DEBUG4("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",task->simdata->message_size / 1000, local_host->name,remote_host->name, MSG_mailbox_get_alias(mailbox));
1145         
1146         SIMIX_mutex_lock(remote_host->simdata->mutex);
1147
1148         /* put the task in the mailbox */
1149         MSG_mailbox_put(mailbox,task);
1150         
1151         if(NULL != (cond = MSG_mailbox_get_cond(mailbox)))
1152         {
1153                 DEBUG0("Somebody is listening. Let's wake him up!");
1154                 SIMIX_cond_signal(cond);
1155         }
1156
1157         
1158         
1159         SIMIX_mutex_unlock(remote_host->simdata->mutex);
1160         
1161         SIMIX_mutex_lock(task->simdata->mutex);
1162
1163         process->simdata->waiting_task = task;
1164         
1165         if(timeout > 0) 
1166         {
1167                 xbt_ex_t e;
1168                 double time;
1169                 double time_elapsed;
1170                 time = SIMIX_get_clock();
1171                 
1172                 TRY 
1173                 {
1174                         /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
1175                         while (1) 
1176                         {
1177                                 time_elapsed = SIMIX_get_clock() - time;
1178                                 SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex,timeout - time_elapsed);
1179                                 
1180                                 if ((task->simdata->comm != NULL) && (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING))
1181                                         break;
1182                         }
1183                 } 
1184                 CATCH(e) 
1185                 {
1186                         if(e.category==timeout_error) 
1187                         {
1188                                 xbt_ex_free(e);
1189                                 /* verify if the timeout happened and the communication didn't started yet */
1190                                 if (task->simdata->comm == NULL) 
1191                                 {
1192                                         process->simdata->waiting_task = NULL;
1193                                         
1194                                         /* remove the task from the mailbox */
1195                                         MSG_mailbox_remove(mailbox,task);
1196                                         
1197                                         if (task->simdata->receiver) 
1198                                         {
1199                                                 task->simdata->receiver->simdata->waiting_task = NULL;
1200                                         }
1201                                         
1202                                         task->simdata->sender = NULL;
1203                                         
1204                                         SIMIX_mutex_unlock(task->simdata->mutex);
1205                                         MSG_RETURN(MSG_TRANSFER_FAILURE);
1206                                 }
1207                         } 
1208                         else 
1209                         {
1210                                 RETHROW;
1211                         }
1212                 }
1213         } 
1214         else 
1215         {
1216                 while (1) 
1217                 {
1218                         SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex);
1219                         
1220                         if (SIMIX_action_get_state(task->simdata->comm) != SURF_ACTION_RUNNING)
1221                                 break;
1222                 }
1223         }
1224         
1225         DEBUG1("Action terminated %s", task->name);
1226         process->simdata->waiting_task = NULL;
1227         
1228         /* the task has already finished and the pointer must be null */
1229         if (task->simdata->receiver) 
1230         {
1231                 task->simdata->receiver->simdata->waiting_task = NULL;
1232         }
1233         
1234         task->simdata->sender = NULL;
1235         SIMIX_mutex_unlock(task->simdata->mutex);
1236
1237         
1238         if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE)
1239         {
1240                 MSG_RETURN(MSG_OK);
1241         } 
1242         else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) 
1243         {
1244                 MSG_RETURN(MSG_HOST_FAILURE);
1245         } 
1246         else 
1247         {
1248                 MSG_RETURN(MSG_TRANSFER_FAILURE);
1249         }
1250 }
1251
1252 MSG_error_t 
1253 MSG_task_send(m_task_t task,const char* alias)
1254 {
1255         return MSG_task_send_with_timeout(task, alias, -1);
1256 }
1257
1258
1259 MSG_error_t 
1260 MSG_task_send_bounded(m_task_t task, const char* alias, double rate)
1261 {
1262         task->simdata->rate = rate;
1263         return MSG_task_send(task, alias);
1264 }
1265
1266 int
1267 MSG_task_listen_from(const char* alias)
1268 {
1269         m_host_t h = NULL;
1270         m_task_t t;
1271
1272         CHECK_HOST();
1273
1274         h = MSG_host_self();
1275
1276         DEBUG2("Probing on alias %s(%s)", alias, h->name);
1277
1278         if(NULL == (t = MSG_mailbox_get_head(MSG_mailbox_get_by_alias(alias))))
1279                 return -1;
1280
1281         return MSG_process_get_PID(t->simdata->sender);
1282 }
1283