Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
06f85c3810baf994665201c3b113c47d514bd295
[simgrid.git] / src / msg / m_process.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_process, msg,
12                                 "Logging specific to MSG (process)");
13
14 /** \defgroup m_process_management Management Functions of Agents
15  *  \brief This section describes the agent structure of MSG
16  *  (#m_process_t) and the functions for managing it.
17  *    \htmlonly <!-- DOXYGEN_NAVBAR_LABEL="Agents" --> \endhtmlonly
18  * 
19  *  We need to simulate many independent scheduling decisions, so
20  *  the concept of <em>process</em> is at the heart of the
21  *  simulator. A process may be defined as a <em>code</em>, with
22  *  some <em>private data</em>, executing in a <em>location</em>.
23  *  \see m_process_t
24  */
25
26 /******************************** Process ************************************/
27 /** \ingroup m_process_management
28  * \brief Creates and runs a new #m_process_t.
29  *
30  * Does exactly the same as #MSG_process_create_with_arguments but without 
31    providing standard arguments (\a argc, \a argv, \a start_time, \a kill_time).
32  * \sa MSG_process_create_with_arguments
33  */
34 m_process_t MSG_process_create(const char *name,
35                                m_process_code_t code, void *data,
36                                m_host_t host)
37 {
38   return MSG_process_create_with_arguments(name, code, data, host, -1, NULL);
39 }
40
41 static void MSG_process_cleanup(void *arg)
42 {
43
44   while(((m_process_t)arg)->simdata->paje_state) {
45     PAJE_PROCESS_POP_STATE((m_process_t)arg);
46   }
47
48   PAJE_PROCESS_FREE(arg);
49
50   xbt_fifo_remove(msg_global->process_list, arg);
51   xbt_fifo_remove(msg_global->process_to_run, arg);
52   xbt_fifo_remove(((m_process_t) arg)->simdata->host->simdata->process_list, arg);
53   free(((m_process_t) arg)->name);
54   ((m_process_t) arg)->name = NULL;
55   free(((m_process_t) arg)->simdata);
56   ((m_process_t) arg)->simdata = NULL;
57   free(arg);
58 }
59
60 /** \ingroup m_process_management
61  * \brief Creates and runs a new #m_process_t.
62
63  * A constructor for #m_process_t taking four arguments and returning the 
64  * corresponding object. The structure (and the corresponding thread) is
65  * created, and put in the list of ready process.
66  * \param name a name for the object. It is for user-level information
67    and can be NULL.
68  * \param code is a function describing the behavior of the agent. It
69    should then only use functions described in \ref
70    m_process_management (to create a new #m_process_t for example),
71    in \ref m_host_management (only the read-only functions i.e. whose
72    name contains the word get), in \ref m_task_management (to create
73    or destroy some #m_task_t for example) and in \ref
74    msg_gos_functions (to handle file transfers and task processing).
75  * \param data a pointer to any data one may want to attach to the new
76    object.  It is for user-level information and can be NULL. It can
77    be retrieved with the function \ref MSG_process_get_data.
78  * \param host the location where the new agent is executed.
79  * \param argc first argument passed to \a code
80  * \param argv second argument passed to \a code
81  * \see m_process_t
82  * \return The new corresponding object.
83  */
84 m_process_t MSG_process_create_with_arguments(const char *name,
85                                               m_process_code_t code, void *data,
86                                               m_host_t host, int argc, char **argv)
87 {
88   simdata_process_t simdata = xbt_new0(s_simdata_process_t,1);
89   m_process_t process = xbt_new0(s_m_process_t,1);
90   m_process_t self = NULL;
91
92   xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters");
93   /* Simulator Data */
94
95   simdata->PID = msg_global->PID++;
96   simdata->host = host;
97   simdata->waiting_task = NULL;
98   simdata->argc = argc;
99   simdata->argv = argv;
100   simdata->context = xbt_context_new(code, NULL, NULL, 
101                                      MSG_process_cleanup, process, 
102                                      simdata->argc, simdata->argv);
103
104   if((self=msg_global->current_process)) {
105     simdata->PPID = MSG_process_get_PID(self);
106   } else {
107     simdata->PPID = -1;
108   }
109   simdata->last_errno=MSG_OK;
110
111
112   /* Process structure */
113   process->name = xbt_strdup(name);
114   process->simdata = simdata;
115   process->data = data;
116
117   xbt_fifo_unshift(host->simdata->process_list, process);
118
119   /* *************** FIX du current_process !!! *************** */
120   self = msg_global->current_process;
121   xbt_context_start(process->simdata->context);
122   msg_global->current_process = self;
123
124   xbt_fifo_unshift(msg_global->process_list, process);
125   DEBUG2("Inserting %s(%s) in the to_run list",process->name,
126          host->name);
127   xbt_fifo_unshift(msg_global->process_to_run, process);
128
129   PAJE_PROCESS_NEW(process);
130
131   return process;
132 }
133
134 /** \ingroup m_process_management
135  * \param process poor victim
136  *
137  * This function simply kills a \a process... scarry isn't it ? :)
138  */
139 void MSG_process_kill(m_process_t process)
140 {
141   int i;
142   simdata_process_t p_simdata = process->simdata;
143   simdata_host_t h_simdata= p_simdata->host->simdata;
144   int _cursor;
145   m_process_t proc = NULL;
146
147   DEBUG3("Killing %s(%d) on %s",process->name, p_simdata->PID,
148          p_simdata->host->name);
149   
150   for (i=0; i<msg_global->max_channel; i++) {
151     if (h_simdata->sleeping[i] == process) {
152       h_simdata->sleeping[i] = NULL;
153       break;
154     }
155   }
156   
157   if(p_simdata->waiting_task) {
158     xbt_dynar_foreach(p_simdata->waiting_task->simdata->sleeping,_cursor,proc) {
159       if(proc==process) 
160         xbt_dynar_remove_at(p_simdata->waiting_task->simdata->sleeping,_cursor,&proc);
161     }
162     if(p_simdata->waiting_task->simdata->compute)
163       surf_workstation_resource->common_public->
164         action_free(p_simdata->waiting_task->simdata->compute);
165     else if (p_simdata->waiting_task->simdata->comm) {
166       surf_workstation_resource->common_public->
167         action_change_state(p_simdata->waiting_task->simdata->comm,SURF_ACTION_FAILED);
168       surf_workstation_resource->common_public->
169         action_free(p_simdata->waiting_task->simdata->comm);
170     } else {
171       xbt_die("UNKNOWN STATUS. Please report this bug.");
172     }
173   }
174
175   if ((i==msg_global->max_channel) && (process!=MSG_process_self()) && 
176       (!p_simdata->waiting_task)) {
177     xbt_die("UNKNOWN STATUS. Please report this bug.");
178   }
179
180   xbt_fifo_remove(msg_global->process_to_run,process);
181   xbt_fifo_remove(msg_global->process_list,process);
182   xbt_context_free(process->simdata->context);
183
184   if(process==MSG_process_self()) {
185     /* I just killed myself */
186     xbt_context_yield();
187   }
188 }
189
190 /** \ingroup m_process_management
191  * \brief Migrates an agent to another location.
192  *
193  * This functions checks whether \a process and \a host are valid pointers
194    and change the value of the #m_host_t on which \a process is running.
195  */
196 MSG_error_t MSG_process_change_host(m_process_t process, m_host_t host)
197 {
198   simdata_process_t simdata = NULL;
199
200   /* Sanity check */
201
202   xbt_assert0(((process) && (process->simdata)
203           && (host)), "Invalid parameters");
204   simdata = process->simdata;
205
206   xbt_fifo_remove(simdata->host->simdata->process_list,process);
207   simdata->host = host;
208   xbt_fifo_unshift(host->simdata->process_list,process);
209
210   return MSG_OK;
211 }
212
213 /** \ingroup m_process_management
214  * \brief Return the user data of a #m_process_t.
215  *
216  * This functions checks whether \a process is a valid pointer or not 
217    and return the user data associated to \a process if it is possible.
218  */
219 void *MSG_process_get_data(m_process_t process)
220 {
221   xbt_assert0((process != NULL), "Invalid parameters");
222
223   return (process->data);
224 }
225
226 /** \ingroup m_process_management
227  * \brief Set the user data of a #m_process_t.
228  *
229  * This functions checks whether \a process is a valid pointer or not 
230    and set the user data associated to \a process if it is possible.
231  */
232 MSG_error_t MSG_process_set_data(m_process_t process,void *data)
233 {
234   xbt_assert0((process != NULL), "Invalid parameters");
235   xbt_assert0((process->data == NULL), "Data already set");
236   
237   process->data = data;
238    
239   return MSG_OK;
240 }
241
242 /** \ingroup m_process_management
243  * \brief Return the location on which an agent is running.
244  *
245  * This functions checks whether \a process is a valid pointer or not 
246    and return the m_host_t corresponding to the location on which \a 
247    process is running.
248  */
249 m_host_t MSG_process_get_host(m_process_t process)
250 {
251   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
252
253   return (((simdata_process_t) process->simdata)->host);
254 }
255
256 /** \ingroup m_process_management
257  *
258  * \brief Return a #m_process_t given its PID.
259  *
260  * This functions search in the list of all the created m_process_t for a m_process_t 
261    whose PID is equal to \a PID. If no host is found, \c NULL is returned. 
262    Note that the PID are uniq in the whole simulation, not only on a given host.
263  */
264 m_process_t MSG_process_from_PID(int PID)
265 {
266   xbt_fifo_item_t i = NULL;
267   m_process_t process = NULL;
268
269   xbt_fifo_foreach(msg_global->process_list,i,process,m_process_t) {
270     if(MSG_process_get_PID(process) == PID) return process;
271   }
272   return NULL;
273 }
274
275 /** \ingroup m_process_management
276  * \brief Returns the process ID of \a process.
277  *
278  * This functions checks whether \a process is a valid pointer or not 
279    and return its PID.
280  */
281 int MSG_process_get_PID(m_process_t process)
282 {
283   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
284
285   return (((simdata_process_t) process->simdata)->PID);
286 }
287
288 /** \ingroup m_process_management
289  * \brief Returns the process ID of the parent of \a process.
290  *
291  * This functions checks whether \a process is a valid pointer or not 
292    and return its PID. Returns -1 if the agent has not been created by 
293    another agent.
294  */
295 int MSG_process_get_PPID(m_process_t process)
296 {
297   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
298
299   return (((simdata_process_t) process->simdata)->PPID);
300 }
301
302 /** \ingroup m_process_management
303  * \brief Return the name of an agent.
304  *
305  * This functions checks whether \a process is a valid pointer or not 
306    and return its name.
307  */
308 const char *MSG_process_get_name(m_process_t process)
309 {
310   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
311
312   return (process->name);
313 }
314
315 /** \ingroup m_process_management
316  * \brief Return the PID of the current agent.
317  *
318  * This functions returns the PID of the currently running #m_process_t.
319  */
320 int MSG_process_self_PID(void)
321 {
322   return (MSG_process_get_PID(MSG_process_self()));
323 }
324
325 /** \ingroup m_process_management
326  * \brief Return the PPID of the current agent.
327  *
328  * This functions returns the PID of the parent of the currently
329  * running #m_process_t.
330  */
331 int MSG_process_self_PPID(void)
332 {
333   return (MSG_process_get_PPID(MSG_process_self()));
334 }
335
336 /** \ingroup m_process_management
337  * \brief Return the current agent.
338  *
339  * This functions returns the currently running #m_process_t.
340  */
341 m_process_t MSG_process_self(void)
342 {
343   return msg_global ? msg_global->current_process : NULL;
344 }
345
346 /** \ingroup m_process_management
347  * \brief Suspend the process.
348  *
349  * This functions suspend the process by suspending the task on which
350  * it was waiting for the completion.
351  */
352 MSG_error_t MSG_process_suspend(m_process_t process)
353 {
354   simdata_process_t simdata = NULL;
355   simdata_task_t simdata_task = NULL;
356
357   XBT_IN2("(%p(%s))", process, process->name);
358
359   xbt_assert0(((process) && (process->simdata)), "Invalid parameters");
360
361   PAJE_PROCESS_PUSH_STATE(process,"S",NULL);
362
363   if(process!=MSG_process_self()) {
364     simdata = process->simdata;
365     
366     xbt_assert0(simdata->waiting_task,"Process not waiting for anything else. Weird !");
367
368     simdata_task = simdata->waiting_task->simdata;
369
370     simdata->suspended = 1;
371     if(simdata->blocked) {
372       XBT_OUT;
373       return MSG_OK;
374     }
375
376     xbt_assert0(((simdata_task->compute)||(simdata_task->comm))&&
377                 !((simdata_task->compute)&&(simdata_task->comm)),
378                 "Got a problem in deciding which action to choose !");
379     simdata->suspended = 1;
380     if(simdata_task->compute) 
381       surf_workstation_resource->common_public->suspend(simdata_task->compute);
382     else
383       surf_workstation_resource->common_public->suspend(simdata_task->comm);
384   } else {
385     m_task_t dummy = MSG_TASK_UNINITIALIZED;
386     dummy = MSG_task_create("suspended", 0.0, 0, NULL);
387
388     simdata = process->simdata;
389     simdata->suspended = 1;
390     __MSG_task_execute(process,dummy);
391     surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
392     __MSG_wait_for_computation(process,dummy);
393     simdata->suspended = 0;
394
395     MSG_task_destroy(dummy);
396   }
397   XBT_OUT;
398   return MSG_OK;
399 }
400
401 /** \ingroup m_process_management
402  * \brief Resume a suspended process.
403  *
404  * This functions resume a suspended process by resuming the task on
405  * which it was waiting for the completion.
406  */
407 MSG_error_t MSG_process_resume(m_process_t process)
408 {
409   simdata_process_t simdata = NULL;
410   simdata_task_t simdata_task = NULL;
411
412   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
413   CHECK_HOST();
414
415   XBT_IN2("(%p(%s))", process, process->name);
416
417   if(process == MSG_process_self()) {
418     XBT_OUT;
419     MSG_RETURN(MSG_OK);
420   }
421
422   simdata = process->simdata;
423
424   if(simdata->blocked) {
425     PAJE_PROCESS_POP_STATE(process);
426
427     simdata->suspended = 0; /* He'll wake up by itself */
428     XBT_OUT;
429     MSG_RETURN(MSG_OK);
430   }
431
432   if(!(simdata->waiting_task)) {
433     xbt_assert0(0,"Process not waiting for anything else. Weird !");
434     XBT_OUT;
435     return MSG_WARNING;
436   }
437   simdata_task = simdata->waiting_task->simdata;
438
439
440   if(simdata_task->compute) {
441     surf_workstation_resource->common_public->resume(simdata_task->compute);
442     PAJE_PROCESS_POP_STATE(process);
443   }
444   else {
445     PAJE_PROCESS_POP_STATE(process);
446     surf_workstation_resource->common_public->resume(simdata_task->comm);
447   }
448
449   XBT_OUT;
450   MSG_RETURN(MSG_OK);
451 }
452
453 /** \ingroup m_process_management
454  * \brief Returns true if the process is suspended .
455  *
456  * This checks whether a process is suspended or not by inspecting the
457  * task on which it was waiting for the completion.
458  */
459 int MSG_process_is_suspended(m_process_t process)
460 {
461   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
462
463   return (process->simdata->suspended);
464 }
465
466 int __MSG_process_block(double max_duration, const char *info)
467 {
468   m_process_t process = MSG_process_self();
469   m_task_t dummy = MSG_TASK_UNINITIALIZED;
470   char blocked_name[512];
471   snprintf(blocked_name,512,"blocked [%s] (%s:%s)",
472           info, process->name, process->simdata->host->name);
473
474   XBT_IN1(": max_duration=%g",max_duration);
475
476   dummy = MSG_task_create(blocked_name, 0.0, 0, NULL);
477   
478   PAJE_PROCESS_PUSH_STATE(process,"B",NULL);
479
480   process->simdata->blocked=1;
481   __MSG_task_execute(process,dummy);
482   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
483   if(max_duration>=0)
484     surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
485                                                                max_duration);
486   __MSG_wait_for_computation(process,dummy);
487   MSG_task_destroy(dummy);
488   process->simdata->blocked=0;
489
490   if(process->simdata->suspended) {
491     DEBUG0("I've been suspended in the meantime");    
492     MSG_process_suspend(process);
493     DEBUG0("I've been resumed, let's keep going");    
494   }
495
496   XBT_OUT;
497   return 1;
498 }
499
500 MSG_error_t __MSG_process_unblock(m_process_t process)
501 {
502   simdata_process_t simdata = NULL;
503   simdata_task_t simdata_task = NULL;
504
505   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
506   CHECK_HOST();
507
508   XBT_IN2(": %s unblocking %s", MSG_process_self()->name,process->name);
509
510   simdata = process->simdata;
511   if(!(simdata->waiting_task)) {
512     xbt_assert0(0,"Process not waiting for anything else. Weird !");
513     XBT_OUT;
514     return MSG_WARNING;
515   }
516   simdata_task = simdata->waiting_task->simdata;
517
518   xbt_assert0(simdata->blocked,"Process not blocked");
519
520   surf_workstation_resource->common_public->resume(simdata_task->compute);
521
522   PAJE_PROCESS_POP_STATE(process);
523
524   XBT_OUT;
525
526   MSG_RETURN(MSG_OK);
527 }
528
529 int __MSG_process_isBlocked(m_process_t process)
530 {
531   xbt_assert0(((process != NULL) && (process->simdata)), "Invalid parameters");
532
533   return (process->simdata->blocked);
534 }