Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cf354393b46deca7d0250d54b158179516041ce4
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(m_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /** \defgroup m_process_management Management Functions of Agents
15  *  \brief This section describes the agent structure of MSG
16  *  (#m_process_t) and the functions for managing it.
17  *
18  *  We need to simulate many independent scheduling decisions, so
19  *  the concept of <em>process</em> is at the heart of the
20  *  simulator. A process may be defined as a <em>code</em>, with
21  *  some <em>private data</em>, executing in a <em>location</em>.
22  *  \see m_process_t
23  */
24
25 /******************************** Process ************************************/
26 /** \ingroup m_process_management
27  * \brief Creates and runs a new #m_process_t.
28  *
29  * Does exactly the same as #MSG_process_create_with_arguments but without 
30    providing standard arguments (\a argc, \a argv, \a start_time, \a kill_time).
31  * \sa MSG_process_create_with_arguments
32  */
33 m_process_t MSG_process_create(const char *name,
34                                m_process_code_t code, void *data,
35                                m_host_t host)
36 {
37   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
38 }
39
40 static void MSG_process_cleanup(void *arg)
41 {
42
43   while(((m_process_t)arg)->simdata->paje_state) {
44     PAJE_PROCESS_POP_STATE((m_process_t)arg);
45   }
46
47   PAJE_PROCESS_FREE(arg);
48
49   xbt_fifo_remove(msg_global->process_list, arg);
50   xbt_fifo_remove(msg_global->process_to_run, arg);
51   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
52   free(((m_process_t) arg)->name);
53   ((m_process_t) arg)->name = NULL;
54   free(((m_process_t) arg)->simdata);
55   ((m_process_t) arg)->simdata = NULL;
56   free(arg);
57 }
58
59 /** \ingroup m_process_management
60  * \brief Creates and runs a new #m_process_t.
61
62  * A constructor for #m_process_t taking four arguments and returning the 
63  * corresponding object. The structure (and the corresponding thread) is
64  * created, and put in the list of ready process.
65  * \param name a name for the object. It is for user-level information
66    and can be NULL.
67  * \param code is a function describing the behavior of the agent. It
68    should then only use functions described in \ref
69    m_process_management (to create a new #m_process_t for example),
70    in \ref m_host_management (only the read-only functions i.e. whose
71    name contains the word get), in \ref m_task_management (to create
72    or destroy some #m_task_t for example) and in \ref
73    msg_gos_functions (to handle file transfers and task processing).
74  * \param data a pointer to any data one may want to attach to the new
75    object.  It is for user-level information and can be NULL. It can
76    be retrieved with the function \ref MSG_process_get_data.
77  * \param host the location where the new agent is executed.
78  * \param argc first argument passed to \a code
79  * \param argv second argument passed to \a code
80  * \see m_process_t
81  * \return The new corresponding object.
82  */
83 m_process_t MSG_process_create_with_arguments(const char *name,
84                                               m_process_code_t code, void *data,
85                                               m_host_t host, int argc, char **argv)
86 {
87   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
88   m_process_t process = xbt_new0(s_m_process_t,1);
89   m_process_t self = NULL;
90
91   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
92   /* Simulator Data */
93
94   simdata->PID = msg_global->PID++;
95   simdata->host = host;
96   simdata->waiting_task = NULL;
97   simdata->argc = argc;
98   simdata->argv = argv;
99   simdata->context = xbt_context_new(code, NULL, NULL, 
100                                      MSG_process_cleanup, process, 
101                                      simdata->argc, simdata->argv);
102
103   if((self=msg_global->current_process)) {
104     simdata->PPID = MSG_process_get_PID(self);
105   } else {
106     simdata->PPID = -1;
107   }
108   simdata->last_errno=MSG_OK;
109
110
111   /* Process structure */
112   process->name = xbt_strdup(name);
113   process->simdata = simdata;
114   process->data = data;
115
116   xbt_fifo_push(host->simdata->process_list, process);
117
118   /* /////////////// FIX du current_process !!! ////////////// */
119   self = msg_global->current_process;
120   xbt_context_start(process->simdata->context);
121   msg_global->current_process = self;
122
123   xbt_fifo_push(msg_global->process_list, process);
124   xbt_fifo_push(msg_global->process_to_run, process);
125
126   PAJE_PROCESS_NEW(process);
127
128   return process;
129 }
130
131 /** \ingroup m_process_management
132  * \param process poor victim
133  *
134  * This function simply kills a \a process... scarry isn't it ? :)
135  */
136 void MSG_process_kill(m_process_t process)
137 {
138   int i;
139   simdata_process_t p_simdata = process->simdata;
140   simdata_host_t h_simdata= p_simdata->host->simdata;
141   int _cursor;
142   m_process_t proc = NULL;
143
144 /*   fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
145 /*        p_simdata->PID,p_simdata->host->name); */
146   
147   for (i=0; i<msg_global->max_channel; i++) {
148     if (h_simdata->sleeping[i] == process) {
149       h_simdata->sleeping[i] = NULL;
150       break;
151     }
152   }
153   if (i==msg_global->max_channel) {
154     if(p_simdata->waiting_task) {
155       xbt_dynar_foreach(p_simdata->waiting_task->simdata->sleeping,_cursor,proc) {
156         if(proc==process) 
157           xbt_dynar_remove_at(p_simdata->waiting_task->simdata->sleeping,_cursor,&proc);
158       }
159       if(p_simdata->waiting_task->simdata->compute)
160         surf_workstation_resource->common_public->
161           action_free(p_simdata->waiting_task->simdata->compute);
162       else if (p_simdata->waiting_task->simdata->comm) {
163         surf_workstation_resource->common_public->
164           action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
165         surf_workstation_resource->common_public->
166           action_free(p_simdata->waiting_task->simdata->comm);
167       } else 
168         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
169     } else { /* Must be trying to put a task somewhere */
170       if(process==MSG_process_self()) {
171         return;
172       } else {
173         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
174       }
175     }
176   }
177
178   xbt_fifo_remove(msg_global->process_to_run,process);
179   xbt_fifo_remove(msg_global->process_list,process);
180   xbt_context_free(process->simdata->context);
181 }
182
183 /** \ingroup m_process_management
184  * \brief Migrates an agent to another location.
185  *
186  * This functions checks whether \a process and \a host are valid pointers
187    and change the value of the #m_host_t on which \a process is running.
188  */
189 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
190 {
191   simdata_process_t simdata = NULL;
192
193   /* Sanity check */
194
195   xbt_assert0(((process) && (process->simdata)
196           && (host)), "Invalid parameters");
197   simdata = process->simdata;
198
199   xbt_fifo_remove(simdata->host->simdata->process_list,process);
200   simdata->host = host;
201   xbt_fifo_push(host->simdata->process_list,process);
202
203   return MSG_OK;
204 }
205
206 /** \ingroup m_process_management
207  * \brief Return the user data of a #m_process_t.
208  *
209  * This functions checks whether \a process is a valid pointer or not 
210    and return the user data associated to \a process if it is possible.
211  */
212 void *MSG_process_get_data(m_process_t process)
213 {
214   xbt_assert0((process != NULL), "Invalid parameters");
215
216   return (process->data);
217 }
218
219 /** \ingroup m_process_management
220  * \brief Set the user data of a #m_process_t.
221  *
222  * This functions checks whether \a process is a valid pointer or not 
223    and set the user data associated to \a process if it is possible.
224  */
225 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
226 {
227   xbt_assert0((process != NULL), "Invalid parameters");
228   xbt_assert0((process->data == NULL), "Data already set");
229   
230   process->data = data;
231    
232   return MSG_OK;
233 }
234
235 /** \ingroup m_process_management
236  * \brief Return the location on which an agent is running.
237  *
238  * This functions checks whether \a process is a valid pointer or not 
239    and return the m_host_t corresponding to the location on which \a 
240    process is running.
241  */
242 m_host_t MSG_process_get_host(m_process_t process)
243 {
244   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
245
246   return (((simdata_process_t) process->simdata)->host);
247 }
248
249 /** \ingroup m_process_management
250  *
251  * \brief Return a #m_process_t given its PID.
252  *
253  * This functions search in the list of all the created m_process_t for a m_process_t 
254    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
255    Note that the PID are uniq in the whole simulation, not only on a given host.
256  */
257 m_process_t MSG_process_from_PID(int PID)
258 {
259   xbt_fifo_item_t i = NULL;
260   m_process_t process = NULL;
261
262   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
263     if(MSG_process_get_PID(process) == PID) return process;
264   }
265   return NULL;
266 }
267
268 /** \ingroup m_process_management
269  * \brief Returns the process ID of \a process.
270  *
271  * This functions checks whether \a process is a valid pointer or not 
272    and return its PID.
273  */
274 int MSG_process_get_PID(m_process_t process)
275 {
276   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
277
278   return (((simdata_process_t) process->simdata)->PID);
279 }
280
281 /** \ingroup m_process_management
282  * \brief Returns the process ID of the parent of \a process.
283  *
284  * This functions checks whether \a process is a valid pointer or not 
285    and return its PID. Returns -1 if the agent has not been created by 
286    another agent.
287  */
288 int MSG_process_get_PPID(m_process_t process)
289 {
290   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
291
292   return (((simdata_process_t) process->simdata)->PPID);
293 }
294
295 /** \ingroup m_process_management
296  * \brief Return the name of an agent.
297  *
298  * This functions checks whether \a process is a valid pointer or not 
299    and return its name.
300  */
301 const char *MSG_process_get_name(m_process_t process)
302 {
303   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
304
305   return (process->name);
306 }
307
308 /** \ingroup m_process_management
309  * \brief Return the PID of the current agent.
310  *
311  * This functions returns the PID of the currently running #m_process_t.
312  */
313 int MSG_process_self_PID(void)
314 {
315   return (MSG_process_get_PID(MSG_process_self()));
316 }
317
318 /** \ingroup m_process_management
319  * \brief Return the PPID of the current agent.
320  *
321  * This functions returns the PID of the parent of the currently
322  * running #m_process_t.
323  */
324 int MSG_process_self_PPID(void)
325 {
326   return (MSG_process_get_PPID(MSG_process_self()));
327 }
328
329 /** \ingroup m_process_management
330  * \brief Return the current agent.
331  *
332  * This functions returns the currently running #m_process_t.
333  */
334 m_process_t MSG_process_self(void)
335 {
336   return msg_global ? msg_global->current_process : NULL;
337 }
338
339 /** \ingroup m_process_management
340  * \brief Suspend the process.
341  *
342  * This functions suspend the process by suspending the task on which
343  * it was waiting for the completion.
344  */
345 MSG_error_t MSG_process_suspend(m_process_t process)
346 {
347   simdata_process_t simdata = NULL;
348   simdata_task_t simdata_task = NULL;
349
350   XBT_IN2("(%p(%s))", process, process->name);
351
352   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
353
354   PAJE_PROCESS_PUSH_STATE(process,"S");
355
356   if(process!=MSG_process_self()) {
357     simdata = process->simdata;
358     
359     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
360
361     simdata_task = simdata->waiting_task->simdata;
362
363     simdata->suspended = 1;
364     if(simdata->blocked) {
365       XBT_OUT;
366       return MSG_OK;
367     }
368
369     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
370                 !((simdata_task->compute)&&(simdata_task->comm)),
371                 "Got a problem in deciding which action to choose !");
372     simdata->suspended = 1;
373     if(simdata_task->compute) 
374       surf_workstation_resource->common_public->suspend(simdata_task->compute);
375     else
376       surf_workstation_resource->common_public->suspend(simdata_task->comm);
377   } else {
378     m_task_t dummy = MSG_TASK_UNINITIALIZED;
379     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
380
381     simdata = process->simdata;
382     simdata->suspended = 1;
383     __MSG_task_execute(process,dummy);
384     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
385     __MSG_wait_for_computation(process,dummy);
386     simdata->suspended = 0;
387
388     MSG_task_destroy(dummy);
389   }
390   XBT_OUT;
391   return MSG_OK;
392 }
393
394 /** \ingroup m_process_management
395  * \brief Resume a suspended process.
396  *
397  * This functions resume a suspended process by resuming the task on
398  * which it was waiting for the completion.
399  */
400 MSG_error_t MSG_process_resume(m_process_t process)
401 {
402   simdata_process_t simdata = NULL;
403   simdata_task_t simdata_task = NULL;
404
405   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
406   CHECK_HOST();
407
408   XBT_IN2("(%p(%s))", process, process->name);
409
410   if(process == MSG_process_self()) {
411     XBT_OUT;
412     MSG_RETURN(MSG_OK);
413   }
414
415   simdata = process->simdata;
416
417   if(simdata->blocked) {
418     PAJE_PROCESS_POP_STATE(process);
419
420     simdata->suspended = 0; /* He'll wake up by itself */
421     XBT_OUT;
422     MSG_RETURN(MSG_OK);
423   }
424
425   if(!(simdata->waiting_task)) {
426     xbt_assert0(0,"Process not waiting for anything else. Weird !");
427     XBT_OUT;
428     return MSG_WARNING;
429   }
430   simdata_task = simdata->waiting_task->simdata;
431
432
433   if(simdata_task->compute) {
434     surf_workstation_resource->common_public->resume(simdata_task->compute);
435     PAJE_PROCESS_POP_STATE(process);
436   }
437   else {
438     PAJE_PROCESS_POP_STATE(process);
439     surf_workstation_resource->common_public->resume(simdata_task->comm);
440   }
441
442   XBT_OUT;
443   MSG_RETURN(MSG_OK);
444 }
445
446 /** \ingroup m_process_management
447  * \brief Returns true if the process is suspended .
448  *
449  * This checks whether a process is suspended or not by inspecting the
450  * task on which it was waiting for the completion.
451  */
452 int MSG_process_is_suspended(m_process_t process)
453 {
454   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
455
456   return (process->simdata->suspended);
457 }
458
459 static char blocked_name[512];
460
461 int __MSG_process_block(double max_duration)
462 {
463   m_process_t process = MSG_process_self();
464
465   m_task_t dummy = MSG_TASK_UNINITIALIZED;
466   snprintf(blocked_name,512,"blocked (%s:%s)",process->name,
467           process->simdata->host->name);
468
469   XBT_IN1(": max_duration=%g",max_duration);
470
471   dummy = MSG_task_create(blocked_name, 0.0, 0, NULL);
472   
473   PAJE_PROCESS_PUSH_STATE(process,"B");
474
475   process->simdata->blocked=1;
476   __MSG_task_execute(process,dummy);
477   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
478   if(max_duration>=0)
479     surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
480                                                                max_duration);
481   __MSG_wait_for_computation(process,dummy);
482   MSG_task_destroy(dummy);
483   process->simdata->blocked=0;
484
485   if(process->simdata->suspended) {
486     DEBUG0("I've been suspended in the meantime");    
487     MSG_process_suspend(process);
488     DEBUG0("I've been resumed, let's keep going");    
489   }
490
491   XBT_OUT;
492   return 1;
493 }
494
495 MSG_error_t __MSG_process_unblock(m_process_t process)
496 {
497   simdata_process_t simdata = NULL;
498   simdata_task_t simdata_task = NULL;
499
500   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
501   CHECK_HOST();
502
503   XBT_IN2(": %s unblocking %s", MSG_process_self()->name,process->name);
504
505   simdata = process->simdata;
506   if(!(simdata->waiting_task)) {
507     xbt_assert0(0,"Process not waiting for anything else. Weird !");
508     XBT_OUT;
509     return MSG_WARNING;
510   }
511   simdata_task = simdata->waiting_task->simdata;
512
513   xbt_assert0(simdata->blocked,"Process not blocked");
514
515   surf_workstation_resource->common_public->resume(simdata_task->compute);
516
517   PAJE_PROCESS_POP_STATE(process);
518
519   XBT_OUT;
520
521   MSG_RETURN(MSG_OK);
522 }
523
524 int __MSG_process_isBlocked(m_process_t process)
525 {
526   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
527
528   return (process->simdata->blocked);
529 }