Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
611857398a3e0cb38bc5ac5a43e1751265a3bbd4
[simgrid.git] / src / msg / global.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2002,2003,2004 Arnaud Legrand. All rights reserved.        */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "private.h"
9 #include "xbt/sysdep.h"
10 #include "xbt/log.h"
11 #include "xbt/ex.h" /* ex_backtrace_display */
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_kernel, msg,
13                                 "Logging specific to MSG (kernel)");
14
15 int __stop_at_time = -1.0 ;
16
17 MSG_Global_t msg_global = NULL;
18
19 /* static void MarkAsFailed(m_task_t t, TBX_HashTable_t failedProcessList); */
20 /* static xbt_fifo_t MSG_buildFailedHostList(double a, double b); */
21
22 /** \defgroup msg_simulation   MSG simulation Functions
23  *  \brief This section describes the functions you need to know to
24  *  set up a simulation. You should have a look at \ref MSG_examples 
25  *  to have an overview of their usage.
26  *    \htmlonly <!-- DOXYGEN_NAVBAR_LABEL="Simulation functions" --> \endhtmlonly
27  */
28
29 /********************************* MSG **************************************/
30
31 /** \ingroup msg_simulation
32  * \brief Initialize some MSG internal data.
33  */
34 void MSG_global_init_args(int *argc, char **argv)
35 {
36   MSG_global_init(argc,argv);
37 }
38
39 /** \ingroup msg_simulation
40  * \brief Initialize some MSG internal data.
41  */
42 void MSG_global_init(int *argc, char **argv)
43 {
44   if (!msg_global) {
45     surf_init(argc, argv);      /* Initialize some common structures. Warning, it sets msg_global=NULL */
46      
47     msg_global = xbt_new0(s_MSG_Global_t,1);
48
49     xbt_context_init();
50     msg_global->host = xbt_fifo_new();
51     msg_global->process_to_run = xbt_fifo_new();
52     msg_global->process_list = xbt_fifo_new();
53     msg_global->max_channel = 0;
54     msg_global->current_process = NULL;
55     msg_global->registered_functions = xbt_dict_new();
56     msg_global->PID = 1;
57     msg_global->task_mallocator = xbt_mallocator_new(256,
58                                                      (pvoid_f_void_t*) task_mallocator_new_f,
59                                                      (void_f_pvoid_t*) task_mallocator_free_f,
60                                                      (void_f_pvoid_t*) task_mallocator_reset_f);
61   }
62 }
63
64 /** \ingroup msg_easier_life
65  * \brief Traces MSG events in the Paje format.
66  */
67 void MSG_paje_output(const char *filename)
68 {
69   int i;
70   const char *paje_preembule=
71     "%EventDef  PajeDefineContainerType 1\n"
72     "%  NewType string\n"
73     "%  ContainerType   string\n"
74     "%  NewName string\n"
75     "%EndEventDef\n"
76     "%EventDef  PajeDefineStateType     3\n"
77     "%  NewType string\n"
78     "%  ContainerType   string\n"
79     "%  NewName string\n"
80     "%EndEventDef\n"
81     "%EventDef  PajeDefineLinkType      5\n"
82     "%  NewType string\n"
83     "%  ContainerType   string\n"
84     "%  SourceContainerType     string\n"
85     "%  DestContainerType       string\n"
86     "%  NewName string\n"
87     "%EndEventDef\n"
88     "%EventDef  PajeDefineEntityValue   6\n"
89     "%  NewValue        string\n"
90     "%  EntityType      string\n"
91     "%  NewName string\n"
92     "%EndEventDef\n"
93     "%EventDef  PajeCreateContainer     7\n"
94     "%  Time    date\n"
95     "%  NewContainer    string\n"
96     "%  NewContainerType        string\n"
97     "%  Container       string\n"
98     "%  NewName string\n"
99     "%EndEventDef\n"
100     "%EventDef  PajeDestroyContainer    8\n"
101     "%  Time    date\n"
102     "%  Name    string\n"
103     "%  Type    string\n"
104     "%EndEventDef\n"
105     "%EventDef  PajeSetState    10\n"
106     "%  Time    date\n"
107     "%  EntityType      string\n"
108     "%  Container       string\n"
109     "%  Value   string\n"
110     "%EndEventDef\n"
111     "%EventDef  PajePushState   11\n"
112     "%  Time    date\n"
113     "%  EntityType      string\n"
114     "%  Container       string\n"
115     "%  Value   string\n"
116     "%  TaskName        string\n"
117     "%EndEventDef\n"
118     "%EventDef  PajePopState    12\n"
119     "%  Time    date\n"
120     "%  EntityType      string\n"
121     "%  Container       string\n"
122     "%EndEventDef\n"
123     "%EventDef  PajeStartLink   16\n"
124     "%  Time    date\n"
125     "%  EntityType      string\n"
126     "%  Container       string\n"
127     "%  Value   string\n"
128     "%  SourceContainer string\n"
129     "%  Key     string\n"
130     "%EndEventDef\n"
131     "%EventDef  PajeEndLink     17\n"
132     "%  Time    date\n"
133     "%  EntityType      string\n"
134     "%  Container       string\n"
135     "%  Value   string\n"
136     "%  DestContainer   string\n"
137     "%  Key     string\n"
138     "%EndEventDef\n";
139
140   const char *type_definitions = "1     Sim_t   0       Simulation_t\n"
141     "1  H_t     Sim_t   m_host_t\n"
142     "1  P_t     H_t     m_process_t\n"
143     "3  S_t     P_t     \"Process State\"\n"
144     "6  E       S_t     Executing\n"
145     "6  B       S_t     Blocked\n"
146     "6  C       S_t     Communicating\n"
147     "5  Comm    Sim_t   P_t     P_t     Communication_t\n";
148
149   const char *ext = ".trace";
150   int ext_len = strlen(ext);
151   int len;
152   m_host_t host;
153   m_process_t process;
154   xbt_fifo_item_t item = NULL;
155
156   xbt_assert0(msg_global, "Initialize MSG first\n");
157   xbt_assert0(!msg_global->paje_output, "Paje output already defined\n");
158   xbt_assert0(filename, "Need a real file name\n");
159
160   len = strlen(filename);
161   if((len<ext_len) || (strncmp(filename+len-ext_len,ext,ext_len))) {
162     CRITICAL2("The name of the Paje trace file \"%s\" does not end by \"%s\". Paje will cause difficulties to read it.\n",
163               filename,ext);
164   }
165
166   msg_global->paje_output=fopen(filename,"w");
167   xbt_assert1(msg_global->paje_output, "Failed to open %s \n",filename);
168
169   fprintf(msg_global->paje_output,"%s",paje_preembule);
170   fprintf(msg_global->paje_output,"%s",type_definitions);
171
172   /*    Channels    */
173   for(i=0; i<msg_global->max_channel; i++) {
174     fprintf(msg_global->paje_output, "6 COMM_%d Comm    \"Channel %d\"\n" ,i,i);
175   }
176   fprintf(msg_global->paje_output,
177           "7    0.0     CUR     Sim_t   0       \"MSG simulation\"\n");
178
179   /*    Hosts       */
180   xbt_fifo_foreach(msg_global->host,item,host,m_host_t) {
181     PAJE_HOST_NEW(host);
182   }
183
184   /*    Process       */
185   xbt_fifo_foreach(msg_global->process_list,item,process,m_process_t) {
186     PAJE_PROCESS_NEW(process);
187   }
188 }
189
190 /** \defgroup m_channel_management    Understanding channels
191  *  \brief This section briefly describes the channel notion of MSG
192  *  (#m_channel_t).
193  *    \htmlonly <!-- DOXYGEN_NAVBAR_LABEL="Channels" --> \endhtmlonly
194  * 
195  *
196  *  For convenience, the simulator provides the notion of channel
197  *  that is close to the tag notion in MPI. A channel is not a
198  *  socket. It doesn't need to be opened neither closed. It rather
199  *  corresponds to the ports opened on the different machines.
200  */
201
202
203 /** \ingroup m_channel_management
204  * \brief Set the number of channel in the simulation.
205  *
206  * This function has to be called to fix the number of channel in the
207    simulation before creating any host. Indeed, each channel is
208    represented by a different mailbox on each #m_host_t. This
209    function can then be called only once. This function takes only one
210    parameter.
211  * \param number the number of channel in the simulation. It has to be >0
212  */
213 MSG_error_t MSG_set_channel_number(int number)
214 {
215   xbt_assert0((msg_global) && (msg_global->max_channel == 0), "Channel number already set!");
216
217   msg_global->max_channel = number;
218
219   return MSG_OK;
220 }
221
222 /** \ingroup m_channel_management
223  * \brief Return the number of channel in the simulation.
224  *
225  * This function has to be called once the number of channel is fixed. I can't 
226    figure out a reason why anyone would like to call this function but nevermind.
227  * \return the number of channel in the simulation.
228  */
229 int MSG_get_channel_number(void)
230 {
231   xbt_assert0((msg_global)&&(msg_global->max_channel != 0), "Channel number not set yet!");
232
233   return msg_global->max_channel;
234 }
235
236 void __MSG_display_process_status(void)
237 {
238    m_process_t process = NULL;
239    xbt_fifo_item_t item = NULL;
240    int i;
241    int nbprocess=xbt_fifo_size(msg_global->process_list);
242    
243    INFO1("MSG: %d processes are still running, waiting for something.",
244          nbprocess);
245    /*  List the process and their state */
246    INFO0("MSG: <process>(<pid>) on <host>: <status>.");
247    xbt_fifo_foreach(msg_global->process_list,item,process,m_process_t) {
248       simdata_process_t p_simdata = (simdata_process_t) process->simdata;
249       simdata_host_t h_simdata=(simdata_host_t)p_simdata->host->simdata;
250       char *who;
251         
252       asprintf(&who,"MSG:  %s(%d) on %s: %s",
253                process->name,p_simdata->PID,
254                p_simdata->host->name,
255                (process->simdata->blocked)?"[blocked] "
256                :((process->simdata->suspended)?"[suspended] ":""));
257       
258       for (i=0; i<msg_global->max_channel; i++) {
259          if (h_simdata->sleeping[i] == process) {
260             INFO2("%s\tListening on channel %d",who,i);
261             break;
262          }
263       }
264       if (i==msg_global->max_channel) {
265          if(p_simdata->waiting_task) {
266             if(p_simdata->waiting_task->simdata->compute) {
267                if(p_simdata->put_host) {
268                   INFO4("%s\tTrying to send the task '%s' to Host %s, channel %d.",
269                         who, p_simdata->waiting_task->name,p_simdata->put_host->name, p_simdata->put_channel);
270                } else {
271                   INFO2("%s\tWaiting for %s to finish.",who,p_simdata->waiting_task->name);
272                }
273             } else if (p_simdata->waiting_task->simdata->comm) {
274                INFO2("%s\tWaiting for %s to be finished transfered.",
275                      who,p_simdata->waiting_task->name);
276             } else {
277               INFO1("%s\tUNKNOWN STATUS. Please report this bug.",who);
278             }
279 /*          The following would display the trace of where the maestro thread is, 
280             since this is the thread calling this. I'd like to get the other threads to 
281             run this to see where they were blocked, but I'm not sure of how to do this */
282 /*          xbt_backtrace_display(); */
283          } else { /* Must be trying to put a task somewhere */
284             INFO1("%s\tUNKNOWN STATUS. Please report this bug.",who);
285          }
286       }
287       free(who);
288    }
289 }
290
291 /* FIXME: Yeah, I'll do it in a portable maner one day [Mt] */
292 #include <signal.h>
293
294 static void inthandler(int ignored)
295 {
296    INFO0("CTRL-C pressed. Displaying status and bailing out");
297    __MSG_display_process_status();
298    exit(1);
299 }
300
301 /** \ingroup msg_simulation
302  * \brief Launch the MSG simulation
303  */
304 MSG_error_t MSG_main(void)
305 {
306   m_process_t process = NULL;
307   int i;
308   double elapsed_time = 0.0;
309   int state_modifications = 1;
310    
311   /* Prepare to display some more info when dying on Ctrl-C pressing */
312   signal(SIGINT,inthandler);
313    
314   /* Clean IO before the run */
315   fflush(stdout);
316   fflush(stderr);
317
318   surf_solve(); /* Takes traces into account. Returns 0.0 */
319 /* xbt_fifo_size(msg_global->process_to_run) */
320   while (1) {
321     xbt_context_empty_trash();
322     if(xbt_fifo_size(msg_global->process_to_run) && (elapsed_time>0)) {
323       DEBUG0("**************************************************");
324     }
325     if((__stop_at_time>0) && (MSG_get_clock() >= __stop_at_time)) {
326       DEBUG0("Let's stop here!");
327     }
328
329     while ((process = xbt_fifo_pop(msg_global->process_to_run))) {
330       DEBUG3("Scheduling %s(%d) on %s",      
331              process->name,process->simdata->PID,
332              process->simdata->host->name);
333       msg_global->current_process = process;
334 /*       fflush(NULL); */
335       xbt_context_schedule(process->simdata->context);
336       msg_global->current_process = NULL;
337     }
338     
339     {
340       surf_action_t action = NULL;
341       surf_resource_t resource = NULL;
342       m_task_t task = NULL;
343
344       void *fun = NULL;
345       void *arg = NULL;
346
347       xbt_dynar_foreach(resource_list, i, resource) {
348         if(xbt_swag_size(resource->common_public->states.failed_action_set) ||
349            xbt_swag_size(resource->common_public->states.done_action_set))
350           state_modifications = 1;
351       }
352       
353       if(!state_modifications) {
354         DEBUG1("%f : Calling surf_solve",MSG_get_clock());
355         elapsed_time = surf_solve();
356         DEBUG1("Elapsed_time %f",elapsed_time);
357         
358         if (elapsed_time<0.0) {
359           /*       fprintf(stderr, "We're done %g\n",elapsed_time); */
360           break;
361         }
362       }
363
364       while (surf_timer_resource->extension_public->get(&fun,(void*)&arg)) {
365         DEBUG2("got %p %p", fun, arg);
366         if(fun==MSG_process_create_with_arguments) {
367           process_arg_t args = arg;
368           DEBUG2("Launching %s on %s", args->name, args->host->name);
369           process = MSG_process_create_with_arguments(args->name, args->code, 
370                                                       args->data, args->host,
371                                                       args->argc,args->argv);
372           if(args->kill_time > MSG_get_clock()) {
373             surf_timer_resource->extension_public->set(args->kill_time, 
374                                                        (void*) &MSG_process_kill,
375                                                        (void*) process);
376           }
377           xbt_free(args);
378         }
379         if(fun==MSG_process_kill) {
380           process = arg;
381           DEBUG3("Killing %s(%d) on %s", process->name, process->simdata->PID, 
382                  process->simdata->host->name);
383           MSG_process_kill(process);
384         }
385       }
386       
387       xbt_dynar_foreach(resource_list, i, resource) {
388         while ((action =
389                 xbt_swag_extract(resource->common_public->states.
390                                  failed_action_set))) {
391           task = action->data;
392           if(task) {
393             int _cursor;
394             DEBUG1("** %s failed **",task->name);
395             xbt_dynar_foreach(task->simdata->sleeping,_cursor,process) {
396               DEBUG3("\t preparing to wake up %s(%d) on %s",         
397                      process->name,process->simdata->PID,
398                      process->simdata->host->name);
399               xbt_fifo_unshift(msg_global->process_to_run, process);
400             }
401             process=NULL;
402           }
403         }
404         while ((action =
405                 xbt_swag_extract(resource->common_public->states.
406                                  done_action_set))) {
407           task = action->data;
408           if(task) {
409             int _cursor;
410             DEBUG1("** %s done **",task->name);
411             xbt_dynar_foreach(task->simdata->sleeping,_cursor,process) {
412               DEBUG3("\t preparing to wake up %s(%d) on %s",         
413                      process->name,process->simdata->PID,
414                      process->simdata->host->name);
415               xbt_fifo_unshift(msg_global->process_to_run, process);
416             }
417             process=NULL;
418           }
419         }
420       }
421     }
422     state_modifications = 0;
423   }
424
425   if (xbt_fifo_size(msg_global->process_list) == 0) {
426     INFO0("Congratulations ! Simulation terminated : all processes are over");
427     return MSG_OK;
428   } else {
429     INFO0("Oops ! Deadlock or code not perfectly clean.");
430     __MSG_display_process_status();
431     if(XBT_LOG_ISENABLED(msg, xbt_log_priority_debug) ||
432        XBT_LOG_ISENABLED(msg_kernel, xbt_log_priority_debug)) {
433       DEBUG0("Aborting!");
434       xbt_abort();
435     }
436
437     INFO0("Return a Warning.");
438     return MSG_WARNING;
439   }
440 }
441
442 /** \ingroup msg_simulation
443  * \brief Kill all running process
444
445  * \param reset_PIDs should we reset the PID numbers. A negative
446  *   number means no reset and a positive number will be used to set the PID
447  *   of the next newly created process.
448  */
449 int MSG_process_killall(int reset_PIDs)
450 {
451   m_process_t p = NULL;
452   m_process_t self = MSG_process_self();
453
454   while((p=xbt_fifo_pop(msg_global->process_list))) {
455     if(p!=self) MSG_process_kill(p);
456   }
457
458   if(reset_PIDs>0) {
459     msg_global->PID = reset_PIDs; 
460     msg_global->session++;
461  }
462
463   xbt_context_empty_trash();
464
465   if(self) {
466     xbt_context_yield();
467   }
468
469   return msg_global->PID;
470 }
471
472 /** \ingroup msg_simulation
473  * \brief Clean the MSG simulation
474  */
475 MSG_error_t MSG_clean(void)
476 {
477   xbt_fifo_item_t i = NULL;
478   m_host_t h = NULL;
479   m_process_t p = NULL;
480
481
482   while((p=xbt_fifo_pop(msg_global->process_list))) {
483     MSG_process_kill(p);
484   }
485   xbt_context_exit();
486
487   xbt_fifo_foreach(msg_global->host,i,h,m_host_t) {
488     __MSG_host_destroy(h);
489   }
490   xbt_fifo_free(msg_global->host);
491   xbt_fifo_free(msg_global->process_to_run);
492   xbt_fifo_free(msg_global->process_list);
493   xbt_dict_free(&(msg_global->registered_functions));
494   xbt_mallocator_free(msg_global->task_mallocator);
495
496   if(msg_global->paje_output) {
497     fclose(msg_global->paje_output);
498     msg_global->paje_output = NULL;
499   }
500   msg_config_finalize();
501   free(msg_global);
502   surf_exit();
503
504   return MSG_OK;
505 }
506
507
508 /** \ingroup msg_easier_life
509  * \brief A clock (in second).
510  */
511 double MSG_get_clock(void) {
512   return surf_get_clock();
513 }
514