forked from OSchip/llvm-project
216 lines
6.8 KiB
Python
Executable File
216 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
"""
|
|
Run the test suite using a separate process for each test file.
|
|
|
|
Each test will run with a time limit of 5 minutes by default.
|
|
|
|
Override the default time limit of 5 minutes by setting
|
|
the environment variable LLDB_TEST_TIMEOUT.
|
|
|
|
E.g., export LLDB_TEST_TIMEOUT=10m
|
|
|
|
Override the time limit for individual tests by setting
|
|
the environment variable LLDB_[TEST NAME]_TIMEOUT.
|
|
|
|
E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
|
|
|
|
Set to "0" to run without time limit.
|
|
|
|
E.g., export LLDB_TEST_TIMEOUT=0
|
|
or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
|
|
"""
|
|
|
|
import multiprocessing
|
|
import os
|
|
import platform
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
|
|
from optparse import OptionParser
|
|
|
|
def get_timeout_command():
|
|
"""Search for a suitable timeout command."""
|
|
if sys.platform.startswith("win32"):
|
|
return None
|
|
try:
|
|
subprocess.call("timeout")
|
|
return "timeout"
|
|
except OSError:
|
|
pass
|
|
try:
|
|
subprocess.call("gtimeout")
|
|
return "gtimeout"
|
|
except OSError:
|
|
pass
|
|
return None
|
|
|
|
timeout_command = get_timeout_command()
|
|
|
|
default_timeout = os.getenv("LLDB_TEST_TIMEOUT") or "5m"
|
|
|
|
# Status codes for running command with timeout.
|
|
eTimedOut, ePassed, eFailed = 124, 0, 1
|
|
|
|
def call_with_timeout(command, timeout):
|
|
"""Run command with a timeout if possible."""
|
|
if os.name != "nt":
|
|
if timeout_command and timeout != "0":
|
|
return subprocess.call([timeout_command, timeout] + command,
|
|
stdin=subprocess.PIPE, close_fds=True)
|
|
return (ePassed if subprocess.call(command, stdin=subprocess.PIPE, close_fds=True) == 0
|
|
else eFailed)
|
|
else:
|
|
if timeout_command and timeout != "0":
|
|
return subprocess.call([timeout_command, timeout] + command,
|
|
stdin=subprocess.PIPE)
|
|
return (ePassed if subprocess.call(command, stdin=subprocess.PIPE) == 0
|
|
else eFailed)
|
|
|
|
def process_dir(root, files, test_root, dotest_options):
|
|
"""Examine a directory for tests, and invoke any found within it."""
|
|
timed_out = []
|
|
failed = []
|
|
passed = []
|
|
for name in files:
|
|
path = os.path.join(root, name)
|
|
|
|
# We're only interested in the test file with the "Test*.py" naming pattern.
|
|
if not name.startswith("Test") or not name.endswith(".py"):
|
|
continue
|
|
|
|
# Neither a symbolically linked file.
|
|
if os.path.islink(path):
|
|
continue
|
|
|
|
script_file = os.path.join(test_root, "dotest.py")
|
|
is_posix = (os.name == "posix")
|
|
split_args = shlex.split(dotest_options, posix=is_posix) if dotest_options else []
|
|
command = ([sys.executable, script_file] +
|
|
split_args +
|
|
["-p", name, root])
|
|
|
|
timeout_name = os.path.basename(os.path.splitext(name)[0]).upper()
|
|
|
|
timeout = os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or default_timeout
|
|
|
|
exit_status = call_with_timeout(command, timeout)
|
|
|
|
if ePassed == exit_status:
|
|
passed.append(name)
|
|
else:
|
|
if eTimedOut == exit_status:
|
|
timed_out.append(name)
|
|
failed.append(name)
|
|
return (timed_out, failed, passed)
|
|
|
|
in_q = None
|
|
out_q = None
|
|
|
|
def process_dir_worker(arg_tuple):
|
|
"""Worker thread main loop when in multithreaded mode.
|
|
Takes one directory specification at a time and works on it."""
|
|
(root, files, test_root, dotest_options) = arg_tuple
|
|
return process_dir(root, files, test_root, dotest_options)
|
|
|
|
def walk_and_invoke(test_root, dotest_options, num_threads):
|
|
"""Look for matched files and invoke test driver on each one.
|
|
In single-threaded mode, each test driver is invoked directly.
|
|
In multi-threaded mode, submit each test driver to a worker
|
|
queue, and then wait for all to complete."""
|
|
|
|
# Collect the test files that we'll run.
|
|
test_work_items = []
|
|
for root, dirs, files in os.walk(test_root, topdown=False):
|
|
test_work_items.append((root, files, test_root, dotest_options))
|
|
|
|
# Run the items, either in a pool (for multicore speedup) or
|
|
# calling each individually.
|
|
if num_threads > 1:
|
|
pool = multiprocessing.Pool(num_threads)
|
|
test_results = pool.map(process_dir_worker, test_work_items)
|
|
else:
|
|
test_results = []
|
|
for work_item in test_work_items:
|
|
test_results.append(process_dir_worker(work_item))
|
|
|
|
timed_out = []
|
|
failed = []
|
|
passed = []
|
|
|
|
for test_result in test_results:
|
|
(dir_timed_out, dir_failed, dir_passed) = test_result
|
|
timed_out += dir_timed_out
|
|
failed += dir_failed
|
|
passed += dir_passed
|
|
|
|
return (timed_out, failed, passed)
|
|
|
|
def main():
|
|
test_root = sys.path[0]
|
|
|
|
parser = OptionParser(usage="""\
|
|
Run lldb test suite using a separate process for each test file.
|
|
|
|
Each test will run with a time limit of 5 minutes by default.
|
|
|
|
Override the default time limit of 5 minutes by setting
|
|
the environment variable LLDB_TEST_TIMEOUT.
|
|
|
|
E.g., export LLDB_TEST_TIMEOUT=10m
|
|
|
|
Override the time limit for individual tests by setting
|
|
the environment variable LLDB_[TEST NAME]_TIMEOUT.
|
|
|
|
E.g., export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=2m
|
|
|
|
Set to "0" to run without time limit.
|
|
|
|
E.g., export LLDB_TEST_TIMEOUT=0
|
|
or export LLDB_TESTCONCURRENTEVENTS_TIMEOUT=0
|
|
""")
|
|
parser.add_option('-o', '--options',
|
|
type='string', action='store',
|
|
dest='dotest_options',
|
|
help="""The options passed to 'dotest.py' if specified.""")
|
|
|
|
parser.add_option('-t', '--threads',
|
|
type='int',
|
|
dest='num_threads',
|
|
help="""The number of threads to use when running tests separately.""")
|
|
|
|
opts, args = parser.parse_args()
|
|
dotest_options = opts.dotest_options
|
|
|
|
if opts.num_threads:
|
|
num_threads = opts.num_threads
|
|
else:
|
|
num_threads_str = os.environ.get("LLDB_TEST_THREADS")
|
|
if num_threads_str:
|
|
num_threads = int(num_threads_str)
|
|
else:
|
|
num_threads = multiprocessing.cpu_count()
|
|
if num_threads < 1:
|
|
num_threads = 1
|
|
|
|
system_info = " ".join(platform.uname())
|
|
(timed_out, failed, passed) = walk_and_invoke(test_root, dotest_options,
|
|
num_threads)
|
|
timed_out = set(timed_out)
|
|
num_tests = len(failed) + len(passed)
|
|
|
|
print "Ran %d tests." % num_tests
|
|
if len(failed) > 0:
|
|
failed.sort()
|
|
print "Failing Tests (%d)" % len(failed)
|
|
for f in failed:
|
|
print "%s: LLDB (suite) :: %s (%s)" % (
|
|
"TIMEOUT" if f in timed_out else "FAIL", f, system_info
|
|
)
|
|
sys.exit(1)
|
|
sys.exit(0)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|