Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5f586ccbb5421337afb84a0e92cef79affe52bbd
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include"private.h"
9 #include"xbt/sysdep.h"
10 #include "xbt/error.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(m_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /******************************** Process ************************************/
15 /** \ingroup m_process_management
16  * \brief Creates and runs a new #m_process_t.
17  *
18  * Does exactly the same as #MSG_process_create_with_arguments but without 
19    providing standard arguments (\a argc, \a argv).
20  * \sa MSG_process_create_with_arguments
21  */
22 m_process_t MSG_process_create(const char *name,
23                                m_process_code_t code, void *data,
24                                m_host_t host)
25 {
26   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
27 }
28
29 static void MSG_process_cleanup(void *arg)
30 {
31
32   PAJE_PROCESS_FREE(arg);
33
34   xbt_fifo_remove(msg_global->process_list, arg);
35   xbt_fifo_remove(msg_global->process_to_run, arg);
36   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
37   xbt_free(((m_process_t) arg)->name);
38   xbt_free(((m_process_t) arg)->simdata);
39   xbt_free(arg);
40 }
41
42 /** \ingroup m_process_management
43  * \brief Creates and runs a new #m_process_t.
44
45  * A constructor for #m_process_t taking four arguments and returning the 
46  * corresponding object. The structure (and the corresponding thread) is
47  * created, and put in the list of ready process.
48  * \param name a name for the object. It is for user-level information
49    and can be NULL.
50  * \param code is a function describing the behavior of the agent. It
51    should then only use functions described in \ref
52    m_process_management (to create a new #m_process_t for example),
53    in \ref m_host_management (only the read-only functions i.e. whose
54    name contains the word get), in \ref m_task_management (to create
55    or destroy some #m_task_t for example) and in \ref
56    msg_gos_functions (to handle file transfers and task processing).
57  * \param data a pointer to any data may want to attach to the new
58    object.  It is for user-level information and can be NULL. It can
59    be retrieved with the function \ref MSG_process_get_data.
60  * \param host the location where the new agent is executed.
61  * \param argc first argument passed to \a code
62  * \param argv second argument passed to \a code
63  * \see m_process_t
64  * \return The new corresponding object.
65  */
66 m_process_t MSG_process_create_with_arguments(const char *name,
67                                               m_process_code_t code, void *data,
68                                               m_host_t host, int argc, char **argv)
69 {
70   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
71   m_process_t process = xbt_new0(s_m_process_t,1);
72   m_process_t self = NULL;
73
74   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
75   /* Simulator Data */
76
77   simdata->PID = msg_global->PID++;
78   simdata->host = host;
79   simdata->waiting_task = NULL;
80   simdata->argc = argc;
81   simdata->argv = argv;
82   simdata->context = xbt_context_new(code, NULL, NULL, 
83                                      MSG_process_cleanup, process, 
84                                      simdata->argc, simdata->argv);
85
86   if((self=msg_global->current_process)) {
87     simdata->PPID = MSG_process_get_PID(self);
88   } else {
89     simdata->PPID = -1;
90   }
91   simdata->last_errno=MSG_OK;
92
93
94   /* Process structure */
95   process->name = xbt_strdup(name);
96   process->simdata = simdata;
97   process->data = data;
98
99   xbt_fifo_push(host->simdata->process_list, process);
100
101   /* /////////////// FIX du current_process !!! ////////////// */
102   self = msg_global->current_process;
103   xbt_context_start(process->simdata->context);
104   msg_global->current_process = self;
105
106   xbt_fifo_push(msg_global->process_list, process);
107   xbt_fifo_push(msg_global->process_to_run, process);
108
109   PAJE_PROCESS_NEW(process);
110
111   return process;
112 }
113
114 /** \ingroup m_process_management
115  * \param process poor victim
116  *
117  * This function simply kills a \a process... scarry isn't it ? :)
118  */
119 void MSG_process_kill(m_process_t process)
120 {
121   int i;
122   simdata_process_t p_simdata = process->simdata;
123   simdata_host_t h_simdata= p_simdata->host->simdata;
124
125 /*   fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
126 /*        p_simdata->PID,p_simdata->host->name); */
127   
128   for (i=0; i<msg_global->max_channel; i++) {
129     if (h_simdata->sleeping[i] == process) {
130       h_simdata->sleeping[i] = NULL;
131       break;
132     }
133   }
134   if (i==msg_global->max_channel) {
135     if(p_simdata->waiting_task) {
136       if(p_simdata->waiting_task->simdata->compute)
137         surf_workstation_resource->common_public->
138           action_free(p_simdata->waiting_task->simdata->compute);
139       else if (p_simdata->waiting_task->simdata->comm)
140         surf_workstation_resource->common_public->
141           action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
142       else
143         fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
144     } else { /* Must be trying to put a task somewhere */
145       fprintf(stderr,"UNKNOWN STATUS. Please report this bug.\n");
146     }
147   }
148
149   xbt_fifo_remove(msg_global->process_list,process);
150   xbt_context_free(process->simdata->context);
151 }
152
153 /** \ingroup m_process_management
154  * \brief Migrates an agent to another location.
155  *
156  * This functions checks whether \a process and \a host are valid pointers
157    and change the value of the #m_host_t on which \a process is running.
158  */
159 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
160 {
161   simdata_process_t simdata = NULL;
162
163   /* Sanity check */
164
165   xbt_assert0(((process) && (process->simdata)
166           && (host)), "Invalid parameters");
167   simdata = process->simdata;
168
169   xbt_fifo_remove(simdata->host->simdata->process_list,process);
170   simdata->host = host;
171   xbt_fifo_push(host->simdata->process_list,process);
172
173   return MSG_OK;
174 }
175
176 /** \ingroup m_process_management
177  * \brief Return the user data of a #m_process_t.
178  *
179  * This functions checks whether \a process is a valid pointer or not 
180    and return the user data associated to \a process if it is possible.
181  */
182 void *MSG_process_get_data(m_process_t process)
183 {
184   xbt_assert0((process != NULL), "Invalid parameters");
185
186   return (process->data);
187 }
188
189 /** \ingroup m_process_management
190  * \brief Set the user data of a #m_process_t.
191  *
192  * This functions checks whether \a process is a valid pointer or not 
193    and set the user data associated to \a process if it is possible.
194  */
195 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
196 {
197   xbt_assert0((process != NULL), "Invalid parameters");
198   xbt_assert0((process->data == NULL), "Data already set");
199   
200   process->data = data;
201    
202   return MSG_OK;
203 }
204
205 /** \ingroup m_process_management
206  * \brief Return the location on which an agent is running.
207  *
208  * This functions checks whether \a process is a valid pointer or not 
209    and return the m_host_t corresponding to the location on which \a 
210    process is running.
211  */
212 m_host_t MSG_process_get_host(m_process_t process)
213 {
214   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
215
216   return (((simdata_process_t) process->simdata)->host);
217 }
218
219 /** \ingroup m_process_management
220  *
221  * \brief Return a #m_process_t given its PID.
222  *
223  * This functions search in the list of all the created m_process_t for a m_process_t 
224    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
225    Note that the PID are uniq in the whole simulation, not only on a given host.
226  */
227 m_process_t MSG_process_from_PID(int PID)
228 {
229   xbt_fifo_item_t i = NULL;
230   m_process_t process = NULL;
231
232   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
233     if(MSG_process_get_PID(process) == PID) return process;
234   }
235   return NULL;
236 }
237
238 /** \ingroup m_process_management
239  * \brief Returns the process ID of \a process.
240  *
241  * This functions checks whether \a process is a valid pointer or not 
242    and return its PID.
243  */
244 int MSG_process_get_PID(m_process_t process)
245 {
246   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
247
248   return (((simdata_process_t) process->simdata)->PID);
249 }
250
251 /** \ingroup m_process_management
252  * \brief Returns the process ID of the parent of \a process.
253  *
254  * This functions checks whether \a process is a valid pointer or not 
255    and return its PID. Returns -1 if the agent has not been created by 
256    another agent.
257  */
258 int MSG_process_get_PPID(m_process_t process)
259 {
260   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
261
262   return (((simdata_process_t) process->simdata)->PPID);
263 }
264
265 /** \ingroup m_process_management
266  * \brief Return the name of an agent.
267  *
268  * This functions checks whether \a process is a valid pointer or not 
269    and return its name.
270  */
271 const char *MSG_process_get_name(m_process_t process)
272 {
273   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
274
275   return (process->name);
276 }
277
278 /** \ingroup m_process_management
279  * \brief Return the PID of the current agent.
280  *
281  * This functions returns the PID of the currently running #m_process_t.
282  */
283 int MSG_process_self_PID(void)
284 {
285   return (MSG_process_get_PID(MSG_process_self()));
286 }
287
288 /** \ingroup m_process_management
289  * \brief Return the PPID of the current agent.
290  *
291  * This functions returns the PID of the parent of the currently
292  * running #m_process_t.
293  */
294 int MSG_process_self_PPID(void)
295 {
296   return (MSG_process_get_PPID(MSG_process_self()));
297 }
298
299 /** \ingroup m_process_management
300  * \brief Return the current agent.
301  *
302  * This functions returns the currently running #m_process_t.
303  */
304 m_process_t MSG_process_self(void)
305 {
306   return msg_global->current_process;
307 }
308
309 /** \ingroup m_process_management
310  * \brief Suspend the process.
311  *
312  * This functions suspend the process by suspending the task on which
313  * it was waiting for the completion.
314  */
315 MSG_error_t MSG_process_suspend(m_process_t process)
316 {
317   simdata_process_t simdata = NULL;
318   simdata_task_t simdata_task = NULL;
319   int i;
320
321   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
322
323   //  PAJE_PROCESS_STATE(process,"S");
324   PAJE_PROCESS_PUSH_STATE(process,"S");
325
326   if(process!=MSG_process_self()) {
327     simdata = process->simdata;
328     
329     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
330
331     simdata_task = simdata->waiting_task->simdata;
332
333     simdata->suspended = 1;
334     if(simdata->blocked) return MSG_OK;
335
336     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
337                 !((simdata_task->compute)&&(simdata_task->comm)),
338                 "Got a problem in deciding which action to choose !");
339     simdata->suspended = 1;
340     if(simdata_task->compute) 
341       surf_workstation_resource->common_public->suspend(simdata_task->compute);
342     else
343       surf_workstation_resource->common_public->suspend(simdata_task->comm);
344   } else {
345     m_task_t dummy = MSG_TASK_UNINITIALIZED;
346     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
347
348     simdata = process->simdata;
349     simdata->suspended = 1;
350     __MSG_task_execute(process,dummy);
351     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
352     __MSG_wait_for_computation(process,dummy);
353     simdata->suspended = 0;
354
355     MSG_task_destroy(dummy);
356   }
357   return MSG_OK;
358 }
359
360 /** \ingroup m_process_management
361  * \brief Resume a suspended process.
362  *
363  * This functions resume a suspended process by resuming the task on
364  * which it was waiting for the completion.
365  */
366 MSG_error_t MSG_process_resume(m_process_t process)
367 {
368   simdata_process_t simdata = NULL;
369   simdata_task_t simdata_task = NULL;
370   int i;
371
372   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
373   CHECK_HOST();
374
375   simdata = process->simdata;
376
377
378   if(simdata->blocked) {
379     //    PAJE_PROCESS_STATE(process,"B");
380     PAJE_PROCESS_POP_STATE(process);
381
382     simdata->suspended = 0; /* He'll wake up by itself */
383     MSG_RETURN(MSG_OK);
384   }
385
386   if(!(simdata->waiting_task)) {
387     xbt_assert0(0,"Process not waiting for anything else. Weird !");
388     return MSG_WARNING;
389   }
390   simdata_task = simdata->waiting_task->simdata;
391
392
393   if(simdata_task->compute) {
394     surf_workstation_resource->common_public->resume(simdata_task->compute);
395     PAJE_PROCESS_POP_STATE(process);
396     //    PAJE_PROCESS_STATE(process,"E");
397   }
398   else {
399     //    PAJE_PROCESS_STATE(process,"C");
400     PAJE_PROCESS_POP_STATE(process);
401     surf_workstation_resource->common_public->resume(simdata_task->comm);
402   }
403
404   MSG_RETURN(MSG_OK);
405 }
406
407 /** \ingroup m_process_management
408  * \brief Returns true if the process is suspended .
409  *
410  * This checks whether a process is suspended or not by inspecting the
411  * task on which it was waiting for the completion.
412  */
413 int MSG_process_isSuspended(m_process_t process)
414 {
415   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
416
417   return (process->simdata->suspended);
418 }
419
420 MSG_error_t __MSG_process_block(void)
421 {
422   m_process_t process = MSG_process_self();
423
424   m_task_t dummy = MSG_TASK_UNINITIALIZED;
425   dummy = MSG_task_create("blocked", 0.0, 0, NULL);
426   
427   //  PAJE_PROCESS_STATE(process,"B");
428   PAJE_PROCESS_PUSH_STATE(process,"B");
429
430   process->simdata->blocked=1;
431   __MSG_task_execute(process,dummy);
432   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
433   __MSG_wait_for_computation(process,dummy);
434   process->simdata->blocked=0;
435
436   if(process->simdata->suspended)
437     MSG_process_suspend(process);
438   
439   MSG_task_destroy(dummy);
440
441   return MSG_OK;
442 }
443
444 MSG_error_t __MSG_process_unblock(m_process_t process)
445 {
446   simdata_process_t simdata = NULL;
447   simdata_task_t simdata_task = NULL;
448   int i;
449
450   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
451   CHECK_HOST();
452
453   simdata = process->simdata;
454   if(!(simdata->waiting_task)) {
455     xbt_assert0(0,"Process not waiting for anything else. Weird !");
456     return MSG_WARNING;
457   }
458   simdata_task = simdata->waiting_task->simdata;
459
460   xbt_assert0(simdata->blocked,"Process not blocked");
461
462   surf_workstation_resource->common_public->resume(simdata_task->compute);
463
464   PAJE_PROCESS_POP_STATE(process);
465
466   MSG_RETURN(MSG_OK);
467 }
468
469 int __MSG_process_isBlocked(m_process_t process)
470 {
471   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
472
473   return (process->simdata->blocked);
474 }