Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Now, a process can be killed and all process are killed at the end of the simulation.
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17
18 MSG_error_t MSG_process_start(m_process_t process)
19 {
20   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
21   
22   return MSG_OK;
23 }
24
25 /** \ingroup msg_gos_functions
26  * \brief Listen on a channel and wait for receiving a task.
27  *
28  * It takes two parameter.
29  * \param task a memory location for storing a #m_task_t. It will
30    hold a task when this function will return. Thus \a task should not
31    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32    those two condition does not hold, there will be a warning message.
33  * \param channel the channel on which the agent should be
34    listening. This value has to be >=0 and < than the maximal
35    number of channels fixed with MSG_set_channel_number().
36  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_get(m_task_t * task,
40                          m_channel_t channel)
41 {
42   m_process_t process = MSG_process_self();
43   m_task_t t = NULL;
44   m_host_t h = NULL;
45   simdata_task_t t_simdata = NULL;
46   simdata_host_t h_simdata = NULL;
47   int warning = 0;
48   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
49   
50   CHECK_HOST();
51   /* Sanity check */
52   xbt_assert0(task,"Null pointer for the task\n");
53
54   if (*task) 
55     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
56
57   /* Get the task */
58   h = MSG_host_self();
59   h_simdata = h->simdata;
60   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61     xbt_assert0(!(h_simdata->sleeping[channel]),
62                 "A process is already blocked on this channel");
63     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64     __MSG_process_block();
65     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
66        == SURF_CPU_OFF)
67       MSG_RETURN(MSG_HOST_FAILURE);
68     h_simdata->sleeping[channel] = NULL;
69     /* OK, we should both be ready now. Are you there ? */
70   }
71
72   t_simdata = t->simdata;
73   /*   *task = __MSG_task_copy(t); */
74   *task=t;
75
76   /* Transfer */
77   t_simdata->using++;
78
79   t_simdata->comm = surf_workstation_resource->extension_public->
80     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
81                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
82   
83   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
84
85   if(__MSG_process_isBlocked(t_simdata->sender)) 
86     __MSG_process_unblock(t_simdata->sender);
87
88   do {
89     __MSG_task_wait_event(process, t);
90     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
91   } while (state==SURF_ACTION_RUNNING);
92
93   if(t->simdata->using>1) {
94     xbt_fifo_unshift(msg_global->process_to_run,process);
95     xbt_context_yield();
96   }
97
98   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
99   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
100           == SURF_CPU_OFF)
101     MSG_RETURN(MSG_HOST_FAILURE);
102   else MSG_RETURN(MSG_TRANSFER_FAILURE);
103 }
104
105 /** \ingroup msg_gos_functions
106  * \brief Test whether there is a pending communication on a channel.
107  *
108  * It takes one parameter.
109  * \param channel the channel on which the agent should be
110    listening. This value has to be >=0 and < than the maximal
111    number of channels fixed with MSG_set_channel_number().
112  * \return 1 if there is a pending communication and 0 otherwise
113  */
114 int MSG_task_Iprobe(m_channel_t channel)
115 {
116   m_host_t h = NULL;
117   simdata_host_t h_simdata = NULL;
118
119   CHECK_HOST();
120   h = MSG_host_self();
121   h_simdata = h->simdata;
122   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
123 }
124
125 /** \ingroup msg_gos_functions
126  * \brief Test whether there is a pending communication on a channel, and who sent it.
127  *
128  * It takes one parameter.
129  * \param channel the channel on which the agent should be
130    listening. This value has to be >=0 and < than the maximal
131    number of channels fixed with MSG_set_channel_number().
132  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
133  */
134 int MSG_task_probe_from(m_channel_t channel)
135 {
136   m_host_t h = NULL;
137   simdata_host_t h_simdata = NULL;
138   xbt_fifo_item_t item;
139   m_task_t t;
140
141   CHECK_HOST();
142   h = MSG_host_self();
143   h_simdata = h->simdata;
144    
145   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
146   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
147     return -1;
148    
149   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
150 }
151
152 /** \ingroup msg_gos_functions
153  * \brief Put a task on a channel of an host and waits for the end of the
154  * transmission.
155  *
156  * This function is used for describing the behavior of an agent. It
157  * takes three parameter.
158  * \param task a #m_task_t to send on another location. This task
159    will not be usable anymore when the function will return. There is
160    no automatic task duplication and you have to save your parameters
161    before calling this function. Tasks are unique and once it has been
162    sent to another location, you should not access it anymore. You do
163    not need to call MSG_task_destroy() but to avoid using, as an
164    effect of inattention, this task anymore, you definitely should
165    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
166    can be transfered iff it has been correctly created with
167    MSG_task_create().
168  * \param dest the destination of the message
169  * \param channel the channel on which the agent should put this
170    task. This value has to be >=0 and < than the maximal number of
171    channels fixed with MSG_set_channel_number().
172  * \return #MSG_FATAL if \a task is not properly initialized and
173  * #MSG_OK otherwise.
174  */
175 MSG_error_t MSG_task_put(m_task_t task,
176                          m_host_t dest, m_channel_t channel)
177 {
178   m_process_t process = MSG_process_self();
179   simdata_task_t task_simdata = NULL;
180   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
181   m_host_t local_host = NULL;
182   m_host_t remote_host = NULL;
183
184   CHECK_HOST();
185
186   task_simdata = task->simdata;
187   task_simdata->sender = process;
188   xbt_assert0(task_simdata->using==1,"Gargl!");
189   task_simdata->comm = NULL;
190   
191   local_host = ((simdata_process_t) process->simdata)->host;
192   remote_host = dest;
193
194   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
195                 mbox[channel], task);
196     
197   if(remote_host->simdata->sleeping[channel]) 
198     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
199 /*   else { */
200   process->simdata->put_host = dest;
201   process->simdata->put_channel = channel;
202   while(!(task_simdata->comm)) 
203     __MSG_process_block();
204   process->simdata->put_host = NULL;
205   process->simdata->put_channel = -1;
206 /*   } */
207
208   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
209   while (state==SURF_ACTION_RUNNING) {
210     __MSG_task_wait_event(process, task);
211     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
212   }
213     
214   MSG_task_destroy(task);
215
216   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
217   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
218           == SURF_CPU_OFF)
219     MSG_RETURN(MSG_HOST_FAILURE);
220   else MSG_RETURN(MSG_TRANSFER_FAILURE);
221 }
222
223 MSG_error_t MSG_task_put_bounded(m_task_t task,
224                                  m_host_t dest, m_channel_t channel,
225                                  long double max_rate)
226 {
227   task->simdata->rate=max_rate;
228   return(MSG_task_put(task, dest, channel));
229 }
230
231 /** \ingroup msg_gos_functions
232  * \brief Executes a task and waits for its termination.
233  *
234  * This function is used for describing the behavior of an agent. It
235  * takes only one parameter.
236  * \param task a #m_task_t to execute on the location on which the
237    agent is running.
238  * \return #MSG_FATAL if \a task is not properly initialized and
239  * #MSG_OK otherwise.
240  */
241 MSG_error_t MSG_task_execute(m_task_t task)
242 {
243   m_process_t process = MSG_process_self();
244
245   __MSG_task_execute(process, task);
246   return __MSG_wait_for_computation(process,task);
247 }
248
249 void __MSG_task_execute(m_process_t process, m_task_t task)
250 {
251   simdata_task_t simdata = NULL;
252
253   CHECK_HOST();
254
255   simdata = task->simdata;
256
257   simdata->compute = surf_workstation_resource->extension_public->
258     execute(MSG_process_get_host(process)->simdata->host,
259             simdata->computation_amount);
260   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
261 }
262
263 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
264 {
265   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
266   simdata_task_t simdata = task->simdata;
267
268   simdata->using++;
269   do {
270     __MSG_task_wait_event(process, task);
271     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
272   } while (state==SURF_ACTION_RUNNING);
273   simdata->using--;
274     
275
276   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
277   else if(surf_workstation_resource->extension_public->
278           get_state(MSG_process_get_host(process)->simdata->host) 
279           == SURF_CPU_OFF)
280     MSG_RETURN(MSG_HOST_FAILURE);
281   else MSG_RETURN(MSG_TRANSFER_FAILURE);
282 }
283
284 /** \ingroup msg_gos_functions
285  * \brief Sleep for the specified number of seconds
286  *
287  * Makes the current process sleep until \a time seconds have elapsed.
288  *
289  * \param nb_sec a number of second
290  */
291 MSG_error_t MSG_process_sleep(long double nb_sec)
292 {
293   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
294   m_process_t process = MSG_process_self();
295   m_task_t dummy = NULL;
296   simdata_task_t simdata = NULL;
297
298   CHECK_HOST();
299   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
300   simdata = dummy->simdata;
301
302   simdata->compute = surf_workstation_resource->extension_public->
303     sleep(MSG_process_get_host(process)->simdata->host,
304             simdata->computation_amount);
305   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
306
307   
308   simdata->using++;
309   do {
310     __MSG_task_wait_event(process, dummy);
311     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
312   } while (state==SURF_ACTION_RUNNING);
313   simdata->using--;
314     
315   if(state == SURF_ACTION_DONE) {
316     if(surf_workstation_resource->extension_public->
317        get_state(MSG_process_get_host(process)->simdata->host) 
318        == SURF_CPU_OFF)
319       MSG_RETURN(MSG_HOST_FAILURE);
320
321     if(__MSG_process_isBlocked(process)) {
322       __MSG_process_unblock(MSG_process_self());
323     }
324     if(surf_workstation_resource->extension_public->
325        get_state(MSG_process_get_host(process)->simdata->host) 
326        == SURF_CPU_OFF)
327       MSG_RETURN(MSG_HOST_FAILURE);
328     MSG_task_destroy(dummy);
329     MSG_RETURN(MSG_OK);
330   } else MSG_RETURN(MSG_HOST_FAILURE);
331 }
332
333 /** \ingroup msg_gos_functions
334  * \brief Return the number of MSG tasks currently running on a
335  * the host of the current running process.
336  */
337 int MSG_get_msgload(void) 
338 {
339   CHECK_HOST();
340   xbt_assert0(0,"Not implemented yet!");
341   
342   return 1;
343 }
344
345 /** \ingroup msg_gos_functions
346  *
347  * \brief Return the the last value returned by a MSG function (except
348  * MSG_get_errno...).
349  */
350 MSG_error_t MSG_get_errno(void)
351 {
352   return PROCESS_GET_ERRNO();
353 }
354