Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'fluidio' into 'master'
[simgrid.git] / teshsuite / smpi / MBI / MBIutils.py
1 # Copyright 2021-2022. The MBI project. All rights reserved.
2 # This program is free software; you can redistribute it and/or modify it under the terms of the license (GNU GPL).
3
4 import os
5 import time
6 import subprocess
7 import sys
8 import re
9 import shlex
10 import select
11 import signal
12 import hashlib
13
14 class AbstractTool:
15     def ensure_image(self, params="", dockerparams=""):
16         """Verify that this is executed from the right docker image, and complain if not."""
17         if os.path.exists("/MBI") or os.path.exists("trust_the_installation"):
18             print("This seems to be a MBI docker image. Good.")
19         else:
20             print("Please run this script in a MBI docker image. Run these commands:")
21             print("  docker build -f Dockerfile -t mpi-bugs-initiative:latest . # Only the first time")
22             print(f"  docker run -it --rm --name MIB --volume $(pwd):/MBI {dockerparams}mpi-bugs-initiative /MBI/MBI.py {params}")
23             sys.exit(1)
24
25     def build(self, rootdir, cached=True):
26         """Rebuilds the tool binaries. By default, we try to reuse the existing build."""
27         print("Nothing to do to rebuild the tool binaries.")
28
29     def setup(self, rootdir):
30         """
31         Ensure that this tool (previously built) is usable in this environment: setup the PATH, etc.
32         This is called only once for all tests, from the logs directory.
33         """
34         # pass
35
36     def run(self, execcmd, filename, binary, num_id, timeout, batchinfo):
37         """Compile that test code and anaylse it with the Tool if needed (a cache system should be used)"""
38         # pass
39
40     def teardown(self):
41         """
42         Clean the results of all test runs: remove temp files and binaries.
43         This is called only once for all tests, from the logs directory.
44         """
45         # pass
46
47     def parse(self, cachefile):
48         """Read the result of a previous run from the cache, and compute the test outcome"""
49         return 'failure'
50
51 # Associate all possible detailed outcome to a given error scope. Scopes must be sorted alphabetically.
52 possible_details = {
53     # scope limited to one call
54     'InvalidBuffer':'AInvalidParam', 'InvalidCommunicator':'AInvalidParam', 'InvalidDatatype':'AInvalidParam', 'InvalidRoot':'AInvalidParam', 'InvalidTag':'AInvalidParam', 'InvalidWindow':'AInvalidParam', 'InvalidOperator':'AInvalidParam', 'InvalidOtherArg':'AInvalidParam', 'ActualDatatype':'AInvalidParam',
55     'InvalidSrcDest':'AInvalidParam',
56     # scope: Process-wide
57 #    'OutOfInitFini':'BInitFini',
58     'CommunicatorLeak':'BResLeak', 'DatatypeLeak':'BResLeak', 'GroupLeak':'BResLeak', 'OperatorLeak':'BResLeak', 'TypeLeak':'BResLeak', 'RequestLeak':'BResLeak',
59     'MissingStart':'BReqLifecycle', 'MissingWait':'BReqLifecycle',
60     'MissingEpoch':'BEpochLifecycle','DoubleEpoch':'BEpochLifecycle',
61     'LocalConcurrency':'BLocalConcurrency',
62     # scope: communicator
63     'CallMatching':'DMatch',
64     'CommunicatorMatching':'CMatch', 'DatatypeMatching':'CMatch', 'OperatorMatching':'CMatch', 'RootMatching':'CMatch', 'TagMatching':'CMatch',
65     'MessageRace':'DRace',
66
67     'GlobalConcurrency':'DGlobalConcurrency',
68     # larger scope
69     'BufferingHazard':'EBufferingHazard',
70     'OK':'FOK'}
71
72 error_scope = {
73     'AInvalidParam':'single call',
74     'BResLeak':'single process',
75 #    'BInitFini':'single process',
76     'BReqLifecycle':'single process',
77     'BEpochLifecycle':'single process',
78     'BLocalConcurrency':'single process',
79     'CMatch':'multi-processes',
80     'DRace':'multi-processes',
81     'DMatch':'multi-processes',
82     'DGlobalConcurrency':'multi-processes',
83     'EBufferingHazard':'system',
84     'FOK':'correct executions'
85 }
86
87 displayed_name = {
88     'AInvalidParam':'Invalid parameter',
89     'BResLeak':'Resource leak',
90 #    'BInitFini':'MPI call before initialization/after finalization',
91     'BReqLifecycle':'Request lifecycle',
92     'BLocalConcurrency':'Local concurrency',
93     'CMatch':'Parameter matching',
94     'DMatch':"Call ordering",
95     'DRace':'Message race',
96     'DGlobalConcurrency':'Global concurrency',
97     'EBufferingHazard':'Buffering hazard',
98     'FOK':"Correct execution",
99
100     'aislinn':'Aislinn', 'civl':'CIVL', 'hermes':'Hermes', 'isp':'ISP', 'itac':'ITAC', 'simgrid':'Mc SimGrid', 'smpi':'SMPI', 'smpivg':'SMPI+VG', 'mpisv':'MPI-SV', 'must':'MUST', 'parcoach':'PARCOACH'
101 }
102
103 def parse_one_code(filename):
104     """
105     Reads the header of the provided filename, and extract a list of todo item, each of them being a (cmd, expect, test_num) tupple.
106     The test_num is useful to build a log file containing both the binary and the test_num, when there is more than one test in the same binary.
107     """
108     res = []
109     test_num = 0
110     with open(filename, "r") as input_file:
111         state = 0  # 0: before header; 1: in header; 2; after header
112         line_num = 1
113         for line in input_file:
114             if re.match(".*BEGIN_MBI_TESTS.*", line):
115                 if state == 0:
116                     state = 1
117                 else:
118                     raise ValueError(f"MBI_TESTS header appears a second time at line {line_num}: \n{line}")
119             elif re.match(".*END_MBI_TESTS.*", line):
120                 if state == 1:
121                     state = 2
122                 else:
123                     raise ValueError(f"Unexpected end of MBI_TESTS header at line {line_num}: \n{line}")
124             if state == 1 and re.match(r'\s+\$ ?.*', line):
125                 m = re.match(r'\s+\$ ?(.*)', line)
126                 cmd = m.group(1)
127                 nextline = next(input_file)
128                 detail = 'OK'
129                 if re.match('[ |]*OK *', nextline):
130                     expect = 'OK'
131                 else:
132                     m = re.match('[ |]*ERROR: *(.*)', nextline)
133                     if not m:
134                         raise ValueError(
135                             f"\n{filename}:{line_num}: MBI parse error: Test not followed by a proper 'ERROR' line:\n{line}{nextline}")
136                     expect = 'ERROR'
137                     detail = m.group(1)
138                     if detail not in possible_details:
139                         raise ValueError(
140                             f"\n{filename}:{line_num}: MBI parse error: Detailled outcome {detail} is not one of the allowed ones.")
141                 test = {'filename': filename, 'id': test_num, 'cmd': cmd, 'expect': expect, 'detail': detail}
142                 res.append(test.copy())
143                 test_num += 1
144                 line_num += 1
145
146     if state == 0:
147         raise ValueError(f"MBI_TESTS header not found in file '{filename}'.")
148     if state == 1:
149         raise ValueError(f"MBI_TESTS header not properly ended in file '{filename}'.")
150
151     if len(res) == 0:
152         raise ValueError(f"No test found in {filename}. Please fix it.")
153     return res
154
155 def categorize(tool, toolname, test_id, expected):
156     outcome = tool.parse(test_id)
157
158     if not os.path.exists(f'{test_id}.elapsed') and not os.path.exists(f'logs/{toolname}/{test_id}.elapsed'):
159         if outcome == 'failure':
160             elapsed = 0
161         else:
162             raise ValueError(f"Invalid test result: {test_id}.txt exists but not {test_id}.elapsed")
163     else:
164         with open(f'{test_id}.elapsed' if os.path.exists(f'{test_id}.elapsed') else f'logs/{toolname}/{test_id}.elapsed', 'r') as infile:
165             elapsed = infile.read()
166
167     # Properly categorize this run
168     if outcome == 'timeout':
169         res_category = 'timeout'
170         if elapsed is None:
171             diagnostic = 'hard timeout'
172         else:
173             diagnostic = f'timeout after {elapsed} sec'
174     elif outcome == 'failure' or outcome == 'segfault':
175         res_category = 'failure'
176         diagnostic = 'tool error, or test not run'
177     elif outcome == 'UNIMPLEMENTED':
178         res_category = 'unimplemented'
179         diagnostic = 'coverage issue'
180     elif outcome == 'other':
181         res_category = 'other'
182         diagnostic = 'inconclusive run'
183     elif expected == 'OK':
184         if outcome == 'OK':
185             res_category = 'TRUE_NEG'
186             diagnostic = 'correctly reported no error'
187         else:
188             res_category = 'FALSE_POS'
189             diagnostic = 'reported an error in a correct code'
190     elif expected == 'ERROR':
191         if outcome == 'OK':
192             res_category = 'FALSE_NEG'
193             diagnostic = 'failed to detect an error'
194         else:
195             res_category = 'TRUE_POS'
196             diagnostic = 'correctly detected an error'
197     else:
198         raise ValueError(f"Unexpected expectation: {expected} (must be OK or ERROR)")
199
200     return (res_category, elapsed, diagnostic, outcome)
201
202
203 def run_cmd(buildcmd, execcmd, cachefile, filename, binary, timeout, batchinfo, read_line_lambda=None):
204     """
205     Runs the test on need. Returns True if the test was ran, and False if it was cached.
206
207     The result is cached if possible, and the test is rerun only if the `test.txt` (containing the tool output) or the `test.elapsed` (containing the timing info) do not exist, or if `test.md5sum` (containing the md5sum of the code to compile) does not match.
208
209     Parameters:
210      - buildcmd and execcmd are shell commands to run. buildcmd can be any shell line (incuding && groups), but execcmd must be a single binary to run.
211      - cachefile is the name of the test
212      - filename is the source file containing the code
213      - binary the file name in which to compile the code
214      - batchinfo: something like "1/1" to say that this run is the only batch (see -b parameter of MBI.py)
215      - read_line_lambda: a lambda to which each line of the tool output is feed ASAP. It allows MUST to interrupt the execution when a deadlock is reported.
216     """
217     if os.path.exists(f'{cachefile}.txt') and os.path.exists(f'{cachefile}.elapsed') and os.path.exists(f'{cachefile}.md5sum'):
218         hash_md5 = hashlib.md5()
219         with open(filename, 'rb') as sourcefile:
220             for chunk in iter(lambda: sourcefile.read(4096), b""):
221                 hash_md5.update(chunk)
222         newdigest = hash_md5.hexdigest()
223         with open(f'{cachefile}.md5sum', 'r') as md5file:
224             olddigest = md5file.read()
225         #print(f'Old digest: {olddigest}; New digest: {newdigest}')
226         if olddigest == newdigest:
227             print(f" (result cached -- digest: {olddigest})")
228             return False
229         os.remove(f'{cachefile}.txt')
230
231     print(f"Wait up to {timeout} seconds")
232
233     start_time = time.time()
234     if buildcmd is None:
235         output = f"No need to compile {binary}.c (batchinfo:{batchinfo})\n\n"
236     else:
237         output = f"Compiling {binary}.c (batchinfo:{batchinfo})\n\n"
238         output += f"$ {buildcmd}\n"
239
240         compil = subprocess.run(buildcmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
241         if compil.stdout is not None:
242             output += str(compil.stdout, errors='replace')
243         if compil.returncode != 0:
244             output += f"Compilation of {binary}.c raised an error (retcode: {compil.returncode})"
245             for line in (output.split('\n')):
246                 print(f"| {line}", file=sys.stderr)
247             with open(f'{cachefile}.elapsed', 'w') as outfile:
248                 outfile.write(str(time.time() - start_time))
249             with open(f'{cachefile}.txt', 'w') as outfile:
250                 outfile.write(output)
251             return True
252
253     output += f"\n\nExecuting the command\n $ {execcmd}\n"
254     for line in (output.split('\n')):
255         print(f"| {line}", file=sys.stderr)
256
257     # We run the subprocess and parse its output line by line, so that we can kill it as soon as it detects a timeout
258     process = subprocess.Popen(shlex.split(execcmd), stdout=subprocess.PIPE,
259                                stderr=subprocess.STDOUT, preexec_fn=os.setsid)
260     poll_obj = select.poll()
261     poll_obj.register(process.stdout, select.POLLIN)
262
263     pid = process.pid
264     pgid = os.getpgid(pid)  # We need that to forcefully kill subprocesses when leaving
265     while True:
266         if poll_obj.poll(5):  # Something to read? Do check the timeout status every 5 sec if not
267             line = process.stdout.readline()
268             # From byte array to string, replacing non-representable strings with question marks
269             line = str(line, errors='replace')
270             output = output + line
271             print(f"| {line}", end='', file=sys.stderr)
272             if read_line_lambda != None:
273                 read_line_lambda(line, process)
274         if time.time() - start_time > timeout:
275             with open(f'{cachefile}.timeout', 'w') as outfile:
276                 outfile.write(f'{time.time() - start_time} seconds')
277             break
278         if process.poll() is not None:  # The subprocess ended. Grab all existing output, and return
279             line = 'more'
280             while line != None and line != '':
281                 line = process.stdout.readline()
282                 if line is not None:
283                     # From byte array to string, replacing non-representable strings with question marks
284                     line = str(line, errors='replace')
285                     output = output + line
286                     print(f"| {line}", end='', file=sys.stderr)
287
288             break
289
290     # We want to clean all forked processes in all cases, no matter whether they are still running (timeout) or supposed to be off. The runners easily get clogged with zombies :(
291     try:
292         os.killpg(pgid, signal.SIGTERM)  # Terminate all forked processes, to make sure it's clean whatever the tool does
293         process.terminate()  # No op if it's already stopped but useful on timeouts
294         time.sleep(0.2)  # allow some time for the tool to finish its childs
295         os.killpg(pgid, signal.SIGKILL)  # Finish 'em all, manually
296         os.kill(pid, signal.SIGKILL)  # die! die! die!
297     except ProcessLookupError:
298         pass  # OK, it's gone now
299
300     elapsed = time.time() - start_time
301
302     rc = process.poll()
303     if rc < 0:
304         status = f"Command killed by signal {-rc}, elapsed time: {elapsed}\n"
305     else:
306         status = f"Command return code: {rc}, elapsed time: {elapsed}\n"
307     print(status)
308     output += status
309
310     with open(f'{cachefile}.elapsed', 'w') as outfile:
311         outfile.write(str(elapsed))
312
313     with open(f'{cachefile}.txt', 'w') as outfile:
314         outfile.write(output)
315     with open(f'{cachefile}.md5sum', 'w') as outfile:
316         hashed = hashlib.md5()
317         with open(filename, 'rb') as sourcefile:
318             for chunk in iter(lambda: sourcefile.read(4096), b""):
319                 hashed.update(chunk)
320         outfile.write(hashed.hexdigest())
321
322     return True