Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
feb7153b6d4f71447ffdc1291bf15154a62920fc
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17
18 MSG_error_t MSG_process_start(m_process_t process)
19 {
20   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
21   
22   return MSG_OK;
23 }
24
25 /** \ingroup msg_gos_functions
26  * \brief Listen on a channel and wait for receiving a task.
27  *
28  * It takes two parameter.
29  * \param task a memory location for storing a #m_task_t. It will
30    hold a task when this function will return. Thus \a task should not
31    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32    those two condition does not hold, there will be a warning message.
33  * \param channel the channel on which the agent should be
34    listening. This value has to be >=0 and < than the maximal
35    number of channels fixed with MSG_set_channel_number().
36  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_get(m_task_t * task,
40                          m_channel_t channel)
41 {
42   m_process_t process = MSG_process_self();
43   m_task_t t = NULL;
44   m_host_t h = NULL;
45   simdata_task_t t_simdata = NULL;
46   simdata_host_t h_simdata = NULL;
47   int warning = 0;
48   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
49   
50   CHECK_HOST();
51   /* Sanity check */
52   xbt_assert0(task,"Null pointer for the task\n");
53
54   if (*task) 
55     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
56
57   /* Get the task */
58   h = MSG_host_self();
59   h_simdata = h->simdata;
60   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61     xbt_assert0(!(h_simdata->sleeping[channel]),
62                 "A process is already blocked on this channel");
63     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64     __MSG_process_block();
65     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
66        == SURF_CPU_OFF)
67       MSG_RETURN(MSG_HOST_FAILURE);
68     h_simdata->sleeping[channel] = NULL;
69     /* OK, we should both be ready now. Are you there ? */
70   }
71
72   t_simdata = t->simdata;
73   /*   *task = __MSG_task_copy(t); */
74   *task=t;
75
76   /* Transfer */
77   t_simdata->using++;
78
79   t_simdata->comm = surf_workstation_resource->extension_public->
80     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
81                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
82   
83   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
84
85   if(__MSG_process_isBlocked(t_simdata->sender)) 
86     __MSG_process_unblock(t_simdata->sender);
87
88   do {
89     __MSG_task_wait_event(process, t);
90     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
91   } while (state==SURF_ACTION_RUNNING);
92
93   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
94   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
95           == SURF_CPU_OFF)
96     MSG_RETURN(MSG_HOST_FAILURE);
97   else MSG_RETURN(MSG_TRANSFER_FAILURE);
98 }
99
100 /** \ingroup msg_gos_functions
101  * \brief Test whether there is a pending communication on a channel.
102  *
103  * It takes one parameter.
104  * \param channel the channel on which the agent should be
105    listening. This value has to be >=0 and < than the maximal
106    number of channels fixed with MSG_set_channel_number().
107  * \return 1 if there is a pending communication and 0 otherwise
108  */
109 int MSG_task_Iprobe(m_channel_t channel)
110 {
111   m_host_t h = NULL;
112   simdata_host_t h_simdata = NULL;
113
114   CHECK_HOST();
115   h = MSG_host_self();
116   h_simdata = h->simdata;
117   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
118 }
119
120 /** \ingroup msg_gos_functions
121  * \brief Test whether there is a pending communication on a channel, and who sent it.
122  *
123  * It takes one parameter.
124  * \param channel the channel on which the agent should be
125    listening. This value has to be >=0 and < than the maximal
126    number of channels fixed with MSG_set_channel_number().
127  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
128  */
129 int MSG_task_probe_from(m_channel_t channel)
130 {
131   m_host_t h = NULL;
132   simdata_host_t h_simdata = NULL;
133   xbt_fifo_item_t item;
134   m_task_t t;
135
136   CHECK_HOST();
137   h = MSG_host_self();
138   h_simdata = h->simdata;
139    
140   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
141   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
142     return -1;
143    
144   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
145 }
146
147 /** \ingroup msg_gos_functions
148  * \brief Put a task on a channel of an host and waits for the end of the
149  * transmission.
150  *
151  * This function is used for describing the behavior of an agent. It
152  * takes three parameter.
153  * \param task a #m_task_t to send on another location. This task
154    will not be usable anymore when the function will return. There is
155    no automatic task duplication and you have to save your parameters
156    before calling this function. Tasks are unique and once it has been
157    sent to another location, you should not access it anymore. You do
158    not need to call MSG_task_destroy() but to avoid using, as an
159    effect of inattention, this task anymore, you definitely should
160    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
161    can be transfered iff it has been correctly created with
162    MSG_task_create().
163  * \param dest the destination of the message
164  * \param channel the channel on which the agent should put this
165    task. This value has to be >=0 and < than the maximal number of
166    channels fixed with MSG_set_channel_number().
167  * \return #MSG_FATAL if \a task is not properly initialized and
168  * #MSG_OK otherwise.
169  */
170 MSG_error_t MSG_task_put(m_task_t task,
171                          m_host_t dest, m_channel_t channel)
172 {
173   m_process_t process = MSG_process_self();
174   simdata_task_t task_simdata = NULL;
175   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
176   m_host_t local_host = NULL;
177   m_host_t remote_host = NULL;
178
179   CHECK_HOST();
180
181   task_simdata = task->simdata;
182   task_simdata->sender = process;
183   xbt_assert0(task_simdata->using==1,"Gargl!");
184   task_simdata->comm = NULL;
185   
186   local_host = ((simdata_process_t) process->simdata)->host;
187   remote_host = dest;
188
189   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
190                 mbox[channel], task);
191     
192   if(remote_host->simdata->sleeping[channel]) 
193     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
194 /*   else { */
195   process->simdata->put_host = dest;
196   process->simdata->put_channel = channel;
197   while(!(task_simdata->comm)) 
198     __MSG_process_block();
199   process->simdata->put_host = NULL;
200   process->simdata->put_channel = -1;
201 /*   } */
202
203   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
204   while (state==SURF_ACTION_RUNNING) {
205     __MSG_task_wait_event(process, task);
206     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
207   }
208     
209   MSG_task_destroy(task);
210
211   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
212   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
213           == SURF_CPU_OFF)
214     MSG_RETURN(MSG_HOST_FAILURE);
215   else MSG_RETURN(MSG_TRANSFER_FAILURE);
216 }
217
218 MSG_error_t MSG_task_put_bounded(m_task_t task,
219                                  m_host_t dest, m_channel_t channel,
220                                  long double max_rate)
221 {
222   task->simdata->rate=max_rate;
223   return(MSG_task_put(task, dest, channel));
224 }
225
226 /** \ingroup msg_gos_functions
227  * \brief Executes a task and waits for its termination.
228  *
229  * This function is used for describing the behavior of an agent. It
230  * takes only one parameter.
231  * \param task a #m_task_t to execute on the location on which the
232    agent is running.
233  * \return #MSG_FATAL if \a task is not properly initialized and
234  * #MSG_OK otherwise.
235  */
236 MSG_error_t MSG_task_execute(m_task_t task)
237 {
238   m_process_t process = MSG_process_self();
239
240   __MSG_task_execute(process, task);
241   return __MSG_wait_for_computation(process,task);
242 }
243
244 void __MSG_task_execute(m_process_t process, m_task_t task)
245 {
246   simdata_task_t simdata = NULL;
247
248   CHECK_HOST();
249
250   simdata = task->simdata;
251
252   simdata->compute = surf_workstation_resource->extension_public->
253     execute(MSG_process_get_host(process)->simdata->host,
254             simdata->computation_amount);
255   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
256 }
257
258 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
259 {
260   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
261   simdata_task_t simdata = task->simdata;
262
263   simdata->using++;
264   do {
265     __MSG_task_wait_event(process, task);
266     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
267   } while (state==SURF_ACTION_RUNNING);
268   simdata->using--;
269     
270
271   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
272   else if(surf_workstation_resource->extension_public->
273           get_state(MSG_process_get_host(process)->simdata->host) 
274           == SURF_CPU_OFF)
275     MSG_RETURN(MSG_HOST_FAILURE);
276   else MSG_RETURN(MSG_TRANSFER_FAILURE);
277 }
278
279 /** \ingroup msg_gos_functions
280  * \brief Sleep for the specified number of seconds
281  *
282  * Makes the current process sleep until \a time seconds have elapsed.
283  *
284  * \param nb_sec a number of second
285  */
286 MSG_error_t MSG_process_sleep(long double nb_sec)
287 {
288   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
289   m_process_t process = MSG_process_self();
290   m_task_t dummy = NULL;
291   simdata_task_t simdata = NULL;
292
293   CHECK_HOST();
294   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
295   simdata = dummy->simdata;
296
297   simdata->compute = surf_workstation_resource->extension_public->
298     sleep(MSG_process_get_host(process)->simdata->host,
299             simdata->computation_amount);
300   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
301
302   
303   simdata->using++;
304   do {
305     __MSG_task_wait_event(process, dummy);
306     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
307   } while (state==SURF_ACTION_RUNNING);
308   simdata->using--;
309     
310   if(state == SURF_ACTION_DONE) {
311     if(surf_workstation_resource->extension_public->
312        get_state(MSG_process_get_host(process)->simdata->host) 
313        == SURF_CPU_OFF)
314       MSG_RETURN(MSG_HOST_FAILURE);
315
316     if(__MSG_process_isBlocked(process)) {
317       __MSG_process_unblock(MSG_process_self());
318     }
319     if(surf_workstation_resource->extension_public->
320        get_state(MSG_process_get_host(process)->simdata->host) 
321        == SURF_CPU_OFF)
322       MSG_RETURN(MSG_HOST_FAILURE);
323     MSG_task_destroy(dummy);
324     MSG_RETURN(MSG_OK);
325   } else MSG_RETURN(MSG_HOST_FAILURE);
326 }
327
328 /** \ingroup msg_gos_functions
329  * \brief Return the number of MSG tasks currently running on a
330  * the host of the current running process.
331  */
332 int MSG_get_msgload(void) 
333 {
334   CHECK_HOST();
335   xbt_assert0(0,"Not implemented yet!");
336   
337   return 1;
338 }
339
340 /** \ingroup msg_gos_functions
341  *
342  * \brief Return the the last value returned by a MSG function (except
343  * MSG_get_errno...).
344  */
345 MSG_error_t MSG_get_errno(void)
346 {
347   return PROCESS_GET_ERRNO();
348 }
349