Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3ecbc09eecb8b822d4e7fb857547d02983bc534d
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /** \defgroup m_process_management Management Functions of Agents
15  *  \brief This section describes the agent structure of MSG
16  *  (#m_process_t) and the functions for managing it.
17  *    \htmlonly <!-- DOXYGEN_NAVBAR_LABEL="Agents" --> \endhtmlonly
18  * 
19  *  We need to simulate many independent scheduling decisions, so
20  *  the concept of <em>process</em> is at the heart of the
21  *  simulator. A process may be defined as a <em>code</em>, with
22  *  some <em>private data</em>, executing in a <em>location</em>.
23  *  \see m_process_t
24  */
25
26 /******************************** Process ************************************/
27 /** \ingroup m_process_management
28  * \brief Creates and runs a new #m_process_t.
29  *
30  * Does exactly the same as #MSG_process_create_with_arguments but without 
31    providing standard arguments (\a argc, \a argv, \a start_time, \a kill_time).
32  * \sa MSG_process_create_with_arguments
33  */
34 m_process_t MSG_process_create(const char *name,
35                                m_process_code_t code, void *data,
36                                m_host_t host)
37 {
38   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
39 }
40
41 static void MSG_process_cleanup(void *arg)
42 {
43
44   while(((m_process_t)arg)->simdata->paje_state) {
45     PAJE_PROCESS_POP_STATE((m_process_t)arg);
46   }
47
48   PAJE_PROCESS_FREE(arg);
49
50   xbt_fifo_remove(msg_global->process_list, arg);
51   xbt_fifo_remove(msg_global->process_to_run, arg);
52   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
53   free(((m_process_t) arg)->name);
54   ((m_process_t) arg)->name = NULL;
55   free(((m_process_t) arg)->simdata);
56   ((m_process_t) arg)->simdata = NULL;
57   free(arg);
58 }
59
60 /** \ingroup m_process_management
61  * \brief Creates and runs a new #m_process_t.
62
63  * A constructor for #m_process_t taking four arguments and returning the 
64  * corresponding object. The structure (and the corresponding thread) is
65  * created, and put in the list of ready process.
66  * \param name a name for the object. It is for user-level information
67    and can be NULL.
68  * \param code is a function describing the behavior of the agent. It
69    should then only use functions described in \ref
70    m_process_management (to create a new #m_process_t for example),
71    in \ref m_host_management (only the read-only functions i.e. whose
72    name contains the word get), in \ref m_task_management (to create
73    or destroy some #m_task_t for example) and in \ref
74    msg_gos_functions (to handle file transfers and task processing).
75  * \param data a pointer to any data one may want to attach to the new
76    object.  It is for user-level information and can be NULL. It can
77    be retrieved with the function \ref MSG_process_get_data.
78  * \param host the location where the new agent is executed.
79  * \param argc first argument passed to \a code
80  * \param argv second argument passed to \a code
81  * \see m_process_t
82  * \return The new corresponding object.
83  */
84 m_process_t MSG_process_create_with_arguments(const char *name,
85                                               m_process_code_t code, void *data,
86                                               m_host_t host, int argc, char **argv)
87 {
88   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
89   m_process_t process = xbt_new0(s_m_process_t,1);
90   m_process_t self = NULL;
91
92   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
93   /* Simulator Data */
94
95   simdata->PID = msg_global->PID++;
96   simdata->host = host;
97   simdata->waiting_task = NULL;
98   simdata->argc = argc;
99   simdata->argv = argv;
100   simdata->context = xbt_context_new(code, NULL, NULL, 
101                                      MSG_process_cleanup, process, 
102                                      simdata->argc, simdata->argv);
103
104   if((self=msg_global->current_process)) {
105     simdata->PPID = MSG_process_get_PID(self);
106   } else {
107     simdata->PPID = -1;
108   }
109   simdata->last_errno=MSG_OK;
110
111
112   /* Process structure */
113   process->name = xbt_strdup(name);
114   process->simdata = simdata;
115   process->data = data;
116
117   xbt_fifo_unshift(host->simdata->process_list, process);
118
119   /* *************** FIX du current_process !!! *************** */
120   self = msg_global->current_process;
121   xbt_context_start(process->simdata->context);
122   msg_global->current_process = self;
123
124   xbt_fifo_unshift(msg_global->process_list, process);
125   DEBUG2("Inserting %s(%s) in the to_run list",process->name,
126          host->name);
127   xbt_fifo_unshift(msg_global->process_to_run, process);
128
129   PAJE_PROCESS_NEW(process);
130
131   return process;
132 }
133
134 /** \ingroup m_process_management
135  * \param process poor victim
136  *
137  * This function simply kills a \a process... scarry isn't it ? :)
138  */
139 void MSG_process_kill(m_process_t process)
140 {
141   int i;
142   simdata_process_t p_simdata = process->simdata;
143   simdata_host_t h_simdata= p_simdata->host->simdata;
144   int _cursor;
145   m_process_t proc = NULL;
146
147 /*   fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
148 /*        p_simdata->PID,p_simdata->host->name); */
149   
150   for (i=0; i<msg_global->max_channel; i++) {
151     if (h_simdata->sleeping[i] == process) {
152       h_simdata->sleeping[i] = NULL;
153       break;
154     }
155   }
156   if (i==msg_global->max_channel) {
157     if(p_simdata->waiting_task) {
158       xbt_dynar_foreach(p_simdata->waiting_task->simdata->sleeping,_cursor,proc) {
159         if(proc==process) 
160           xbt_dynar_remove_at(p_simdata->waiting_task->simdata->sleeping,_cursor,&proc);
161       }
162       if(p_simdata->waiting_task->simdata->compute)
163         surf_workstation_resource->common_public->
164           action_free(p_simdata->waiting_task->simdata->compute);
165       else if (p_simdata->waiting_task->simdata->comm) {
166         surf_workstation_resource->common_public->
167           action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
168         surf_workstation_resource->common_public->
169           action_free(p_simdata->waiting_task->simdata->comm);
170       } else 
171         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
172     } else { /* Must be trying to put a task somewhere */
173       if(process==MSG_process_self()) {
174         return;
175       } else {
176         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
177       }
178     }
179   }
180
181   xbt_fifo_remove(msg_global->process_to_run,process);
182   xbt_fifo_remove(msg_global->process_list,process);
183   xbt_context_free(process->simdata->context);
184 }
185
186 /** \ingroup m_process_management
187  * \brief Migrates an agent to another location.
188  *
189  * This functions checks whether \a process and \a host are valid pointers
190    and change the value of the #m_host_t on which \a process is running.
191  */
192 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
193 {
194   simdata_process_t simdata = NULL;
195
196   /* Sanity check */
197
198   xbt_assert0(((process) && (process->simdata)
199           && (host)), "Invalid parameters");
200   simdata = process->simdata;
201
202   xbt_fifo_remove(simdata->host->simdata->process_list,process);
203   simdata->host = host;
204   xbt_fifo_unshift(host->simdata->process_list,process);
205
206   return MSG_OK;
207 }
208
209 /** \ingroup m_process_management
210  * \brief Return the user data of a #m_process_t.
211  *
212  * This functions checks whether \a process is a valid pointer or not 
213    and return the user data associated to \a process if it is possible.
214  */
215 void *MSG_process_get_data(m_process_t process)
216 {
217   xbt_assert0((process != NULL), "Invalid parameters");
218
219   return (process->data);
220 }
221
222 /** \ingroup m_process_management
223  * \brief Set the user data of a #m_process_t.
224  *
225  * This functions checks whether \a process is a valid pointer or not 
226    and set the user data associated to \a process if it is possible.
227  */
228 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
229 {
230   xbt_assert0((process != NULL), "Invalid parameters");
231   xbt_assert0((process->data == NULL), "Data already set");
232   
233   process->data = data;
234    
235   return MSG_OK;
236 }
237
238 /** \ingroup m_process_management
239  * \brief Return the location on which an agent is running.
240  *
241  * This functions checks whether \a process is a valid pointer or not 
242    and return the m_host_t corresponding to the location on which \a 
243    process is running.
244  */
245 m_host_t MSG_process_get_host(m_process_t process)
246 {
247   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
248
249   return (((simdata_process_t) process->simdata)->host);
250 }
251
252 /** \ingroup m_process_management
253  *
254  * \brief Return a #m_process_t given its PID.
255  *
256  * This functions search in the list of all the created m_process_t for a m_process_t 
257    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
258    Note that the PID are uniq in the whole simulation, not only on a given host.
259  */
260 m_process_t MSG_process_from_PID(int PID)
261 {
262   xbt_fifo_item_t i = NULL;
263   m_process_t process = NULL;
264
265   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
266     if(MSG_process_get_PID(process) == PID) return process;
267   }
268   return NULL;
269 }
270
271 /** \ingroup m_process_management
272  * \brief Returns the process ID of \a process.
273  *
274  * This functions checks whether \a process is a valid pointer or not 
275    and return its PID.
276  */
277 int MSG_process_get_PID(m_process_t process)
278 {
279   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
280
281   return (((simdata_process_t) process->simdata)->PID);
282 }
283
284 /** \ingroup m_process_management
285  * \brief Returns the process ID of the parent of \a process.
286  *
287  * This functions checks whether \a process is a valid pointer or not 
288    and return its PID. Returns -1 if the agent has not been created by 
289    another agent.
290  */
291 int MSG_process_get_PPID(m_process_t process)
292 {
293   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
294
295   return (((simdata_process_t) process->simdata)->PPID);
296 }
297
298 /** \ingroup m_process_management
299  * \brief Return the name of an agent.
300  *
301  * This functions checks whether \a process is a valid pointer or not 
302    and return its name.
303  */
304 const char *MSG_process_get_name(m_process_t process)
305 {
306   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
307
308   return (process->name);
309 }
310
311 /** \ingroup m_process_management
312  * \brief Return the PID of the current agent.
313  *
314  * This functions returns the PID of the currently running #m_process_t.
315  */
316 int MSG_process_self_PID(void)
317 {
318   return (MSG_process_get_PID(MSG_process_self()));
319 }
320
321 /** \ingroup m_process_management
322  * \brief Return the PPID of the current agent.
323  *
324  * This functions returns the PID of the parent of the currently
325  * running #m_process_t.
326  */
327 int MSG_process_self_PPID(void)
328 {
329   return (MSG_process_get_PPID(MSG_process_self()));
330 }
331
332 /** \ingroup m_process_management
333  * \brief Return the current agent.
334  *
335  * This functions returns the currently running #m_process_t.
336  */
337 m_process_t MSG_process_self(void)
338 {
339   return msg_global ? msg_global->current_process : NULL;
340 }
341
342 /** \ingroup m_process_management
343  * \brief Suspend the process.
344  *
345  * This functions suspend the process by suspending the task on which
346  * it was waiting for the completion.
347  */
348 MSG_error_t MSG_process_suspend(m_process_t process)
349 {
350   simdata_process_t simdata = NULL;
351   simdata_task_t simdata_task = NULL;
352
353   XBT_IN2("(%p(%s))", process, process->name);
354
355   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
356
357   PAJE_PROCESS_PUSH_STATE(process,"S",NULL);
358
359   if(process!=MSG_process_self()) {
360     simdata = process->simdata;
361     
362     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
363
364     simdata_task = simdata->waiting_task->simdata;
365
366     simdata->suspended = 1;
367     if(simdata->blocked) {
368       XBT_OUT;
369       return MSG_OK;
370     }
371
372     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
373                 !((simdata_task->compute)&&(simdata_task->comm)),
374                 "Got a problem in deciding which action to choose !");
375     simdata->suspended = 1;
376     if(simdata_task->compute) 
377       surf_workstation_resource->common_public->suspend(simdata_task->compute);
378     else
379       surf_workstation_resource->common_public->suspend(simdata_task->comm);
380   } else {
381     m_task_t dummy = MSG_TASK_UNINITIALIZED;
382     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
383
384     simdata = process->simdata;
385     simdata->suspended = 1;
386     __MSG_task_execute(process,dummy);
387     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
388     __MSG_wait_for_computation(process,dummy);
389     simdata->suspended = 0;
390
391     MSG_task_destroy(dummy);
392   }
393   XBT_OUT;
394   return MSG_OK;
395 }
396
397 /** \ingroup m_process_management
398  * \brief Resume a suspended process.
399  *
400  * This functions resume a suspended process by resuming the task on
401  * which it was waiting for the completion.
402  */
403 MSG_error_t MSG_process_resume(m_process_t process)
404 {
405   simdata_process_t simdata = NULL;
406   simdata_task_t simdata_task = NULL;
407
408   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
409   CHECK_HOST();
410
411   XBT_IN2("(%p(%s))", process, process->name);
412
413   if(process == MSG_process_self()) {
414     XBT_OUT;
415     MSG_RETURN(MSG_OK);
416   }
417
418   simdata = process->simdata;
419
420   if(simdata->blocked) {
421     PAJE_PROCESS_POP_STATE(process);
422
423     simdata->suspended = 0; /* He'll wake up by itself */
424     XBT_OUT;
425     MSG_RETURN(MSG_OK);
426   }
427
428   if(!(simdata->waiting_task)) {
429     xbt_assert0(0,"Process not waiting for anything else. Weird !");
430     XBT_OUT;
431     return MSG_WARNING;
432   }
433   simdata_task = simdata->waiting_task->simdata;
434
435
436   if(simdata_task->compute) {
437     surf_workstation_resource->common_public->resume(simdata_task->compute);
438     PAJE_PROCESS_POP_STATE(process);
439   }
440   else {
441     PAJE_PROCESS_POP_STATE(process);
442     surf_workstation_resource->common_public->resume(simdata_task->comm);
443   }
444
445   XBT_OUT;
446   MSG_RETURN(MSG_OK);
447 }
448
449 /** \ingroup m_process_management
450  * \brief Returns true if the process is suspended .
451  *
452  * This checks whether a process is suspended or not by inspecting the
453  * task on which it was waiting for the completion.
454  */
455 int MSG_process_is_suspended(m_process_t process)
456 {
457   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
458
459   return (process->simdata->suspended);
460 }
461
462 int __MSG_process_block(double max_duration, const char *info)
463 {
464   m_process_t process = MSG_process_self();
465   m_task_t dummy = MSG_TASK_UNINITIALIZED;
466   char blocked_name[512];
467   snprintf(blocked_name,512,"blocked [%s] (%s:%s)",
468           info, process->name, process->simdata->host->name);
469
470   XBT_IN1(": max_duration=%g",max_duration);
471
472   dummy = MSG_task_create(blocked_name, 0.0, 0, NULL);
473   
474   PAJE_PROCESS_PUSH_STATE(process,"B",NULL);
475
476   process->simdata->blocked=1;
477   __MSG_task_execute(process,dummy);
478   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
479   if(max_duration>=0)
480     surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
481                                                                max_duration);
482   __MSG_wait_for_computation(process,dummy);
483   MSG_task_destroy(dummy);
484   process->simdata->blocked=0;
485
486   if(process->simdata->suspended) {
487     DEBUG0("I've been suspended in the meantime");    
488     MSG_process_suspend(process);
489     DEBUG0("I've been resumed, let's keep going");    
490   }
491
492   XBT_OUT;
493   return 1;
494 }
495
496 MSG_error_t __MSG_process_unblock(m_process_t process)
497 {
498   simdata_process_t simdata = NULL;
499   simdata_task_t simdata_task = NULL;
500
501   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
502   CHECK_HOST();
503
504   XBT_IN2(": %s unblocking %s", MSG_process_self()->name,process->name);
505
506   simdata = process->simdata;
507   if(!(simdata->waiting_task)) {
508     xbt_assert0(0,"Process not waiting for anything else. Weird !");
509     XBT_OUT;
510     return MSG_WARNING;
511   }
512   simdata_task = simdata->waiting_task->simdata;
513
514   xbt_assert0(simdata->blocked,"Process not blocked");
515
516   surf_workstation_resource->common_public->resume(simdata_task->compute);
517
518   PAJE_PROCESS_POP_STATE(process);
519
520   XBT_OUT;
521
522   MSG_RETURN(MSG_OK);
523 }
524
525 int __MSG_process_isBlocked(m_process_t process)
526 {
527   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
528
529   return (process->simdata->blocked);
530 }