[scan-build-py] use subprocess wrapper

llvm-svn: 293396
This commit is contained in:
Laszlo Nagy 2017-01-28 22:48:26 +00:00
parent a897f7cd40
commit 46fc18a9a9
6 changed files with 88 additions and 91 deletions

View File

@ -3,10 +3,13 @@
#
# This file is distributed under the University of Illinois Open Source
# License. See LICENSE.TXT for details.
"""
This module responsible to run the Clang static analyzer against any build
and generate reports.
"""
""" This module is a collection of methods commonly used in this project. """
import functools
import logging
import os
import os.path
import subprocess
import sys
def duplicate_check(method):
@ -33,16 +36,35 @@ def duplicate_check(method):
def tempdir():
""" Return the default temorary directory. """
from os import getenv
return getenv('TMPDIR', getenv('TEMP', getenv('TMP', '/tmp')))
return os.getenv('TMPDIR', os.getenv('TEMP', os.getenv('TMP', '/tmp')))
def run_command(command, cwd=None):
""" Run a given command and report the execution.
:param command: array of tokens
:param cwd: the working directory where the command will be executed
:return: output of the command
"""
def decode_when_needed(result):
""" check_output returns bytes or string depend on python version """
return result.decode('utf-8') if isinstance(result, bytes) else result
try:
directory = os.path.abspath(cwd) if cwd else os.getcwd()
logging.debug('exec command %s in %s', command, directory)
output = subprocess.check_output(command,
cwd=directory,
stderr=subprocess.STDOUT)
return decode_when_needed(output).splitlines()
except subprocess.CalledProcessError as ex:
ex.output = decode_when_needed(ex.output).splitlines()
raise ex
def initialize_logging(verbose_level):
""" Output content controlled by the verbosity level. """
import sys
import os.path
import logging
level = logging.WARNING - min(logging.WARNING, (10 * verbose_level))
if verbose_level <= 3:
@ -57,9 +79,6 @@ def initialize_logging(verbose_level):
def command_entry_point(function):
""" Decorator for command entry points. """
import functools
import logging
@functools.wraps(function)
def wrapper(*args, **kwargs):

View File

@ -9,8 +9,7 @@ Since Clang command line interface is so rich, but this project is using only
a subset of that, it makes sense to create a function specific wrapper. """
import re
import subprocess
import logging
from libscanbuild import run_command
from libscanbuild.shell import decode
__all__ = ['get_version', 'get_arguments', 'get_checkers']
@ -25,8 +24,9 @@ def get_version(clang):
:param clang: the compiler we are using
:return: the version string printed to stderr """
output = subprocess.check_output([clang, '-v'], stderr=subprocess.STDOUT)
return output.decode('utf-8').splitlines()[0]
output = run_command([clang, '-v'])
# the relevant version info is in the first line
return output[0]
def get_arguments(command, cwd):
@ -38,12 +38,11 @@ def get_arguments(command, cwd):
cmd = command[:]
cmd.insert(1, '-###')
logging.debug('exec command in %s: %s', cwd, ' '.join(cmd))
output = subprocess.check_output(cmd, cwd=cwd, stderr=subprocess.STDOUT)
output = run_command(cmd, cwd=cwd)
# The relevant information is in the last line of the output.
# Don't check if finding last line fails, would throw exception anyway.
last_line = output.decode('utf-8').splitlines()[-1]
last_line = output[-1]
if re.search(r'clang(.*): error:', last_line):
raise Exception(last_line)
return decode(last_line)
@ -141,9 +140,7 @@ def get_checkers(clang, plugins):
load = [elem for plugin in plugins for elem in ['-load', plugin]]
cmd = [clang, '-cc1'] + load + ['-analyzer-checker-help']
logging.debug('exec command: %s', ' '.join(cmd))
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
lines = output.decode('utf-8').splitlines()
lines = run_command(cmd)
is_active_checker = is_active(get_active_checkers(clang, plugins))

View File

