llvm-project/lldb/test/dosep.py

216 lines
6.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""
Run the test suite using a separate process for each test file.
Each test will run with a time limit of 5 minutes by default.
Override the default time limit of 5 minutes by setting
the environment variable LLDB_TEST_TIMEOUT.
E.g., export LLDB_TEST_TIMEOUT=10m
Override the time limit for individual tests by setting
the environment variable LLDB_[TEST NAME]_TIMEOUT.
E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
Set to "0" to run without time limit.
E.g., export LLDB_TEST_TIMEOUT=0
or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
"""
import multiprocessing
import os
import platform
import shlex
import subprocess
import sys
from optparse import OptionParser
def get_timeout_command():
"""Search for a suitable timeout command."""
if sys.platform.startswith("win32"):
return None
try:
subprocess.call("timeout")
return "timeout"
except OSError:
pass
try:
subprocess.call("gtimeout")
return "gtimeout"
except OSError:
pass
return None
timeout_command = get_timeout_command()
default_timeout = os.getenv("LLDB_TEST_TIMEOUT") or "5m"
# Status codes for running command with timeout.
eTimedOut, ePassed, eFailed = 124, 0, 1
def call_with_timeout(command, timeout):
"""Run command with a timeout if possible."""
if os.name != "nt":
if timeout_command and timeout != "0":
return subprocess.call([timeout_command, timeout] + command,
stdin=subprocess.PIPE, close_fds=True)
return (ePassed if subprocess.call(command, stdin=subprocess.PIPE, close_fds=True) == 0
else eFailed)
else:
if timeout_command and timeout != "0":
return subprocess.call([timeout_command, timeout] + command,
stdin=subprocess.PIPE)
return (ePassed if subprocess.call(command, stdin=subprocess.PIPE) == 0
else eFailed)
def process_dir(root, files, test_root, dotest_options):
"""Examine a directory for tests, and invoke any found within it."""
timed_out = []
failed = []
passed = []
for name in files:
path = os.path.join(root, name)
# We're only interested in the test file with the "Test*.py" naming pattern.
if not name.startswith("Test") or not name.endswith(".py"):
continue
# Neither a symbolically linked file.
if os.path.islink(path):
continue
script_file = os.path.join(test_root, "dotest.py")
is_posix = (os.name == "posix")
split_args = shlex.split(dotest_options, posix=is_posix) if dotest_options else []
command = ([sys.executable, script_file] +
split_args +
["-p", name, root])
timeout_name = os.path.basename(os.path.splitext(name)[0]).upper()
timeout = os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or default_timeout
exit_status = call_with_timeout(command, timeout)
if ePassed == exit_status:
passed.append(name)
else:
if eTimedOut == exit_status:
timed_out.append(name)
failed.append(name)
return (timed_out, failed, passed)
in_q = None
out_q = None
def process_dir_worker(arg_tuple):
"""Worker thread main loop when in multithreaded mode.
Takes one directory specification at a time and works on it."""
(root, files, test_root, dotest_options) = arg_tuple
return process_dir(root, files, test_root, dotest_options)
def walk_and_invoke(test_root, dotest_options, num_threads):
"""Look for matched files and invoke test driver on each one.
In single-threaded mode, each test driver is invoked directly.
In multi-threaded mode, submit each test driver to a worker
queue, and then wait for all to complete."""
# Collect the test files that we'll run.
test_work_items = []
for root, dirs, files in os.walk(test_root, topdown=False):
test_work_items.append((root, files, test_root, dotest_options))
# Run the items, either in a pool (for multicore speedup) or
# calling each individually.
if num_threads > 1:
pool = multiprocessing.Pool(num_threads)
test_results = pool.map(process_dir_worker, test_work_items)
else:
test_results = []
for work_item in test_work_items:
test_results.append(process_dir_worker(work_item))
timed_out = []
failed = []
passed = []
for test_result in test_results:
(dir_timed_out, dir_failed, dir_passed) = test_result
timed_out += dir_timed_out
failed += dir_failed
passed += dir_passed
return (timed_out, failed, passed)
def main():
test_root = sys.path[0]
parser = OptionParser(usage="""\
Run lldb test suite using a separate process for each test file.
Each test will run with a time limit of 5 minutes by default.
Override the default time limit of 5 minutes by setting
the environment variable LLDB_TEST_TIMEOUT.
E.g., export LLDB_TEST_TIMEOUT=10m
Override the time limit for individual tests by setting
the environment variable LLDB_[TEST NAME]_TIMEOUT.
E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
Set to "0" to run without time limit.
E.g., export LLDB_TEST_TIMEOUT=0
or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
""")
parser.add_option('-o', '--options',
type='string', action='store',
dest='dotest_options',
help="""The options passed to 'dotest.py' if specified.""")
parser.add_option('-t', '--threads',
type='int',
dest='num_threads',
help="""The number of threads to use when running tests separately.""")
opts, args = parser.parse_args()
dotest_options = opts.dotest_options
if opts.num_threads:
num_threads = opts.num_threads
else:
num_threads_str = os.environ.get("LLDB_TEST_THREADS")
if num_threads_str:
num_threads = int(num_threads_str)
else:
num_threads = multiprocessing.cpu_count()
if num_threads < 1:
num_threads = 1
system_info = " ".join(platform.uname())
(timed_out, failed, passed) = walk_and_invoke(test_root, dotest_options,
num_threads)
timed_out = set(timed_out)
num_tests = len(failed) + len(passed)
print "Ran %d tests." % num_tests
if len(failed) > 0:
failed.sort()
print "Failing Tests (%d)" % len(failed)
for f in failed:
print "%s: LLDB (suite) :: %s (%s)" % (
"TIMEOUT" if f in timed_out else "FAIL", f, system_info
)
sys.exit(1)
sys.exit(0)
if __name__ == '__main__':
main()