Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Now, any surf-based program can take --surf-path arguments to help locating platform...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17
18 MSG_error_t MSG_process_start(m_process_t process)
19 {
20   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
21   
22   return MSG_OK;
23 }
24
25 /** \ingroup msg_gos_functions
26  * \brief Listen on a channel and wait for receiving a task.
27  *
28  * It takes two parameter.
29  * \param task a memory location for storing a #m_task_t. It will
30    hold a task when this function will return. Thus \a task should not
31    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32    those two condition does not hold, there will be a warning message.
33  * \param channel the channel on which the agent should be
34    listening. This value has to be >=0 and < than the maximal
35    number of channels fixed with MSG_set_channel_number().
36  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_get(m_task_t * task,
40                          m_channel_t channel)
41 {
42   m_process_t process = MSG_process_self();
43   m_task_t t = NULL;
44   m_host_t h = NULL;
45   simdata_task_t t_simdata = NULL;
46   simdata_host_t h_simdata = NULL;
47   int warning = 0;
48   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
49   
50   CHECK_HOST();
51   /* Sanity check */
52   xbt_assert0(task,"Null pointer for the task\n");
53
54   if (*task) 
55     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
56
57   /* Get the task */
58   h = MSG_host_self();
59   h_simdata = h->simdata;
60   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61     xbt_assert0(!(h_simdata->sleeping[channel]),
62                 "A process is already blocked on this channel");
63     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64     MSG_process_suspend(process);
65     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
66        == SURF_CPU_OFF)
67       MSG_RETURN(MSG_HOST_FAILURE);
68     h_simdata->sleeping[channel] = NULL;
69     /* OK, we should both be ready now. Are you there ? */
70   }
71
72   t_simdata = t->simdata;
73   /*   *task = __MSG_task_copy(t); */
74   *task=t;
75
76   /* Transfer */
77   t_simdata->using++;
78   t_simdata->comm = surf_workstation_resource->extension_public->
79     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80                 h->simdata->host, t_simdata->message_size);
81   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
82
83   if(MSG_process_isSuspended(t_simdata->sender)) 
84     MSG_process_resume(t_simdata->sender);
85
86   do {
87     __MSG_task_wait_event(process, t);
88     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
89   } while (state==SURF_ACTION_RUNNING);
90
91   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
92   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
93           == SURF_CPU_OFF)
94     MSG_RETURN(MSG_HOST_FAILURE);
95   else MSG_RETURN(MSG_TRANSFER_FAILURE);
96 }
97
98 /** \ingroup msg_gos_functions
99  * \brief Test whether there is a pending communication on a channel.
100  *
101  * It takes one parameter.
102  * \param channel the channel on which the agent should be
103    listening. This value has to be >=0 and < than the maximal
104    number of channels fixed with MSG_set_channel_number().
105  * \return 1 if there is a pending communication and 0 otherwise
106  */
107 int MSG_task_Iprobe(m_channel_t channel)
108 {
109   m_host_t h = NULL;
110   simdata_host_t h_simdata = NULL;
111
112   CHECK_HOST();
113   h = MSG_host_self();
114   h_simdata = h->simdata;
115   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
116 }
117
118 /** \ingroup msg_gos_functions
119  * \brief Test whether there is a pending communication on a channel, and who sent it.
120  *
121  * It takes one parameter.
122  * \param channel the channel on which the agent should be
123    listening. This value has to be >=0 and < than the maximal
124    number of channels fixed with MSG_set_channel_number().
125  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
126  */
127 int MSG_task_probe_from(m_channel_t channel)
128 {
129   m_host_t h = NULL;
130   simdata_host_t h_simdata = NULL;
131   xbt_fifo_item_t item;
132   m_task_t t;
133
134   CHECK_HOST();
135   h = MSG_host_self();
136   h_simdata = h->simdata;
137    
138   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
139   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
140     return -1;
141    
142   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
143 }
144
145 /** \ingroup msg_gos_functions
146  * \brief Put a task on a channel of an host and waits for the end of the
147  * transmission.
148  *
149  * This function is used for describing the behavior of an agent. It
150  * takes three parameter.
151  * \param task a #m_task_t to send on another location. This task
152    will not be usable anymore when the function will return. There is
153    no automatic task duplication and you have to save your parameters
154    before calling this function. Tasks are unique and once it has been
155    sent to another location, you should not access it anymore. You do
156    not need to call MSG_task_destroy() but to avoid using, as an
157    effect of inattention, this task anymore, you definitely should
158    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
159    can be transfered iff it has been correctly created with
160    MSG_task_create().
161  * \param dest the destination of the message
162  * \param channel the channel on which the agent should put this
163    task. This value has to be >=0 and < than the maximal number of
164    channels fixed with MSG_set_channel_number().
165  * \return #MSG_FATAL if \a task is not properly initialized and
166  * #MSG_OK otherwise.
167  */
168 MSG_error_t MSG_task_put(m_task_t task,
169                          m_host_t dest, m_channel_t channel)
170 {
171   m_process_t process = MSG_process_self();
172   simdata_task_t task_simdata = NULL;
173   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
174   m_host_t local_host = NULL;
175   m_host_t remote_host = NULL;
176
177   CHECK_HOST();
178
179   task_simdata = task->simdata;
180   task_simdata->sender = process;
181   xbt_assert0(task_simdata->using==1,"Gargl!");
182   task_simdata->comm = NULL;
183   
184   local_host = ((simdata_process_t) process->simdata)->host;
185   remote_host = dest;
186
187   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
188                 mbox[channel], task);
189     
190   if(remote_host->simdata->sleeping[channel]) 
191     MSG_process_resume(remote_host->simdata->sleeping[channel]);
192   else {
193     process->simdata->put_host = dest;
194     process->simdata->put_channel = channel;
195     while(!(task_simdata->comm)) 
196       MSG_process_suspend(process);
197     process->simdata->put_host = NULL;
198     process->simdata->put_channel = -1;
199   }
200
201   do {
202     __MSG_task_wait_event(process, task);
203     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
204   } while (state==SURF_ACTION_RUNNING);
205     
206   MSG_task_destroy(task);
207
208   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
209   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
210           == SURF_CPU_OFF)
211     MSG_RETURN(MSG_HOST_FAILURE);
212   else MSG_RETURN(MSG_TRANSFER_FAILURE);
213 }
214
215 /** \ingroup msg_gos_functions
216  * \brief Executes a task and waits for its termination.
217  *
218  * This function is used for describing the behavior of an agent. It
219  * takes only one parameter.
220  * \param task a #m_task_t to execute on the location on which the
221    agent is running.
222  * \return #MSG_FATAL if \a task is not properly initialized and
223  * #MSG_OK otherwise.
224  */
225 MSG_error_t MSG_task_execute(m_task_t task)
226 {
227   m_process_t process = MSG_process_self();
228
229   __MSG_task_execute(process, task);
230   return __MSG_wait_for_computation(process,task);
231 }
232
233 void __MSG_task_execute(m_process_t process, m_task_t task)
234 {
235   simdata_task_t simdata = NULL;
236
237   CHECK_HOST();
238
239   simdata = task->simdata;
240
241   simdata->compute = surf_workstation_resource->extension_public->
242     execute(MSG_process_get_host(process)->simdata->host,
243             simdata->computation_amount);
244   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
245 }
246
247 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
248 {
249   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
250   simdata_task_t simdata = task->simdata;
251
252   simdata->using++;
253   do {
254     __MSG_task_wait_event(process, task);
255     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
256   } while (state==SURF_ACTION_RUNNING);
257   simdata->using--;
258     
259
260   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
261   else if(surf_workstation_resource->extension_public->
262           get_state(MSG_process_get_host(process)->simdata->host) 
263           == SURF_CPU_OFF)
264     MSG_RETURN(MSG_HOST_FAILURE);
265   else MSG_RETURN(MSG_TRANSFER_FAILURE);
266 }
267
268 /** \ingroup msg_gos_functions
269  * \brief Sleep for the specified number of seconds
270  *
271  * Makes the current process sleep until \a time seconds have elapsed.
272  *
273  * \param nb_sec a number of second
274  */
275 MSG_error_t MSG_process_sleep(long double nb_sec)
276 {
277   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
278   m_process_t process = MSG_process_self();
279   m_task_t dummy = NULL;
280   simdata_task_t simdata = NULL;
281
282   CHECK_HOST();
283   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
284   simdata = dummy->simdata;
285
286   simdata->compute = surf_workstation_resource->extension_public->
287     sleep(MSG_process_get_host(process)->simdata->host,
288             simdata->computation_amount);
289   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
290
291   
292   simdata->using++;
293   do {
294     __MSG_task_wait_event(process, dummy);
295     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
296   } while (state==SURF_ACTION_RUNNING);
297   simdata->using--;
298     
299   if(state == SURF_ACTION_DONE) {
300     if(surf_workstation_resource->extension_public->
301        get_state(MSG_process_get_host(process)->simdata->host) 
302        == SURF_CPU_OFF)
303       MSG_RETURN(MSG_HOST_FAILURE);
304
305     if(MSG_process_isSuspended(process)) {
306       MSG_process_suspend(MSG_process_self());
307     }
308     if(surf_workstation_resource->extension_public->
309        get_state(MSG_process_get_host(process)->simdata->host) 
310        == SURF_CPU_OFF)
311       MSG_RETURN(MSG_HOST_FAILURE);
312     MSG_task_destroy(dummy);
313     MSG_RETURN(MSG_OK);
314   } else MSG_RETURN(MSG_HOST_FAILURE);
315 }
316
317 /** \ingroup msg_gos_functions
318  * \brief Return the number of MSG tasks currently running on a
319  * the host of the current running process.
320  */
321 int MSG_get_msgload(void) 
322 {
323   CHECK_HOST();
324   xbt_assert0(0,"Not implemented yet!");
325   
326   return 1;
327 }
328
329 /** \ingroup msg_gos_functions
330  *
331  * \brief Return the the last value returned by a MSG function (except
332  * MSG_get_errno...).
333  */
334 MSG_error_t MSG_get_errno(void)
335 {
336   return PROCESS_GET_ERRNO();
337 }
338