Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ease the reduction of the number of tests, do reduce the number of tests so that...
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17 MSG_error_t MSG_process_start(m_process_t process)
18 {
19   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
20   
21   return MSG_OK;
22 }
23
24 /** \ingroup msg_gos_functions
25  * \brief Listen on a channel and wait for receiving a task.
26  *
27  * It takes two parameter.
28  * \param task a memory location for storing a #m_task_t. It will
29    hold a task when this function will return. Thus \a task should not
30    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31    those two condition does not hold, there will be a warning message.
32  * \param channel the channel on which the agent should be
33    listening. This value has to be >=0 and < than the maximal
34    number of channels fixed with MSG_set_channel_number().
35  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_get(m_task_t * task,
39                          m_channel_t channel)
40 {
41   m_process_t process = MSG_process_self();
42   m_task_t t = NULL;
43   m_host_t h = NULL;
44   simdata_task_t t_simdata = NULL;
45   simdata_host_t h_simdata = NULL;
46   int warning = 0;
47   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
48   
49   CHECK_HOST();
50   /* Sanity check */
51   xbt_assert0(task,"Null pointer for the task\n");
52
53   if (*task) 
54     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
55
56   /* Get the task */
57   h = MSG_host_self();
58   h_simdata = h->simdata;
59   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60     xbt_assert2(!(h_simdata->sleeping[channel]),
61                 "A process (%s(%d)) is already blocked on this channel",
62                 h_simdata->sleeping[channel]->name,
63                 h_simdata->sleeping[channel]->simdata->PID);
64     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65     __MSG_process_block();
66     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
67        == SURF_CPU_OFF)
68       MSG_RETURN(MSG_HOST_FAILURE);
69     h_simdata->sleeping[channel] = NULL;
70     /* OK, we should both be ready now. Are you there ? */
71   }
72
73   t_simdata = t->simdata;
74   /*   *task = __MSG_task_copy(t); */
75   *task=t;
76
77   /* Transfer */
78   t_simdata->using++;
79
80   t_simdata->comm = surf_workstation_resource->extension_public->
81     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
83   
84   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
85
86   if(__MSG_process_isBlocked(t_simdata->sender)) 
87     __MSG_process_unblock(t_simdata->sender);
88
89   PAJE_PROCESS_STATE(process,"C");  
90
91   do {
92     __MSG_task_wait_event(process, t);
93     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
94   } while (state==SURF_ACTION_RUNNING);
95
96   if(t->simdata->using>1) {
97     xbt_fifo_unshift(msg_global->process_to_run,process);
98     xbt_context_yield();
99   }
100
101   PAJE_COMM_STOP(process,t,channel);
102
103   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
104   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
105           == SURF_CPU_OFF)
106     MSG_RETURN(MSG_HOST_FAILURE);
107   else MSG_RETURN(MSG_TRANSFER_FAILURE);
108 }
109
110 /** \ingroup msg_gos_functions
111  * \brief Test whether there is a pending communication on a channel.
112  *
113  * It takes one parameter.
114  * \param channel the channel on which the agent should be
115    listening. This value has to be >=0 and < than the maximal
116    number of channels fixed with MSG_set_channel_number().
117  * \return 1 if there is a pending communication and 0 otherwise
118  */
119 int MSG_task_Iprobe(m_channel_t channel)
120 {
121   m_host_t h = NULL;
122   simdata_host_t h_simdata = NULL;
123
124   CHECK_HOST();
125   h = MSG_host_self();
126   h_simdata = h->simdata;
127   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
128 }
129
130 /** \ingroup msg_gos_functions
131  * \brief Test whether there is a pending communication on a channel, and who sent it.
132  *
133  * It takes one parameter.
134  * \param channel the channel on which the agent should be
135    listening. This value has to be >=0 and < than the maximal
136    number of channels fixed with MSG_set_channel_number().
137  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
138  */
139 int MSG_task_probe_from(m_channel_t channel)
140 {
141   m_host_t h = NULL;
142   simdata_host_t h_simdata = NULL;
143   xbt_fifo_item_t item;
144   m_task_t t;
145
146   CHECK_HOST();
147   h = MSG_host_self();
148   h_simdata = h->simdata;
149    
150   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
151   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
152     return -1;
153    
154   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
155 }
156
157 /** \ingroup msg_gos_functions
158  * \brief Put a task on a channel of an host and waits for the end of the
159  * transmission.
160  *
161  * This function is used for describing the behavior of an agent. It
162  * takes three parameter.
163  * \param task a #m_task_t to send on another location. This task
164    will not be usable anymore when the function will return. There is
165    no automatic task duplication and you have to save your parameters
166    before calling this function. Tasks are unique and once it has been
167    sent to another location, you should not access it anymore. You do
168    not need to call MSG_task_destroy() but to avoid using, as an
169    effect of inattention, this task anymore, you definitely should
170    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
171    can be transfered iff it has been correctly created with
172    MSG_task_create().
173  * \param dest the destination of the message
174  * \param channel the channel on which the agent should put this
175    task. This value has to be >=0 and < than the maximal number of
176    channels fixed with MSG_set_channel_number().
177  * \return #MSG_FATAL if \a task is not properly initialized and
178  * #MSG_OK otherwise.
179  */
180 MSG_error_t MSG_task_put(m_task_t task,
181                          m_host_t dest, m_channel_t channel)
182 {
183   m_process_t process = MSG_process_self();
184   simdata_task_t task_simdata = NULL;
185   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
186   m_host_t local_host = NULL;
187   m_host_t remote_host = NULL;
188
189   CHECK_HOST();
190
191   task_simdata = task->simdata;
192   task_simdata->sender = process;
193   xbt_assert0(task_simdata->using==1,"Gargl!");
194   task_simdata->comm = NULL;
195   
196   local_host = ((simdata_process_t) process->simdata)->host;
197   remote_host = dest;
198
199   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
200                 mbox[channel], task);
201
202   PAJE_COMM_START(process,task,channel);
203     
204   if(remote_host->simdata->sleeping[channel]) 
205     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
206 /*   else { */
207   process->simdata->put_host = dest;
208   process->simdata->put_channel = channel;
209   while(!(task_simdata->comm)) 
210     __MSG_process_block();
211   process->simdata->put_host = NULL;
212   process->simdata->put_channel = -1;
213 /*   } */
214
215   PAJE_PROCESS_STATE(process,"C");  
216
217   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
218   while (state==SURF_ACTION_RUNNING) {
219     __MSG_task_wait_event(process, task);
220     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
221   }
222     
223   MSG_task_destroy(task);
224
225   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
226   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
227           == SURF_CPU_OFF)
228     MSG_RETURN(MSG_HOST_FAILURE);
229   else MSG_RETURN(MSG_TRANSFER_FAILURE);
230 }
231
232 /** \ingroup msg_gos_functions
233  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
234  * rate.
235  *
236  * \sa MSG_task_put
237  */
238 MSG_error_t MSG_task_put_bounded(m_task_t task,
239                                  m_host_t dest, m_channel_t channel,
240                                  long double max_rate)
241 {
242   task->simdata->rate=max_rate;
243   return(MSG_task_put(task, dest, channel));
244   task->simdata->rate=-1.0;
245 }
246
247 /** \ingroup msg_gos_functions
248  * \brief Executes a task and waits for its termination.
249  *
250  * This function is used for describing the behavior of an agent. It
251  * takes only one parameter.
252  * \param task a #m_task_t to execute on the location on which the
253    agent is running.
254  * \return #MSG_FATAL if \a task is not properly initialized and
255  * #MSG_OK otherwise.
256  */
257 MSG_error_t MSG_task_execute(m_task_t task)
258 {
259   m_process_t process = MSG_process_self();
260
261   __MSG_task_execute(process, task);
262   PAJE_PROCESS_STATE(process,"E");  
263   return __MSG_wait_for_computation(process,task);
264 }
265
266 void __MSG_task_execute(m_process_t process, m_task_t task)
267 {
268   simdata_task_t simdata = NULL;
269
270   CHECK_HOST();
271
272   simdata = task->simdata;
273
274   simdata->compute = surf_workstation_resource->extension_public->
275     execute(MSG_process_get_host(process)->simdata->host,
276             simdata->computation_amount);
277   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
278 }
279
280 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
281 {
282   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
283   simdata_task_t simdata = task->simdata;
284
285   simdata->using++;
286   do {
287     __MSG_task_wait_event(process, task);
288     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
289   } while (state==SURF_ACTION_RUNNING);
290   simdata->using--;
291     
292
293   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
294   else if(surf_workstation_resource->extension_public->
295           get_state(MSG_process_get_host(process)->simdata->host) 
296           == SURF_CPU_OFF)
297     MSG_RETURN(MSG_HOST_FAILURE);
298   else MSG_RETURN(MSG_TRANSFER_FAILURE);
299 }
300
301 /** \ingroup msg_gos_functions
302  * \brief Sleep for the specified number of seconds
303  *
304  * Makes the current process sleep until \a time seconds have elapsed.
305  *
306  * \param nb_sec a number of second
307  */
308 MSG_error_t MSG_process_sleep(long double nb_sec)
309 {
310   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
311   m_process_t process = MSG_process_self();
312   m_task_t dummy = NULL;
313   simdata_task_t simdata = NULL;
314
315   CHECK_HOST();
316   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
317   simdata = dummy->simdata;
318
319   simdata->compute = surf_workstation_resource->extension_public->
320     sleep(MSG_process_get_host(process)->simdata->host,
321             simdata->computation_amount);
322   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
323
324   
325   simdata->using++;
326   do {
327     __MSG_task_wait_event(process, dummy);
328     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
329   } while (state==SURF_ACTION_RUNNING);
330   simdata->using--;
331     
332   if(state == SURF_ACTION_DONE) {
333     if(surf_workstation_resource->extension_public->
334        get_state(MSG_process_get_host(process)->simdata->host) 
335        == SURF_CPU_OFF)
336       MSG_RETURN(MSG_HOST_FAILURE);
337
338     if(__MSG_process_isBlocked(process)) {
339       __MSG_process_unblock(MSG_process_self());
340     }
341     if(surf_workstation_resource->extension_public->
342        get_state(MSG_process_get_host(process)->simdata->host) 
343        == SURF_CPU_OFF)
344       MSG_RETURN(MSG_HOST_FAILURE);
345     MSG_task_destroy(dummy);
346     MSG_RETURN(MSG_OK);
347   } else MSG_RETURN(MSG_HOST_FAILURE);
348 }
349
350 /** \ingroup msg_gos_functions
351  * \brief Return the number of MSG tasks currently running on a
352  * the host of the current running process.
353  */
354 int MSG_get_msgload(void) 
355 {
356   CHECK_HOST();
357   xbt_assert0(0,"Not implemented yet!");
358   
359   return 1;
360 }
361
362 /** \ingroup msg_gos_functions
363  *
364  * \brief Return the the last value returned by a MSG function (except
365  * MSG_get_errno...).
366  */
367 MSG_error_t MSG_get_errno(void)
368 {
369   return PROCESS_GET_ERRNO();
370 }