Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
18094ea14b5acb715ebe160ade87a1c2d3db20c9
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17
18 MSG_error_t MSG_process_start(m_process_t process)
19 {
20   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
21   
22   return MSG_OK;
23 }
24
25 /** \ingroup msg_gos_functions
26  * \brief Listen on a channel and wait for receiving a task.
27  *
28  * It takes two parameter.
29  * \param task a memory location for storing a #m_task_t. It will
30    hold a task when this function will return. Thus \a task should not
31    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32    those two condition does not hold, there will be a warning message.
33  * \param channel the channel on which the agent should be
34    listening. This value has to be >=0 and < than the maximal
35    number of channels fixed with MSG_set_channel_number().
36  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_get(m_task_t * task,
40                          m_channel_t channel)
41 {
42   m_process_t process = MSG_process_self();
43   m_task_t t = NULL;
44   m_host_t h = NULL;
45   simdata_task_t t_simdata = NULL;
46   simdata_host_t h_simdata = NULL;
47   int warning = 0;
48   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
49   
50   CHECK_HOST();
51   /* Sanity check */
52   xbt_assert0(task,"Null pointer for the task\n");
53
54   if (*task) 
55     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
56
57   /* Get the task */
58   h = MSG_host_self();
59   h_simdata = h->simdata;
60   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61     xbt_assert0(!(h_simdata->sleeping[channel]),
62                 "A process is already blocked on this channel");
63     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64     MSG_process_suspend(process);
65     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
66        == SURF_CPU_OFF)
67       MSG_RETURN(MSG_HOST_FAILURE);
68     h_simdata->sleeping[channel] = NULL;
69     /* OK, we should both be ready now. Are you there ? */
70   }
71
72   t_simdata = t->simdata;
73   /*   *task = __MSG_task_copy(t); */
74   *task=t;
75
76   /* Transfer */
77   t_simdata->using++;
78   t_simdata->comm = surf_workstation_resource->extension_public->
79     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80                 h->simdata->host, t_simdata->message_size);
81   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
82
83   do {
84     __MSG_task_wait_event(process, t);
85     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
86   } while (state==SURF_ACTION_RUNNING);
87
88   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
89   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
90           == SURF_CPU_OFF)
91     MSG_RETURN(MSG_HOST_FAILURE);
92   else MSG_RETURN(MSG_TRANSFER_FAILURE);
93 }
94
95 /** \ingroup msg_gos_functions
96  * \brief Test whether there is a pending communication on a channel.
97  *
98  * It takes one parameter.
99  * \param channel the channel on which the agent should be
100    listening. This value has to be >=0 and < than the maximal
101    number of channels fixed with MSG_set_channel_number().
102  * \return 1 if there is a pending communication and 0 otherwise
103  */
104 int MSG_task_Iprobe(m_channel_t channel)
105 {
106   m_host_t h = NULL;
107   simdata_host_t h_simdata = NULL;
108
109   CHECK_HOST();
110   h = MSG_host_self();
111   h_simdata = h->simdata;
112   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
113 }
114
115 /** \ingroup msg_gos_functions
116  * \brief Put a task on a channel of an host and waits for the end of the
117  * transmission.
118  *
119  * This function is used for describing the behavior of an agent. It
120  * takes three parameter.
121  * \param task a #m_task_t to send on another location. This task
122    will not be usable anymore when the function will return. There is
123    no automatic task duplication and you have to save your parameters
124    before calling this function. Tasks are unique and once it has been
125    sent to another location, you should not access it anymore. You do
126    not need to call MSG_task_destroy() but to avoid using, as an
127    effect of inattention, this task anymore, you definitely should
128    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
129    can be transfered iff it has been correctly created with
130    MSG_task_create().
131  * \param dest the destination of the message
132  * \param channel the channel on which the agent should put this
133    task. This value has to be >=0 and < than the maximal number of
134    channels fixed with MSG_set_channel_number().
135  * \return #MSG_FATAL if \a task is not properly initialized and
136  * #MSG_OK otherwise.
137  */
138 MSG_error_t MSG_task_put(m_task_t task,
139                          m_host_t dest, m_channel_t channel)
140 {
141   m_process_t process = MSG_process_self();
142   simdata_task_t task_simdata = NULL;
143   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
144   m_host_t local_host = NULL;
145   m_host_t remote_host = NULL;
146
147   CHECK_HOST();
148
149   task_simdata = task->simdata;
150   task_simdata->sender = process;
151
152   
153   local_host = ((simdata_process_t) process->simdata)->host;
154   remote_host = dest;
155
156   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
157                 mbox[channel], task);
158     
159   if(remote_host->simdata->sleeping[channel]) 
160     MSG_process_resume(remote_host->simdata->sleeping[channel]);
161   else {
162     process->simdata->put_host = dest;
163     process->simdata->put_channel = channel;
164     MSG_process_suspend(process);
165     process->simdata->put_host = NULL;
166     process->simdata->put_channel = -1;
167   }
168
169   do {
170     __MSG_task_wait_event(process, task);
171     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
172   } while (state==SURF_ACTION_RUNNING);
173     
174   MSG_task_destroy(task);
175
176   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
177   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
178           == SURF_CPU_OFF)
179     MSG_RETURN(MSG_HOST_FAILURE);
180   else MSG_RETURN(MSG_TRANSFER_FAILURE);
181 }
182
183 /** \ingroup msg_gos_functions
184  * \brief Executes a task and waits for its termination.
185  *
186  * This function is used for describing the behavior of an agent. It
187  * takes only one parameter.
188  * \param task a #m_task_t to execute on the location on which the
189    agent is running.
190  * \return #MSG_FATAL if \a task is not properly initialized and
191  * #MSG_OK otherwise.
192  */
193 MSG_error_t MSG_task_execute(m_task_t task)
194 {
195   m_process_t process = MSG_process_self();
196
197   __MSG_task_execute(process, task);
198   return __MSG_wait_for_computation(process,task);
199 }
200
201 void __MSG_task_execute(m_process_t process, m_task_t task)
202 {
203   simdata_task_t simdata = NULL;
204
205   CHECK_HOST();
206
207   simdata = task->simdata;
208
209   simdata->compute = surf_workstation_resource->extension_public->
210     execute(MSG_process_get_host(process)->simdata->host,
211             simdata->computation_amount);
212   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
213 }
214
215 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
216 {
217   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
218   simdata_task_t simdata = task->simdata;
219
220   simdata->using++;
221   do {
222     __MSG_task_wait_event(process, task);
223     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
224   } while (state==SURF_ACTION_RUNNING);
225   simdata->using--;
226     
227
228   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
229   else if(surf_workstation_resource->extension_public->
230           get_state(MSG_process_get_host(process)->simdata->host) 
231           == SURF_CPU_OFF)
232     MSG_RETURN(MSG_HOST_FAILURE);
233   else MSG_RETURN(MSG_TRANSFER_FAILURE);
234 }
235
236 /** \ingroup msg_gos_functions
237  * \brief Sleep for the specified number of seconds
238  *
239  * Makes the current process sleep until \a time seconds have elapsed.
240  *
241  * \param nb_sec a number of second
242  */
243 MSG_error_t MSG_process_sleep(long double nb_sec)
244 {
245   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
246   m_process_t process = MSG_process_self();
247   m_task_t dummy = NULL;
248   simdata_task_t simdata = NULL;
249
250   CHECK_HOST();
251   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
252   simdata = dummy->simdata;
253
254   simdata->compute = surf_workstation_resource->extension_public->
255     sleep(MSG_process_get_host(process)->simdata->host,
256             simdata->computation_amount);
257   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
258
259   
260   simdata->using++;
261   do {
262     __MSG_task_wait_event(process, dummy);
263     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
264   } while (state==SURF_ACTION_RUNNING);
265   simdata->using--;
266     
267   if(state == SURF_ACTION_DONE) {
268     if(surf_workstation_resource->extension_public->
269        get_state(MSG_process_get_host(process)->simdata->host) 
270        == SURF_CPU_OFF)
271       MSG_RETURN(MSG_HOST_FAILURE);
272
273     if(MSG_process_isSuspended(process)) {
274       MSG_process_suspend(MSG_process_self());
275     }
276     if(surf_workstation_resource->extension_public->
277        get_state(MSG_process_get_host(process)->simdata->host) 
278        == SURF_CPU_OFF)
279       MSG_RETURN(MSG_HOST_FAILURE);
280     MSG_task_destroy(dummy);
281     MSG_RETURN(MSG_OK);
282   } else MSG_RETURN(MSG_HOST_FAILURE);
283 }
284
285 /** \ingroup msg_gos_functions
286  * \brief Return the number of MSG tasks currently running on a
287  * the host of the current running process.
288  */
289 int MSG_get_msgload(void) 
290 {
291   CHECK_HOST();
292   xbt_assert0(0,"Not implemented yet!");
293   
294   return 1;
295 }
296
297 /** \ingroup msg_gos_functions
298  *
299  * \brief Return the the last value returned by a MSG function (except
300  * MSG_get_errno...).
301  */
302 MSG_error_t MSG_get_errno(void)
303 {
304   return PROCESS_GET_ERRNO();
305 }
306