Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7507c3309dddbffe95f240602a4a6e5b32959769
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(m_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /** \defgroup m_process_management Management Functions of Agents
15  *  \brief This section describes the agent structure of MSG
16  *  (#m_process_t) and the functions for managing it.
17  *
18  *  We need to simulate many independent scheduling decisions, so
19  *  the concept of <em>process</em> is at the heart of the
20  *  simulator. A process may be defined as a <em>code</em>, with
21  *  some <em>private data</em>, executing in a <em>location</em>.
22  *  \see m_process_t
23  */
24
25 /******************************** Process ************************************/
26 /** \ingroup m_process_management
27  * \brief Creates and runs a new #m_process_t.
28  *
29  * Does exactly the same as #MSG_process_create_with_arguments but without 
30    providing standard arguments (\a argc, \a argv, \a start_time, \a kill_time).
31  * \sa MSG_process_create_with_arguments
32  */
33 m_process_t MSG_process_create(const char *name,
34                                m_process_code_t code, void *data,
35                                m_host_t host)
36 {
37   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
38 }
39
40 static void MSG_process_cleanup(void *arg)
41 {
42
43   while(((m_process_t)arg)->simdata->paje_state) {
44     PAJE_PROCESS_POP_STATE((m_process_t)arg);
45   }
46
47   PAJE_PROCESS_FREE(arg);
48
49   xbt_fifo_remove(msg_global->process_list, arg);
50   xbt_fifo_remove(msg_global->process_to_run, arg);
51   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
52   free(((m_process_t) arg)->name);
53   ((m_process_t) arg)->name = NULL;
54   free(((m_process_t) arg)->simdata);
55   ((m_process_t) arg)->simdata = NULL;
56   free(arg);
57 }
58
59 /** \ingroup m_process_management
60  * \brief Creates and runs a new #m_process_t.
61
62  * A constructor for #m_process_t taking four arguments and returning the 
63  * corresponding object. The structure (and the corresponding thread) is
64  * created, and put in the list of ready process.
65  * \param name a name for the object. It is for user-level information
66    and can be NULL.
67  * \param code is a function describing the behavior of the agent. It
68    should then only use functions described in \ref
69    m_process_management (to create a new #m_process_t for example),
70    in \ref m_host_management (only the read-only functions i.e. whose
71    name contains the word get), in \ref m_task_management (to create
72    or destroy some #m_task_t for example) and in \ref
73    msg_gos_functions (to handle file transfers and task processing).
74  * \param data a pointer to any data one may want to attach to the new
75    object.  It is for user-level information and can be NULL. It can
76    be retrieved with the function \ref MSG_process_get_data.
77  * \param host the location where the new agent is executed.
78  * \param argc first argument passed to \a code
79  * \param argv second argument passed to \a code
80  * \see m_process_t
81  * \return The new corresponding object.
82  */
83 m_process_t MSG_process_create_with_arguments(const char *name,
84                                               m_process_code_t code, void *data,
85                                               m_host_t host, int argc, char **argv)
86 {
87   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
88   m_process_t process = xbt_new0(s_m_process_t,1);
89   m_process_t self = NULL;
90
91   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
92   /* Simulator Data */
93
94   simdata->PID = msg_global->PID++;
95   simdata->host = host;
96   simdata->waiting_task = NULL;
97   simdata->argc = argc;
98   simdata->argv = argv;
99   simdata->context = xbt_context_new(code, NULL, NULL, 
100                                      MSG_process_cleanup, process, 
101                                      simdata->argc, simdata->argv);
102
103   if((self=msg_global->current_process)) {
104     simdata->PPID = MSG_process_get_PID(self);
105   } else {
106     simdata->PPID = -1;
107   }
108   simdata->last_errno=MSG_OK;
109
110
111   /* Process structure */
112   process->name = xbt_strdup(name);
113   process->simdata = simdata;
114   process->data = data;
115
116   xbt_fifo_unshift(host->simdata->process_list, process);
117
118   /* /////////////// FIX du current_process !!! ////////////// */
119   self = msg_global->current_process;
120   xbt_context_start(process->simdata->context);
121   msg_global->current_process = self;
122
123   xbt_fifo_unshift(msg_global->process_list, process);
124   DEBUG2("Inserting %s(%s) in the to_run list",process->name,
125          host->name);
126   xbt_fifo_unshift(msg_global->process_to_run, process);
127
128   PAJE_PROCESS_NEW(process);
129
130   return process;
131 }
132
133 /** \ingroup m_process_management
134  * \param process poor victim
135  *
136  * This function simply kills a \a process... scarry isn't it ? :)
137  */
138 void MSG_process_kill(m_process_t process)
139 {
140   int i;
141   simdata_process_t p_simdata = process->simdata;
142   simdata_host_t h_simdata= p_simdata->host->simdata;
143   int _cursor;
144   m_process_t proc = NULL;
145
146 /*   fprintf(stderr,"Killing %s(%d) on %s.\n",process->name, */
147 /*        p_simdata->PID,p_simdata->host->name); */
148   
149   for (i=0; i<msg_global->max_channel; i++) {
150     if (h_simdata->sleeping[i] == process) {
151       h_simdata->sleeping[i] = NULL;
152       break;
153     }
154   }
155   if (i==msg_global->max_channel) {
156     if(p_simdata->waiting_task) {
157       xbt_dynar_foreach(p_simdata->waiting_task->simdata->sleeping,_cursor,proc) {
158         if(proc==process) 
159           xbt_dynar_remove_at(p_simdata->waiting_task->simdata->sleeping,_cursor,&proc);
160       }
161       if(p_simdata->waiting_task->simdata->compute)
162         surf_workstation_resource->common_public->
163           action_free(p_simdata->waiting_task->simdata->compute);
164       else if (p_simdata->waiting_task->simdata->comm) {
165         surf_workstation_resource->common_public->
166           action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
167         surf_workstation_resource->common_public->
168           action_free(p_simdata->waiting_task->simdata->comm);
169       } else 
170         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
171     } else { /* Must be trying to put a task somewhere */
172       if(process==MSG_process_self()) {
173         return;
174       } else {
175         CRITICAL0("UNKNOWN STATUS. Please report this bug.");
176       }
177     }
178   }
179
180   xbt_fifo_remove(msg_global->process_to_run,process);
181   xbt_fifo_remove(msg_global->process_list,process);
182   xbt_context_free(process->simdata->context);
183 }
184
185 /** \ingroup m_process_management
186  * \brief Migrates an agent to another location.
187  *
188  * This functions checks whether \a process and \a host are valid pointers
189    and change the value of the #m_host_t on which \a process is running.
190  */
191 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
192 {
193   simdata_process_t simdata = NULL;
194
195   /* Sanity check */
196
197   xbt_assert0(((process) && (process->simdata)
198           && (host)), "Invalid parameters");
199   simdata = process->simdata;
200
201   xbt_fifo_remove(simdata->host->simdata->process_list,process);
202   simdata->host = host;
203   xbt_fifo_unshift(host->simdata->process_list,process);
204
205   return MSG_OK;
206 }
207
208 /** \ingroup m_process_management
209  * \brief Return the user data of a #m_process_t.
210  *
211  * This functions checks whether \a process is a valid pointer or not 
212    and return the user data associated to \a process if it is possible.
213  */
214 void *MSG_process_get_data(m_process_t process)
215 {
216   xbt_assert0((process != NULL), "Invalid parameters");
217
218   return (process->data);
219 }
220
221 /** \ingroup m_process_management
222  * \brief Set the user data of a #m_process_t.
223  *
224  * This functions checks whether \a process is a valid pointer or not 
225    and set the user data associated to \a process if it is possible.
226  */
227 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
228 {
229   xbt_assert0((process != NULL), "Invalid parameters");
230   xbt_assert0((process->data == NULL), "Data already set");
231   
232   process->data = data;
233    
234   return MSG_OK;
235 }
236
237 /** \ingroup m_process_management
238  * \brief Return the location on which an agent is running.
239  *
240  * This functions checks whether \a process is a valid pointer or not 
241    and return the m_host_t corresponding to the location on which \a 
242    process is running.
243  */
244 m_host_t MSG_process_get_host(m_process_t process)
245 {
246   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
247
248   return (((simdata_process_t) process->simdata)->host);
249 }
250
251 /** \ingroup m_process_management
252  *
253  * \brief Return a #m_process_t given its PID.
254  *
255  * This functions search in the list of all the created m_process_t for a m_process_t 
256    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
257    Note that the PID are uniq in the whole simulation, not only on a given host.
258  */
259 m_process_t MSG_process_from_PID(int PID)
260 {
261   xbt_fifo_item_t i = NULL;
262   m_process_t process = NULL;
263
264   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
265     if(MSG_process_get_PID(process) == PID) return process;
266   }
267   return NULL;
268 }
269
270 /** \ingroup m_process_management
271  * \brief Returns the process ID of \a process.
272  *
273  * This functions checks whether \a process is a valid pointer or not 
274    and return its PID.
275  */
276 int MSG_process_get_PID(m_process_t process)
277 {
278   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
279
280   return (((simdata_process_t) process->simdata)->PID);
281 }
282
283 /** \ingroup m_process_management
284  * \brief Returns the process ID of the parent of \a process.
285  *
286  * This functions checks whether \a process is a valid pointer or not 
287    and return its PID. Returns -1 if the agent has not been created by 
288    another agent.
289  */
290 int MSG_process_get_PPID(m_process_t process)
291 {
292   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
293
294   return (((simdata_process_t) process->simdata)->PPID);
295 }
296
297 /** \ingroup m_process_management
298  * \brief Return the name of an agent.
299  *
300  * This functions checks whether \a process is a valid pointer or not 
301    and return its name.
302  */
303 const char *MSG_process_get_name(m_process_t process)
304 {
305   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
306
307   return (process->name);
308 }
309
310 /** \ingroup m_process_management
311  * \brief Return the PID of the current agent.
312  *
313  * This functions returns the PID of the currently running #m_process_t.
314  */
315 int MSG_process_self_PID(void)
316 {
317   return (MSG_process_get_PID(MSG_process_self()));
318 }
319
320 /** \ingroup m_process_management
321  * \brief Return the PPID of the current agent.
322  *
323  * This functions returns the PID of the parent of the currently
324  * running #m_process_t.
325  */
326 int MSG_process_self_PPID(void)
327 {
328   return (MSG_process_get_PPID(MSG_process_self()));
329 }
330
331 /** \ingroup m_process_management
332  * \brief Return the current agent.
333  *
334  * This functions returns the currently running #m_process_t.
335  */
336 m_process_t MSG_process_self(void)
337 {
338   return msg_global ? msg_global->current_process : NULL;
339 }
340
341 /** \ingroup m_process_management
342  * \brief Suspend the process.
343  *
344  * This functions suspend the process by suspending the task on which
345  * it was waiting for the completion.
346  */
347 MSG_error_t MSG_process_suspend(m_process_t process)
348 {
349   simdata_process_t simdata = NULL;
350   simdata_task_t simdata_task = NULL;
351
352   XBT_IN2("(%p(%s))", process, process->name);
353
354   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
355
356   PAJE_PROCESS_PUSH_STATE(process,"S");
357
358   if(process!=MSG_process_self()) {
359     simdata = process->simdata;
360     
361     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
362
363     simdata_task = simdata->waiting_task->simdata;
364
365     simdata->suspended = 1;
366     if(simdata->blocked) {
367       XBT_OUT;
368       return MSG_OK;
369     }
370
371     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
372                 !((simdata_task->compute)&&(simdata_task->comm)),
373                 "Got a problem in deciding which action to choose !");
374     simdata->suspended = 1;
375     if(simdata_task->compute) 
376       surf_workstation_resource->common_public->suspend(simdata_task->compute);
377     else
378       surf_workstation_resource->common_public->suspend(simdata_task->comm);
379   } else {
380     m_task_t dummy = MSG_TASK_UNINITIALIZED;
381     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
382
383     simdata = process->simdata;
384     simdata->suspended = 1;
385     __MSG_task_execute(process,dummy);
386     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
387     __MSG_wait_for_computation(process,dummy);
388     simdata->suspended = 0;
389
390     MSG_task_destroy(dummy);
391   }
392   XBT_OUT;
393   return MSG_OK;
394 }
395
396 /** \ingroup m_process_management
397  * \brief Resume a suspended process.
398  *
399  * This functions resume a suspended process by resuming the task on
400  * which it was waiting for the completion.
401  */
402 MSG_error_t MSG_process_resume(m_process_t process)
403 {
404   simdata_process_t simdata = NULL;
405   simdata_task_t simdata_task = NULL;
406
407   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
408   CHECK_HOST();
409
410   XBT_IN2("(%p(%s))", process, process->name);
411
412   if(process == MSG_process_self()) {
413     XBT_OUT;
414     MSG_RETURN(MSG_OK);
415   }
416
417   simdata = process->simdata;
418
419   if(simdata->blocked) {
420     PAJE_PROCESS_POP_STATE(process);
421
422     simdata->suspended = 0; /* He'll wake up by itself */
423     XBT_OUT;
424     MSG_RETURN(MSG_OK);
425   }
426
427   if(!(simdata->waiting_task)) {
428     xbt_assert0(0,"Process not waiting for anything else. Weird !");
429     XBT_OUT;
430     return MSG_WARNING;
431   }
432   simdata_task = simdata->waiting_task->simdata;
433
434
435   if(simdata_task->compute) {
436     surf_workstation_resource->common_public->resume(simdata_task->compute);
437     PAJE_PROCESS_POP_STATE(process);
438   }
439   else {
440     PAJE_PROCESS_POP_STATE(process);
441     surf_workstation_resource->common_public->resume(simdata_task->comm);
442   }
443
444   XBT_OUT;
445   MSG_RETURN(MSG_OK);
446 }
447
448 /** \ingroup m_process_management
449  * \brief Returns true if the process is suspended .
450  *
451  * This checks whether a process is suspended or not by inspecting the
452  * task on which it was waiting for the completion.
453  */
454 int MSG_process_is_suspended(m_process_t process)
455 {
456   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
457
458   return (process->simdata->suspended);
459 }
460
461 static char blocked_name[512];
462
463 int __MSG_process_block(double max_duration)
464 {
465   m_process_t process = MSG_process_self();
466
467   m_task_t dummy = MSG_TASK_UNINITIALIZED;
468   snprintf(blocked_name,512,"blocked (%s:%s)",process->name,
469           process->simdata->host->name);
470
471   XBT_IN1(": max_duration=%g",max_duration);
472
473   dummy = MSG_task_create(blocked_name, 0.0, 0, NULL);
474   
475   PAJE_PROCESS_PUSH_STATE(process,"B");
476
477   process->simdata->blocked=1;
478   __MSG_task_execute(process,dummy);
479   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
480   if(max_duration>=0)
481     surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
482                                                                max_duration);
483   __MSG_wait_for_computation(process,dummy);
484   MSG_task_destroy(dummy);
485   process->simdata->blocked=0;
486
487   if(process->simdata->suspended) {
488     DEBUG0("I've been suspended in the meantime");    
489     MSG_process_suspend(process);
490     DEBUG0("I've been resumed, let's keep going");    
491   }
492
493   XBT_OUT;
494   return 1;
495 }
496
497 MSG_error_t __MSG_process_unblock(m_process_t process)
498 {
499   simdata_process_t simdata = NULL;
500   simdata_task_t simdata_task = NULL;
501
502   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
503   CHECK_HOST();
504
505   XBT_IN2(": %s unblocking %s", MSG_process_self()->name,process->name);
506
507   simdata = process->simdata;
508   if(!(simdata->waiting_task)) {
509     xbt_assert0(0,"Process not waiting for anything else. Weird !");
510     XBT_OUT;
511     return MSG_WARNING;
512   }
513   simdata_task = simdata->waiting_task->simdata;
514
515   xbt_assert0(simdata->blocked,"Process not blocked");
516
517   surf_workstation_resource->common_public->resume(simdata_task->compute);
518
519   PAJE_PROCESS_POP_STATE(process);
520
521   XBT_OUT;
522
523   MSG_RETURN(MSG_OK);
524 }
525
526 int __MSG_process_isBlocked(m_process_t process)
527 {
528   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
529
530   return (process->simdata->blocked);
531 }