mirror of https://github.com/silx-kit/pyFAI.git
566 lines
18 KiB
Python
Executable File
566 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
# /*##########################################################################
|
|
#
|
|
# Copyright (C) 2015-2018 European Synchrotron Radiation Facility
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
#
|
|
# ###########################################################################*/
|
|
"""Run the tests of the project.
|
|
|
|
This script expects a suite function in <project_package>.test,
|
|
which returns a unittest.TestSuite.
|
|
|
|
Test coverage dependencies: coverage, lxml.
|
|
"""
|
|
|
|
__authors__ = ["Jérôme Kieffer", "Thomas Vincent"]
|
|
__date__ = "18/12/2018"
|
|
__license__ = "MIT"
|
|
|
|
import distutils.util
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import unittest
|
|
import collections
|
|
from argparse import ArgumentParser
|
|
|
|
|
|
class StreamHandlerUnittestReady(logging.StreamHandler):
|
|
"""The unittest class TestResult redefine sys.stdout/err to capture
|
|
stdout/err from tests and to display them only when a test fail.
|
|
This class allow to use unittest stdout-capture by using the last sys.stdout
|
|
and not a cached one.
|
|
"""
|
|
|
|
def emit(self, record):
|
|
"""
|
|
:type record: logging.LogRecord
|
|
"""
|
|
self.stream = sys.stderr
|
|
super(StreamHandlerUnittestReady, self).emit(record)
|
|
|
|
def flush(self):
|
|
pass
|
|
|
|
|
|
def createBasicHandler():
|
|
"""Create the handler using the basic configuration"""
|
|
hdlr = StreamHandlerUnittestReady()
|
|
fs = logging.BASIC_FORMAT
|
|
dfs = None
|
|
fmt = logging.Formatter(fs, dfs)
|
|
hdlr.setFormatter(fmt)
|
|
return hdlr
|
|
|
|
|
|
# Use an handler compatible with unittests, else use_buffer is not working
|
|
logging.root.addHandler(createBasicHandler())
|
|
|
|
# Capture all default warnings
|
|
logging.captureWarnings(True)
|
|
import warnings
|
|
warnings.simplefilter('default')
|
|
|
|
logger = logging.getLogger("run_tests")
|
|
logger.setLevel(logging.WARNING)
|
|
|
|
logger.info("Python %s %s", sys.version, tuple.__itemsize__ * 8)
|
|
|
|
|
|
try:
|
|
import resource
|
|
except ImportError:
|
|
resource = None
|
|
logger.warning("resource module missing")
|
|
|
|
try:
|
|
import importlib
|
|
importer = importlib.import_module
|
|
except ImportError:
|
|
def importer(name):
|
|
module = __import__(name)
|
|
# returns the leaf module, instead of the root module
|
|
subnames = name.split(".")
|
|
subnames.pop(0)
|
|
for subname in subnames:
|
|
module = getattr(module, subname)
|
|
return module
|
|
|
|
try:
|
|
import numpy
|
|
except ImportError:
|
|
logger.warning("numpy missing")
|
|
else:
|
|
logger.info("Numpy %s", numpy.version.version)
|
|
|
|
try:
|
|
import scipy
|
|
except ImportError:
|
|
logger.warning("Scipy missing")
|
|
else:
|
|
logger.info("Scipy %s from %s", scipy.version.version, scipy.__path__)
|
|
|
|
try:
|
|
import fabio
|
|
except ImportError:
|
|
logger.warning("FabIO missing")
|
|
else:
|
|
logger.info("FabIO %s", fabio.version)
|
|
|
|
try:
|
|
import h5py
|
|
except Exception as error:
|
|
logger.warning("h5py missing: %s", error)
|
|
else:
|
|
logger.info("h5py %s", h5py.version.version)
|
|
|
|
try:
|
|
import Cython
|
|
except ImportError:
|
|
logger.warning("Cython missing")
|
|
else:
|
|
logger.info("Cython %s", Cython.__version__)
|
|
|
|
|
|
def get_project_name(root_dir):
|
|
"""Retrieve project name by running python setup.py --name in root_dir.
|
|
|
|
:param str root_dir: Directory where to run the command.
|
|
:return: The name of the project stored in root_dir
|
|
"""
|
|
logger.debug("Getting project name in %s", root_dir)
|
|
p = subprocess.Popen([sys.executable, "setup.py", "--name"],
|
|
shell=False, cwd=root_dir, stdout=subprocess.PIPE)
|
|
name, _stderr_data = p.communicate()
|
|
logger.debug("subprocess ended with rc= %s", p.returncode)
|
|
return name.split()[-1].decode('ascii')
|
|
|
|
|
|
class TextTestResultWithSkipList(unittest.TextTestResult):
|
|
"""Override default TextTestResult to display list of skipped tests at the
|
|
end
|
|
"""
|
|
|
|
def printErrors(self):
|
|
unittest.TextTestResult.printErrors(self)
|
|
# Print skipped tests at the end
|
|
self.printGroupedList("SKIPPED", self.skipped)
|
|
|
|
def printGroupedList(self, flavour, errors):
|
|
grouped = collections.OrderedDict()
|
|
|
|
for test, err in errors:
|
|
if err in grouped:
|
|
grouped[err] = grouped[err] + [test]
|
|
else:
|
|
grouped[err] = [test]
|
|
|
|
for err, tests in grouped.items():
|
|
self.stream.writeln(self.separator1)
|
|
for test in tests:
|
|
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
|
|
self.stream.writeln(self.separator2)
|
|
self.stream.writeln("%s" % err)
|
|
|
|
|
|
class ProfileTextTestResult(unittest.TextTestRunner.resultclass):
|
|
|
|
def __init__(self, *arg, **kwarg):
|
|
unittest.TextTestRunner.resultclass.__init__(self, *arg, **kwarg)
|
|
self.logger = logging.getLogger("memProf")
|
|
self.logger.setLevel(min(logging.INFO, logging.root.level))
|
|
self.logger.handlers.append(logging.FileHandler("profile.log"))
|
|
|
|
def startTest(self, test):
|
|
unittest.TextTestRunner.resultclass.startTest(self, test)
|
|
if resource:
|
|
self.__mem_start = \
|
|
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
|
self.__time_start = time.time()
|
|
|
|
def stopTest(self, test):
|
|
unittest.TextTestRunner.resultclass.stopTest(self, test)
|
|
# see issue 311. For other platform, get size of ru_maxrss in "man getrusage"
|
|
if sys.platform == "darwin":
|
|
ratio = 1e-6
|
|
else:
|
|
ratio = 1e-3
|
|
if resource:
|
|
memusage = (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss -
|
|
self.__mem_start) * ratio
|
|
else:
|
|
memusage = 0
|
|
self.logger.info("Time: %.3fs \t RAM: %.3f Mb\t%s",
|
|
time.time() - self.__time_start,
|
|
memusage, test.id())
|
|
|
|
|
|
def report_uncovered_files(cov, build_dir, inject_xml=None):
|
|
"""
|
|
Generate a report of all modules uncovered by the tests
|
|
|
|
:param cov: test coverage instance
|
|
:param str build_dir: Build directory
|
|
:return: Text report
|
|
"""
|
|
if build_dir.endswith("pyFAI"):
|
|
build_dir = os.path.dirname(build_dir)
|
|
import fnmatch
|
|
|
|
existing_files = []
|
|
for root, _dirnames, filenames in os.walk(PROJECT_NAME):
|
|
for filename in fnmatch.filter(filenames, '*.py'):
|
|
existing_files.append(os.path.join(root, filename))
|
|
existing_files = filter(lambda x: not x.endswith("/setup.py"), existing_files)
|
|
existing_files = filter(lambda x: "/test/" not in x, existing_files)
|
|
existing_files = filter(lambda x: "/third_party/" not in x, existing_files)
|
|
|
|
if inject_xml is None:
|
|
import tempfile
|
|
fd, fn = tempfile.mkstemp(suffix=".xml")
|
|
os.close(fd)
|
|
cov.xml_report(outfile=fn)
|
|
else:
|
|
fn = inject_xml
|
|
from lxml import etree
|
|
xml = etree.parse(fn)
|
|
classes = xml.xpath("//class")
|
|
|
|
build_dir = os.path.abspath(build_dir)
|
|
|
|
covered_files = []
|
|
|
|
for cl in classes:
|
|
filename = cl.get("filename")
|
|
filename = os.path.abspath(filename)
|
|
if filename.startswith(build_dir):
|
|
filename = filename[len(build_dir) + 1:]
|
|
covered_files.append(filename)
|
|
|
|
uncovered_files = set(existing_files) - set(covered_files)
|
|
uncovered_files = sorted(list(uncovered_files))
|
|
text = ""
|
|
text += "Uncovered files (%d)\n" % len(uncovered_files)
|
|
text += "-" * (len(text) - 1) + "\n"
|
|
for filename in uncovered_files:
|
|
text += "* %s\n" % filename
|
|
|
|
return text
|
|
|
|
|
|
def report_rst(cov, package, version="0.0.0", base="", inject_xml=None):
|
|
"""
|
|
Generate a report of test coverage in RST (for Sphinx inclusion)
|
|
|
|
:param cov: test coverage instance
|
|
:param str package: Name of the package
|
|
:param str base: base directory of modules to include in the report
|
|
:return: RST string
|
|
"""
|
|
if inject_xml is None:
|
|
import tempfile
|
|
fd, fn = tempfile.mkstemp(suffix=".xml")
|
|
os.close(fd)
|
|
cov.xml_report(outfile=fn)
|
|
else:
|
|
fn = inject_xml
|
|
from lxml import etree
|
|
xml = etree.parse(fn)
|
|
classes = xml.xpath("//class")
|
|
|
|
line0 = "Test coverage report for %s" % package
|
|
res = [line0, "=" * len(line0), ""]
|
|
res.append("Measured on *%s* version %s, %s" %
|
|
(package, version, time.strftime("%d/%m/%Y")))
|
|
res += ["",
|
|
".. csv-table:: Test suite coverage",
|
|
' :header: "Name", "Stmts", "Exec", "Cover"',
|
|
' :widths: 35, 8, 8, 8',
|
|
'']
|
|
tot_sum_lines = 0
|
|
tot_sum_hits = 0
|
|
|
|
for cl in classes:
|
|
name = cl.get("name")
|
|
fname = cl.get("filename")
|
|
if os.path.abspath(fname).startswith(base):
|
|
lines = cl.find("lines").getchildren()
|
|
hits = [int(i.get("hits")) for i in lines]
|
|
|
|
sum_hits = sum(hits)
|
|
sum_lines = len(lines)
|
|
|
|
cover = 100.0 * sum_hits / sum_lines if sum_lines else 0
|
|
|
|
if base:
|
|
name = os.path.relpath(fname, base)
|
|
|
|
res.append(' "%s", "%s", "%s", "%.1f %%"' %
|
|
(name, sum_lines, sum_hits, cover))
|
|
tot_sum_lines += sum_lines
|
|
tot_sum_hits += sum_hits
|
|
res.append("")
|
|
res.append(' "%s total", "%s", "%s", "%.1f %%"' %
|
|
(package, tot_sum_lines, tot_sum_hits,
|
|
100.0 * tot_sum_hits / tot_sum_lines if tot_sum_lines else 0))
|
|
res.append("")
|
|
return os.linesep.join(res)
|
|
|
|
|
|
def is_debug_python():
|
|
"""Returns true if the Python interpreter is in debug mode."""
|
|
try:
|
|
import sysconfig
|
|
except ImportError: # pragma nocover
|
|
# Python < 2.7
|
|
import distutils.sysconfig as sysconfig
|
|
|
|
if sysconfig.get_config_var("Py_DEBUG"):
|
|
return True
|
|
|
|
return hasattr(sys, "gettotalrefcount")
|
|
|
|
|
|
def build_project(name, root_dir):
|
|
"""Run python setup.py build for the project.
|
|
|
|
Build directory can be modified by environment variables.
|
|
|
|
:param str name: Name of the project.
|
|
:param str root_dir: Root directory of the project
|
|
:return: The path to the directory were build was performed
|
|
"""
|
|
platform = distutils.util.get_platform()
|
|
architecture = "lib.%s-%i.%i" % (platform,
|
|
sys.version_info[0], sys.version_info[1])
|
|
if is_debug_python():
|
|
architecture += "-pydebug"
|
|
|
|
if os.environ.get("PYBUILD_NAME") == name:
|
|
# we are in the debian packaging way
|
|
home = os.environ.get("PYTHONPATH", "").split(os.pathsep)[-1]
|
|
elif os.environ.get("BUILDPYTHONPATH"):
|
|
home = os.path.abspath(os.environ.get("BUILDPYTHONPATH", ""))
|
|
else:
|
|
home = os.path.join(root_dir, "build", architecture)
|
|
|
|
logger.warning("Building %s to %s", name, home)
|
|
p = subprocess.Popen([sys.executable, "setup.py", "build"],
|
|
shell=False, cwd=root_dir)
|
|
logger.debug("subprocess ended with rc= %s", p.wait())
|
|
return home
|
|
|
|
|
|
def import_project_module(project_name, project_dir):
|
|
"""Import project module, from the system of from the project directory"""
|
|
# Prevent importing from source directory
|
|
if (os.path.dirname(os.path.abspath(__file__)) == os.path.abspath(sys.path[0])):
|
|
removed_from_sys_path = sys.path.pop(0)
|
|
logger.info("Patched sys.path, removed: '%s'", removed_from_sys_path)
|
|
|
|
if "--installed" in sys.argv:
|
|
try:
|
|
module = importer(project_name)
|
|
except ImportError:
|
|
raise ImportError(
|
|
"%s not installed: Cannot run tests on installed version" %
|
|
PROJECT_NAME)
|
|
else: # Use built source
|
|
build_dir = build_project(project_name, project_dir)
|
|
|
|
sys.path.insert(0, build_dir)
|
|
logger.warning("Patched sys.path, added: '%s'", build_dir)
|
|
module = importer(project_name)
|
|
return module
|
|
|
|
|
|
def get_test_options(project_module):
|
|
"""Returns the test options if available, else None"""
|
|
module_name = project_module.__name__ + '.test.utilstest'
|
|
logger.info('Import %s', module_name)
|
|
try:
|
|
test_utils = importer(module_name)
|
|
except ImportError:
|
|
logger.warning("No module named '%s'. No test options available.", module_name)
|
|
return None
|
|
|
|
test_options = getattr(test_utils, "test_options", None)
|
|
return test_options
|
|
|
|
|
|
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
PROJECT_NAME = get_project_name(PROJECT_DIR)
|
|
logger.info("Project name: %s", PROJECT_NAME)
|
|
|
|
project_module = import_project_module(PROJECT_NAME, PROJECT_DIR)
|
|
PROJECT_VERSION = getattr(project_module, 'version', '')
|
|
PROJECT_PATH = project_module.__path__[0]
|
|
|
|
test_options = get_test_options(project_module)
|
|
"""Contains extra configuration for the tests."""
|
|
|
|
|
|
epilog = """Environment variables:
|
|
WITH_QT_TEST=False to disable graphical tests
|
|
PYFAI_OPENCL=False to disable OpenCL tests.
|
|
PYFAI_LOW_MEM: set to True to skip all tests >100Mb
|
|
WITH_GL_TEST=False to disable tests using OpenGL
|
|
"""
|
|
parser = ArgumentParser(description='Run the tests.',
|
|
epilog=epilog)
|
|
|
|
parser.add_argument("--installed",
|
|
action="store_true", dest="installed", default=False,
|
|
help=("Test the installed version instead of" +
|
|
"building from the source"))
|
|
parser.add_argument("-c", "--coverage", dest="coverage",
|
|
action="store_true", default=False,
|
|
help=("Report code coverage" +
|
|
"(requires 'coverage' and 'lxml' module)"))
|
|
parser.add_argument("-m", "--memprofile", dest="memprofile",
|
|
action="store_true", default=False,
|
|
help="Report memory profiling")
|
|
parser.add_argument("-v", "--verbose", default=0,
|
|
action="count", dest="verbose",
|
|
help="Increase verbosity. Option -v prints additional " +
|
|
"INFO messages. Use -vv for full verbosity, " +
|
|
"including debug messages and test help strings.")
|
|
parser.add_argument("--qt-binding", dest="qt_binding", default=None,
|
|
help="Force using a Qt binding, from 'PyQt4', 'PyQt5', or 'PySide'")
|
|
if test_options is not None:
|
|
test_options.add_parser_argument(parser)
|
|
|
|
default_test_name = "%s.test.suite" % PROJECT_NAME
|
|
parser.add_argument("test_name", nargs='*',
|
|
default=(default_test_name,),
|
|
help="Test names to run (Default: %s)" % default_test_name)
|
|
options = parser.parse_args()
|
|
sys.argv = [sys.argv[0]]
|
|
|
|
|
|
test_verbosity = 1
|
|
use_buffer = True
|
|
if options.verbose == 1:
|
|
logging.root.setLevel(logging.INFO)
|
|
logger.info("Set log level: INFO")
|
|
test_verbosity = 2
|
|
use_buffer = False
|
|
elif options.verbose > 1:
|
|
logging.root.setLevel(logging.DEBUG)
|
|
logger.info("Set log level: DEBUG")
|
|
test_verbosity = 2
|
|
use_buffer = False
|
|
|
|
if options.coverage:
|
|
logger.info("Running test-coverage")
|
|
import coverage
|
|
try:
|
|
coverage_class = coverage.Coverage
|
|
except AttributeError:
|
|
coverage_class = coverage.coverage
|
|
print("|%s|" % PROJECT_NAME)
|
|
cov = coverage_class(include=["*/%s/*" % PROJECT_NAME],
|
|
omit=["*test*", "*third_party*", "*/setup.py"])
|
|
cov.start()
|
|
|
|
if options.qt_binding:
|
|
binding = options.qt_binding.lower()
|
|
if binding == "pyqt4":
|
|
logger.info("Force using PyQt4")
|
|
if sys.version < "3.0.0":
|
|
try:
|
|
import sip
|
|
sip.setapi("QString", 2)
|
|
sip.setapi("QVariant", 2)
|
|
except Exception:
|
|
logger.warning("Cannot set sip API")
|
|
import PyQt4.QtCore # noqa
|
|
elif binding == "pyqt5":
|
|
logger.info("Force using PyQt5")
|
|
import PyQt5.QtCore # noqa
|
|
elif binding == "pyside":
|
|
logger.info("Force using PySide")
|
|
import PySide.QtCore # noqa
|
|
elif binding == "pyside2":
|
|
logger.info("Force using PySide2")
|
|
import PySide2.QtCore # noqa
|
|
else:
|
|
raise ValueError("Qt binding '%s' is unknown" % options.qt_binding)
|
|
|
|
# Run the tests
|
|
runnerArgs = {}
|
|
runnerArgs["verbosity"] = test_verbosity
|
|
runnerArgs["buffer"] = use_buffer
|
|
if options.memprofile:
|
|
runnerArgs["resultclass"] = ProfileTextTestResult
|
|
else:
|
|
runnerArgs["resultclass"] = TextTestResultWithSkipList
|
|
runner = unittest.TextTestRunner(**runnerArgs)
|
|
|
|
logger.warning("Test %s %s from %s",
|
|
PROJECT_NAME, PROJECT_VERSION, PROJECT_PATH)
|
|
|
|
test_module_name = PROJECT_NAME + '.test'
|
|
logger.info('Import %s', test_module_name)
|
|
test_module = importer(test_module_name)
|
|
test_suite = unittest.TestSuite()
|
|
|
|
if test_options is not None:
|
|
# Configure the test options according to the command lines and the the environment
|
|
test_options.configure(options)
|
|
else:
|
|
logger.warning("No test options available.")
|
|
|
|
|
|
if not options.test_name:
|
|
# Do not use test loader to avoid cryptic exception
|
|
# when an error occur during import
|
|
project_test_suite = getattr(test_module, 'suite')
|
|
test_suite.addTest(project_test_suite())
|
|
else:
|
|
test_suite.addTest(
|
|
unittest.defaultTestLoader.loadTestsFromNames(options.test_name))
|
|
|
|
# Display the result when using CTRL-C
|
|
unittest.installHandler()
|
|
|
|
result = runner.run(test_suite)
|
|
|
|
if result.wasSuccessful():
|
|
exit_status = 0
|
|
else:
|
|
exit_status = 1
|
|
|
|
|
|
if options.coverage:
|
|
cov.stop()
|
|
cov.save()
|
|
with open("coverage.rst", "w") as fn:
|
|
fn.write(report_rst(cov, PROJECT_NAME, PROJECT_VERSION, PROJECT_PATH))
|
|
print(cov.report())
|
|
print("")
|
|
print(report_uncovered_files(cov, PROJECT_PATH))
|
|
|
|
sys.exit(exit_status)
|