Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Getting rid of useless field
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
20                                                         m_channel_t channel,
21                                                         double max_duration,
22                                                         m_host_t host)
23 {
24   m_process_t process = MSG_process_self();
25   m_task_t t = NULL;
26   m_host_t h = NULL;
27   simdata_task_t t_simdata = NULL;
28   simdata_host_t h_simdata = NULL;
29   int first_time = 1;
30   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
31   xbt_fifo_item_t item = NULL;
32
33   CHECK_HOST();
34   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
35   /* Sanity check */
36   xbt_assert0(task,"Null pointer for the task\n");
37
38   if (*task) 
39     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
40
41   /* Get the task */
42   h = MSG_host_self();
43   h_simdata = h->simdata;
44
45   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
46
47   while (1) {
48     if(xbt_fifo_size(h_simdata->mbox[channel])>0) {
49       if(!host) {
50         t = xbt_fifo_shift(h_simdata->mbox[channel]);
51         break;
52       } else {
53         xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
54           if(t->simdata->source==host) break;
55         }
56         if(item) {
57           xbt_fifo_remove_item(h->simdata->mbox[channel],item);
58           break;
59         } 
60       }
61     }
62                                                        
63     if(max_duration>0) {
64       if(!first_time) {
65         MSG_RETURN(MSG_OK);
66       }
67     }
68     xbt_assert3(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on channel %d",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID,
72                 channel);
73     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
74     if(max_duration>0) {
75       __MSG_process_block(max_duration);
76     } else {
77       __MSG_process_block(-1);
78     }
79     h_simdata->sleeping[channel] = NULL;
80     first_time = 0;
81     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
82        == SURF_CPU_OFF)
83       MSG_RETURN(MSG_HOST_FAILURE);
84     /* OK, we should both be ready now. Are you there ? */
85   }
86
87   DEBUG1("OK, got a task (%s)", t->name);
88
89   t_simdata = t->simdata;
90   /*   *task = __MSG_task_copy(t); */
91   *task=t;
92
93   /* Transfer */
94   t_simdata->using++;
95
96   while(MSG_process_is_suspended(t_simdata->sender)) {
97     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
98            t_simdata->sender->name);
99     m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
100     if(__MSG_process_isBlocked(t_simdata->sender)) {
101       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
102       __MSG_process_unblock(t_simdata->sender);
103       task_to_wait_for->simdata->using++;
104       __MSG_task_wait_event(process, task_to_wait_for);
105       MSG_task_destroy(task_to_wait_for);
106     } else {
107       DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
108       task_to_wait_for->simdata->using++;
109       __MSG_task_wait_event(process, task_to_wait_for);
110       MSG_task_destroy(task_to_wait_for);
111       DEBUG0("He's resumed. He should block again. So let's free him.");
112       __MSG_process_unblock(t_simdata->sender);
113       break;
114     }
115   }
116   DEBUG0("Calling SURF for communication creation");
117   t_simdata->comm = surf_workstation_resource->extension_public->
118     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
119                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
120   
121   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
122
123   if(__MSG_process_isBlocked(t_simdata->sender)) {
124     DEBUG1("Unblocking %s",t_simdata->sender->name);
125     __MSG_process_unblock(t_simdata->sender);
126   }
127
128   PAJE_PROCESS_PUSH_STATE(process,"C",t);  
129
130   do {
131     DEBUG0("Waiting for action termination");
132     __MSG_task_wait_event(process, t);
133     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
134   } while (state==SURF_ACTION_RUNNING);
135   DEBUG0("Action terminated");
136
137   if(t->simdata->using>1) {
138     xbt_fifo_unshift(msg_global->process_to_run,process);
139     xbt_context_yield();
140   }
141
142   PAJE_PROCESS_POP_STATE(process);
143   PAJE_COMM_STOP(process,t,channel);
144
145   if(state == SURF_ACTION_DONE) {
146     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
147       t_simdata->comm = NULL;
148     MSG_RETURN(MSG_OK);
149   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
150           == SURF_CPU_OFF) {
151     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
152       t_simdata->comm = NULL;
153     MSG_RETURN(MSG_HOST_FAILURE);
154   } else {
155     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
156       t_simdata->comm = NULL;
157     MSG_RETURN(MSG_TRANSFER_FAILURE);
158   }
159 }
160
161 /** \ingroup msg_gos_functions
162  * \brief Listen on a channel and wait for receiving a task.
163  *
164  * It takes two parameters.
165  * \param task a memory location for storing a #m_task_t. It will
166    hold a task when this function will return. Thus \a task should not
167    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
168    those two condition does not hold, there will be a warning message.
169  * \param channel the channel on which the agent should be
170    listening. This value has to be >=0 and < than the maximal
171    number of channels fixed with MSG_set_channel_number().
172  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
173  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
174  */
175 MSG_error_t MSG_task_get(m_task_t * task,
176                          m_channel_t channel)
177 {
178   return MSG_task_get_with_time_out(task, channel, -1);
179 }
180
181 /** \ingroup msg_gos_functions
182  * \brief Listen on a channel and wait for receiving a task with a timeout.
183  *
184  * It takes three parameters.
185  * \param task a memory location for storing a #m_task_t. It will
186    hold a task when this function will return. Thus \a task should not
187    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
188    those two condition does not hold, there will be a warning message.
189  * \param channel the channel on which the agent should be
190    listening. This value has to be >=0 and < than the maximal
191    number of channels fixed with MSG_set_channel_number().
192  * \param max_duration the maximum time to wait for a task before giving
193     up. In such a case, \a task will not be modified and will still be
194     equal to \c NULL when returning.
195  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
196    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
197  */
198 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
199                                        m_channel_t channel,
200                                        double max_duration)
201 {
202   return __MSG_task_get_with_time_out_from_host(task, channel, max_duration, NULL);
203 }
204
205 /** \ingroup msg_gos_functions
206  * \brief Listen on \a channel and waits for receiving a task from \a host.
207  *
208  * It takes three parameters.
209  * \param task a memory location for storing a #m_task_t. It will
210    hold a task when this function will return. Thus \a task should not
211    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
212    those two condition does not hold, there will be a warning message.
213  * \param channel the channel on which the agent should be
214    listening. This value has to be >=0 and < than the maximal
215    number of channels fixed with MSG_set_channel_number().
216  * \param host the host that is to be watched.
217  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
218    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
219  */
220 MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel, 
221                                    m_host_t host)
222 {
223   return __MSG_task_get_with_time_out_from_host(task, channel, -1, host);
224 }
225
226 /** \ingroup msg_gos_functions
227  * \brief Test whether there is a pending communication on a channel.
228  *
229  * It takes one parameter.
230  * \param channel the channel on which the agent should be
231    listening. This value has to be >=0 and < than the maximal
232    number of channels fixed with MSG_set_channel_number().
233  * \return 1 if there is a pending communication and 0 otherwise
234  */
235 int MSG_task_Iprobe(m_channel_t channel)
236 {
237   m_host_t h = NULL;
238   simdata_host_t h_simdata = NULL;
239
240   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
241   DEBUG2("Probing on channel %d (%s)", channel,h->name);
242   CHECK_HOST();
243   h = MSG_host_self();
244   h_simdata = h->simdata;
245   return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
246 }
247
248 /** \ingroup msg_gos_functions
249  * \brief Test whether there is a pending communication on a channel, and who sent it.
250  *
251  * It takes one parameter.
252  * \param channel the channel on which the agent should be
253    listening. This value has to be >=0 and < than the maximal
254    number of channels fixed with MSG_set_channel_number().
255  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
256  */
257 int MSG_task_probe_from(m_channel_t channel)
258 {
259   m_host_t h = NULL;
260   simdata_host_t h_simdata = NULL;
261   xbt_fifo_item_t item;
262   m_task_t t;
263
264   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
265   CHECK_HOST();
266   h = MSG_host_self();
267   h_simdata = h->simdata;
268
269   DEBUG2("Probing on channel %d (%s)", channel,h->name);
270    
271   item = xbt_fifo_get_first_item(h->simdata->mbox[channel]);
272   if (!item || !(t = xbt_fifo_get_item_content(item)))
273     return -1;
274    
275   return MSG_process_get_PID(t->simdata->sender);
276 }
277
278 /** \ingroup msg_gos_functions
279  * \brief Wait for at most \a max_duration second for a task reception
280    on \a channel. *\a PID is updated with the PID of the first process
281    that triggered this event if any.
282  *
283  * It takes three parameters:
284  * \param channel the channel on which the agent should be
285    listening. This value has to be >=0 and < than the maximal.
286    number of channels fixed with MSG_set_channel_number().
287  * \param PID a memory location for storing an int.
288  * \param max_duration the maximum time to wait for a task before
289     giving up. In the case of a reception, *\a PID will be updated
290     with the PID of the first process to send a task.
291  * \return #MSG_HOST_FAILURE if the host is shut down in the meantime
292    and #MSG_OK otherwise.
293  */
294 MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
295                                     int *PID)
296 {
297   m_host_t h = NULL;
298   simdata_host_t h_simdata = NULL;
299   xbt_fifo_item_t item;
300   m_task_t t;
301   int first_time = 1;
302   m_process_t process = MSG_process_self();
303
304   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
305   if(PID) {
306     *PID = -1;
307   }
308
309   if(max_duration==0.0) {
310     *PID = MSG_task_probe_from(channel);
311     MSG_RETURN(MSG_OK);
312   } else {
313     CHECK_HOST();
314     h = MSG_host_self();
315     h_simdata = h->simdata;
316     
317     DEBUG2("Probing on channel %d (%s)", channel,h->name);
318     while(!(item = xbt_fifo_get_first_item(h->simdata->mbox[channel]))) {
319       if(max_duration>0) {
320         if(!first_time) {
321           MSG_RETURN(MSG_OK);
322         }
323       }
324       xbt_assert2(!(h_simdata->sleeping[channel]),
325                   "A process (%s(%d)) is already blocked on this channel",
326                   h_simdata->sleeping[channel]->name,
327                   h_simdata->sleeping[channel]->simdata->PID);
328       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
329       if(max_duration>0) {
330         __MSG_process_block(max_duration);
331       } else {
332         __MSG_process_block(-1);
333       }
334       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
335          == SURF_CPU_OFF) {
336         MSG_RETURN(MSG_HOST_FAILURE);
337       }
338       h_simdata->sleeping[channel] = NULL;
339       first_time = 0;
340     }
341     if (!item || !(t = xbt_fifo_get_item_content(item))) {
342       MSG_RETURN(MSG_OK);
343     }
344     if(PID) {
345       *PID = MSG_process_get_PID(t->simdata->sender);
346     }
347     MSG_RETURN(MSG_OK);
348   }
349 }
350
351
352 /** \ingroup msg_gos_functions
353
354  * \brief Return the number of tasks waiting to be received on a \a
355    channel and sent by \a host.
356  *
357  * It takes two parameters.
358  * \param channel the channel on which the agent should be
359    listening. This value has to be >=0 and < than the maximal
360    number of channels fixed with MSG_set_channel_number().
361  * \param host the host that is to be watched.
362  * \return the number of tasks waiting to be received on \a channel
363    and sent by \a host.
364  */
365 int MSG_task_probe_from_host(int channel, m_host_t host)
366 {
367   simdata_host_t h_simdata = NULL;
368   xbt_fifo_item_t item;
369   m_task_t t;
370   int count = 0;
371   m_host_t h = NULL;
372   
373   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
374   CHECK_HOST();
375   h = MSG_host_self();
376   h_simdata = h->simdata;
377
378   DEBUG2("Probing on channel %d (%s)", channel,h->name);
379    
380   xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
381     if(t->simdata->source==host) count++;
382   }
383    
384   return count;
385 }
386
387 /** \ingroup msg_gos_functions
388  * \brief Put a task on a channel of an host and waits for the end of the
389  * transmission.
390  *
391  * This function is used for describing the behavior of an agent. It
392  * takes three parameter.
393  * \param task a #m_task_t to send on another location. This task
394    will not be usable anymore when the function will return. There is
395    no automatic task duplication and you have to save your parameters
396    before calling this function. Tasks are unique and once it has been
397    sent to another location, you should not access it anymore. You do
398    not need to call MSG_task_destroy() but to avoid using, as an
399    effect of inattention, this task anymore, you definitely should
400    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
401    can be transfered iff it has been correctly created with
402    MSG_task_create().
403  * \param dest the destination of the message
404  * \param channel the channel on which the agent should put this
405    task. This value has to be >=0 and < than the maximal number of
406    channels fixed with MSG_set_channel_number().
407  * \return #MSG_FATAL if \a task is not properly initialized and
408  * #MSG_OK otherwise.
409  */
410 MSG_error_t MSG_task_put(m_task_t task,
411                          m_host_t dest, m_channel_t channel)
412 {
413   m_process_t process = MSG_process_self();
414   simdata_task_t task_simdata = NULL;
415   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
416   m_host_t local_host = NULL;
417   m_host_t remote_host = NULL;
418
419   CHECK_HOST();
420
421   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
422
423   task_simdata = task->simdata;
424   task_simdata->sender = process;
425   task_simdata->source = MSG_process_get_host(process);
426   xbt_assert0(task_simdata->using==1,
427               "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
428   task_simdata->comm = NULL;
429   
430   local_host = ((simdata_process_t) process->simdata)->host;
431   remote_host = dest;
432
433   DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
434          task->simdata->message_size,local_host->name, remote_host->name, channel);
435
436   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
437                 mbox[channel], task);
438
439   PAJE_COMM_START(process,task,channel);
440     
441   if(remote_host->simdata->sleeping[channel]) {
442     DEBUG0("Somebody is listening. Let's wake him up!");
443     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
444   }
445
446   process->simdata->put_host = dest;
447   process->simdata->put_channel = channel;
448   while(!(task_simdata->comm)) {
449     DEBUG0("Communication not initiated yet. Let's block!");
450     __MSG_process_block(-1);
451     if(surf_workstation_resource->extension_public->
452        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
453       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
454                       task);
455       MSG_task_destroy(task);
456       MSG_RETURN(MSG_HOST_FAILURE);
457     }
458   }
459   DEBUG0("Registering to this communication");
460   surf_workstation_resource->common_public->action_use(task_simdata->comm);
461   process->simdata->put_host = NULL;
462   process->simdata->put_channel = -1;
463
464
465   PAJE_PROCESS_PUSH_STATE(process,"C",task);  
466
467   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
468   while (state==SURF_ACTION_RUNNING) {
469     DEBUG0("Waiting for action termination");
470     __MSG_task_wait_event(process, task);
471     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
472   }
473   DEBUG0("Action terminated");
474   task->simdata->rate=-1.0; /* Sets the rate back to default */
475
476   PAJE_PROCESS_POP_STATE(process);  
477
478   if(state == SURF_ACTION_DONE) {
479     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
480       task_simdata->comm = NULL;
481     MSG_task_destroy(task);
482     MSG_RETURN(MSG_OK);
483   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
484             == SURF_CPU_OFF) {
485     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
486       task_simdata->comm = NULL;
487     MSG_task_destroy(task);
488     MSG_RETURN(MSG_HOST_FAILURE);
489   } else { 
490     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
491       task_simdata->comm = NULL;
492     MSG_task_destroy(task);
493     MSG_RETURN(MSG_TRANSFER_FAILURE);
494   }
495 }
496
497 /** \ingroup msg_gos_functions
498  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
499  * rate.
500  *
501  * \sa MSG_task_put
502  */
503 MSG_error_t MSG_task_put_bounded(m_task_t task,
504                                  m_host_t dest, m_channel_t channel,
505                                  double max_rate)
506 {
507   MSG_error_t res = MSG_OK;
508   task->simdata->rate=max_rate;
509   res = MSG_task_put(task, dest, channel);
510   return(res);
511 }
512
513 /** \ingroup msg_gos_functions
514  * \brief Executes a task and waits for its termination.
515  *
516  * This function is used for describing the behavior of an agent. It
517  * takes only one parameter.
518  * \param task a #m_task_t to execute on the location on which the
519    agent is running.
520  * \return #MSG_FATAL if \a task is not properly initialized and
521  * #MSG_OK otherwise.
522  */
523 MSG_error_t MSG_task_execute(m_task_t task)
524 {
525   m_process_t process = MSG_process_self();
526   MSG_error_t res;
527
528   DEBUG1("Computing on %s", process->simdata->host->name);
529
530   __MSG_task_execute(process, task);
531
532   PAJE_PROCESS_PUSH_STATE(process,"E",task);  
533   res = __MSG_wait_for_computation(process,task);
534   PAJE_PROCESS_POP_STATE(process);
535   return res;
536 }
537
538 void __MSG_task_execute(m_process_t process, m_task_t task)
539 {
540   simdata_task_t simdata = NULL;
541
542   CHECK_HOST();
543
544   simdata = task->simdata;
545   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
546               "This taks is executed somewhere else. Go fix your code!");
547   task->simdata->using++;
548   simdata->compute = surf_workstation_resource->extension_public->
549     execute(MSG_process_get_host(process)->simdata->host,
550             simdata->computation_amount);
551   surf_workstation_resource->common_public->
552     set_priority(simdata->compute, simdata->priority);
553
554   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
555   task->simdata->using--;
556 }
557
558 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
559 {
560   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
561   simdata_task_t simdata = task->simdata;
562
563   XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
564   simdata->using++;
565   do {
566     __MSG_task_wait_event(process, task);
567     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
568   } while (state==SURF_ACTION_RUNNING);
569   simdata->using--;
570     
571
572   if(state == SURF_ACTION_DONE) {
573     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
574       simdata->compute = NULL;
575     simdata->computation_amount = 0.0;
576     XBT_OUT;
577     MSG_RETURN(MSG_OK);
578   } else if(surf_workstation_resource->extension_public->
579             get_state(MSG_process_get_host(process)->simdata->host) 
580             == SURF_CPU_OFF) {
581     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
582       simdata->compute = NULL;
583     XBT_OUT;
584     MSG_RETURN(MSG_HOST_FAILURE);
585   } else {
586     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
587       simdata->compute = NULL;
588     XBT_OUT;
589     MSG_RETURN(MSG_TASK_CANCELLED);
590   }
591 }
592 /** \ingroup m_task_management
593  * \brief Creates a new #m_task_t (a parallel one....).
594  *
595  * A constructor for #m_task_t taking six arguments and returning the 
596    corresponding object.
597  * \param name a name for the object. It is for user-level information
598    and can be NULL.
599  * \param host_nb the number of hosts implied in the parallel task.
600  * \param host_list an array of \p host_nb m_host_t.
601  * \param computation_amount an array of \p host_nb
602    doubles. computation_amount[i] is the total number of operations
603    that have to be performed on host_list[i].
604  * \param communication_amount an array of \p host_nb* \p host_nb doubles.
605  * \param data a pointer to any data may want to attach to the new
606    object.  It is for user-level information and can be NULL. It can
607    be retrieved with the function \ref MSG_task_get_data.
608  * \see m_task_t
609  * \return The new corresponding object.
610  */
611 m_task_t MSG_parallel_task_create(const char *name, 
612                                   int host_nb,
613                                   const m_host_t *host_list,
614                                   double *computation_amount,
615                                   double *communication_amount,
616                                   void *data)
617 {
618   simdata_task_t simdata = xbt_new0(s_simdata_task_t,1);
619   m_task_t task = xbt_new0(s_m_task_t,1);
620   int i;
621
622   /* Task structure */
623   task->name = xbt_strdup(name);
624   task->simdata = simdata;
625   task->data = data;
626
627   /* Simulator Data */
628   simdata->sleeping = xbt_dynar_new(sizeof(m_process_t),NULL);
629   simdata->rate = -1.0;
630   simdata->using = 1;
631   simdata->sender = NULL;
632   simdata->source = NULL;
633   simdata->host_nb = host_nb;
634   
635   simdata->host_list = xbt_new0(void *, host_nb);
636   simdata->comp_amount = computation_amount;
637   simdata->comm_amount = communication_amount;
638
639   for(i=0;i<host_nb;i++)
640     simdata->host_list[i] = host_list[i]->simdata->host;
641
642   return task;
643 }
644
645
646 static void __MSG_parallel_task_execute(m_process_t process, m_task_t task)
647 {
648   simdata_task_t simdata = NULL;
649
650   CHECK_HOST();
651
652   simdata = task->simdata;
653
654   xbt_assert0(simdata->host_nb,"This is not a parallel task. Go to hell.");
655
656   simdata->compute = surf_workstation_resource->extension_public->
657   execute_parallel_task(task->simdata->host_nb,
658                         task->simdata->host_list,
659                         task->simdata->comp_amount,
660                         task->simdata->comm_amount,
661                         1.0,
662                         -1.0);
663   if(simdata->compute)
664     surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
665 }
666
667 MSG_error_t MSG_parallel_task_execute(m_task_t task)
668 {
669   m_process_t process = MSG_process_self();
670   MSG_error_t res;
671
672   DEBUG0("Computing on a tons of guys");
673   
674   __MSG_parallel_task_execute(process, task);
675
676   if(task->simdata->compute)
677     res = __MSG_wait_for_computation(process,task);
678   else 
679     res = MSG_OK;
680
681   return res;  
682 }
683
684
685 /** \ingroup msg_gos_functions
686  * \brief Sleep for the specified number of seconds
687  *
688  * Makes the current process sleep until \a time seconds have elapsed.
689  *
690  * \param nb_sec a number of second
691  */
692 MSG_error_t MSG_process_sleep(double nb_sec)
693 {
694   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
695   m_process_t process = MSG_process_self();
696   m_task_t dummy = NULL;
697   simdata_task_t simdata = NULL;
698
699   CHECK_HOST();
700   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
701   simdata = dummy->simdata;
702
703   simdata->compute = surf_workstation_resource->extension_public->
704     sleep(MSG_process_get_host(process)->simdata->host,
705             simdata->computation_amount);
706   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
707
708   
709   simdata->using++;
710   do {
711     __MSG_task_wait_event(process, dummy);
712     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
713   } while (state==SURF_ACTION_RUNNING);
714   simdata->using--;
715     
716   if(state == SURF_ACTION_DONE) {
717     if(surf_workstation_resource->extension_public->
718        get_state(MSG_process_get_host(process)->simdata->host) 
719        == SURF_CPU_OFF) {
720       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
721         simdata->compute = NULL;
722       MSG_RETURN(MSG_HOST_FAILURE);
723     }
724     if(__MSG_process_isBlocked(process)) {
725       __MSG_process_unblock(MSG_process_self());
726     }
727     if(surf_workstation_resource->extension_public->
728        get_state(MSG_process_get_host(process)->simdata->host) 
729        == SURF_CPU_OFF) {
730       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
731         simdata->compute = NULL;
732       MSG_RETURN(MSG_HOST_FAILURE);
733     }
734     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
735       simdata->compute = NULL;
736     MSG_task_destroy(dummy);
737     MSG_RETURN(MSG_OK);
738   } else MSG_RETURN(MSG_HOST_FAILURE);
739 }
740
741 /** \ingroup msg_gos_functions
742  * \brief Return the number of MSG tasks currently running on
743  * the host of the current running process.
744  */
745 static int MSG_get_msgload(void) 
746 {
747   m_process_t process;
748    
749   CHECK_HOST();
750   
751   xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
752   process = MSG_process_self();
753   return xbt_fifo_size(process->simdata->host->simdata->process_list);
754 }
755
756 /** \ingroup msg_gos_functions
757  *
758  * \brief Return the last value returned by a MSG function (except
759  * MSG_get_errno...).
760  */
761 MSG_error_t MSG_get_errno(void)
762 {
763   return PROCESS_GET_ERRNO();
764 }