Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fight against synchronization issues on armageddon (ie, when a background task fail...
[simgrid.git] / tools / tesh / run_context.c
1 /* $Id$ */
2
3 /* run_context -- stuff in which TESH runs a command                        */
4
5 /* Copyright (c) 2007 Martin Quinson.                                       */
6 /* All rights reserved.                                                     */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "tesh.h"
12
13 #include <sys/types.h>
14 #include <sys/wait.h>
15
16 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(tesh);
17
18 xbt_dynar_t bg_jobs = NULL;
19 rctx_t armageddon_initiator = NULL;
20 xbt_os_mutex_t armageddon_mutex = NULL;
21
22 /* 
23  * Module management
24  */
25
26 static void kill_it(void*r) {  
27   rctx_t rctx = *(rctx_t*)r;
28
29   VERB2("Join thread %p which were running background cmd <%s>",rctx->runner,rctx->filepos);
30   xbt_os_thread_join(rctx->runner,NULL);
31   rctx_free(rctx);
32 }
33
34 void rctx_init(void) {
35   bg_jobs = xbt_dynar_new_sync(sizeof(rctx_t),kill_it);
36   armageddon_mutex = xbt_os_mutex_init();
37   armageddon_initiator = NULL;
38 }
39
40 void rctx_exit(void) {
41   if (bg_jobs) {
42     /* Do not use xbt_dynar_free or it will lock the dynar, preventing armageddon from working */
43     while (xbt_dynar_length(bg_jobs)) {
44        rctx_t rctx;
45        xbt_dynar_pop(bg_jobs,&rctx);
46        kill_it(&rctx);
47     }
48     xbt_dynar_free(&bg_jobs);
49   }
50   xbt_os_mutex_destroy(armageddon_mutex);
51 }
52
53 void rctx_wait_bg(void) {
54   if (bg_jobs) {
55     /* Do not use xbt_dynar_free or it will lock the dynar, preventing armageddon from working */
56     while (xbt_dynar_length(bg_jobs)) {
57        rctx_t rctx;
58        xbt_dynar_pop(bg_jobs,&rctx);
59        kill_it(&rctx);
60     }
61     xbt_dynar_free(&bg_jobs);
62   }
63   bg_jobs = xbt_dynar_new_sync(sizeof(rctx_t),kill_it);
64 }
65
66 void rctx_armageddon(rctx_t initiator, int exitcode) {
67   rctx_t rctx;
68
69   DEBUG2("Armageddon request by <%s> (exit=%d)",initiator->filepos,exitcode);
70   xbt_os_mutex_lock(armageddon_mutex);
71   if (armageddon_initiator != NULL) {
72     VERB0("Armageddon already started. Let it go");
73     xbt_os_mutex_unlock(initiator->interruption);
74     xbt_os_mutex_unlock(armageddon_mutex);
75     return;
76   }
77   DEBUG1("Armageddon request by <%s> got the lock. Let's go amok",initiator->filepos);
78   armageddon_initiator = initiator;
79   xbt_os_mutex_unlock(armageddon_mutex);
80
81   /* Kill any background commands */
82   while (xbt_dynar_length(bg_jobs)) {
83     xbt_dynar_pop(bg_jobs,&rctx);
84     if (rctx != initiator) {
85       INFO2("Kill <%s> because <%s> failed",rctx->filepos,initiator->filepos);
86       xbt_os_mutex_lock(rctx->interruption);
87       rctx->interrupted = 1;
88       xbt_os_mutex_unlock(rctx->interruption);
89       INFO2("Do Kill <%s> because <%s> failed",rctx->filepos,initiator->filepos);
90       if (!rctx->reader_done) {
91         kill(rctx->pid,SIGTERM);
92         usleep(100);
93         kill(rctx->pid,SIGKILL);          
94       }
95     }
96   }
97
98   VERB0("Shut everything down!");
99   exit(exitcode);
100 }
101
102 /*
103  * Memory management
104  */
105
106 void rctx_empty(rctx_t rc) {
107   if (rc->cmd)
108     free(rc->cmd);
109   rc->cmd = NULL;
110   if (rc->filepos)
111     free(rc->filepos);
112   rc->filepos = NULL;
113   rc->is_empty = 1;
114   rc->is_background = 0;
115   rc->is_stoppable = 0;
116   rc->output = e_output_check;
117   rc->brokenpipe = 0;
118   rc->timeout = 0;
119   rc->interrupted = 0;
120   buff_empty(rc->input);
121   buff_empty(rc->output_wanted);
122   buff_empty(rc->output_got);
123 }
124
125 rctx_t rctx_new() {
126   rctx_t res = xbt_new0(s_rctx_t,1);
127
128   res->input=buff_new();
129   res->output_wanted=buff_new();
130   res->output_got=buff_new();
131   res->interruption = xbt_os_mutex_init();
132   rctx_empty(res);
133   return res;
134 }
135
136 void rctx_free(rctx_t rctx) {
137   DEBUG1("RCTX: Free %p", rctx);
138   rctx_dump(rctx,"free");
139   if (!rctx)
140     return;
141
142   if (rctx->cmd)
143     free(rctx->cmd);
144   if (rctx->filepos)
145     free(rctx->filepos);
146   xbt_os_mutex_destroy(rctx->interruption);
147   buff_free(rctx->input);
148   buff_free(rctx->output_got);
149   buff_free(rctx->output_wanted);
150   free(rctx);
151 }
152
153 void rctx_dump(rctx_t rctx, const char *str) {
154   DEBUG9("%s RCTX %p={in%p={%d,%10s}, want={%d,%10s}, out={%d,%10s}}",
155          str, rctx,
156          rctx->input,              rctx->input->used,        rctx->input->data,
157          rctx->output_wanted->used,rctx->output_wanted->data,
158          rctx->output_got->used,   rctx->output_got->data);
159   DEBUG5("%s RCTX %p=[cmd%p=%10s, pid=%d]",
160          str,rctx,rctx->cmd,rctx->cmd,rctx->pid);
161
162 }
163
164 /*
165  * Getting instructions from the file
166  */
167
168 void rctx_pushline(const char* filepos, char kind, char *line) {
169   
170   switch (kind) {
171   case '$':
172   case '&':
173     if (rctx->cmd) {
174       if (!rctx->is_empty) {
175         ERROR2("[%s] More than one command in this chunk of lines (previous: %s).\n"
176                " Dunno which input/output belongs to which command.",
177                filepos,rctx->cmd);
178         ERROR1("Test suite `%s': NOK (syntax error)",testsuite_name);
179         rctx_armageddon(rctx,1);
180         return;
181       }
182       rctx_start();
183       VERB1("[%s] More than one command in this chunk of lines",filepos);
184     }
185     if (kind == '&')
186       rctx->is_background = 1;
187     else
188       rctx->is_background = 0;
189       
190     rctx->cmd = xbt_strdup(line);
191     rctx->filepos = xbt_strdup(filepos);
192     INFO3("[%s] %s%s",filepos,rctx->cmd,
193           ((rctx->is_background)?" (background command)":""));
194
195     break;
196     
197   case '<':
198     rctx->is_empty = 0;
199     buff_append(rctx->input,line);
200     buff_append(rctx->input,"\n");
201     break;
202
203   case '>':
204     rctx->is_empty = 0;
205     buff_append(rctx->output_wanted,line);
206     buff_append(rctx->output_wanted,"\n");
207     break;
208
209   case '!':
210     if (rctx->cmd)
211       rctx_start();
212
213     if (!strncmp(line,"timeout no",strlen("timeout no"))) {
214       VERB1("[%s] (disable timeout)", filepos);
215       timeout_value = -1;
216     } else if (!strncmp(line,"timeout ",strlen("timeout "))) {
217       timeout_value=atoi(line+strlen("timeout"));
218       VERB2("[%s] (new timeout value: %d)",
219              filepos,timeout_value);
220
221     } else if (!strncmp(line,"expect signal ",strlen("expect signal "))) {
222       rctx->expected_signal = strdup(line + strlen("expect signal "));
223       xbt_str_trim(rctx->expected_signal," \n");
224            VERB2("[%s] (next command must raise signal %s)", 
225                  filepos, rctx->expected_signal);
226
227     } else if (!strncmp(line,"expect return ",strlen("expect return "))) {
228       rctx->expected_return = atoi(line+strlen("expect return "));
229       VERB2("[%s] (next command must return code %d)",
230             filepos, rctx->expected_return);
231
232     } else if (!strncmp(line,"output ignore",strlen("output ignore"))) {
233       rctx->output = e_output_ignore;
234       VERB1("[%s] (ignore output of next command)", filepos);
235        
236     } else if (!strncmp(line,"output display",strlen("output display"))) {
237       rctx->output = e_output_display;
238       VERB1("[%s] (ignore output of next command)", filepos);
239        
240     } else {
241       ERROR2("%s: Malformed metacommand: %s",filepos,line);
242       ERROR1("Test suite `%s': NOK (syntax error)",testsuite_name);
243       rctx_armageddon(rctx,1);
244       return;
245     }
246     break;
247   }
248 }
249
250 /* 
251  * Actually doing the job
252  */
253
254 /* The IO of the childs are handled by the two following threads
255    (one pair per child) */
256
257 static void* thread_writer(void *r) {
258   int posw;
259   rctx_t rctx = (rctx_t)r;
260   for (posw=0; posw<rctx->input->used && !rctx->brokenpipe; ) {
261     int got;
262     DEBUG1("Still %d chars to write",rctx->input->used-posw);
263     got=write(rctx->child_to,rctx->input->data+posw,rctx->input->used-posw);
264     if (got>0)
265       posw+=got;
266     if (got<0) {
267       if (errno == EPIPE) {
268         rctx->brokenpipe = 1;
269       } else if (errno!=EINTR && errno!=EAGAIN && errno!=EPIPE) {
270         perror("Error while writing input to child");
271         ERROR1("Test suite `%s': NOK (system error)",testsuite_name);
272         rctx_armageddon(rctx,4);
273         return NULL;
274       }
275     }
276     DEBUG1("written %d chars so far",posw);
277
278     if (got <= 0)
279       usleep(100);
280   }
281   rctx->input->data[0]='\0';
282   rctx->input->used=0;
283   close(rctx->child_to);
284
285   return NULL;
286 }
287 static void *thread_reader(void *r) {
288   rctx_t rctx = (rctx_t)r;
289   char *buffout=malloc(4096);
290   int posr, got_pid;
291
292   do {
293     posr=read(rctx->child_from,buffout,4095);
294     if (posr<0 && errno!=EINTR && errno!=EAGAIN) {
295       perror("Error while reading output of child");
296       ERROR1("Test suite `%s': NOK (system error)", testsuite_name);
297       rctx_armageddon(rctx,4);
298       return NULL;
299     }
300     if (posr>0) {
301       buffout[posr]='\0';
302       buff_append(rctx->output_got,buffout);
303     } else {
304       usleep(100);
305     }
306   } while (!rctx->timeout && posr!=0);
307   free(buffout);
308
309   /* let this thread wait for the child so that the main thread can detect the timeout without blocking on the wait */
310   got_pid = waitpid(rctx->pid,&rctx->status,0);
311   if (got_pid != rctx->pid) {
312     perror(bprintf("Cannot wait for the child %s",rctx->cmd));
313     ERROR1("Test suite `%s': NOK (system error)", testsuite_name);
314     rctx_armageddon(rctx,4);
315     return NULL;
316   }
317    
318   rctx->reader_done = 1;
319   return NULL;
320
321
322 /* Start a new child, plug the pipes as expected and fire up the 
323    helping threads. Is also waits for the child to end if this is a 
324    foreground job, or fire up a thread to wait otherwise. */
325
326 void rctx_start(void) {
327   int child_in[2];
328   int child_out[2];
329
330   VERB2("Start %s %s",rctx->cmd,(rctx->is_background?"(background job)":""));
331   if (pipe(child_in) || pipe(child_out)) {
332     perror("Cannot open the pipes");
333     ERROR1("Test suite `%s': NOK (system error)", testsuite_name);
334     rctx_armageddon(rctx,4);
335   }
336
337   rctx->pid=fork();
338   if (rctx->pid<0) {
339     perror("Cannot fork the command");
340     ERROR1("Test suite `%s': NOK (system error)", testsuite_name);
341     rctx_armageddon(rctx,4);
342     return;
343   }
344
345   if (rctx->pid) { /* father */
346     close(child_in[0]);
347     rctx->child_to = child_in[1];
348
349     close(child_out[1]);
350     rctx->child_from = child_out[0];
351
352     if (timeout_value > 0)
353        rctx->end_time = time(NULL) + timeout_value;
354     else 
355        rctx->end_time = -1;
356
357     rctx->reader_done = 0;
358     rctx->reader = xbt_os_thread_create("reader",thread_reader,(void*)rctx);
359     rctx->writer = xbt_os_thread_create("writer",thread_writer,(void*)rctx);
360
361   } else { /* child */
362
363     close(child_in[1]);
364     dup2(child_in[0],0);
365     close(child_in[0]);
366
367     close(child_out[0]);
368     dup2(child_out[1],1);
369     dup2(child_out[1],2);
370     close(child_out[1]);
371
372     execlp ("/bin/sh", "sh", "-c", rctx->cmd, NULL);
373   }
374
375   rctx->is_stoppable = 1;
376
377   if (!rctx->is_background) {
378     rctx_wait(rctx);
379   } else {
380     /* Damn. Copy the rctx and launch a thread to handle it */
381     rctx_t old = rctx;
382     xbt_os_thread_t runner;
383
384     rctx = rctx_new();
385     DEBUG2("RCTX: new bg=%p, new fg=%p",old,rctx);
386
387     DEBUG2("Launch a thread to wait for %s %d",old->cmd,old->pid);
388     runner = xbt_os_thread_create(old->cmd,rctx_wait,(void*)old);
389     old->runner = runner;
390     VERB3("Launched thread %p to wait for %s %d",
391           runner,old->cmd, old->pid);
392     xbt_dynar_push(bg_jobs,&old);
393   }
394 }
395
396 /* Waits for the child to end (or to timeout), and check its 
397    ending conditions. This is launched from rctx_start but either in main
398    thread (for foreground jobs) or in a separate one for background jobs. 
399    That explains the prototype, forced by xbt_os_thread_create. */
400
401 void *rctx_wait(void* r) {
402   rctx_t rctx = (rctx_t)r;
403   int errcode = 0;
404   int now = time(NULL);
405     
406   rctx_dump(rctx,"wait");
407
408   if (!rctx->is_stoppable) 
409     THROW1(unknown_error,0,"Cmd '%s' not started yet. Cannot wait it",
410            rctx->cmd);
411
412   /* Wait for the child to die or the timeout to happen (or an armageddon to happen) */
413   while (!rctx->interrupted && !rctx->reader_done && (rctx->end_time <0 ||rctx->end_time >= now)) {
414     usleep(100);
415     now = time(NULL);
416   }
417    
418   xbt_os_mutex_lock(rctx->interruption);
419   if (!rctx->interrupted && rctx->end_time > 0 && rctx->end_time < now) {    
420     INFO1("<%s> timeouted. Kill the process.",rctx->filepos);
421     rctx->timeout = 1;
422     kill(rctx->pid,SIGTERM);
423     usleep(100);
424     kill(rctx->pid,SIGKILL);    
425     rctx->reader_done = 1;
426   }
427    
428   /* Make sure helper threads die.
429      Cannot block since they wait for the child we just killed
430      if not already dead. */
431   xbt_os_thread_join(rctx->writer,NULL);
432   xbt_os_thread_join(rctx->reader,NULL);
433
434   /*  xbt_os_mutex_unlock(rctx->interruption);
435   if (rctx->interrupted)
436     return NULL;
437     xbt_os_mutex_lock(rctx->interruption);*/
438  
439   buff_chomp(rctx->output_got);
440   buff_chomp(rctx->output_wanted);
441   buff_trim(rctx->output_got);
442   buff_trim(rctx->output_wanted);
443
444   /* Check for broken pipe */
445   if (rctx->brokenpipe)
446     VERB0("Warning: Child did not consume all its input (I got broken pipe)");
447
448   /* Check for timeouts */
449   if (rctx->timeout) {
450     if (rctx->output_got->data[0])
451       INFO2("<%s> Output on timeout:\n%s",
452             rctx->filepos,rctx->output_got->data);
453     else
454       INFO1("<%s> No output before timeout",
455             rctx->filepos);
456     ERROR3("Test suite `%s': NOK (<%s> timeout after %d sec)", 
457            testsuite_name,rctx->filepos,timeout_value);
458     DEBUG2("<%s> Interrupted = %d", rctx->filepos, rctx->interrupted);
459     if (!rctx->interrupted) {
460       rctx_armageddon(rctx, 3);
461       return NULL;
462     }
463   }
464       
465   DEBUG2("RCTX=%p (pid=%d)",rctx,rctx->pid);
466   DEBUG3("Status(%s|%d)=%d",rctx->cmd,rctx->pid,rctx->status);
467
468   if (!rctx->interrupted) {
469     if (WIFSIGNALED(rctx->status) && !rctx->expected_signal) {
470       ERROR3("Test suite `%s': NOK (<%s> got signal %s)", 
471              testsuite_name, rctx->filepos,
472              signal_name(WTERMSIG(rctx->status),NULL));
473       errcode = WTERMSIG(rctx->status)+4;       
474     }
475     
476     if (WIFSIGNALED(rctx->status) && rctx->expected_signal &&
477         strcmp(signal_name(WTERMSIG(rctx->status),rctx->expected_signal),
478                rctx->expected_signal)) {
479       ERROR4("Test suite `%s': NOK (%s got signal %s instead of %s)", 
480              testsuite_name, rctx->filepos,
481              signal_name(WTERMSIG(rctx->status),rctx->expected_signal),
482              rctx->expected_signal);
483       errcode = WTERMSIG(rctx->status)+4;       
484     }
485     
486     if (!WIFSIGNALED(rctx->status) && rctx->expected_signal) {
487       ERROR3("Test suite `%s': NOK (child %s expected signal %s)", 
488              testsuite_name, rctx->filepos,
489              rctx->expected_signal);
490       errcode = 5;
491     }
492     
493     if (WIFEXITED(rctx->status) && WEXITSTATUS(rctx->status) != rctx->expected_return ) {
494       if (rctx->expected_return) 
495         ERROR4("Test suite `%s': NOK (<%s> returned code %d instead of %d)",
496                testsuite_name, rctx->filepos,
497                WEXITSTATUS(rctx->status), rctx->expected_return);
498       else
499         ERROR3("Test suite `%s': NOK (<%s> returned code %d)",
500                testsuite_name, rctx->filepos, WEXITSTATUS(rctx->status));
501       errcode = 40+WEXITSTATUS(rctx->status);
502       
503     }
504     rctx->expected_return = 0;
505   
506     if(rctx->expected_signal){
507       free(rctx->expected_signal);
508       rctx->expected_signal = NULL;
509     }
510   }
511
512   if (   rctx->output == e_output_check
513       && (    rctx->output_got->used != rctx->output_wanted->used
514            || strcmp(rctx->output_got->data, rctx->output_wanted->data))) {
515     if (XBT_LOG_ISENABLED(tesh,xbt_log_priority_info)) {
516        char *diff= xbt_str_diff(rctx->output_wanted->data,rctx->output_got->data);        
517        ERROR2("Output of <%s> mismatch:\n%s",rctx->filepos,diff);
518        free(diff);
519     }     
520     ERROR2("Test suite `%s': NOK (<%s> output mismatch)", 
521            testsuite_name,rctx->filepos);
522      
523     errcode=2;
524   } else if (rctx->output == e_output_ignore) {
525     INFO1("(ignoring the output of <%s> as requested)",rctx->filepos);
526   } else if (rctx->output == e_output_display) {
527     xbt_dynar_t a = xbt_str_split(rctx->output_got->data, "\n");
528     char *out = xbt_str_join(a,"\n||");
529     xbt_dynar_free(&a);
530     INFO1("Here is the (ignored) command output: \n||%s",out);
531     free(out);
532   } else if (errcode || rctx->interrupted) {
533     /* checking output, and matching */
534     xbt_dynar_t a = xbt_str_split(rctx->output_got->data, "\n");
535     char *out = xbt_str_join(a,"\n||");
536     xbt_dynar_free(&a);
537     INFO2("Output of <%s> so far: \n||%s",rctx->filepos,out);
538     free(out);    
539   }
540
541   if (!rctx->is_background) {
542     rctx_empty(rctx);
543   }
544   if (errcode) {
545     if (!rctx->interrupted) {
546       rctx_armageddon(rctx, errcode);
547       return NULL;
548     }
549   }
550
551   xbt_os_mutex_unlock(rctx->interruption);
552   return NULL;
553 }
554