Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'add_wait_for_to_py_comm_binding' into 'master'
[simgrid.git] / teshsuite / smpi / MBI / MBIutils.py
1 # Copyright 2021-2022. The MBI project. All rights reserved. 
2 # This program is free software; you can redistribute it and/or modify it under the terms of the license (GNU GPL).
3
4 import os
5 import time
6 import subprocess
7 import sys
8 import re
9 import shlex
10 import select
11 import signal
12 import hashlib
13
14 class AbstractTool:
15     def ensure_image(self, params=""):
16         """Verify that this is executed from the right docker image, and complain if not."""
17         if os.path.exists("/MBI") or os.path.exists("trust_the_installation"):
18             print("This seems to be a MBI docker image. Good.")
19         else:
20             print("Please run this script in a MBI docker image. Run these commands:")
21             print("  docker build -f Dockerfile -t mpi-bugs-initiative:latest . # Only the first time")
22             print(f"  docker run -it --rm --name MIB --volume $(pwd):/MBI mpi-bugs-initiative /MBI/MBI.py {params}")
23             sys.exit(1)
24
25     def build(self, rootdir, cached=True):
26         """Rebuilds the tool binaries. By default, we try to reuse the existing build."""
27         print ("Nothing to do to rebuild the tool binaries.")
28
29     def setup(self, rootdir):
30         """
31         Ensure that this tool (previously built) is usable in this environment: setup the PATH, etc.
32         This is called only once for all tests, from the logs directory.
33         """
34         pass
35
36     def run(execcmd, filename, binary, id, timeout):
37         """Compile that test code and anaylse it with the Tool if needed (a cache system should be used)"""
38         pass
39
40     def teardown(self):
41         """
42         Clean the results of all test runs: remove temp files and binaries.
43         This is called only once for all tests, from the logs directory.
44         """
45         pass
46
47     def parse(self, cachefile):
48         """Read the result of a previous run from the cache, and compute the test outcome"""
49         return 'failure'
50
51 # Associate all possible detailed outcome to a given error scope. Scopes must be sorted alphabetically.
52 possible_details = {
53     # scope limited to one call
54     'InvalidBuffer':'AInvalidParam', 'InvalidCommunicator':'AInvalidParam', 'InvalidDatatype':'AInvalidParam', 'InvalidRoot':'AInvalidParam', 'InvalidTag':'AInvalidParam', 'InvalidWindow':'AInvalidParam', 'InvalidOperator':'AInvalidParam', 'InvalidOtherArg':'AInvalidParam', 'ActualDatatype':'AInvalidParam',
55     'InvalidSrcDest':'AInvalidParam', 
56     # scope: Process-wide
57 #    'OutOfInitFini':'BInitFini', 
58     'CommunicatorLeak':'BResLeak', 'DatatypeLeak':'BResLeak', 'GroupLeak':'BResLeak', 'OperatorLeak':'BResLeak', 'TypeLeak':'BResLeak', 'RequestLeak':'BResLeak',
59     'MissingStart':'BReqLifecycle', 'MissingWait':'BReqLifecycle',
60     'LocalConcurrency':'BLocalConcurrency',
61     # scope: communicator
62     'CallMatching':'DMatch', 
63     'CommunicatorMatching':'CMatch', 'DatatypeMatching':'CMatch', 'OperatorMatching':'CMatch', 'RootMatching':'CMatch', 'TagMatching':'CMatch',
64     'MessageRace':'DRace', 
65     
66     'GlobalConcurrency':'DGlobalConcurrency',
67     # larger scope
68 #    'BufferingHazard':'EBufferingHazard',
69     'OK':'FOK'}
70
71 error_scope = {
72     'AInvalidParam':'single call',
73     'BResLeak':'single process',
74 #    'BInitFini':'single process',
75     'BReqLifecycle':'single process',
76     'BLocalConcurrency':'single process',
77     'CMatch':'multi-processes',
78     'DRace':'multi-processes',
79     'DMatch':'multi-processes',
80     'DGlobalConcurrency':'multi-processes',
81 #    'EBufferingHazard':'system',
82     'FOK':'correct executions'
83 }
84
85 displayed_name = {
86     'AInvalidParam':'Invalid parameter',
87     'BResLeak':'Resource leak',
88 #    'BInitFini':'MPI call before initialization/after finalization',
89     'BReqLifecycle':'Request lifecycle',
90     'BLocalConcurrency':'Local concurrency',
91     'CMatch':'Parameter matching',
92     'DMatch':"Call ordering",
93     'DRace':'Message race',
94     'DGlobalConcurrency':'Global concurrency',
95     'EBufferingHazard':'Buffering hazard',
96     'FOK':"Correct execution",
97
98     'aislinn':'Aislinn','civl':'CIVL','hermes':'Hermes', 'isp':'ISP','itac':'ITAC', 'simgrid':'Mc SimGrid', 'smpi':'SMPI','smpivg':'SMPI+VG', 'mpisv':'MPI-SV', 'must':'MUST', 'parcoach':'PARCOACH'
99 }
100
101 def parse_one_code(filename):
102     """
103     Reads the header of the provided filename, and extract a list of todo item, each of them being a (cmd, expect, test_num) tupple.
104     The test_num is useful to build a log file containing both the binary and the test_num, when there is more than one test in the same binary.
105     """
106     res = []
107     test_num = 0
108     with open(filename, "r") as input:
109         state = 0  # 0: before header; 1: in header; 2; after header
110         line_num = 1
111         for line in input:
112             if re.match(".*BEGIN_MBI_TESTS.*", line):
113                 if state == 0:
114                     state = 1
115                 else:
116                     raise Exception(f"MBI_TESTS header appears a second time at line {line_num}: \n{line}")
117             elif re.match(".*END_MBI_TESTS.*", line):
118                 if state == 1:
119                     state = 2
120                 else:
121                     raise Exception(f"Unexpected end of MBI_TESTS header at line {line_num}: \n{line}")
122             if state == 1 and re.match("\s+\$ ?.*", line):
123                 m = re.match('\s+\$ ?(.*)', line)
124                 cmd = m.group(1)
125                 nextline = next(input)
126                 detail = 'OK'
127                 if re.match('[ |]*OK *', nextline):
128                     expect = 'OK'
129                 else:
130                     m = re.match('[ |]*ERROR: *(.*)', nextline)
131                     if not m:
132                         raise Exception(
133                             f"\n{filename}:{line_num}: MBI parse error: Test not followed by a proper 'ERROR' line:\n{line}{nextline}")
134                     expect = 'ERROR'
135                     detail = m.group(1)
136                     if detail not in possible_details:
137                         raise Exception(
138                             f"\n{filename}:{line_num}: MBI parse error: Detailled outcome {detail} is not one of the allowed ones.")
139                 test = {'filename': filename, 'id': test_num, 'cmd': cmd, 'expect': expect, 'detail': detail}
140                 res.append(test.copy())
141                 test_num += 1
142                 line_num += 1
143
144     if state == 0:
145         raise Exception(f"MBI_TESTS header not found in file '{filename}'.")
146     if state == 1:
147         raise Exception(f"MBI_TESTS header not properly ended in file '{filename}'.")
148
149     if len(res) == 0:
150         raise Exception(f"No test found in {filename}. Please fix it.")
151     return res
152
153 def categorize(tool, toolname, test_ID, expected):
154     outcome = tool.parse(test_ID)
155
156     if not os.path.exists(f'{test_ID}.elapsed') and not os.path.exists(f'logs/{toolname}/{test_ID}.elapsed'):
157         if outcome == 'failure':
158             elapsed = 0
159         else:
160             raise Exception(f"Invalid test result: {test_ID}.txt exists but not {test_ID}.elapsed")
161     else:
162         with open(f'{test_ID}.elapsed' if os.path.exists(f'{test_ID}.elapsed') else f'logs/{toolname}/{test_ID}.elapsed', 'r') as infile:
163             elapsed = infile.read()
164
165     # Properly categorize this run
166     if outcome == 'timeout':
167         res_category = 'timeout'
168         if elapsed is None:
169             diagnostic = f'hard timeout'
170         else:
171             diagnostic = f'timeout after {elapsed} sec'
172     elif outcome == 'failure':
173         res_category = 'failure'
174         diagnostic = f'tool error, or test not run'
175     elif outcome == 'UNIMPLEMENTED':
176         res_category = 'unimplemented'
177         diagnostic = f'coverage issue'
178     elif outcome == 'other':
179         res_category = 'other'
180         diagnostic = f'inconclusive run'
181     elif expected == 'OK':
182         if outcome == 'OK':
183             res_category = 'TRUE_NEG'
184             diagnostic = f'correctly reported no error'
185         else:
186             res_category = 'FALSE_POS'
187             diagnostic = f'reported an error in a correct code'
188     elif expected == 'ERROR':
189         if outcome == 'OK':
190             res_category = 'FALSE_NEG'
191             diagnostic = f'failed to detect an error'
192         else:
193             res_category = 'TRUE_POS'
194             diagnostic =  f'correctly detected an error'
195     else:
196         raise Exception(f"Unexpected expectation: {expected} (must be OK or ERROR)")
197
198     return (res_category, elapsed, diagnostic, outcome)
199
200
201 def run_cmd(buildcmd, execcmd, cachefile, filename, binary, timeout, batchinfo, read_line_lambda=None):
202     """
203     Runs the test on need. Returns True if the test was ran, and False if it was cached.
204     
205     The result is cached if possible, and the test is rerun only if the `test.txt` (containing the tool output) or the `test.elapsed` (containing the timing info) do not exist, or if `test.md5sum` (containing the md5sum of the code to compile) does not match.
206
207     Parameters:
208      - buildcmd and execcmd are shell commands to run. buildcmd can be any shell line (incuding && groups), but execcmd must be a single binary to run. 
209      - cachefile is the name of the test
210      - filename is the source file containing the code
211      - binary the file name in which to compile the code
212      - batchinfo: something like "1/1" to say that this run is the only batch (see -b parameter of MBI.py)
213      - read_line_lambda: a lambda to which each line of the tool output is feed ASAP. It allows MUST to interrupt the execution when a deadlock is reported.
214     """
215     if os.path.exists(f'{cachefile}.txt') and os.path.exists(f'{cachefile}.elapsed') and os.path.exists(f'{cachefile}.md5sum'):
216         hash_md5 = hashlib.md5()
217         with open(filename, 'rb') as sourcefile :
218             for chunk in iter(lambda: sourcefile.read(4096), b""):
219                 hash_md5.update(chunk)
220         newdigest = hash_md5.hexdigest()
221         with open(f'{cachefile}.md5sum', 'r') as md5file:
222             olddigest = md5file.read()
223         #print(f'Old digest: {olddigest}; New digest: {newdigest}')
224         if olddigest == newdigest:
225             print(f" (result cached -- digest: {olddigest})")
226             return False
227         else:
228             os.remove(f'{cachefile}.txt')
229
230     print(f"Wait up to {timeout} seconds")
231
232     start_time = time.time()
233     if buildcmd == None:
234         output = f"No need to compile {binary}.c (batchinfo:{batchinfo})\n\n"
235     else:
236         output = f"Compiling {binary}.c (batchinfo:{batchinfo})\n\n"
237         output += f"$ {buildcmd}\n"
238
239         compil = subprocess.run(buildcmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
240         if compil.stdout is not None:
241             output += str(compil.stdout, errors='replace')
242         if compil.returncode != 0:
243             output += f"Compilation of {binary}.c raised an error (retcode: {compil.returncode})"
244             for line in (output.split('\n')):
245                 print(f"| {line}", file=sys.stderr)
246             with open(f'{cachefile}.elapsed', 'w') as outfile:
247                 outfile.write(str(time.time() - start_time))
248             with open(f'{cachefile}.txt', 'w') as outfile:
249                 outfile.write(output)
250             return True
251
252     output += f"\n\nExecuting the command\n $ {execcmd}\n"
253     for line in (output.split('\n')):
254         print(f"| {line}", file=sys.stderr)
255
256     # We run the subprocess and parse its output line by line, so that we can kill it as soon as it detects a timeout
257     process = subprocess.Popen(shlex.split(execcmd), stdout=subprocess.PIPE,
258                                stderr=subprocess.STDOUT, preexec_fn=os.setsid)
259     poll_obj = select.poll()
260     poll_obj.register(process.stdout, select.POLLIN)
261
262     pid = process.pid
263     pgid = os.getpgid(pid)  # We need that to forcefully kill subprocesses when leaving
264     outcome = None
265     while True:
266         if poll_obj.poll(5):  # Something to read? Do check the timeout status every 5 sec if not
267             line = process.stdout.readline()
268             # From byte array to string, replacing non-representable strings with question marks
269             line = str(line, errors='replace')
270             output = output + line
271             print(f"| {line}", end='', file=sys.stderr)
272             if read_line_lambda != None:
273                 read_line_lambda(line, process)
274         if time.time() - start_time > timeout:
275             outcome = 'timeout'
276             with open(f'{cachefile}.timeout', 'w') as outfile:
277                 outfile.write(f'{time.time() - start_time} seconds')
278             break
279         if process.poll() is not None:  # The subprocess ended. Grab all existing output, and return
280             line = 'more'
281             while line != None and line != '':
282                 line = process.stdout.readline()
283                 if line is not None:
284                     # From byte array to string, replacing non-representable strings with question marks
285                     line = str(line, errors='replace')
286                     output = output + line
287                     print(f"| {line}", end='', file=sys.stderr)
288
289             break
290
291     # We want to clean all forked processes in all cases, no matter whether they are still running (timeout) or supposed to be off. The runners easily get clogged with zombies :(
292     try:
293         os.killpg(pgid, signal.SIGTERM)  # Terminate all forked processes, to make sure it's clean whatever the tool does
294         process.terminate()  # No op if it's already stopped but useful on timeouts
295         time.sleep(0.2)  # allow some time for the tool to finish its childs
296         os.killpg(pgid, signal.SIGKILL)  # Finish 'em all, manually
297         os.kill(pid, signal.SIGKILL)  # die! die! die!
298     except ProcessLookupError:
299         pass  # OK, it's gone now
300
301     elapsed = time.time() - start_time
302
303     rc = process.poll()
304     if rc < 0:
305         status = f"Command killed by signal {-rc}, elapsed time: {elapsed}\n"
306     else:
307         status = f"Command return code: {rc}, elapsed time: {elapsed}\n"
308     print(status)
309     output += status
310
311     with open(f'{cachefile}.elapsed', 'w') as outfile:
312         outfile.write(str(elapsed))
313
314     with open(f'{cachefile}.txt', 'w') as outfile:
315         outfile.write(output)
316     with open(f'{cachefile}.md5sum', 'w') as outfile:
317         hash = hashlib.md5()
318         with open(filename, 'rb') as sourcefile :
319             for chunk in iter(lambda: sourcefile.read(4096), b""):
320                 hash.update(chunk)
321         outfile.write(hash.hexdigest())
322     
323     return True