Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Replace tesh.pl by pesh.py a much better version of tesh in python.
[simgrid.git] / tools / tesh / tesh.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 """
4
5 tesh -- testing shell
6 ========================
7
8 Copyright (c) 2012-2015. The SimGrid Team.
9 All rights reserved.
10
11 This program is free software; you can redistribute it and/or modify it
12 under the terms of the license (GNU LGPL) which comes with this package.
13
14 #TODO: <glesserd> emptty: does "! output display" also disable the comparaison with the output ?  yes
15
16 #TODO: child of child of child printf => tester dans rsg...
17
18 #TODO: regular expression in output
19 #TODO: linked regular expression in output
20
21
22 """
23
24
25 import sys, os
26 import shlex
27 import re
28 import difflib
29 import signal
30 import argparse
31
32 if sys.version_info[0] == 3:
33     import subprocess
34     import _thread
35 elif sys.version_info[0] < 3:
36     import subprocess32 as subprocess
37     import thread as _thread
38 else:
39     raise "This program has not been made to exist this long"
40
41
42
43 ##############
44 #
45 # Utilities
46 #
47 #
48
49
50 # Singleton metaclass that works in Python 2 & 3
51 # http://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
52 class _Singleton(type):
53     """ A metaclass that creates a Singleton base class when called. """
54     _instances = {}
55     def __call__(cls, *args, **kwargs):
56         if cls not in cls._instances:
57             cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
58         return cls._instances[cls]
59 class Singleton(_Singleton('SingletonMeta', (object,), {})): pass
60
61 SIGNALS_TO_NAMES_DICT = dict((getattr(signal, n), n) \
62     for n in dir(signal) if n.startswith('SIG') and '_' not in n )
63
64
65
66 #exit correctly
67 def exit(errcode):
68     #If you do not flush some prints are skipped
69     sys.stdout.flush()
70     #os._exit exit even when executed within a thread
71     os._exit(errcode)
72
73
74 def fatal_error(msg):
75     print("[Tesh/CRITICAL] "+str(msg))
76     exit(1)
77
78
79 #Set an environment variable.
80 # arg must be a string with the format "variable=value"
81 def setenv(arg):
82     print("[Tesh/INFO] setenv "+arg)
83     t = arg.split("=")
84     os.environ[t[0]] = t[1]
85     #os.putenv(t[0], t[1]) does not work
86     #see http://stackoverflow.com/questions/17705419/python-os-environ-os-putenv-usr-bin-env
87
88
89 #http://stackoverflow.com/questions/30734967/how-to-expand-environment-variables-in-python-as-bash-does
90 def expandvars2(path):
91     return re.sub(r'(?<!\\)\$[A-Za-z_][A-Za-z0-9_]*', '', os.path.expandvars(path))
92
93 # https://github.com/Cadair/jupyter_environment_kernels/issues/10
94 try:
95     FileNotFoundError
96 except NameError:
97     #py2
98     FileNotFoundError = OSError
99
100
101 ##############
102 #
103 # Classes
104 #
105 #
106
107
108
109 # read file line per line (and concat line that ends with "\")
110 class FileReader(Singleton):
111     def __init__(self, filename):
112         if filename is None:
113             self.filename = "(stdin)"
114             self.f = sys.stdin
115         else:
116             self.filename_raw = filename
117             self.filename = os.path.basename(filename)
118             self.f = open(self.filename_raw)
119         
120         self.linenumber = 0
121
122     def linenumber(self):
123         return self.linenumber
124     
125     def __repr__(self):
126         return self.filename+":"+str(self.linenumber)
127     
128     def readfullline(self):
129         try:
130             line = next(self.f)
131             self.linenumber += 1
132         except StopIteration:
133             return None
134         if line[-1] == "\n":
135             txt = line[0:-1]
136         else:
137             txt = line
138         while len(line) > 1 and line[-2] == "\\":
139             txt = txt[0:-1]
140             line = next(self.f)
141             self.linenumber += 1
142             txt += line[0:-1]
143         return txt
144
145
146 #keep the state of tesh (mostly configuration values)
147 class TeshState(Singleton):
148     def __init__(self):
149         self.threads = []
150         self.args_suffix = ""
151         self.ignore_regexps_common = []
152     
153     def add_thread(self, thread):
154         self.threads.append(thread)
155     
156     def join_all_threads(self):
157         for t in self.threads:
158             t.acquire()
159             t.release()
160
161
162
163
164 #Command line object
165 class Cmd(object):
166     def __init__(self):
167         self.input_pipe = []
168         self.output_pipe_stdout = []
169         self.output_pipe_stderr = []
170         self.timeout = 5
171         self.args = None
172         self.linenumber = -1
173         
174         self.background = False
175         self.cwd = None
176         
177         self.ignore_output = False
178         self.expect_return = 0
179         
180         self.output_display = False
181         
182         self.sort = -1
183         
184         self.ignore_regexps = TeshState().ignore_regexps_common
185
186     def add_input_pipe(self, l):
187         self.input_pipe.append(l)
188
189     def add_output_pipe_stdout(self, l):
190         self.output_pipe_stdout.append(l)
191
192     def add_output_pipe_stderr(self, l):
193         self.output_pipe_stderr.append(l)
194
195     def set_cmd(self, args, linenumber):
196         self.args = args
197         self.linenumber = linenumber
198     
199     def add_ignore(self, txt):
200         self.ignore_regexps.append(re.compile(txt))
201     
202     def remove_ignored_lines(self, lines):
203         for ign in self.ignore_regexps:
204                 lines = [l for l in lines if not ign.match(l)]
205         return lines
206
207
208     def _cmd_mkfile(self, argline):
209         filename = argline[len("mkfile "):]
210         file = open(filename, "w")
211         if file is None:
212             fatal_error("Unable to create file "+filename)
213         file.write("\n".join(self.input_pipe))
214         file.close()
215
216     def _cmd_cd(self, argline):
217         args = shlex.split(argline)
218         if len(args) != 2:
219             fatal_error("Too many arguments to cd")
220         try:
221             os.chdir(args[1])
222             print("[Tesh/INFO] change directory to "+args[1])
223         except FileNotFoundError:
224             print("Chdir to "+args[1]+" failed: No such file or directory")
225             print("Test suite `"+FileReader().filename+"': NOK (system error)")
226             exit(4)
227
228
229     #Run the Cmd if possible.
230     # Return False if nothing has been ran.
231     def run_if_possible(self):
232         if self.can_run():
233             if self.background:
234                 #Python threads loose the cwd
235                 self.cwd = os.getcwd()
236                 lock = _thread.allocate_lock()
237                 lock.acquire()
238                 TeshState().add_thread(lock)
239                 _thread.start_new_thread( Cmd._run, (self, lock) )
240             else:
241                 self._run()
242             return True
243         else:
244             return False
245
246
247     def _run(self, lock=None):
248         #Python threads loose the cwd
249         if self.cwd is not None:
250             os.chdir(self.cwd)
251             self.cwd = None
252         
253         #retrocompatibility: support ${aaa:=.} variable format
254         def replace_perl_variables(m):
255             vname = m.group(1)
256             vdefault = m.group(2)
257             if vname in os.environ:
258                 return "$"+vname
259             else:
260                 return vdefault
261         self.args = re.sub(r"\${(\w+):=([^}]*)}", replace_perl_variables, self.args)
262
263         #replace bash environment variables ($THINGS) to their values
264         self.args = expandvars2(self.args)
265         
266         if re.match("^mkfile ", self.args) is not None:
267             self._cmd_mkfile(self.args)
268             if lock is not None: lock.release()
269             return
270         
271         if re.match("^cd ", self.args) is not None:
272             self._cmd_cd(self.args)
273             if lock is not None: lock.release()
274             return
275         
276         self.args += TeshState().args_suffix
277         
278         print("["+FileReader().filename+":"+str(self.linenumber)+"] "+self.args)
279         
280         args = shlex.split(self.args)
281         #print (args)
282         try:
283             proc = subprocess.Popen(args, bufsize=1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
284         except OSError as e:
285             if e.errno == 8:
286                 e.strerror += "\nOSError: [Errno 8] Executed scripts should start with shebang line (like #!/bin/sh)"
287             raise e
288
289         try:
290             (stdout_data, stderr_data) = proc.communicate("\n".join(self.input_pipe), self.timeout)
291         except subprocess.TimeoutExpired:
292             print("Test suite `"+FileReader().filename+"': NOK (<"+FileReader().filename+":"+str(self.linenumber)+"> timeout after "+str(self.timeout)+" sec)")
293             exit(3)
294
295         if self.output_display:
296             print(stdout_data)
297
298         #remove text colors
299         ansi_escape = re.compile(r'\x1b[^m]*m')
300         stdout_data = ansi_escape.sub('', stdout_data)
301         
302         #print ((stdout_data, stderr_data))
303         
304         if self.ignore_output:
305             print("(ignoring the output of <"+FileReader().filename+":"+str(self.linenumber)+"> as requested)")
306         else:
307             stdouta = stdout_data.split("\n")
308             while len(stdouta) > 0 and stdouta[-1] == "":
309                 del stdouta[-1]
310             stdouta = self.remove_ignored_lines(stdouta)
311
312             #the "sort" bash command is case unsensitive,
313             # we mimic its behaviour
314             if self.sort == 0:
315                 stdouta.sort(key=lambda x: x.lower())
316                 self.output_pipe_stdout.sort(key=lambda x: x.lower())
317             elif self.sort > 0:
318                 stdouta.sort(key=lambda x: x[:self.sort].lower())
319                 self.output_pipe_stdout.sort(key=lambda x: x[:self.sort].lower())
320             
321             diff = list(difflib.unified_diff(self.output_pipe_stdout, stdouta,lineterm="",fromfile='expected', tofile='obtained'))
322             if len(diff) > 0: 
323                 print("Output of <"+FileReader().filename+":"+str(self.linenumber)+"> mismatch:")
324                 for line in diff:
325                     print(line)
326                 print("Test suite `"+FileReader().filename+"': NOK (<"+str(FileReader())+"> output mismatch)")
327                 if lock is not None: lock.release()
328                 exit(2)
329         
330         #print ((proc.returncode, self.expect_return))
331         
332         if proc.returncode != self.expect_return:
333             if proc.returncode >= 0:
334                 print("Test suite `"+FileReader().filename+"': NOK (<"+str(FileReader())+"> returned code "+str(proc.returncode)+")")
335                 if lock is not None: lock.release()
336                 exit(2)
337             else:
338                 print("Test suite `"+FileReader().filename+"': NOK (<"+str(FileReader())+"> got signal "+SIGNALS_TO_NAMES_DICT[-proc.returncode]+")")
339                 if lock is not None: lock.release()
340                 exit(-proc.returncode)
341             
342         if lock is not None: lock.release()
343     
344     
345     
346     def can_run(self):
347         return self.args is not None
348
349
350
351
352
353
354
355
356
357
358 ##############
359 #
360 # Main
361 #
362 #
363
364
365
366 if __name__ == '__main__':
367     
368     parser = argparse.ArgumentParser(description='tesh -- testing shell', add_help=True)
369     group1 = parser.add_argument_group('Options')
370     group1.add_argument('teshfile', nargs='?', help='Name of teshfile, stdin if omitted')
371     group1.add_argument('--cd', metavar='some/directory', help='ask tesh to switch the working directory before launching the tests')
372     group1.add_argument('--setenv', metavar='var=value', action='append', help='set a specific environment variable')
373     group1.add_argument('--cfg', metavar='arg', help='add parameter --cfg=arg to each command line')
374     group1.add_argument('--log', metavar='arg', help='add parameter --log=arg to each command line')
375     group1.add_argument('--enable-coverage', action='store_true', help='ignore output lines starting with "profiling:"')
376
377     try:
378         options = parser.parse_args()
379     except:
380         exit(1)
381
382     if options.cd is not None:
383         os.chdir(options.cd)
384
385     if options.enable_coverage:
386         print("Enable coverage")
387         TeshState().ignore_regexps_common = [re.compile("^profiling:")]
388     
389     if options.teshfile is None:
390         f = FileReader(None)
391         print("Test suite from stdin")
392     else:
393         f = FileReader(options.teshfile)
394         print("Test suite '"+f.filename+"'")
395     
396     if options.setenv is not None:
397         for e in options.setenv:
398             setenv(e)
399     
400     if options.cfg is not None:
401         TeshState().args_suffix += " --cfg="+options.cfg
402     if options.log is not None:
403         TeshState().args_suffix += " --log="+options.log
404     
405     
406     #cmd holds the current command line
407     # tech commands will add some parameters to it
408     # when ready, we execute it.
409     cmd = Cmd()
410     
411     line = f.readfullline()
412     while line is not None:
413         #print(">>============="+line+"==<<")
414         if len(line) == 0:
415             #print ("END CMD block")
416             if cmd.run_if_possible():
417                 cmd = Cmd()
418         
419         elif line[0] == "#":
420             pass
421         
422         elif line[0:2] == "p ":
423             print("["+str(FileReader())+"] "+line[2:])
424         
425         elif line[0:2] == "< ":
426             cmd.add_input_pipe(line[2:])
427         elif line[0:1] == "<":
428             cmd.add_input_pipe(line[1:])
429             
430         elif line[0:2] == "> ":
431             cmd.add_output_pipe_stdout(line[2:])
432         elif line[0:1] == ">":
433             cmd.add_output_pipe_stdout(line[1:])
434             
435         elif line[0:2] == "$ ":
436             if cmd.run_if_possible():
437                 cmd = Cmd()
438             cmd.set_cmd(line[2:], f.linenumber)
439         
440         elif line[0:2] == "& ":
441             if cmd.run_if_possible():
442                 cmd = Cmd()
443             cmd.set_cmd(line[2:], f.linenumber)
444             cmd.background = True
445         
446         elif line[0:15] == "! output ignore":
447             cmd.ignore_output = True
448             #print("cmd.ignore_output = True")
449         elif line[0:16] == "! output display":
450             cmd.output_display = True
451             cmd.ignore_output = True
452         elif line[0:15] == "! expect return":
453             cmd.expect_return = int(line[16:])
454             #print("expect return "+str(int(line[16:])))
455         elif line[0:15] == "! expect signal":
456             sig = line[16:]
457             #get the signal integer value from the signal module
458             if sig not in signal.__dict__:
459                 fatal_error("unrecognized signal '"+sig+"'")
460             sig = int(signal.__dict__[sig])
461             #popen return -signal when a process ends with a signal
462             cmd.expect_return = -sig
463         elif line[0:len("! timeout ")] == "! timeout ":
464             if "no" in line[len("! timeout "):]:
465                 cmd.timeout = None
466             else:
467                 cmd.timeout = int(line[len("! timeout "):])
468             
469         elif line[0:len("! output sort")] == "! output sort":
470             if len(line) >= len("! output sort "):
471                 sort = int(line[len("! output sort "):])
472             else:
473                 sort = 0
474             cmd.sort = sort
475         elif line[0:len("! setenv ")] == "! setenv ":
476             setenv(line[len("! setenv "):])
477         
478         elif line[0:len("! ignore ")] == "! ignore ":
479             cmd.add_ignore(line[len("! ignore "):])
480         
481         else:
482             fatal_error("UNRECOGNIZED OPTION")
483             
484         
485         line = f.readfullline()
486
487     cmd.run_if_possible()
488     
489     TeshState().join_all_threads()
490     
491     if f.filename == "(stdin)":
492         print("Test suite from stdin OK")
493     else:
494         print("Test suite `"+f.filename+"' OK")
495
496
497
498
499