Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
61be3c9a4e45cc4f25f750f54226137dec1df73e
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \defgroup msg_gos_functions MSG Operating System Functions
15  *  \brief This section describes the functions that can be used
16  *  by an agent for handling some task.
17  */
18
19 /** \ingroup msg_gos_functions
20  * \brief This function is now deprecated and useless. Please stop using it.
21  */
22 MSG_error_t MSG_process_start(m_process_t process)
23 {
24   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
25   
26   return MSG_OK;
27 }
28
29 /** \ingroup msg_gos_functions
30  * \brief Listen on a channel and wait for receiving a task.
31  *
32  * It takes two parameter.
33  * \param task a memory location for storing a #m_task_t. It will
34    hold a task when this function will return. Thus \a task should not
35    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
36    those two condition does not hold, there will be a warning message.
37  * \param channel the channel on which the agent should be
38    listening. This value has to be >=0 and < than the maximal
39    number of channels fixed with MSG_set_channel_number().
40  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
41  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
42  */
43 MSG_error_t MSG_task_get(m_task_t * task,
44                          m_channel_t channel)
45 {
46   m_process_t process = MSG_process_self();
47   m_task_t t = NULL;
48   m_host_t h = NULL;
49   simdata_task_t t_simdata = NULL;
50   simdata_host_t h_simdata = NULL;
51   int warning = 0;
52   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
53   
54   CHECK_HOST();
55   /* Sanity check */
56   xbt_assert0(task,"Null pointer for the task\n");
57
58   if (*task) 
59     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
60
61   /* Get the task */
62   h = MSG_host_self();
63   h_simdata = h->simdata;
64
65   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
66
67   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
68     xbt_assert2(!(h_simdata->sleeping[channel]),
69                 "A process (%s(%d)) is already blocked on this channel",
70                 h_simdata->sleeping[channel]->name,
71                 h_simdata->sleeping[channel]->simdata->PID);
72     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
73     __MSG_process_block();
74     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
75        == SURF_CPU_OFF)
76       MSG_RETURN(MSG_HOST_FAILURE);
77     h_simdata->sleeping[channel] = NULL;
78     /* OK, we should both be ready now. Are you there ? */
79   }
80
81   t_simdata = t->simdata;
82   /*   *task = __MSG_task_copy(t); */
83   *task=t;
84
85   /* Transfer */
86   t_simdata->using++;
87
88   t_simdata->comm = surf_workstation_resource->extension_public->
89     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
90                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
91   
92   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
93
94   if(__MSG_process_isBlocked(t_simdata->sender)) 
95     __MSG_process_unblock(t_simdata->sender);
96
97   PAJE_PROCESS_PUSH_STATE(process,"C");  
98
99   do {
100     __MSG_task_wait_event(process, t);
101     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
102   } while (state==SURF_ACTION_RUNNING);
103
104   if(t->simdata->using>1) {
105     xbt_fifo_unshift(msg_global->process_to_run,process);
106     xbt_context_yield();
107   }
108
109   PAJE_PROCESS_POP_STATE(process);
110   PAJE_COMM_STOP(process,t,channel);
111
112   if(state == SURF_ACTION_DONE) {
113     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
114       t_simdata->comm = NULL;
115     MSG_RETURN(MSG_OK);
116   } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
117           == SURF_CPU_OFF) {
118     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
119       t_simdata->comm = NULL;
120     MSG_RETURN(MSG_HOST_FAILURE);
121   } else {
122     if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
123       t_simdata->comm = NULL;
124     MSG_RETURN(MSG_TRANSFER_FAILURE);
125   }
126 }
127
128 /** \ingroup msg_gos_functions
129  * \brief Test whether there is a pending communication on a channel.
130  *
131  * It takes one parameter.
132  * \param channel the channel on which the agent should be
133    listening. This value has to be >=0 and < than the maximal
134    number of channels fixed with MSG_set_channel_number().
135  * \return 1 if there is a pending communication and 0 otherwise
136  */
137 int MSG_task_Iprobe(m_channel_t channel)
138 {
139   m_host_t h = NULL;
140   simdata_host_t h_simdata = NULL;
141
142   DEBUG2("Probing on channel %d (%s)", channel,h->name);
143   CHECK_HOST();
144   h = MSG_host_self();
145   h_simdata = h->simdata;
146   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
147 }
148
149 /** \ingroup msg_gos_functions
150  * \brief Test whether there is a pending communication on a channel, and who sent it.
151  *
152  * It takes one parameter.
153  * \param channel the channel on which the agent should be
154    listening. This value has to be >=0 and < than the maximal
155    number of channels fixed with MSG_set_channel_number().
156  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
157  */
158 int MSG_task_probe_from(m_channel_t channel)
159 {
160   m_host_t h = NULL;
161   simdata_host_t h_simdata = NULL;
162   xbt_fifo_item_t item;
163   m_task_t t;
164
165   CHECK_HOST();
166   h = MSG_host_self();
167   h_simdata = h->simdata;
168
169   DEBUG2("Probing on channel %d (%s)", channel,h->name);
170    
171   item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
172   if (!item || !(t = xbt_fifo_get_item_content(item)))
173     return -1;
174    
175   return MSG_process_get_PID(t->simdata->sender);
176 }
177
178 /** \ingroup msg_gos_functions
179  * \brief Put a task on a channel of an host and waits for the end of the
180  * transmission.
181  *
182  * This function is used for describing the behavior of an agent. It
183  * takes three parameter.
184  * \param task a #m_task_t to send on another location. This task
185    will not be usable anymore when the function will return. There is
186    no automatic task duplication and you have to save your parameters
187    before calling this function. Tasks are unique and once it has been
188    sent to another location, you should not access it anymore. You do
189    not need to call MSG_task_destroy() but to avoid using, as an
190    effect of inattention, this task anymore, you definitely should
191    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
192    can be transfered iff it has been correctly created with
193    MSG_task_create().
194  * \param dest the destination of the message
195  * \param channel the channel on which the agent should put this
196    task. This value has to be >=0 and < than the maximal number of
197    channels fixed with MSG_set_channel_number().
198  * \return #MSG_FATAL if \a task is not properly initialized and
199  * #MSG_OK otherwise.
200  */
201 MSG_error_t MSG_task_put(m_task_t task,
202                          m_host_t dest, m_channel_t channel)
203 {
204   m_process_t process = MSG_process_self();
205   simdata_task_t task_simdata = NULL;
206   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
207   m_host_t local_host = NULL;
208   m_host_t remote_host = NULL;
209
210   CHECK_HOST();
211
212   task_simdata = task->simdata;
213   task_simdata->sender = process;
214   xbt_assert0(task_simdata->using==1,"Gargl!");
215   task_simdata->comm = NULL;
216   
217   local_host = ((simdata_process_t) process->simdata)->host;
218   remote_host = dest;
219
220   DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
221          task->simdata->message_size,local_host->name, remote_host->name, channel);
222
223   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
224                 mbox[channel], task);
225
226   PAJE_COMM_START(process,task,channel);
227     
228   if(remote_host->simdata->sleeping[channel]) 
229     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
230
231   process->simdata->put_host = dest;
232   process->simdata->put_channel = channel;
233   while(!(task_simdata->comm)) 
234     __MSG_process_block();
235   surf_workstation_resource->common_public->action_use(task_simdata->comm);
236   process->simdata->put_host = NULL;
237   process->simdata->put_channel = -1;
238
239
240   PAJE_PROCESS_PUSH_STATE(process,"C");  
241
242   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
243   while (state==SURF_ACTION_RUNNING) {
244     __MSG_task_wait_event(process, task);
245     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
246   }
247     
248   MSG_task_destroy(task);
249
250   PAJE_PROCESS_POP_STATE(process);  
251
252   if(state == SURF_ACTION_DONE) {
253     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
254       task_simdata->comm = NULL;
255     MSG_RETURN(MSG_OK);
256   } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
257             == SURF_CPU_OFF) {
258     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
259       task_simdata->comm = NULL;
260     MSG_RETURN(MSG_HOST_FAILURE);
261   } else { 
262     if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
263       task_simdata->comm = NULL;
264     MSG_RETURN(MSG_TRANSFER_FAILURE);
265   }
266 }
267
268 /** \ingroup msg_gos_functions
269  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
270  * rate.
271  *
272  * \sa MSG_task_put
273  */
274 MSG_error_t MSG_task_put_bounded(m_task_t task,
275                                  m_host_t dest, m_channel_t channel,
276                                  double max_rate)
277 {
278   task->simdata->rate=max_rate;
279   return(MSG_task_put(task, dest, channel));
280   task->simdata->rate=-1.0;
281 }
282
283 /** \ingroup msg_gos_functions
284  * \brief Executes a task and waits for its termination.
285  *
286  * This function is used for describing the behavior of an agent. It
287  * takes only one parameter.
288  * \param task a #m_task_t to execute on the location on which the
289    agent is running.
290  * \return #MSG_FATAL if \a task is not properly initialized and
291  * #MSG_OK otherwise.
292  */
293 MSG_error_t MSG_task_execute(m_task_t task)
294 {
295   m_process_t process = MSG_process_self();
296   MSG_error_t res;
297
298   DEBUG1("Computing on %s", process->simdata->host->name);
299
300   __MSG_task_execute(process, task);
301
302   PAJE_PROCESS_PUSH_STATE(process,"E");  
303   res = __MSG_wait_for_computation(process,task);
304   PAJE_PROCESS_POP_STATE(process);
305   return res;
306 }
307
308 void __MSG_task_execute(m_process_t process, m_task_t task)
309 {
310   simdata_task_t simdata = NULL;
311
312   CHECK_HOST();
313
314   simdata = task->simdata;
315
316   simdata->compute = surf_workstation_resource->extension_public->
317     execute(MSG_process_get_host(process)->simdata->host,
318             simdata->computation_amount);
319   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
320 }
321
322 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
323 {
324   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
325   simdata_task_t simdata = task->simdata;
326
327   simdata->using++;
328   do {
329     __MSG_task_wait_event(process, task);
330     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
331   } while (state==SURF_ACTION_RUNNING);
332   simdata->using--;
333     
334
335   if(state == SURF_ACTION_DONE) {
336     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
337       simdata->compute = NULL;
338     MSG_RETURN(MSG_OK);
339   } else if(surf_workstation_resource->extension_public->
340             get_state(MSG_process_get_host(process)->simdata->host) 
341             == SURF_CPU_OFF) {
342     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
343       simdata->compute = NULL;
344     MSG_RETURN(MSG_HOST_FAILURE);
345   } else {
346     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
347       simdata->compute = NULL;
348     MSG_RETURN(MSG_TRANSFER_FAILURE);
349   }
350 }
351
352 /** \ingroup msg_gos_functions
353  * \brief Sleep for the specified number of seconds
354  *
355  * Makes the current process sleep until \a time seconds have elapsed.
356  *
357  * \param nb_sec a number of second
358  */
359 MSG_error_t MSG_process_sleep(double nb_sec)
360 {
361   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
362   m_process_t process = MSG_process_self();
363   m_task_t dummy = NULL;
364   simdata_task_t simdata = NULL;
365
366   CHECK_HOST();
367   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
368   simdata = dummy->simdata;
369
370   simdata->compute = surf_workstation_resource->extension_public->
371     sleep(MSG_process_get_host(process)->simdata->host,
372             simdata->computation_amount);
373   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
374
375   
376   simdata->using++;
377   do {
378     __MSG_task_wait_event(process, dummy);
379     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
380   } while (state==SURF_ACTION_RUNNING);
381   simdata->using--;
382     
383   if(state == SURF_ACTION_DONE) {
384     if(surf_workstation_resource->extension_public->
385        get_state(MSG_process_get_host(process)->simdata->host) 
386        == SURF_CPU_OFF) {
387       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
388         simdata->compute = NULL;
389       MSG_RETURN(MSG_HOST_FAILURE);
390     }
391     if(__MSG_process_isBlocked(process)) {
392       __MSG_process_unblock(MSG_process_self());
393     }
394     if(surf_workstation_resource->extension_public->
395        get_state(MSG_process_get_host(process)->simdata->host) 
396        == SURF_CPU_OFF) {
397       if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
398         simdata->compute = NULL;
399       MSG_RETURN(MSG_HOST_FAILURE);
400     }
401     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
402       simdata->compute = NULL;
403     MSG_task_destroy(dummy);
404     MSG_RETURN(MSG_OK);
405   } else MSG_RETURN(MSG_HOST_FAILURE);
406 }
407
408 /** \ingroup msg_gos_functions
409  * \brief Return the number of MSG tasks currently running on a
410  * the host of the current running process.
411  */
412 int MSG_get_msgload(void) 
413 {
414   m_process_t process;
415    
416   CHECK_HOST();
417   
418   process = MSG_process_self();
419   return xbt_fifo_size(process->simdata->host->simdata->process_list);
420 }
421
422 /** \ingroup msg_gos_functions
423  *
424  * \brief Return the the last value returned by a MSG function (except
425  * MSG_get_errno...).
426  */
427 MSG_error_t MSG_get_errno(void)
428 {
429   return PROCESS_GET_ERRNO();
430 }