Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG_task_get is now fair...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameter.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   m_process_t process = MSG_process_self();
47   m_task_t t = NULL;
48   m_host_t h = NULL;
49   simdata_task_t t_simdata = NULL;
50   simdata_host_t h_simdata = NULL;
51   int warning = 0;
52   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
53   
54   CHECK_HOST();
55   /* Sanity check */
56   xbt_assert0(task,"Null pointer for the task\n");
57
58   if (*task) 
59     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
60
61   /* Get the task */
62   h = MSG_host_self();
63   h_simdata = h->simdata;
64
65   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
66
67   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     __MSG_process_block();
74     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
75        == SURF_CPU_OFF)
76       MSG_RETURN(MSG_HOST_FAILURE);
77     h_simdata->sleeping[channel] = NULL;
78     /* OK, we should both be ready now. Are you there ? */
79   }
80
81   t_simdata = t->simdata;
82   /*   *task = __MSG_task_copy(t); */
83   *task=t;
84
85   /* Transfer */
86   t_simdata->using++;
87
88   t_simdata->comm = surf_workstation_resource->extension_public->
89     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
90                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
91   
92   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
93
94   if(__MSG_process_isBlocked(t_simdata->sender)) 
95     __MSG_process_unblock(t_simdata->sender);
96
97   PAJE_PROCESS_PUSH_STATE(process,"C");  
98
99   do {
100     __MSG_task_wait_event(process, t);
101     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
102   } while (state==SURF_ACTION_RUNNING);
103
104   if(t->simdata->using>1) {
105     xbt_fifo_unshift(msg_global->process_to_run,process);
106     xbt_context_yield();
107   }
108
109   PAJE_PROCESS_POP_STATE(process);  
110   PAJE_COMM_STOP(process,t,channel);
111
112   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
113   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
114           == SURF_CPU_OFF)
115     MSG_RETURN(MSG_HOST_FAILURE);
116   else MSG_RETURN(MSG_TRANSFER_FAILURE);
117 }
118
119 /** \ingroup msg_gos_functions
120  * \brief Test whether there is a pending communication on a channel.
121  *
122  * It takes one parameter.
123  * \param channel the channel on which the agent should be
124    listening. This value has to be >=0 and < than the maximal
125    number of channels fixed with MSG_set_channel_number().
126  * \return 1 if there is a pending communication and 0 otherwise
127  */
128 int MSG_task_Iprobe(m_channel_t channel)
129 {
130   m_host_t h = NULL;
131   simdata_host_t h_simdata = NULL;
132
133   DEBUG2("Probing on channel %d (%s)", channel,h->name);
134   CHECK_HOST();
135   h = MSG_host_self();
136   h_simdata = h->simdata;
137   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
138 }
139
140 /** \ingroup msg_gos_functions
141  * \brief Test whether there is a pending communication on a channel, and who sent it.
142  *
143  * It takes one parameter.
144  * \param channel the channel on which the agent should be
145    listening. This value has to be >=0 and < than the maximal
146    number of channels fixed with MSG_set_channel_number().
147  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
148  */
149 int MSG_task_probe_from(m_channel_t channel)
150 {
151   m_host_t h = NULL;
152   simdata_host_t h_simdata = NULL;
153   xbt_fifo_item_t item;
154   m_task_t t;
155
156   CHECK_HOST();
157   h = MSG_host_self();
158   h_simdata = h->simdata;
159
160   DEBUG2("Probing on channel %d (%s)", channel,h->name);
161    
162   item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
163   if (!item || !(t = xbt_fifo_get_item_content(item)))
164     return -1;
165    
166   return MSG_process_get_PID(t->simdata->sender);
167 }
168
169 /** \ingroup msg_gos_functions
170  * \brief Put a task on a channel of an host and waits for the end of the
171  * transmission.
172  *
173  * This function is used for describing the behavior of an agent. It
174  * takes three parameter.
175  * \param task a #m_task_t to send on another location. This task
176    will not be usable anymore when the function will return. There is
177    no automatic task duplication and you have to save your parameters
178    before calling this function. Tasks are unique and once it has been
179    sent to another location, you should not access it anymore. You do
180    not need to call MSG_task_destroy() but to avoid using, as an
181    effect of inattention, this task anymore, you definitely should
182    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
183    can be transfered iff it has been correctly created with
184    MSG_task_create().
185  * \param dest the destination of the message
186  * \param channel the channel on which the agent should put this
187    task. This value has to be >=0 and < than the maximal number of
188    channels fixed with MSG_set_channel_number().
189  * \return #MSG_FATAL if \a task is not properly initialized and
190  * #MSG_OK otherwise.
191  */
192 MSG_error_t MSG_task_put(m_task_t task,
193                          m_host_t dest, m_channel_t channel)
194 {
195   m_process_t process = MSG_process_self();
196   simdata_task_t task_simdata = NULL;
197   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
198   m_host_t local_host = NULL;
199   m_host_t remote_host = NULL;
200
201   CHECK_HOST();
202
203   task_simdata = task->simdata;
204   task_simdata->sender = process;
205   xbt_assert0(task_simdata->using==1,"Gargl!");
206   task_simdata->comm = NULL;
207   
208   local_host = ((simdata_process_t) process->simdata)->host;
209   remote_host = dest;
210
211   DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
212          task->simdata->message_size,local_host->name, remote_host->name, channel);
213
214   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
215                 mbox[channel], task);
216
217   PAJE_COMM_START(process,task,channel);
218     
219   if(remote_host->simdata->sleeping[channel]) 
220     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
221
222   process->simdata->put_host = dest;
223   process->simdata->put_channel = channel;
224   while(!(task_simdata->comm)) 
225     __MSG_process_block();
226   process->simdata->put_host = NULL;
227   process->simdata->put_channel = -1;
228
229
230   PAJE_PROCESS_PUSH_STATE(process,"C");  
231
232   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
233   while (state==SURF_ACTION_RUNNING) {
234     __MSG_task_wait_event(process, task);
235     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
236   }
237     
238   MSG_task_destroy(task);
239
240   PAJE_PROCESS_POP_STATE(process);  
241
242   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
243   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
244           == SURF_CPU_OFF)
245     MSG_RETURN(MSG_HOST_FAILURE);
246   else MSG_RETURN(MSG_TRANSFER_FAILURE);
247 }
248
249 /** \ingroup msg_gos_functions
250  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
251  * rate.
252  *
253  * \sa MSG_task_put
254  */
255 MSG_error_t MSG_task_put_bounded(m_task_t task,
256                                  m_host_t dest, m_channel_t channel,
257                                  double max_rate)
258 {
259   task->simdata->rate=max_rate;
260   return(MSG_task_put(task, dest, channel));
261   task->simdata->rate=-1.0;
262 }
263
264 /** \ingroup msg_gos_functions
265  * \brief Executes a task and waits for its termination.
266  *
267  * This function is used for describing the behavior of an agent. It
268  * takes only one parameter.
269  * \param task a #m_task_t to execute on the location on which the
270    agent is running.
271  * \return #MSG_FATAL if \a task is not properly initialized and
272  * #MSG_OK otherwise.
273  */
274 MSG_error_t MSG_task_execute(m_task_t task)
275 {
276   m_process_t process = MSG_process_self();
277   MSG_error_t res;
278
279   DEBUG1("Computing on %s", process->simdata->host->name);
280
281   __MSG_task_execute(process, task);
282
283   PAJE_PROCESS_PUSH_STATE(process,"E");  
284   res = __MSG_wait_for_computation(process,task);
285   PAJE_PROCESS_POP_STATE(process);
286   return res;
287 }
288
289 void __MSG_task_execute(m_process_t process, m_task_t task)
290 {
291   simdata_task_t simdata = NULL;
292
293   CHECK_HOST();
294
295   simdata = task->simdata;
296
297   simdata->compute = surf_workstation_resource->extension_public->
298     execute(MSG_process_get_host(process)->simdata->host,
299             simdata->computation_amount);
300   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
301 }
302
303 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
304 {
305   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
306   simdata_task_t simdata = task->simdata;
307
308   simdata->using++;
309   do {
310     __MSG_task_wait_event(process, task);
311     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
312   } while (state==SURF_ACTION_RUNNING);
313   simdata->using--;
314     
315
316   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
317   else if(surf_workstation_resource->extension_public->
318           get_state(MSG_process_get_host(process)->simdata->host) 
319           == SURF_CPU_OFF)
320     MSG_RETURN(MSG_HOST_FAILURE);
321   else MSG_RETURN(MSG_TRANSFER_FAILURE);
322 }
323
324 /** \ingroup msg_gos_functions
325  * \brief Sleep for the specified number of seconds
326  *
327  * Makes the current process sleep until \a time seconds have elapsed.
328  *
329  * \param nb_sec a number of second
330  */
331 MSG_error_t MSG_process_sleep(double nb_sec)
332 {
333   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
334   m_process_t process = MSG_process_self();
335   m_task_t dummy = NULL;
336   simdata_task_t simdata = NULL;
337
338   CHECK_HOST();
339   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
340   simdata = dummy->simdata;
341
342   simdata->compute = surf_workstation_resource->extension_public->
343     sleep(MSG_process_get_host(process)->simdata->host,
344             simdata->computation_amount);
345   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
346
347   
348   simdata->using++;
349   do {
350     __MSG_task_wait_event(process, dummy);
351     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
352   } while (state==SURF_ACTION_RUNNING);
353   simdata->using--;
354     
355   if(state == SURF_ACTION_DONE) {
356     if(surf_workstation_resource->extension_public->
357        get_state(MSG_process_get_host(process)->simdata->host) 
358        == SURF_CPU_OFF)
359       MSG_RETURN(MSG_HOST_FAILURE);
360
361     if(__MSG_process_isBlocked(process)) {
362       __MSG_process_unblock(MSG_process_self());
363     }
364     if(surf_workstation_resource->extension_public->
365        get_state(MSG_process_get_host(process)->simdata->host) 
366        == SURF_CPU_OFF)
367       MSG_RETURN(MSG_HOST_FAILURE);
368     MSG_task_destroy(dummy);
369     MSG_RETURN(MSG_OK);
370   } else MSG_RETURN(MSG_HOST_FAILURE);
371 }
372
373 /** \ingroup msg_gos_functions
374  * \brief Return the number of MSG tasks currently running on a
375  * the host of the current running process.
376  */
377 int MSG_get_msgload(void) 
378 {
379   CHECK_HOST();
380   xbt_assert0(0,"Not implemented yet!");
381   
382   return 1;
383 }
384
385 /** \ingroup msg_gos_functions
386  *
387  * \brief Return the the last value returned by a MSG function (except
388  * MSG_get_errno...).
389  */
390 MSG_error_t MSG_get_errno(void)
391 {
392   return PROCESS_GET_ERRNO();
393 }