Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Surf is not broken anymore. Those tests should be run.
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17 MSG_error_t MSG_process_start(m_process_t process)
18 {
19   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
20   
21   return MSG_OK;
22 }
23
24 /** \ingroup msg_gos_functions
25  * \brief Listen on a channel and wait for receiving a task.
26  *
27  * It takes two parameter.
28  * \param task a memory location for storing a #m_task_t. It will
29    hold a task when this function will return. Thus \a task should not
30    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31    those two condition does not hold, there will be a warning message.
32  * \param channel the channel on which the agent should be
33    listening. This value has to be >=0 and < than the maximal
34    number of channels fixed with MSG_set_channel_number().
35  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_get(m_task_t * task,
39                          m_channel_t channel)
40 {
41   m_process_t process = MSG_process_self();
42   m_task_t t = NULL;
43   m_host_t h = NULL;
44   simdata_task_t t_simdata = NULL;
45   simdata_host_t h_simdata = NULL;
46   int warning = 0;
47   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
48   
49   CHECK_HOST();
50   /* Sanity check */
51   xbt_assert0(task,"Null pointer for the task\n");
52
53   if (*task) 
54     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
55
56   /* Get the task */
57   h = MSG_host_self();
58   h_simdata = h->simdata;
59   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60     xbt_assert2(!(h_simdata->sleeping[channel]),
61                 "A process (%s(%d)) is already blocked on this channel",
62                 h_simdata->sleeping[channel]->name,
63                 h_simdata->sleeping[channel]->simdata->PID);
64     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65     __MSG_process_block();
66     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
67        == SURF_CPU_OFF)
68       MSG_RETURN(MSG_HOST_FAILURE);
69     h_simdata->sleeping[channel] = NULL;
70     /* OK, we should both be ready now. Are you there ? */
71   }
72
73   t_simdata = t->simdata;
74   /*   *task = __MSG_task_copy(t); */
75   *task=t;
76
77   /* Transfer */
78   t_simdata->using++;
79
80   t_simdata->comm = surf_workstation_resource->extension_public->
81     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
83   
84   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
85
86   if(__MSG_process_isBlocked(t_simdata->sender)) 
87     __MSG_process_unblock(t_simdata->sender);
88
89   //  PAJE_PROCESS_STATE(process,"C");  
90   PAJE_PROCESS_PUSH_STATE(process,"C");  
91
92   do {
93     __MSG_task_wait_event(process, t);
94     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
95   } while (state==SURF_ACTION_RUNNING);
96
97   if(t->simdata->using>1) {
98     xbt_fifo_unshift(msg_global->process_to_run,process);
99     xbt_context_yield();
100   }
101
102   PAJE_PROCESS_POP_STATE(process);  
103   PAJE_COMM_STOP(process,t,channel);
104
105   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
106   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
107           == SURF_CPU_OFF)
108     MSG_RETURN(MSG_HOST_FAILURE);
109   else MSG_RETURN(MSG_TRANSFER_FAILURE);
110 }
111
112 /** \ingroup msg_gos_functions
113  * \brief Test whether there is a pending communication on a channel.
114  *
115  * It takes one parameter.
116  * \param channel the channel on which the agent should be
117    listening. This value has to be >=0 and < than the maximal
118    number of channels fixed with MSG_set_channel_number().
119  * \return 1 if there is a pending communication and 0 otherwise
120  */
121 int MSG_task_Iprobe(m_channel_t channel)
122 {
123   m_host_t h = NULL;
124   simdata_host_t h_simdata = NULL;
125
126   CHECK_HOST();
127   h = MSG_host_self();
128   h_simdata = h->simdata;
129   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
130 }
131
132 /** \ingroup msg_gos_functions
133  * \brief Test whether there is a pending communication on a channel, and who sent it.
134  *
135  * It takes one parameter.
136  * \param channel the channel on which the agent should be
137    listening. This value has to be >=0 and < than the maximal
138    number of channels fixed with MSG_set_channel_number().
139  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
140  */
141 int MSG_task_probe_from(m_channel_t channel)
142 {
143   m_host_t h = NULL;
144   simdata_host_t h_simdata = NULL;
145   xbt_fifo_item_t item;
146   m_task_t t;
147
148   CHECK_HOST();
149   h = MSG_host_self();
150   h_simdata = h->simdata;
151    
152   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
153   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
154     return -1;
155    
156   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
157 }
158
159 /** \ingroup msg_gos_functions
160  * \brief Put a task on a channel of an host and waits for the end of the
161  * transmission.
162  *
163  * This function is used for describing the behavior of an agent. It
164  * takes three parameter.
165  * \param task a #m_task_t to send on another location. This task
166    will not be usable anymore when the function will return. There is
167    no automatic task duplication and you have to save your parameters
168    before calling this function. Tasks are unique and once it has been
169    sent to another location, you should not access it anymore. You do
170    not need to call MSG_task_destroy() but to avoid using, as an
171    effect of inattention, this task anymore, you definitely should
172    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
173    can be transfered iff it has been correctly created with
174    MSG_task_create().
175  * \param dest the destination of the message
176  * \param channel the channel on which the agent should put this
177    task. This value has to be >=0 and < than the maximal number of
178    channels fixed with MSG_set_channel_number().
179  * \return #MSG_FATAL if \a task is not properly initialized and
180  * #MSG_OK otherwise.
181  */
182 MSG_error_t MSG_task_put(m_task_t task,
183                          m_host_t dest, m_channel_t channel)
184 {
185   m_process_t process = MSG_process_self();
186   simdata_task_t task_simdata = NULL;
187   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
188   m_host_t local_host = NULL;
189   m_host_t remote_host = NULL;
190
191   CHECK_HOST();
192
193   task_simdata = task->simdata;
194   task_simdata->sender = process;
195   xbt_assert0(task_simdata->using==1,"Gargl!");
196   task_simdata->comm = NULL;
197   
198   local_host = ((simdata_process_t) process->simdata)->host;
199   remote_host = dest;
200
201   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
202                 mbox[channel], task);
203
204   PAJE_COMM_START(process,task,channel);
205     
206   if(remote_host->simdata->sleeping[channel]) 
207     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
208 /*   else { */
209   process->simdata->put_host = dest;
210   process->simdata->put_channel = channel;
211   while(!(task_simdata->comm)) 
212     __MSG_process_block();
213   process->simdata->put_host = NULL;
214   process->simdata->put_channel = -1;
215 /*   } */
216
217   // PAJE_PROCESS_STATE(process,"C");  
218   PAJE_PROCESS_PUSH_STATE(process,"C");  
219
220   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
221   while (state==SURF_ACTION_RUNNING) {
222     __MSG_task_wait_event(process, task);
223     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
224   }
225     
226   MSG_task_destroy(task);
227
228   PAJE_PROCESS_POP_STATE(process);  
229
230   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
231   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
232           == SURF_CPU_OFF)
233     MSG_RETURN(MSG_HOST_FAILURE);
234   else MSG_RETURN(MSG_TRANSFER_FAILURE);
235 }
236
237 /** \ingroup msg_gos_functions
238  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
239  * rate.
240  *
241  * \sa MSG_task_put
242  */
243 MSG_error_t MSG_task_put_bounded(m_task_t task,
244                                  m_host_t dest, m_channel_t channel,
245                                  long double max_rate)
246 {
247   task->simdata->rate=max_rate;
248   return(MSG_task_put(task, dest, channel));
249   task->simdata->rate=-1.0;
250 }
251
252 /** \ingroup msg_gos_functions
253  * \brief Executes a task and waits for its termination.
254  *
255  * This function is used for describing the behavior of an agent. It
256  * takes only one parameter.
257  * \param task a #m_task_t to execute on the location on which the
258    agent is running.
259  * \return #MSG_FATAL if \a task is not properly initialized and
260  * #MSG_OK otherwise.
261  */
262 MSG_error_t MSG_task_execute(m_task_t task)
263 {
264   m_process_t process = MSG_process_self();
265   MSG_error_t res;
266   __MSG_task_execute(process, task);
267
268   //  PAJE_PROCESS_STATE(process,"E");  
269   PAJE_PROCESS_PUSH_STATE(process,"E");  
270   res = __MSG_wait_for_computation(process,task);
271   PAJE_PROCESS_POP_STATE(process);
272   return res;
273 }
274
275 void __MSG_task_execute(m_process_t process, m_task_t task)
276 {
277   simdata_task_t simdata = NULL;
278
279   CHECK_HOST();
280
281   simdata = task->simdata;
282
283   simdata->compute = surf_workstation_resource->extension_public->
284     execute(MSG_process_get_host(process)->simdata->host,
285             simdata->computation_amount);
286   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
287 }
288
289 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
290 {
291   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
292   simdata_task_t simdata = task->simdata;
293
294   simdata->using++;
295   do {
296     __MSG_task_wait_event(process, task);
297     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
298   } while (state==SURF_ACTION_RUNNING);
299   simdata->using--;
300     
301
302   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
303   else if(surf_workstation_resource->extension_public->
304           get_state(MSG_process_get_host(process)->simdata->host) 
305           == SURF_CPU_OFF)
306     MSG_RETURN(MSG_HOST_FAILURE);
307   else MSG_RETURN(MSG_TRANSFER_FAILURE);
308 }
309
310 /** \ingroup msg_gos_functions
311  * \brief Sleep for the specified number of seconds
312  *
313  * Makes the current process sleep until \a time seconds have elapsed.
314  *
315  * \param nb_sec a number of second
316  */
317 MSG_error_t MSG_process_sleep(long double nb_sec)
318 {
319   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
320   m_process_t process = MSG_process_self();
321   m_task_t dummy = NULL;
322   simdata_task_t simdata = NULL;
323
324   CHECK_HOST();
325   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
326   simdata = dummy->simdata;
327
328   simdata->compute = surf_workstation_resource->extension_public->
329     sleep(MSG_process_get_host(process)->simdata->host,
330             simdata->computation_amount);
331   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
332
333   
334   simdata->using++;
335   do {
336     __MSG_task_wait_event(process, dummy);
337     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
338   } while (state==SURF_ACTION_RUNNING);
339   simdata->using--;
340     
341   if(state == SURF_ACTION_DONE) {
342     if(surf_workstation_resource->extension_public->
343        get_state(MSG_process_get_host(process)->simdata->host) 
344        == SURF_CPU_OFF)
345       MSG_RETURN(MSG_HOST_FAILURE);
346
347     if(__MSG_process_isBlocked(process)) {
348       __MSG_process_unblock(MSG_process_self());
349     }
350     if(surf_workstation_resource->extension_public->
351        get_state(MSG_process_get_host(process)->simdata->host) 
352        == SURF_CPU_OFF)
353       MSG_RETURN(MSG_HOST_FAILURE);
354     MSG_task_destroy(dummy);
355     MSG_RETURN(MSG_OK);
356   } else MSG_RETURN(MSG_HOST_FAILURE);
357 }
358
359 /** \ingroup msg_gos_functions
360  * \brief Return the number of MSG tasks currently running on a
361  * the host of the current running process.
362  */
363 int MSG_get_msgload(void) 
364 {
365   CHECK_HOST();
366   xbt_assert0(0,"Not implemented yet!");
367   
368   return 1;
369 }
370
371 /** \ingroup msg_gos_functions
372  *
373  * \brief Return the the last value returned by a MSG function (except
374  * MSG_get_errno...).
375  */
376 MSG_error_t MSG_get_errno(void)
377 {
378   return PROCESS_GET_ERRNO();
379 }