Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a new function: MSG_task_get_with_timeout. That should be very convenient to...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameters.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   return MSG_task_get_with_time_out(task, channel, -1);
47 }
48
49 /** \ingroup msg_gos_functions
50  * \brief Listen on a channel and wait for receiving a task with a timeout.
51  *
52  * It takes three parameters.
53  * \param task a memory location for storing a #m_task_t. It will
54    hold a task when this function will return. Thus \a task should not
55    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
56    those two condition does not hold, there will be a warning message.
57  * \param channel the channel on which the agent should be
58    listening. This value has to be >=0 and < than the maximal
59    number of channels fixed with MSG_set_channel_number().
60  * \param timeout the maximum time to wait for a task before giving
61     up. In such a case, \a task will not be modified and will still be
62     equal to \c NULL when returning.
63  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
64    if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
65  */
66
67 MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
68                                        m_channel_t channel,
69                                        double max_duration)
70 {
71   m_process_t process = MSG_process_self();
72   m_task_t t = NULL;
73   m_host_t h = NULL;
74   simdata_task_t t_simdata = NULL;
75   simdata_host_t h_simdata = NULL;
76   int warning = 0;
77   int first_time = 1;
78   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
79   
80   CHECK_HOST();
81   /* Sanity check */
82   xbt_assert0(task,"Null pointer for the task\n");
83
84   if (*task) 
85     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
86
87   /* Get the task */
88   h = MSG_host_self();
89   h_simdata = h->simdata;
90
91   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
92
93   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
94     if(max_duration>0) {
95       if(!first_time) {
96         MSG_RETURN(MSG_OK);
97       }
98     }
99     xbt_assert2(!(h_simdata->sleeping[channel]),
100                 "A process (%s(%d)) is already blocked on this channel",
101                 h_simdata->sleeping[channel]->name,
102                 h_simdata->sleeping[channel]->simdata->PID);
103     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
104     if(max_duration>0) {
105       __MSG_process_block(max_duration);
106     } else {
107       __MSG_process_block(-1);
108     }
109     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
110        == SURF_CPU_OFF)
111       MSG_RETURN(MSG_HOST_FAILURE);
112     h_simdata->sleeping[channel] = NULL;
113     first_time = 0;
114     /* OK, we should both be ready now. Are you there ? */
115   }
116
117   t_simdata = t->simdata;
118   /*   *task = __MSG_task_copy(t); */
119   *task=t;
120
121   /* Transfer */
122   t_simdata->using++;
123
124   t_simdata->comm = surf_workstation_resource->extension_public->
125     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
126                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
127   
128   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
129
130   if(__MSG_process_isBlocked(t_simdata->sender)) 
131     __MSG_process_unblock(t_simdata->sender);
132
133   PAJE_PROCESS_PUSH_STATE(process,"C");  
134
135   do {
136     __MSG_task_wait_event(process, t);
137     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
138   } while (state==SURF_ACTION_RUNNING);
139
140   if(t->simdata->using>1) {
141     xbt_fifo_unshift(msg_global->process_to_run,process);
142     xbt_context_yield();
143   }
144
145   PAJE_PROCESS_POP_STATE(process);
146   PAJE_COMM_STOP(process,t,channel);
147
148   if(state == SURF_ACTION_DONE) {
149     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
150       t_simdata->comm = NULL;
151     MSG_RETURN(MSG_OK);
152   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
153           == SURF_CPU_OFF) {
154     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
155       t_simdata->comm = NULL;
156     MSG_RETURN(MSG_HOST_FAILURE);
157   } else {
158     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
159       t_simdata->comm = NULL;
160     MSG_RETURN(MSG_TRANSFER_FAILURE);
161   }
162 }
163
164 /** \ingroup msg_gos_functions
165  * \brief Test whether there is a pending communication on a channel.
166  *
167  * It takes one parameter.
168  * \param channel the channel on which the agent should be
169    listening. This value has to be >=0 and < than the maximal
170    number of channels fixed with MSG_set_channel_number().
171  * \return 1 if there is a pending communication and 0 otherwise
172  */
173 int MSG_task_Iprobe(m_channel_t channel)
174 {
175   m_host_t h = NULL;
176   simdata_host_t h_simdata = NULL;
177
178   DEBUG2("Probing on channel %d (%s)", channel,h->name);
179   CHECK_HOST();
180   h = MSG_host_self();
181   h_simdata = h->simdata;
182   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
183 }
184
185 /** \ingroup msg_gos_functions
186  * \brief Test whether there is a pending communication on a channel, and who sent it.
187  *
188  * It takes one parameter.
189  * \param channel the channel on which the agent should be
190    listening. This value has to be >=0 and < than the maximal
191    number of channels fixed with MSG_set_channel_number().
192  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
193  */
194 int MSG_task_probe_from(m_channel_t channel)
195 {
196   m_host_t h = NULL;
197   simdata_host_t h_simdata = NULL;
198   xbt_fifo_item_t item;
199   m_task_t t;
200
201   CHECK_HOST();
202   h = MSG_host_self();
203   h_simdata = h->simdata;
204
205   DEBUG2("Probing on channel %d (%s)", channel,h->name);
206    
207   item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
208   if (!item || !(t = xbt_fifo_get_item_content(item)))
209     return -1;
210    
211   return MSG_process_get_PID(t->simdata->sender);
212 }
213
214 /** \ingroup msg_gos_functions
215  * \brief Put a task on a channel of an host and waits for the end of the
216  * transmission.
217  *
218  * This function is used for describing the behavior of an agent. It
219  * takes three parameter.
220  * \param task a #m_task_t to send on another location. This task
221    will not be usable anymore when the function will return. There is
222    no automatic task duplication and you have to save your parameters
223    before calling this function. Tasks are unique and once it has been
224    sent to another location, you should not access it anymore. You do
225    not need to call MSG_task_destroy() but to avoid using, as an
226    effect of inattention, this task anymore, you definitely should
227    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
228    can be transfered iff it has been correctly created with
229    MSG_task_create().
230  * \param dest the destination of the message
231  * \param channel the channel on which the agent should put this
232    task. This value has to be >=0 and < than the maximal number of
233    channels fixed with MSG_set_channel_number().
234  * \return #MSG_FATAL if \a task is not properly initialized and
235  * #MSG_OK otherwise.
236  */
237 MSG_error_t MSG_task_put(m_task_t task,
238                          m_host_t dest, m_channel_t channel)
239 {
240   m_process_t process = MSG_process_self();
241   simdata_task_t task_simdata = NULL;
242   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
243   m_host_t local_host = NULL;
244   m_host_t remote_host = NULL;
245
246   CHECK_HOST();
247
248   task_simdata = task->simdata;
249   task_simdata->sender = process;
250   xbt_assert0(task_simdata->using==1,"Gargl!");
251   task_simdata->comm = NULL;
252   
253   local_host = ((simdata_process_t) process->simdata)->host;
254   remote_host = dest;
255
256   DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
257          task->simdata->message_size,local_host->name, remote_host->name, channel);
258
259   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
260                 mbox[channel], task);
261
262   PAJE_COMM_START(process,task,channel);
263     
264   if(remote_host->simdata->sleeping[channel]) 
265     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
266
267   process->simdata->put_host = dest;
268   process->simdata->put_channel = channel;
269   while(!(task_simdata->comm)) 
270     __MSG_process_block(-1);
271   surf_workstation_resource->common_public->action_use(task_simdata->comm);
272   process->simdata->put_host = NULL;
273   process->simdata->put_channel = -1;
274
275
276   PAJE_PROCESS_PUSH_STATE(process,"C");  
277
278   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
279   while (state==SURF_ACTION_RUNNING) {
280     __MSG_task_wait_event(process, task);
281     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
282   }
283     
284   MSG_task_destroy(task);
285
286   PAJE_PROCESS_POP_STATE(process);  
287
288   if(state == SURF_ACTION_DONE) {
289     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
290       task_simdata->comm = NULL;
291     MSG_RETURN(MSG_OK);
292   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
293             == SURF_CPU_OFF) {
294     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
295       task_simdata->comm = NULL;
296     MSG_RETURN(MSG_HOST_FAILURE);
297   } else { 
298     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
299       task_simdata->comm = NULL;
300     MSG_RETURN(MSG_TRANSFER_FAILURE);
301   }
302 }
303
304 /** \ingroup msg_gos_functions
305  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
306  * rate.
307  *
308  * \sa MSG_task_put
309  */
310 MSG_error_t MSG_task_put_bounded(m_task_t task,
311                                  m_host_t dest, m_channel_t channel,
312                                  double max_rate)
313 {
314   task->simdata->rate=max_rate;
315   return(MSG_task_put(task, dest, channel));
316   task->simdata->rate=-1.0;
317 }
318
319 /** \ingroup msg_gos_functions
320  * \brief Executes a task and waits for its termination.
321  *
322  * This function is used for describing the behavior of an agent. It
323  * takes only one parameter.
324  * \param task a #m_task_t to execute on the location on which the
325    agent is running.
326  * \return #MSG_FATAL if \a task is not properly initialized and
327  * #MSG_OK otherwise.
328  */
329 MSG_error_t MSG_task_execute(m_task_t task)
330 {
331   m_process_t process = MSG_process_self();
332   MSG_error_t res;
333
334   DEBUG1("Computing on %s", process->simdata->host->name);
335
336   __MSG_task_execute(process, task);
337
338   PAJE_PROCESS_PUSH_STATE(process,"E");  
339   res = __MSG_wait_for_computation(process,task);
340   PAJE_PROCESS_POP_STATE(process);
341   return res;
342 }
343
344 void __MSG_task_execute(m_process_t process, m_task_t task)
345 {
346   simdata_task_t simdata = NULL;
347
348   CHECK_HOST();
349
350   simdata = task->simdata;
351
352   simdata->compute = surf_workstation_resource->extension_public->
353     execute(MSG_process_get_host(process)->simdata->host,
354             simdata->computation_amount);
355   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
356 }
357
358 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
359 {
360   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
361   simdata_task_t simdata = task->simdata;
362
363   simdata->using++;
364   do {
365     __MSG_task_wait_event(process, task);
366     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
367   } while (state==SURF_ACTION_RUNNING);
368   simdata->using--;
369     
370
371   if(state == SURF_ACTION_DONE) {
372     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
373       simdata->compute = NULL;
374     MSG_RETURN(MSG_OK);
375   } else if(surf_workstation_resource->extension_public->
376             get_state(MSG_process_get_host(process)->simdata->host) 
377             == SURF_CPU_OFF) {
378     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
379       simdata->compute = NULL;
380     MSG_RETURN(MSG_HOST_FAILURE);
381   } else {
382     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
383       simdata->compute = NULL;
384     MSG_RETURN(MSG_TRANSFER_FAILURE);
385   }
386 }
387
388 /** \ingroup msg_gos_functions
389  * \brief Sleep for the specified number of seconds
390  *
391  * Makes the current process sleep until \a time seconds have elapsed.
392  *
393  * \param nb_sec a number of second
394  */
395 MSG_error_t MSG_process_sleep(double nb_sec)
396 {
397   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
398   m_process_t process = MSG_process_self();
399   m_task_t dummy = NULL;
400   simdata_task_t simdata = NULL;
401
402   CHECK_HOST();
403   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
404   simdata = dummy->simdata;
405
406   simdata->compute = surf_workstation_resource->extension_public->
407     sleep(MSG_process_get_host(process)->simdata->host,
408             simdata->computation_amount);
409   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
410
411   
412   simdata->using++;
413   do {
414     __MSG_task_wait_event(process, dummy);
415     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
416   } while (state==SURF_ACTION_RUNNING);
417   simdata->using--;
418     
419   if(state == SURF_ACTION_DONE) {
420     if(surf_workstation_resource->extension_public->
421        get_state(MSG_process_get_host(process)->simdata->host) 
422        == SURF_CPU_OFF) {
423       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
424         simdata->compute = NULL;
425       MSG_RETURN(MSG_HOST_FAILURE);
426     }
427     if(__MSG_process_isBlocked(process)) {
428       __MSG_process_unblock(MSG_process_self());
429     }
430     if(surf_workstation_resource->extension_public->
431        get_state(MSG_process_get_host(process)->simdata->host) 
432        == SURF_CPU_OFF) {
433       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
434         simdata->compute = NULL;
435       MSG_RETURN(MSG_HOST_FAILURE);
436     }
437     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
438       simdata->compute = NULL;
439     MSG_task_destroy(dummy);
440     MSG_RETURN(MSG_OK);
441   } else MSG_RETURN(MSG_HOST_FAILURE);
442 }
443
444 /** \ingroup msg_gos_functions
445  * \brief Return the number of MSG tasks currently running on a
446  * the host of the current running process.
447  */
448 int MSG_get_msgload(void) 
449 {
450   m_process_t process;
451    
452   CHECK_HOST();
453   
454   process = MSG_process_self();
455   return xbt_fifo_size(process->simdata->host->simdata->process_list);
456 }
457
458 /** \ingroup msg_gos_functions
459  *
460  * \brief Return the the last value returned by a MSG function (except
461  * MSG_get_errno...).
462  */
463 MSG_error_t MSG_get_errno(void)
464 {
465   return PROCESS_GET_ERRNO();
466 }