Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
moving include/surf to src/include/surf (continued)
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17 MSG_error_t MSG_process_start(m_process_t process)
18 {
19   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
20   
21   return MSG_OK;
22 }
23
24 /** \ingroup msg_gos_functions
25  * \brief Listen on a channel and wait for receiving a task.
26  *
27  * It takes two parameter.
28  * \param task a memory location for storing a #m_task_t. It will
29    hold a task when this function will return. Thus \a task should not
30    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31    those two condition does not hold, there will be a warning message.
32  * \param channel the channel on which the agent should be
33    listening. This value has to be >=0 and < than the maximal
34    number of channels fixed with MSG_set_channel_number().
35  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_get(m_task_t * task,
39                          m_channel_t channel)
40 {
41   m_process_t process = MSG_process_self();
42   m_task_t t = NULL;
43   m_host_t h = NULL;
44   simdata_task_t t_simdata = NULL;
45   simdata_host_t h_simdata = NULL;
46   int warning = 0;
47   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
48   
49   CHECK_HOST();
50   /* Sanity check */
51   xbt_assert0(task,"Null pointer for the task\n");
52
53   if (*task) 
54     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
55
56   /* Get the task */
57   h = MSG_host_self();
58   h_simdata = h->simdata;
59   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60     xbt_assert0(!(h_simdata->sleeping[channel]),
61                 "A process is already blocked on this channel");
62     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
63     __MSG_process_block();
64     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
65        == SURF_CPU_OFF)
66       MSG_RETURN(MSG_HOST_FAILURE);
67     h_simdata->sleeping[channel] = NULL;
68     /* OK, we should both be ready now. Are you there ? */
69   }
70
71   t_simdata = t->simdata;
72   /*   *task = __MSG_task_copy(t); */
73   *task=t;
74
75   /* Transfer */
76   t_simdata->using++;
77
78   t_simdata->comm = surf_workstation_resource->extension_public->
79     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
81   
82   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
83
84   if(__MSG_process_isBlocked(t_simdata->sender)) 
85     __MSG_process_unblock(t_simdata->sender);
86
87   do {
88     __MSG_task_wait_event(process, t);
89     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
90   } while (state==SURF_ACTION_RUNNING);
91
92   if(t->simdata->using>1) {
93     xbt_fifo_unshift(msg_global->process_to_run,process);
94     xbt_context_yield();
95   }
96
97   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
98   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
99           == SURF_CPU_OFF)
100     MSG_RETURN(MSG_HOST_FAILURE);
101   else MSG_RETURN(MSG_TRANSFER_FAILURE);
102 }
103
104 /** \ingroup msg_gos_functions
105  * \brief Test whether there is a pending communication on a channel.
106  *
107  * It takes one parameter.
108  * \param channel the channel on which the agent should be
109    listening. This value has to be >=0 and < than the maximal
110    number of channels fixed with MSG_set_channel_number().
111  * \return 1 if there is a pending communication and 0 otherwise
112  */
113 int MSG_task_Iprobe(m_channel_t channel)
114 {
115   m_host_t h = NULL;
116   simdata_host_t h_simdata = NULL;
117
118   CHECK_HOST();
119   h = MSG_host_self();
120   h_simdata = h->simdata;
121   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
122 }
123
124 /** \ingroup msg_gos_functions
125  * \brief Test whether there is a pending communication on a channel, and who sent it.
126  *
127  * It takes one parameter.
128  * \param channel the channel on which the agent should be
129    listening. This value has to be >=0 and < than the maximal
130    number of channels fixed with MSG_set_channel_number().
131  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
132  */
133 int MSG_task_probe_from(m_channel_t channel)
134 {
135   m_host_t h = NULL;
136   simdata_host_t h_simdata = NULL;
137   xbt_fifo_item_t item;
138   m_task_t t;
139
140   CHECK_HOST();
141   h = MSG_host_self();
142   h_simdata = h->simdata;
143    
144   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
145   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
146     return -1;
147    
148   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
149 }
150
151 /** \ingroup msg_gos_functions
152  * \brief Put a task on a channel of an host and waits for the end of the
153  * transmission.
154  *
155  * This function is used for describing the behavior of an agent. It
156  * takes three parameter.
157  * \param task a #m_task_t to send on another location. This task
158    will not be usable anymore when the function will return. There is
159    no automatic task duplication and you have to save your parameters
160    before calling this function. Tasks are unique and once it has been
161    sent to another location, you should not access it anymore. You do
162    not need to call MSG_task_destroy() but to avoid using, as an
163    effect of inattention, this task anymore, you definitely should
164    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
165    can be transfered iff it has been correctly created with
166    MSG_task_create().
167  * \param dest the destination of the message
168  * \param channel the channel on which the agent should put this
169    task. This value has to be >=0 and < than the maximal number of
170    channels fixed with MSG_set_channel_number().
171  * \return #MSG_FATAL if \a task is not properly initialized and
172  * #MSG_OK otherwise.
173  */
174 MSG_error_t MSG_task_put(m_task_t task,
175                          m_host_t dest, m_channel_t channel)
176 {
177   m_process_t process = MSG_process_self();
178   simdata_task_t task_simdata = NULL;
179   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
180   m_host_t local_host = NULL;
181   m_host_t remote_host = NULL;
182
183   CHECK_HOST();
184
185   task_simdata = task->simdata;
186   task_simdata->sender = process;
187   xbt_assert0(task_simdata->using==1,"Gargl!");
188   task_simdata->comm = NULL;
189   
190   local_host = ((simdata_process_t) process->simdata)->host;
191   remote_host = dest;
192
193   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
194                 mbox[channel], task);
195     
196   if(remote_host->simdata->sleeping[channel]) 
197     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
198 /*   else { */
199   process->simdata->put_host = dest;
200   process->simdata->put_channel = channel;
201   while(!(task_simdata->comm)) 
202     __MSG_process_block();
203   process->simdata->put_host = NULL;
204   process->simdata->put_channel = -1;
205 /*   } */
206
207   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
208   while (state==SURF_ACTION_RUNNING) {
209     __MSG_task_wait_event(process, task);
210     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
211   }
212     
213   MSG_task_destroy(task);
214
215   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
216   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
217           == SURF_CPU_OFF)
218     MSG_RETURN(MSG_HOST_FAILURE);
219   else MSG_RETURN(MSG_TRANSFER_FAILURE);
220 }
221
222 /** \ingroup msg_gos_functions
223  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
224  * rate.
225  *
226  * \sa MSG_task_put
227  */
228 MSG_error_t MSG_task_put_bounded(m_task_t task,
229                                  m_host_t dest, m_channel_t channel,
230                                  long double max_rate)
231 {
232   task->simdata->rate=max_rate;
233   return(MSG_task_put(task, dest, channel));
234 }
235
236 /** \ingroup msg_gos_functions
237  * \brief Executes a task and waits for its termination.
238  *
239  * This function is used for describing the behavior of an agent. It
240  * takes only one parameter.
241  * \param task a #m_task_t to execute on the location on which the
242    agent is running.
243  * \return #MSG_FATAL if \a task is not properly initialized and
244  * #MSG_OK otherwise.
245  */
246 MSG_error_t MSG_task_execute(m_task_t task)
247 {
248   m_process_t process = MSG_process_self();
249
250   __MSG_task_execute(process, task);
251   return __MSG_wait_for_computation(process,task);
252 }
253
254 void __MSG_task_execute(m_process_t process, m_task_t task)
255 {
256   simdata_task_t simdata = NULL;
257
258   CHECK_HOST();
259
260   simdata = task->simdata;
261
262   simdata->compute = surf_workstation_resource->extension_public->
263     execute(MSG_process_get_host(process)->simdata->host,
264             simdata->computation_amount);
265   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
266 }
267
268 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
269 {
270   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
271   simdata_task_t simdata = task->simdata;
272
273   simdata->using++;
274   do {
275     __MSG_task_wait_event(process, task);
276     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
277   } while (state==SURF_ACTION_RUNNING);
278   simdata->using--;
279     
280
281   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
282   else if(surf_workstation_resource->extension_public->
283           get_state(MSG_process_get_host(process)->simdata->host) 
284           == SURF_CPU_OFF)
285     MSG_RETURN(MSG_HOST_FAILURE);
286   else MSG_RETURN(MSG_TRANSFER_FAILURE);
287 }
288
289 /** \ingroup msg_gos_functions
290  * \brief Sleep for the specified number of seconds
291  *
292  * Makes the current process sleep until \a time seconds have elapsed.
293  *
294  * \param nb_sec a number of second
295  */
296 MSG_error_t MSG_process_sleep(long double nb_sec)
297 {
298   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
299   m_process_t process = MSG_process_self();
300   m_task_t dummy = NULL;
301   simdata_task_t simdata = NULL;
302
303   CHECK_HOST();
304   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
305   simdata = dummy->simdata;
306
307   simdata->compute = surf_workstation_resource->extension_public->
308     sleep(MSG_process_get_host(process)->simdata->host,
309             simdata->computation_amount);
310   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
311
312   
313   simdata->using++;
314   do {
315     __MSG_task_wait_event(process, dummy);
316     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
317   } while (state==SURF_ACTION_RUNNING);
318   simdata->using--;
319     
320   if(state == SURF_ACTION_DONE) {
321     if(surf_workstation_resource->extension_public->
322        get_state(MSG_process_get_host(process)->simdata->host) 
323        == SURF_CPU_OFF)
324       MSG_RETURN(MSG_HOST_FAILURE);
325
326     if(__MSG_process_isBlocked(process)) {
327       __MSG_process_unblock(MSG_process_self());
328     }
329     if(surf_workstation_resource->extension_public->
330        get_state(MSG_process_get_host(process)->simdata->host) 
331        == SURF_CPU_OFF)
332       MSG_RETURN(MSG_HOST_FAILURE);
333     MSG_task_destroy(dummy);
334     MSG_RETURN(MSG_OK);
335   } else MSG_RETURN(MSG_HOST_FAILURE);
336 }
337
338 /** \ingroup msg_gos_functions
339  * \brief Return the number of MSG tasks currently running on a
340  * the host of the current running process.
341  */
342 int MSG_get_msgload(void) 
343 {
344   CHECK_HOST();
345   xbt_assert0(0,"Not implemented yet!");
346   
347   return 1;
348 }
349
350 /** \ingroup msg_gos_functions
351  *
352  * \brief Return the the last value returned by a MSG function (except
353  * MSG_get_errno...).
354  */
355 MSG_error_t MSG_get_errno(void)
356 {
357   return PROCESS_GET_ERRNO();
358 }