Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a new function: MSG_task_get_with_timeout. That should be very convenient to...
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(m_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /** \defgroup m_process_management Management Functions of Agents
15  *  \brief This section describes the agent structure of MSG
16  *  (#m_process_t) and the functions for managing it.
17  *
18  *  We need to simulate many independent scheduling decisions, so
19  *  the concept of <em>process</em> is at the heart of the
20  *  simulator. A process may be defined as a <em>code</em>, with
21  *  some <em>private data</em>, executing in a <em>location</em>.
22  *  \see m_process_t
23  */
24
25 /******************************** Process ************************************/
26 /** \ingroup m_process_management
27  * \brief Creates and runs a new #m_process_t.
28  *
29  * Does exactly the same as #MSG_process_create_with_arguments but without 
30    providing standard arguments (\a argc, \a argv, \a start_time, \a kill_time).
31  * \sa MSG_process_create_with_arguments
32  */
33 m_process_t MSG_process_create(const char *name,
34                                m_process_code_t code, void *data,
35                                m_host_t host)
36 {
37   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
38 }
39
40 static void MSG_process_cleanup(void *arg)
41 {
42
43   while(((m_process_t)arg)->simdata->paje_state) {
44     PAJE_PROCESS_POP_STATE((m_process_t)arg);
45   }
46
47   PAJE_PROCESS_FREE(arg);
48
49   xbt_fifo_remove(msg_global->process_list, arg);
50   xbt_fifo_remove(msg_global->process_to_run, arg);
51   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
52   free(((m_process_t) arg)->name);
53   free(((m_process_t) arg)->simdata);
54   free(arg);
55 }
56
57 /** \ingroup m_process_management
58  * \brief Creates and runs a new #m_process_t.
59
60  * A constructor for #m_process_t taking four arguments and returning the 
61  * corresponding object. The structure (and the corresponding thread) is
62  * created, and put in the list of ready process.
63  * \param name a name for the object. It is for user-level information
64    and can be NULL.
65  * \param code is a function describing the behavior of the agent. It
66    should then only use functions described in \ref
67    m_process_management (to create a new #m_process_t for example),
68    in \ref m_host_management (only the read-only functions i.e. whose
69    name contains the word get), in \ref m_task_management (to create
70    or destroy some #m_task_t for example) and in \ref
71    msg_gos_functions (to handle file transfers and task processing).
72  * \param data a pointer to any data may want to attach to the new
73    object.  It is for user-level information and can be NULL. It can
74    be retrieved with the function \ref MSG_process_get_data.
75  * \param host the location where the new agent is executed.
76  * \param argc first argument passed to \a code
77  * \param argv second argument passed to \a code
78  * \see m_process_t
79  * \return The new corresponding object.
80  */
81 m_process_t MSG_process_create_with_arguments(const char *name,
82                                               m_process_code_t code, void *data,
83                                               m_host_t host, int argc, char **argv)
84 {
85   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
86   m_process_t process = xbt_new0(s_m_process_t,1);
87   m_process_t self = NULL;
88
89   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
90   /* Simulator Data */
91
92   simdata->PID = msg_global->PID++;
93   simdata->host = host;
94   simdata->waiting_task = NULL;
95   simdata->argc = argc;
96   simdata->argv = argv;
97   simdata->context = xbt_context_new(code, NULL, NULL, 
98                                      MSG_process_cleanup, process, 
99                                      simdata->argc, simdata->argv);
100
101   if((self=msg_global->current_process)) {
102     simdata->PPID = MSG_process_get_PID(self);
103   } else {
104     simdata->PPID = -1;
105   }
106   simdata->last_errno=MSG_OK;
107
108
109   /* Process structure */
110   process->name = xbt_strdup(name);
111   process->simdata = simdata;
112   process->data = data;
113
114   xbt_fifo_push(host->simdata->process_list, process);
115
116   /* /////////////// FIX du current_process !!! ////////////// */
117   self = msg_global->current_process;
118   xbt_context_start(process->simdata->context);
119   msg_global->current_process = self;
120
121   xbt_fifo_push(msg_global->process_list, process);
122   xbt_fifo_push(msg_global->process_to_run, process);
123
124   PAJE_PROCESS_NEW(process);
125
126   return process;
127 }
128
129 /** \ingroup m_process_management
130  * \param process poor victim
131  *
132  * This function simply kills a \a process... scarry isn't it ? :)
133  */
134 void MSG_process_kill(m_process_t process)
135 {
136   int i;
137   simdata_process_t p_simdata = process->simdata;
138   simdata_host_t h_simdata= p_simdata->host->simdata;
139   int _cursor;
140   m_process_t proc = NULL;
141
142 /*   fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
143 /*        p_simdata->PID,p_simdata->host->name); */
144   
145   for (i=0; i<msg_global->max_channel; i++) {
146     if (h_simdata->sleeping[i] == process) {
147       h_simdata->sleeping[i] = NULL;
148       break;
149     }
150   }
151   if (i==msg_global->max_channel) {
152     if(p_simdata->waiting_task) {
153       xbt_dynar_foreach(p_simdata->waiting_task->simdata->sleeping,_cursor,proc) {
154         if(proc==process) 
155           xbt_dynar_remove_at(p_simdata->waiting_task->simdata->sleeping,_cursor,&proc);
156       }
157       if(p_simdata->waiting_task->simdata->compute)
158         surf_workstation_resource->common_public->
159           action_free(p_simdata->waiting_task->simdata->compute);
160       else if (p_simdata->waiting_task->simdata->comm) {
161         surf_workstation_resource->common_public->
162           action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
163         surf_workstation_resource->common_public->
164           action_free(p_simdata->waiting_task->simdata->comm);
165       } else 
166         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
167     } else { /* Must be trying to put a task somewhere */
168       if(process==MSG_process_self()) {
169         return;
170       } else {
171         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
172       }
173     }
174   }
175
176   xbt_fifo_remove(msg_global->process_to_run,process);
177   xbt_fifo_remove(msg_global->process_list,process);
178   xbt_context_free(process->simdata->context);
179 }
180
181 /** \ingroup m_process_management
182  * \brief Migrates an agent to another location.
183  *
184  * This functions checks whether \a process and \a host are valid pointers
185    and change the value of the #m_host_t on which \a process is running.
186  */
187 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
188 {
189   simdata_process_t simdata = NULL;
190
191   /* Sanity check */
192
193   xbt_assert0(((process) && (process->simdata)
194           && (host)), "Invalid parameters");
195   simdata = process->simdata;
196
197   xbt_fifo_remove(simdata->host->simdata->process_list,process);
198   simdata->host = host;
199   xbt_fifo_push(host->simdata->process_list,process);
200
201   return MSG_OK;
202 }
203
204 /** \ingroup m_process_management
205  * \brief Return the user data of a #m_process_t.
206  *
207  * This functions checks whether \a process is a valid pointer or not 
208    and return the user data associated to \a process if it is possible.
209  */
210 void *MSG_process_get_data(m_process_t process)
211 {
212   xbt_assert0((process != NULL), "Invalid parameters");
213
214   return (process->data);
215 }
216
217 /** \ingroup m_process_management
218  * \brief Set the user data of a #m_process_t.
219  *
220  * This functions checks whether \a process is a valid pointer or not 
221    and set the user data associated to \a process if it is possible.
222  */
223 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
224 {
225   xbt_assert0((process != NULL), "Invalid parameters");
226   xbt_assert0((process->data == NULL), "Data already set");
227   
228   process->data = data;
229    
230   return MSG_OK;
231 }
232
233 /** \ingroup m_process_management
234  * \brief Return the location on which an agent is running.
235  *
236  * This functions checks whether \a process is a valid pointer or not 
237    and return the m_host_t corresponding to the location on which \a 
238    process is running.
239  */
240 m_host_t MSG_process_get_host(m_process_t process)
241 {
242   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
243
244   return (((simdata_process_t) process->simdata)->host);
245 }
246
247 /** \ingroup m_process_management
248  *
249  * \brief Return a #m_process_t given its PID.
250  *
251  * This functions search in the list of all the created m_process_t for a m_process_t 
252    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
253    Note that the PID are uniq in the whole simulation, not only on a given host.
254  */
255 m_process_t MSG_process_from_PID(int PID)
256 {
257   xbt_fifo_item_t i = NULL;
258   m_process_t process = NULL;
259
260   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
261     if(MSG_process_get_PID(process) == PID) return process;
262   }
263   return NULL;
264 }
265
266 /** \ingroup m_process_management
267  * \brief Returns the process ID of \a process.
268  *
269  * This functions checks whether \a process is a valid pointer or not 
270    and return its PID.
271  */
272 int MSG_process_get_PID(m_process_t process)
273 {
274   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
275
276   return (((simdata_process_t) process->simdata)->PID);
277 }
278
279 /** \ingroup m_process_management
280  * \brief Returns the process ID of the parent of \a process.
281  *
282  * This functions checks whether \a process is a valid pointer or not 
283    and return its PID. Returns -1 if the agent has not been created by 
284    another agent.
285  */
286 int MSG_process_get_PPID(m_process_t process)
287 {
288   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
289
290   return (((simdata_process_t) process->simdata)->PPID);
291 }
292
293 /** \ingroup m_process_management
294  * \brief Return the name of an agent.
295  *
296  * This functions checks whether \a process is a valid pointer or not 
297    and return its name.
298  */
299 const char *MSG_process_get_name(m_process_t process)
300 {
301   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
302
303   return (process->name);
304 }
305
306 /** \ingroup m_process_management
307  * \brief Return the PID of the current agent.
308  *
309  * This functions returns the PID of the currently running #m_process_t.
310  */
311 int MSG_process_self_PID(void)
312 {
313   return (MSG_process_get_PID(MSG_process_self()));
314 }
315
316 /** \ingroup m_process_management
317  * \brief Return the PPID of the current agent.
318  *
319  * This functions returns the PID of the parent of the currently
320  * running #m_process_t.
321  */
322 int MSG_process_self_PPID(void)
323 {
324   return (MSG_process_get_PPID(MSG_process_self()));
325 }
326
327 /** \ingroup m_process_management
328  * \brief Return the current agent.
329  *
330  * This functions returns the currently running #m_process_t.
331  */
332 m_process_t MSG_process_self(void)
333 {
334   return msg_global ? msg_global->current_process : NULL;
335 }
336
337 /** \ingroup m_process_management
338  * \brief Suspend the process.
339  *
340  * This functions suspend the process by suspending the task on which
341  * it was waiting for the completion.
342  */
343 MSG_error_t MSG_process_suspend(m_process_t process)
344 {
345   simdata_process_t simdata = NULL;
346   simdata_task_t simdata_task = NULL;
347   int i;
348
349   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
350
351   PAJE_PROCESS_PUSH_STATE(process,"S");
352
353   if(process!=MSG_process_self()) {
354     simdata = process->simdata;
355     
356     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
357
358     simdata_task = simdata->waiting_task->simdata;
359
360     simdata->suspended = 1;
361     if(simdata->blocked) return MSG_OK;
362
363     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
364                 !((simdata_task->compute)&&(simdata_task->comm)),
365                 "Got a problem in deciding which action to choose !");
366     simdata->suspended = 1;
367     if(simdata_task->compute) 
368       surf_workstation_resource->common_public->suspend(simdata_task->compute);
369     else
370       surf_workstation_resource->common_public->suspend(simdata_task->comm);
371   } else {
372     m_task_t dummy = MSG_TASK_UNINITIALIZED;
373     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
374
375     simdata = process->simdata;
376     simdata->suspended = 1;
377     __MSG_task_execute(process,dummy);
378     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
379     __MSG_wait_for_computation(process,dummy);
380     simdata->suspended = 0;
381
382     MSG_task_destroy(dummy);
383   }
384   return MSG_OK;
385 }
386
387 /** \ingroup m_process_management
388  * \brief Resume a suspended process.
389  *
390  * This functions resume a suspended process by resuming the task on
391  * which it was waiting for the completion.
392  */
393 MSG_error_t MSG_process_resume(m_process_t process)
394 {
395   simdata_process_t simdata = NULL;
396   simdata_task_t simdata_task = NULL;
397   int i;
398
399   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
400   CHECK_HOST();
401
402   simdata = process->simdata;
403
404
405   if(simdata->blocked) {
406     PAJE_PROCESS_POP_STATE(process);
407
408     simdata->suspended = 0; /* He'll wake up by itself */
409     MSG_RETURN(MSG_OK);
410   }
411
412   if(!(simdata->waiting_task)) {
413     xbt_assert0(0,"Process not waiting for anything else. Weird !");
414     return MSG_WARNING;
415   }
416   simdata_task = simdata->waiting_task->simdata;
417
418
419   if(simdata_task->compute) {
420     surf_workstation_resource->common_public->resume(simdata_task->compute);
421     PAJE_PROCESS_POP_STATE(process);
422   }
423   else {
424     PAJE_PROCESS_POP_STATE(process);
425     surf_workstation_resource->common_public->resume(simdata_task->comm);
426   }
427
428   MSG_RETURN(MSG_OK);
429 }
430
431 /** \ingroup m_process_management
432  * \brief Returns true if the process is suspended .
433  *
434  * This checks whether a process is suspended or not by inspecting the
435  * task on which it was waiting for the completion.
436  */
437 int MSG_process_isSuspended(m_process_t process)
438 {
439   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
440
441   return (process->simdata->suspended);
442 }
443
444 int __MSG_process_block(double max_duration)
445 {
446   m_process_t process = MSG_process_self();
447
448   m_task_t dummy = MSG_TASK_UNINITIALIZED;
449   dummy = MSG_task_create("blocked", 0.0, 0, NULL);
450   
451   PAJE_PROCESS_PUSH_STATE(process,"B");
452
453   process->simdata->blocked=1;
454   __MSG_task_execute(process,dummy);
455   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
456   if(max_duration>=0)
457     surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
458                                                                max_duration);
459   __MSG_wait_for_computation(process,dummy);
460   process->simdata->blocked=0;
461
462   if(process->simdata->suspended)
463     MSG_process_suspend(process);
464   
465   MSG_task_destroy(dummy);
466
467   return 1;
468 }
469
470 MSG_error_t __MSG_process_unblock(m_process_t process)
471 {
472   simdata_process_t simdata = NULL;
473   simdata_task_t simdata_task = NULL;
474   int i;
475
476   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
477   CHECK_HOST();
478
479   simdata = process->simdata;
480   if(!(simdata->waiting_task)) {
481     xbt_assert0(0,"Process not waiting for anything else. Weird !");
482     return MSG_WARNING;
483   }
484   simdata_task = simdata->waiting_task->simdata;
485
486   xbt_assert0(simdata->blocked,"Process not blocked");
487
488   surf_workstation_resource->common_public->resume(simdata_task->compute);
489
490   PAJE_PROCESS_POP_STATE(process);
491
492   MSG_RETURN(MSG_OK);
493 }
494
495 int __MSG_process_isBlocked(m_process_t process)
496 {
497   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
498
499   return (process->simdata->blocked);
500 }