Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3e1a6e67a15534988a2ecb7072006ea80e449ba0
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17
18 MSG_error_t MSG_process_start(m_process_t process)
19 {
20   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
21   
22   return MSG_OK;
23 }
24
25 /** \ingroup msg_gos_functions
26  * \brief Listen on a channel and wait for receiving a task.
27  *
28  * It takes two parameter.
29  * \param task a memory location for storing a #m_task_t. It will
30    hold a task when this function will return. Thus \a task should not
31    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
32    those two condition does not hold, there will be a warning message.
33  * \param channel the channel on which the agent should be
34    listening. This value has to be >=0 and < than the maximal
35    number of channels fixed with MSG_set_channel_number().
36  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
37  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
38  */
39 MSG_error_t MSG_task_get(m_task_t * task,
40                          m_channel_t channel)
41 {
42   m_process_t process = MSG_process_self();
43   m_task_t t = NULL;
44   m_host_t h = NULL;
45   simdata_task_t t_simdata = NULL;
46   simdata_host_t h_simdata = NULL;
47   int warning = 0;
48   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
49   
50   CHECK_HOST();
51   /* Sanity check */
52   xbt_assert0(task,"Null pointer for the task\n");
53
54   if (*task) 
55     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
56
57   /* Get the task */
58   h = MSG_host_self();
59   h_simdata = h->simdata;
60   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
61     xbt_assert0(!(h_simdata->sleeping[channel]),
62                 "A process is already blocked on this channel");
63     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
64     MSG_process_suspend(process);
65     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
66        == SURF_CPU_OFF)
67       MSG_RETURN(MSG_HOST_FAILURE);
68     h_simdata->sleeping[channel] = NULL;
69     /* OK, we should both be ready now. Are you there ? */
70   }
71
72   t_simdata = t->simdata;
73   /*   *task = __MSG_task_copy(t); */
74   *task=t;
75
76   /* Transfer */
77   t_simdata->using++;
78   t_simdata->comm = surf_workstation_resource->extension_public->
79     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
80                 h->simdata->host, t_simdata->message_size);
81   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
82
83   if(MSG_process_isSuspended(t_simdata->sender)) 
84     MSG_process_resume(t_simdata->sender);
85
86   do {
87     __MSG_task_wait_event(process, t);
88     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
89   } while (state==SURF_ACTION_RUNNING);
90
91   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
92   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
93           == SURF_CPU_OFF)
94     MSG_RETURN(MSG_HOST_FAILURE);
95   else MSG_RETURN(MSG_TRANSFER_FAILURE);
96 }
97
98 /** \ingroup msg_gos_functions
99  * \brief Test whether there is a pending communication on a channel.
100  *
101  * It takes one parameter.
102  * \param channel the channel on which the agent should be
103    listening. This value has to be >=0 and < than the maximal
104    number of channels fixed with MSG_set_channel_number().
105  * \return 1 if there is a pending communication and 0 otherwise
106  */
107 int MSG_task_Iprobe(m_channel_t channel)
108 {
109   m_host_t h = NULL;
110   simdata_host_t h_simdata = NULL;
111
112   CHECK_HOST();
113   h = MSG_host_self();
114   h_simdata = h->simdata;
115   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
116 }
117
118 /** \ingroup msg_gos_functions
119  * \brief Put a task on a channel of an host and waits for the end of the
120  * transmission.
121  *
122  * This function is used for describing the behavior of an agent. It
123  * takes three parameter.
124  * \param task a #m_task_t to send on another location. This task
125    will not be usable anymore when the function will return. There is
126    no automatic task duplication and you have to save your parameters
127    before calling this function. Tasks are unique and once it has been
128    sent to another location, you should not access it anymore. You do
129    not need to call MSG_task_destroy() but to avoid using, as an
130    effect of inattention, this task anymore, you definitely should
131    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
132    can be transfered iff it has been correctly created with
133    MSG_task_create().
134  * \param dest the destination of the message
135  * \param channel the channel on which the agent should put this
136    task. This value has to be >=0 and < than the maximal number of
137    channels fixed with MSG_set_channel_number().
138  * \return #MSG_FATAL if \a task is not properly initialized and
139  * #MSG_OK otherwise.
140  */
141 MSG_error_t MSG_task_put(m_task_t task,
142                          m_host_t dest, m_channel_t channel)
143 {
144   m_process_t process = MSG_process_self();
145   simdata_task_t task_simdata = NULL;
146   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
147   m_host_t local_host = NULL;
148   m_host_t remote_host = NULL;
149
150   CHECK_HOST();
151
152   task_simdata = task->simdata;
153   task_simdata->sender = process;
154   xbt_assert0(task_simdata->using==1,"Gargl!");
155   task_simdata->comm = NULL;
156   
157   local_host = ((simdata_process_t) process->simdata)->host;
158   remote_host = dest;
159
160   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
161                 mbox[channel], task);
162     
163   if(remote_host->simdata->sleeping[channel]) 
164     MSG_process_resume(remote_host->simdata->sleeping[channel]);
165   else {
166     process->simdata->put_host = dest;
167     process->simdata->put_channel = channel;
168     while(!(task_simdata->comm)) 
169       MSG_process_suspend(process);
170     process->simdata->put_host = NULL;
171     process->simdata->put_channel = -1;
172   }
173
174   do {
175     __MSG_task_wait_event(process, task);
176     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
177   } while (state==SURF_ACTION_RUNNING);
178     
179   MSG_task_destroy(task);
180
181   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
182   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
183           == SURF_CPU_OFF)
184     MSG_RETURN(MSG_HOST_FAILURE);
185   else MSG_RETURN(MSG_TRANSFER_FAILURE);
186 }
187
188 /** \ingroup msg_gos_functions
189  * \brief Executes a task and waits for its termination.
190  *
191  * This function is used for describing the behavior of an agent. It
192  * takes only one parameter.
193  * \param task a #m_task_t to execute on the location on which the
194    agent is running.
195  * \return #MSG_FATAL if \a task is not properly initialized and
196  * #MSG_OK otherwise.
197  */
198 MSG_error_t MSG_task_execute(m_task_t task)
199 {
200   m_process_t process = MSG_process_self();
201
202   __MSG_task_execute(process, task);
203   return __MSG_wait_for_computation(process,task);
204 }
205
206 void __MSG_task_execute(m_process_t process, m_task_t task)
207 {
208   simdata_task_t simdata = NULL;
209
210   CHECK_HOST();
211
212   simdata = task->simdata;
213
214   simdata->compute = surf_workstation_resource->extension_public->
215     execute(MSG_process_get_host(process)->simdata->host,
216             simdata->computation_amount);
217   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
218 }
219
220 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
221 {
222   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
223   simdata_task_t simdata = task->simdata;
224
225   simdata->using++;
226   do {
227     __MSG_task_wait_event(process, task);
228     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
229   } while (state==SURF_ACTION_RUNNING);
230   simdata->using--;
231     
232
233   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
234   else if(surf_workstation_resource->extension_public->
235           get_state(MSG_process_get_host(process)->simdata->host) 
236           == SURF_CPU_OFF)
237     MSG_RETURN(MSG_HOST_FAILURE);
238   else MSG_RETURN(MSG_TRANSFER_FAILURE);
239 }
240
241 /** \ingroup msg_gos_functions
242  * \brief Sleep for the specified number of seconds
243  *
244  * Makes the current process sleep until \a time seconds have elapsed.
245  *
246  * \param nb_sec a number of second
247  */
248 MSG_error_t MSG_process_sleep(long double nb_sec)
249 {
250   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
251   m_process_t process = MSG_process_self();
252   m_task_t dummy = NULL;
253   simdata_task_t simdata = NULL;
254
255   CHECK_HOST();
256   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
257   simdata = dummy->simdata;
258
259   simdata->compute = surf_workstation_resource->extension_public->
260     sleep(MSG_process_get_host(process)->simdata->host,
261             simdata->computation_amount);
262   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
263
264   
265   simdata->using++;
266   do {
267     __MSG_task_wait_event(process, dummy);
268     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
269   } while (state==SURF_ACTION_RUNNING);
270   simdata->using--;
271     
272   if(state == SURF_ACTION_DONE) {
273     if(surf_workstation_resource->extension_public->
274        get_state(MSG_process_get_host(process)->simdata->host) 
275        == SURF_CPU_OFF)
276       MSG_RETURN(MSG_HOST_FAILURE);
277
278     if(MSG_process_isSuspended(process)) {
279       MSG_process_suspend(MSG_process_self());
280     }
281     if(surf_workstation_resource->extension_public->
282        get_state(MSG_process_get_host(process)->simdata->host) 
283        == SURF_CPU_OFF)
284       MSG_RETURN(MSG_HOST_FAILURE);
285     MSG_task_destroy(dummy);
286     MSG_RETURN(MSG_OK);
287   } else MSG_RETURN(MSG_HOST_FAILURE);
288 }
289
290 /** \ingroup msg_gos_functions
291  * \brief Return the number of MSG tasks currently running on a
292  * the host of the current running process.
293  */
294 int MSG_get_msgload(void) 
295 {
296   CHECK_HOST();
297   xbt_assert0(0,"Not implemented yet!");
298   
299   return 1;
300 }
301
302 /** \ingroup msg_gos_functions
303  *
304  * \brief Return the the last value returned by a MSG function (except
305  * MSG_get_errno...).
306  */
307 MSG_error_t MSG_get_errno(void)
308 {
309   return PROCESS_GET_ERRNO();
310 }
311