Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3831fa454f84562b40b15cb9366c0991e24512e8
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameter.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   m_process_t process = MSG_process_self();
47   m_task_t t = NULL;
48   m_host_t h = NULL;
49   simdata_task_t t_simdata = NULL;
50   simdata_host_t h_simdata = NULL;
51   int warning = 0;
52   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
53   
54   CHECK_HOST();
55   /* Sanity check */
56   xbt_assert0(task,"Null pointer for the task\n");
57
58   if (*task) 
59     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
60
61   /* Get the task */
62   h = MSG_host_self();
63   h_simdata = h->simdata;
64   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
65     xbt_assert2(!(h_simdata->sleeping[channel]),
66                 "A process (%s(%d)) is already blocked on this channel",
67                 h_simdata->sleeping[channel]->name,
68                 h_simdata->sleeping[channel]->simdata->PID);
69     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
70     __MSG_process_block();
71     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
72        == SURF_CPU_OFF)
73       MSG_RETURN(MSG_HOST_FAILURE);
74     h_simdata->sleeping[channel] = NULL;
75     /* OK, we should both be ready now. Are you there ? */
76   }
77
78   t_simdata = t->simdata;
79   /*   *task = __MSG_task_copy(t); */
80   *task=t;
81
82   /* Transfer */
83   t_simdata->using++;
84
85   t_simdata->comm = surf_workstation_resource->extension_public->
86     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
87                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
88   
89   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
90
91   if(__MSG_process_isBlocked(t_simdata->sender)) 
92     __MSG_process_unblock(t_simdata->sender);
93
94   PAJE_PROCESS_PUSH_STATE(process,"C");  
95
96   do {
97     __MSG_task_wait_event(process, t);
98     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
99   } while (state==SURF_ACTION_RUNNING);
100
101   if(t->simdata->using>1) {
102     xbt_fifo_unshift(msg_global->process_to_run,process);
103     xbt_context_yield();
104   }
105
106   PAJE_PROCESS_POP_STATE(process);  
107   PAJE_COMM_STOP(process,t,channel);
108
109   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
110   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
111           == SURF_CPU_OFF)
112     MSG_RETURN(MSG_HOST_FAILURE);
113   else MSG_RETURN(MSG_TRANSFER_FAILURE);
114 }
115
116 /** \ingroup msg_gos_functions
117  * \brief Test whether there is a pending communication on a channel.
118  *
119  * It takes one parameter.
120  * \param channel the channel on which the agent should be
121    listening. This value has to be >=0 and < than the maximal
122    number of channels fixed with MSG_set_channel_number().
123  * \return 1 if there is a pending communication and 0 otherwise
124  */
125 int MSG_task_Iprobe(m_channel_t channel)
126 {
127   m_host_t h = NULL;
128   simdata_host_t h_simdata = NULL;
129
130   CHECK_HOST();
131   h = MSG_host_self();
132   h_simdata = h->simdata;
133   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
134 }
135
136 /** \ingroup msg_gos_functions
137  * \brief Test whether there is a pending communication on a channel, and who sent it.
138  *
139  * It takes one parameter.
140  * \param channel the channel on which the agent should be
141    listening. This value has to be >=0 and < than the maximal
142    number of channels fixed with MSG_set_channel_number().
143  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
144  */
145 int MSG_task_probe_from(m_channel_t channel)
146 {
147   m_host_t h = NULL;
148   simdata_host_t h_simdata = NULL;
149   xbt_fifo_item_t item;
150   m_task_t t;
151
152   CHECK_HOST();
153   h = MSG_host_self();
154   h_simdata = h->simdata;
155    
156   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
157   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
158     return -1;
159    
160   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
161 }
162
163 /** \ingroup msg_gos_functions
164  * \brief Put a task on a channel of an host and waits for the end of the
165  * transmission.
166  *
167  * This function is used for describing the behavior of an agent. It
168  * takes three parameter.
169  * \param task a #m_task_t to send on another location. This task
170    will not be usable anymore when the function will return. There is
171    no automatic task duplication and you have to save your parameters
172    before calling this function. Tasks are unique and once it has been
173    sent to another location, you should not access it anymore. You do
174    not need to call MSG_task_destroy() but to avoid using, as an
175    effect of inattention, this task anymore, you definitely should
176    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
177    can be transfered iff it has been correctly created with
178    MSG_task_create().
179  * \param dest the destination of the message
180  * \param channel the channel on which the agent should put this
181    task. This value has to be >=0 and < than the maximal number of
182    channels fixed with MSG_set_channel_number().
183  * \return #MSG_FATAL if \a task is not properly initialized and
184  * #MSG_OK otherwise.
185  */
186 MSG_error_t MSG_task_put(m_task_t task,
187                          m_host_t dest, m_channel_t channel)
188 {
189   m_process_t process = MSG_process_self();
190   simdata_task_t task_simdata = NULL;
191   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
192   m_host_t local_host = NULL;
193   m_host_t remote_host = NULL;
194
195   CHECK_HOST();
196
197   task_simdata = task->simdata;
198   task_simdata->sender = process;
199   xbt_assert0(task_simdata->using==1,"Gargl!");
200   task_simdata->comm = NULL;
201   
202   local_host = ((simdata_process_t) process->simdata)->host;
203   remote_host = dest;
204
205   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
206                 mbox[channel], task);
207
208   PAJE_COMM_START(process,task,channel);
209     
210   if(remote_host->simdata->sleeping[channel]) 
211     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
212
213   process->simdata->put_host = dest;
214   process->simdata->put_channel = channel;
215   while(!(task_simdata->comm)) 
216     __MSG_process_block();
217   process->simdata->put_host = NULL;
218   process->simdata->put_channel = -1;
219
220
221   PAJE_PROCESS_PUSH_STATE(process,"C");  
222
223   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
224   while (state==SURF_ACTION_RUNNING) {
225     __MSG_task_wait_event(process, task);
226     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
227   }
228     
229   MSG_task_destroy(task);
230
231   PAJE_PROCESS_POP_STATE(process);  
232
233   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
234   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
235           == SURF_CPU_OFF)
236     MSG_RETURN(MSG_HOST_FAILURE);
237   else MSG_RETURN(MSG_TRANSFER_FAILURE);
238 }
239
240 /** \ingroup msg_gos_functions
241  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
242  * rate.
243  *
244  * \sa MSG_task_put
245  */
246 MSG_error_t MSG_task_put_bounded(m_task_t task,
247                                  m_host_t dest, m_channel_t channel,
248                                  double max_rate)
249 {
250   task->simdata->rate=max_rate;
251   return(MSG_task_put(task, dest, channel));
252   task->simdata->rate=-1.0;
253 }
254
255 /** \ingroup msg_gos_functions
256  * \brief Executes a task and waits for its termination.
257  *
258  * This function is used for describing the behavior of an agent. It
259  * takes only one parameter.
260  * \param task a #m_task_t to execute on the location on which the
261    agent is running.
262  * \return #MSG_FATAL if \a task is not properly initialized and
263  * #MSG_OK otherwise.
264  */
265 MSG_error_t MSG_task_execute(m_task_t task)
266 {
267   m_process_t process = MSG_process_self();
268   MSG_error_t res;
269   __MSG_task_execute(process, task);
270
271   PAJE_PROCESS_PUSH_STATE(process,"E");  
272   res = __MSG_wait_for_computation(process,task);
273   PAJE_PROCESS_POP_STATE(process);
274   return res;
275 }
276
277 void __MSG_task_execute(m_process_t process, m_task_t task)
278 {
279   simdata_task_t simdata = NULL;
280
281   CHECK_HOST();
282
283   simdata = task->simdata;
284
285   simdata->compute = surf_workstation_resource->extension_public->
286     execute(MSG_process_get_host(process)->simdata->host,
287             simdata->computation_amount);
288   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
289 }
290
291 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
292 {
293   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
294   simdata_task_t simdata = task->simdata;
295
296   simdata->using++;
297   do {
298     __MSG_task_wait_event(process, task);
299     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
300   } while (state==SURF_ACTION_RUNNING);
301   simdata->using--;
302     
303
304   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
305   else if(surf_workstation_resource->extension_public->
306           get_state(MSG_process_get_host(process)->simdata->host) 
307           == SURF_CPU_OFF)
308     MSG_RETURN(MSG_HOST_FAILURE);
309   else MSG_RETURN(MSG_TRANSFER_FAILURE);
310 }
311
312 /** \ingroup msg_gos_functions
313  * \brief Sleep for the specified number of seconds
314  *
315  * Makes the current process sleep until \a time seconds have elapsed.
316  *
317  * \param nb_sec a number of second
318  */
319 MSG_error_t MSG_process_sleep(double nb_sec)
320 {
321   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
322   m_process_t process = MSG_process_self();
323   m_task_t dummy = NULL;
324   simdata_task_t simdata = NULL;
325
326   CHECK_HOST();
327   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
328   simdata = dummy->simdata;
329
330   simdata->compute = surf_workstation_resource->extension_public->
331     sleep(MSG_process_get_host(process)->simdata->host,
332             simdata->computation_amount);
333   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
334
335   
336   simdata->using++;
337   do {
338     __MSG_task_wait_event(process, dummy);
339     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
340   } while (state==SURF_ACTION_RUNNING);
341   simdata->using--;
342     
343   if(state == SURF_ACTION_DONE) {
344     if(surf_workstation_resource->extension_public->
345        get_state(MSG_process_get_host(process)->simdata->host) 
346        == SURF_CPU_OFF)
347       MSG_RETURN(MSG_HOST_FAILURE);
348
349     if(__MSG_process_isBlocked(process)) {
350       __MSG_process_unblock(MSG_process_self());
351     }
352     if(surf_workstation_resource->extension_public->
353        get_state(MSG_process_get_host(process)->simdata->host) 
354        == SURF_CPU_OFF)
355       MSG_RETURN(MSG_HOST_FAILURE);
356     MSG_task_destroy(dummy);
357     MSG_RETURN(MSG_OK);
358   } else MSG_RETURN(MSG_HOST_FAILURE);
359 }
360
361 /** \ingroup msg_gos_functions
362  * \brief Return the number of MSG tasks currently running on a
363  * the host of the current running process.
364  */
365 int MSG_get_msgload(void) 
366 {
367   CHECK_HOST();
368   xbt_assert0(0,"Not implemented yet!");
369   
370   return 1;
371 }
372
373 /** \ingroup msg_gos_functions
374  *
375  * \brief Return the the last value returned by a MSG function (except
376  * MSG_get_errno...).
377  */
378 MSG_error_t MSG_get_errno(void)
379 {
380   return PROCESS_GET_ERRNO();
381 }