@ -31,7 +31,7 @@ import argparse
import logging
import subprocess
from libear import build_libear, TemporaryDirectory
from libscanbuild import command_entry_point
from libscanbuild import command_entry_point, run_command
from libscanbuild import duplicate_check, tempdir, initialize_logging
from libscanbuild.compilation import split_command
from libscanbuild.shell import encode, decode
@ -44,6 +44,7 @@ US = chr(0x1f)
COMPILER_WRAPPER_CC = 'intercept-cc'
COMPILER_WRAPPER_CXX = 'intercept-c++'
WRAPPER_ONLY_PLATFORMS = frozenset({'win32', 'cygwin'})
@command_entry_point
@ -238,24 +239,21 @@ def is_preload_disabled(platform):
the path and, if so, (2) whether the output of executing 'csrutil status'
contains 'System Integrity Protection status: enabled'.
Same problem on linux when SELinux is enabled. The status query program
'sestatus' and the output when it's enabled 'SELinux status: enabled'. """
:param platform: name of the platform (returned by sys.platform),
:return: True if library preload will fail by the dynamic linker. """
if platform == 'darwin':
pattern = re.compile(r'System Integrity Protection status:\s+enabled')
if platform in WRAPPER_ONLY_PLATFORMS:
return True
elif platform == 'darwin':
command = ['csrutil', 'status']
elif platform in {'linux', 'linux2'}:
pattern = re.compile(r'SELinux status:\s+enabled')
command = ['sestatus']
pattern = re.compile(r'System Integrity Protection status:\s+enabled')
try:
return any(pattern.match(line) for line in run_command(command))
except:
return False
else:
return False
try:
lines = subprocess.check_output(command).decode('utf-8')
return any((pattern.match(line) for line in lines.splitlines()))
except:
return False
def entry_hash(entry):
""" Implement unique hash method for compilation database entries. """

View File

@ -12,6 +12,7 @@ import tempfile
import functools
import subprocess
import logging
from libscanbuild import run_command
from libscanbuild.compilation import classify_source, compiler_language
from libscanbuild.clang import get_version, get_arguments
from libscanbuild.shell import decode
@ -100,7 +101,7 @@ def run(opts):
@require(['clang', 'directory', 'flags', 'file', 'output_dir', 'language',
'error_type', 'error_output', 'exit_code'])
'error_output', 'exit_code'])
def report_failure(opts):
""" Create report when analyzer failed.
@ -108,30 +109,36 @@ def report_failure(opts):
randomly. The compiler output also captured into '.stderr.txt' file.
And some more execution context also saved into '.info.txt' file. """
def extension(opts):
def extension():
""" Generate preprocessor file extension. """
mapping = {'objective-c++': '.mii', 'objective-c': '.mi', 'c++': '.ii'}
return mapping.get(opts['language'], '.i')
def destination(opts):
def destination():
""" Creates failures directory if not exits yet. """
name = os.path.join(opts['output_dir'], 'failures')
if not os.path.isdir(name):
os.makedirs(name)
return name
failures_dir = os.path.join(opts['output_dir'], 'failures')
if not os.path.isdir(failures_dir):
os.makedirs(failures_dir)
return failures_dir
error = opts['error_type']
(handle, name) = tempfile.mkstemp(suffix=extension(opts),
# Classify error type: when Clang terminated by a signal it's a 'Crash'.
# (python subprocess Popen.returncode is negative when child terminated
# by signal.) Everything else is 'Other Error'.
error = 'crash' if opts['exit_code'] < 0 else 'other_error'
# Create preprocessor output file name. (This is blindly following the
# Perl implementation.)
(handle, name) = tempfile.mkstemp(suffix=extension(),
prefix='clang_' + error + '_',
dir=destination(opts))
dir=destination())
os.close(handle)
# Execute Clang again, but run the syntax check only.
cwd = opts['directory']
cmd = get_arguments([opts['clang'], '-fsyntax-only', '-E'] +
opts['flags'] + [opts['file'], '-o', name], cwd)
logging.debug('exec command in %s: %s', cwd, ' '.join(cmd))
subprocess.call(cmd, cwd=cwd)
cmd = get_arguments(
[opts['clang'], '-fsyntax-only', '-E'
] + opts['flags'] + [opts['file'], '-o', name], cwd)
run_command(cmd, cwd=cwd)
# write general information about the crash
with open(name + '.info.txt', 'w') as handle:
handle.write(opts['file'] + os.linesep)
@ -144,11 +151,6 @@ def report_failure(opts):
with open(name + '.stderr.txt', 'w') as handle:
handle.writelines(opts['error_output'])
handle.close()
# return with the previous step exit code and output
return {
'error_output': opts['error_output'],
'exit_code': opts['exit_code']
}
@require(['clang', 'directory', 'flags', 'direct_args', 'file', 'output_dir',
@ -158,7 +160,7 @@ def run_analyzer(opts, continuation=report_failure):
output of the analysis and returns with it. If failure reports are
requested, it calls the continuation to generate it. """
def output():
def target():
""" Creates output file name for reports. """
if opts['output_format'] in {'plist', 'plist-html'}:
(handle, name) = tempfile.mkstemp(prefix='report-',
@ -168,30 +170,20 @@ def run_analyzer(opts, continuation=report_failure):
return name
return opts['output_dir']
cwd = opts['directory']
cmd = get_arguments([opts['clang'], '--analyze'] + opts['direct_args'] +
opts['flags'] + [opts['file'], '-o', output()],
cwd)
logging.debug('exec command in %s: %s', cwd, ' '.join(cmd))
child = subprocess.Popen(cmd,
cwd=cwd,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output = child.stdout.readlines()
child.stdout.close()
# do report details if it were asked
child.wait()
if opts.get('output_failures', False) and child.returncode:
error_type = 'crash' if child.returncode & 127 else 'other_error'
opts.update({
'error_type': error_type,
'error_output': output,
'exit_code': child.returncode
})
return continuation(opts)
# return the output for logging and exit code for testing
return {'error_output': output, 'exit_code': child.returncode}
try:
cwd = opts['directory']
cmd = get_arguments([opts['clang'], '--analyze'] +
opts['direct_args'] + opts['flags'] +
[opts['file'], '-o', target()],
cwd)
output = run_command(cmd, cwd=cwd)
return {'error_output': output, 'exit_code': 0}
except subprocess.CalledProcessError as ex:
result = {'error_output': ex.output, 'exit_code': ex.returncode}
if opts.get('output_failures', False):
opts.update(result)
continuation(opts)
return result
@require(['flags', 'force_debug'])

View File

@ -65,11 +65,10 @@ class InterceptUtilTest(unittest.TestCase):
DISABLED = 'disabled'
OSX = 'darwin'
LINUX = 'linux'
with libear.TemporaryDirectory() as tmpdir:
saved = os.environ['PATH']
try:
saved = os.environ['PATH']
os.environ['PATH'] = tmpdir + ':' + saved
create_csrutil(tmpdir, ENABLED)
@ -77,21 +76,14 @@ class InterceptUtilTest(unittest.TestCase):
create_csrutil(tmpdir, DISABLED)
self.assertFalse(sut.is_preload_disabled(OSX))
create_sestatus(tmpdir, ENABLED)
self.assertTrue(sut.is_preload_disabled(LINUX))
create_sestatus(tmpdir, DISABLED)
self.assertFalse(sut.is_preload_disabled(LINUX))
finally:
os.environ['PATH'] = saved
saved = os.environ['PATH']
try:
saved = os.environ['PATH']
os.environ['PATH'] = ''
# shall be false when it's not in the path
self.assertFalse(sut.is_preload_disabled(OSX))
self.assertFalse(sut.is_preload_disabled(LINUX))
self.assertFalse(sut.is_preload_disabled('unix'))
finally:

View File

@ -150,7 +150,6 @@ class RunAnalyzerTest(unittest.TestCase):
def test_run_analyzer_crash_and_forwarded(self):
content = "int div(int n, int d) { return n / d }"
(_, fwds) = RunAnalyzerTest.run_analyzer(content, True)
self.assertEqual('crash', fwds['error_type'])
self.assertEqual(1, fwds['exit_code'])
self.assertTrue(len(fwds['error_output']) > 0)