Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2bb88bf9af306729632e961fcb4435694aa02d26
[simgrid.git] / src / msg / gos.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
12                                 "Logging specific to MSG (gos)");
13
14 /** \ingroup msg_gos_functions
15  * \brief This function is now deprecated and useless. Please stop using it.
16  */
17 MSG_error_t MSG_process_start(m_process_t process)
18 {
19   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
20   
21   return MSG_OK;
22 }
23
24 /** \ingroup msg_gos_functions
25  * \brief Listen on a channel and wait for receiving a task.
26  *
27  * It takes two parameter.
28  * \param task a memory location for storing a #m_task_t. It will
29    hold a task when this function will return. Thus \a task should not
30    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
31    those two condition does not hold, there will be a warning message.
32  * \param channel the channel on which the agent should be
33    listening. This value has to be >=0 and < than the maximal
34    number of channels fixed with MSG_set_channel_number().
35  * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
36  * if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
37  */
38 MSG_error_t MSG_task_get(m_task_t * task,
39                          m_channel_t channel)
40 {
41   m_process_t process = MSG_process_self();
42   m_task_t t = NULL;
43   m_host_t h = NULL;
44   simdata_task_t t_simdata = NULL;
45   simdata_host_t h_simdata = NULL;
46   int warning = 0;
47   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
48   
49   CHECK_HOST();
50   /* Sanity check */
51   xbt_assert0(task,"Null pointer for the task\n");
52
53   if (*task) 
54     CRITICAL0("MSG_task_get() was asked to write in a non empty task struct.");
55
56   /* Get the task */
57   h = MSG_host_self();
58   h_simdata = h->simdata;
59   while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
60     xbt_assert2(!(h_simdata->sleeping[channel]),
61                 "A process (%s(%d)) is already blocked on this channel",
62                 h_simdata->sleeping[channel]->name,
63                 h_simdata->sleeping[channel]->simdata->PID);
64     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
65     __MSG_process_block();
66     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
67        == SURF_CPU_OFF)
68       MSG_RETURN(MSG_HOST_FAILURE);
69     h_simdata->sleeping[channel] = NULL;
70     /* OK, we should both be ready now. Are you there ? */
71   }
72
73   t_simdata = t->simdata;
74   /*   *task = __MSG_task_copy(t); */
75   *task=t;
76
77   /* Transfer */
78   t_simdata->using++;
79
80   t_simdata->comm = surf_workstation_resource->extension_public->
81     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
82                 h->simdata->host, t_simdata->message_size,t_simdata->rate);
83   
84   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
85
86   if(__MSG_process_isBlocked(t_simdata->sender)) 
87     __MSG_process_unblock(t_simdata->sender);
88
89   PAJE_PROCESS_PUSH_STATE(process,"C");  
90
91   do {
92     __MSG_task_wait_event(process, t);
93     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
94   } while (state==SURF_ACTION_RUNNING);
95
96   if(t->simdata->using>1) {
97     xbt_fifo_unshift(msg_global->process_to_run,process);
98     xbt_context_yield();
99   }
100
101   PAJE_PROCESS_POP_STATE(process);  
102   PAJE_COMM_STOP(process,t,channel);
103
104   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
105   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
106           == SURF_CPU_OFF)
107     MSG_RETURN(MSG_HOST_FAILURE);
108   else MSG_RETURN(MSG_TRANSFER_FAILURE);
109 }
110
111 /** \ingroup msg_gos_functions
112  * \brief Test whether there is a pending communication on a channel.
113  *
114  * It takes one parameter.
115  * \param channel the channel on which the agent should be
116    listening. This value has to be >=0 and < than the maximal
117    number of channels fixed with MSG_set_channel_number().
118  * \return 1 if there is a pending communication and 0 otherwise
119  */
120 int MSG_task_Iprobe(m_channel_t channel)
121 {
122   m_host_t h = NULL;
123   simdata_host_t h_simdata = NULL;
124
125   CHECK_HOST();
126   h = MSG_host_self();
127   h_simdata = h->simdata;
128   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
129 }
130
131 /** \ingroup msg_gos_functions
132  * \brief Test whether there is a pending communication on a channel, and who sent it.
133  *
134  * It takes one parameter.
135  * \param channel the channel on which the agent should be
136    listening. This value has to be >=0 and < than the maximal
137    number of channels fixed with MSG_set_channel_number().
138  * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
139  */
140 int MSG_task_probe_from(m_channel_t channel)
141 {
142   m_host_t h = NULL;
143   simdata_host_t h_simdata = NULL;
144   xbt_fifo_item_t item;
145   m_task_t t;
146
147   CHECK_HOST();
148   h = MSG_host_self();
149   h_simdata = h->simdata;
150    
151   item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
152   if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
153     return -1;
154    
155   return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
156 }
157
158 /** \ingroup msg_gos_functions
159  * \brief Put a task on a channel of an host and waits for the end of the
160  * transmission.
161  *
162  * This function is used for describing the behavior of an agent. It
163  * takes three parameter.
164  * \param task a #m_task_t to send on another location. This task
165    will not be usable anymore when the function will return. There is
166    no automatic task duplication and you have to save your parameters
167    before calling this function. Tasks are unique and once it has been
168    sent to another location, you should not access it anymore. You do
169    not need to call MSG_task_destroy() but to avoid using, as an
170    effect of inattention, this task anymore, you definitely should
171    renitialize it with #MSG_TASK_UNINITIALIZED. Note that this task
172    can be transfered iff it has been correctly created with
173    MSG_task_create().
174  * \param dest the destination of the message
175  * \param channel the channel on which the agent should put this
176    task. This value has to be >=0 and < than the maximal number of
177    channels fixed with MSG_set_channel_number().
178  * \return #MSG_FATAL if \a task is not properly initialized and
179  * #MSG_OK otherwise.
180  */
181 MSG_error_t MSG_task_put(m_task_t task,
182                          m_host_t dest, m_channel_t channel)
183 {
184   m_process_t process = MSG_process_self();
185   simdata_task_t task_simdata = NULL;
186   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
187   m_host_t local_host = NULL;
188   m_host_t remote_host = NULL;
189
190   CHECK_HOST();
191
192   task_simdata = task->simdata;
193   task_simdata->sender = process;
194   xbt_assert0(task_simdata->using==1,"Gargl!");
195   task_simdata->comm = NULL;
196   
197   local_host = ((simdata_process_t) process->simdata)->host;
198   remote_host = dest;
199
200   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
201                 mbox[channel], task);
202
203   PAJE_COMM_START(process,task,channel);
204     
205   if(remote_host->simdata->sleeping[channel]) 
206     __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
207
208   process->simdata->put_host = dest;
209   process->simdata->put_channel = channel;
210   while(!(task_simdata->comm)) 
211     __MSG_process_block();
212   process->simdata->put_host = NULL;
213   process->simdata->put_channel = -1;
214
215
216   PAJE_PROCESS_PUSH_STATE(process,"C");  
217
218   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
219   while (state==SURF_ACTION_RUNNING) {
220     __MSG_task_wait_event(process, task);
221     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
222   }
223     
224   MSG_task_destroy(task);
225
226   PAJE_PROCESS_POP_STATE(process);  
227
228   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
229   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
230           == SURF_CPU_OFF)
231     MSG_RETURN(MSG_HOST_FAILURE);
232   else MSG_RETURN(MSG_TRANSFER_FAILURE);
233 }
234
235 /** \ingroup msg_gos_functions
236  * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
237  * rate.
238  *
239  * \sa MSG_task_put
240  */
241 MSG_error_t MSG_task_put_bounded(m_task_t task,
242                                  m_host_t dest, m_channel_t channel,
243                                  long double max_rate)
244 {
245   task->simdata->rate=max_rate;
246   return(MSG_task_put(task, dest, channel));
247   task->simdata->rate=-1.0;
248 }
249
250 /** \ingroup msg_gos_functions
251  * \brief Executes a task and waits for its termination.
252  *
253  * This function is used for describing the behavior of an agent. It
254  * takes only one parameter.
255  * \param task a #m_task_t to execute on the location on which the
256    agent is running.
257  * \return #MSG_FATAL if \a task is not properly initialized and
258  * #MSG_OK otherwise.
259  */
260 MSG_error_t MSG_task_execute(m_task_t task)
261 {
262   m_process_t process = MSG_process_self();
263   MSG_error_t res;
264   __MSG_task_execute(process, task);
265
266   PAJE_PROCESS_PUSH_STATE(process,"E");  
267   res = __MSG_wait_for_computation(process,task);
268   PAJE_PROCESS_POP_STATE(process);
269   return res;
270 }
271
272 void __MSG_task_execute(m_process_t process, m_task_t task)
273 {
274   simdata_task_t simdata = NULL;
275
276   CHECK_HOST();
277
278   simdata = task->simdata;
279
280   simdata->compute = surf_workstation_resource->extension_public->
281     execute(MSG_process_get_host(process)->simdata->host,
282             simdata->computation_amount);
283   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
284 }
285
286 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
287 {
288   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
289   simdata_task_t simdata = task->simdata;
290
291   simdata->using++;
292   do {
293     __MSG_task_wait_event(process, task);
294     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
295   } while (state==SURF_ACTION_RUNNING);
296   simdata->using--;
297     
298
299   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
300   else if(surf_workstation_resource->extension_public->
301           get_state(MSG_process_get_host(process)->simdata->host) 
302           == SURF_CPU_OFF)
303     MSG_RETURN(MSG_HOST_FAILURE);
304   else MSG_RETURN(MSG_TRANSFER_FAILURE);
305 }
306
307 /** \ingroup msg_gos_functions
308  * \brief Sleep for the specified number of seconds
309  *
310  * Makes the current process sleep until \a time seconds have elapsed.
311  *
312  * \param nb_sec a number of second
313  */
314 MSG_error_t MSG_process_sleep(long double nb_sec)
315 {
316   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
317   m_process_t process = MSG_process_self();
318   m_task_t dummy = NULL;
319   simdata_task_t simdata = NULL;
320
321   CHECK_HOST();
322   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);
323   simdata = dummy->simdata;
324
325   simdata->compute = surf_workstation_resource->extension_public->
326     sleep(MSG_process_get_host(process)->simdata->host,
327             simdata->computation_amount);
328   surf_workstation_resource->common_public->action_set_data(simdata->compute,dummy);
329
330   
331   simdata->using++;
332   do {
333     __MSG_task_wait_event(process, dummy);
334     state=surf_workstation_resource->common_public->action_get_state(simdata->compute);
335   } while (state==SURF_ACTION_RUNNING);
336   simdata->using--;
337     
338   if(state == SURF_ACTION_DONE) {
339     if(surf_workstation_resource->extension_public->
340        get_state(MSG_process_get_host(process)->simdata->host) 
341        == SURF_CPU_OFF)
342       MSG_RETURN(MSG_HOST_FAILURE);
343
344     if(__MSG_process_isBlocked(process)) {
345       __MSG_process_unblock(MSG_process_self());
346     }
347     if(surf_workstation_resource->extension_public->
348        get_state(MSG_process_get_host(process)->simdata->host) 
349        == SURF_CPU_OFF)
350       MSG_RETURN(MSG_HOST_FAILURE);
351     MSG_task_destroy(dummy);
352     MSG_RETURN(MSG_OK);
353   } else MSG_RETURN(MSG_HOST_FAILURE);
354 }
355
356 /** \ingroup msg_gos_functions
357  * \brief Return the number of MSG tasks currently running on a
358  * the host of the current running process.
359  */
360 int MSG_get_msgload(void) 
361 {
362   CHECK_HOST();
363   xbt_assert0(0,"Not implemented yet!");
364   
365   return 1;
366 }
367
368 /** \ingroup msg_gos_functions
369  *
370  * \brief Return the the last value returned by a MSG function (except
371  * MSG_get_errno...).
372  */
373 MSG_error_t MSG_get_errno(void)
374 {
375   return PROCESS_GET_ERRNO();
376 }