[analyzer] SATest: Add posibility to download source from git and zip

Differential Revision: https://reviews.llvm.org/D81564
This commit is contained in:
Valeriy Savchenko 2020-06-02 17:46:50 +03:00
parent fb4b565212
commit bbb8f17136
2 changed files with 126 additions and 61 deletions

View File

@ -1,7 +1,8 @@
import json
import os
from typing import Any, Dict, List, NamedTuple, Optional
from enum import Enum
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
JSON = Dict[str, Any]
@ -10,12 +11,21 @@ JSON = Dict[str, Any]
DEFAULT_MAP_FILE = "projects.json"
class DownloadType(str, Enum):
GIT = "git"
ZIP = "zip"
SCRIPT = "script"
class ProjectInfo(NamedTuple):
"""
Information about a project to analyze.
"""
name: str
mode: int
source: DownloadType = DownloadType.SCRIPT
origin: str = ""
commit: str = ""
enabled: bool = True
@ -73,12 +83,29 @@ class ProjectMap:
name: str = raw_project["name"]
build_mode: int = raw_project["mode"]
enabled: bool = raw_project.get("enabled", True)
return ProjectInfo(name, build_mode, enabled)
source: DownloadType = raw_project.get("source", "zip")
if source == DownloadType.GIT:
origin, commit = ProjectMap._get_git_params(raw_project)
else:
origin, commit = "", ""
return ProjectInfo(name, build_mode, source, origin, commit,
enabled)
except KeyError as e:
raise ValueError(
f"Project info is required to have a '{e.args[0]}' field")
@staticmethod
def _get_git_params(raw_project: JSON) -> Tuple[str, str]:
try:
return raw_project["origin"], raw_project["commit"]
except KeyError as e:
raise ValueError(
f"Profect info is required to have a '{e.args[0]}' field "
f"if it has a 'git' source")
@staticmethod
def _create_empty(path: str):
ProjectMap._save([], path)

View File

@ -44,7 +44,7 @@ variable. It should contain a comma separated list.
"""
import CmpRuns
import SATestUtils
from ProjectMap import ProjectInfo, ProjectMap
from ProjectMap import DownloadType, ProjectInfo, ProjectMap
import argparse
import glob
@ -57,6 +57,7 @@ import shutil
import sys
import threading
import time
import zipfile
from queue import Queue
# mypy has problems finding InvalidFileException in the module
@ -198,63 +199,6 @@ def run_cleanup_script(directory: str, build_log_file: IO):
verbose=VERBOSE)
def download_and_patch(directory: str, build_log_file: IO):
"""
Download the project and apply the local patchfile if it exists.
"""
cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
# If the we don't already have the cached source, run the project's
# download script to download it.
if not os.path.exists(cached_source):
download(directory, build_log_file)
if not os.path.exists(cached_source):
stderr(f"Error: '{cached_source}' not found after download.\n")
exit(1)
patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
# Remove potentially stale patched source.
if os.path.exists(patched_source):
shutil.rmtree(patched_source)
# Copy the cached source and apply any patches to the copy.
shutil.copytree(cached_source, patched_source, symlinks=True)
apply_patch(directory, build_log_file)
def download(directory: str, build_log_file: IO):
"""
Run the script to download the project, if it exists.
"""
script_path = os.path.join(directory, DOWNLOAD_SCRIPT)
SATestUtils.run_script(script_path, build_log_file, directory,
out=LOCAL.stdout, err=LOCAL.stderr,
verbose=VERBOSE)
def apply_patch(directory: str, build_log_file: IO):
patchfile_path = os.path.join(directory, PATCHFILE_NAME)
patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
if not os.path.exists(patchfile_path):
stdout(" No local patches.\n")
return
stdout(" Applying patch.\n")
try:
check_call(f"patch -p1 < '{patchfile_path}'",
cwd=patched_source,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
except CalledProcessError:
stderr(f"Error: Patch failed. "
f"See {build_log_file.name} for details.\n")
sys.exit(1)
class TestInfo(NamedTuple):
"""
Information about a project and settings for its analysis.
@ -430,7 +374,7 @@ class ProjectTester:
# Build and analyze the project.
with open(build_log_path, "w+") as build_log_file:
if self.project.mode == 1:
download_and_patch(directory, build_log_file)
self._download_and_patch(directory, build_log_file)
run_cleanup_script(directory, build_log_file)
self.scan_build(directory, output_dir, build_log_file)
else:
@ -587,6 +531,100 @@ class ProjectTester:
return out
def _download_and_patch(self, directory: str, build_log_file: IO):
"""
Download the project and apply the local patchfile if it exists.
"""
cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
# If the we don't already have the cached source, run the project's
# download script to download it.
if not os.path.exists(cached_source):
self._download(directory, build_log_file)
if not os.path.exists(cached_source):
stderr(f"Error: '{cached_source}' not found after download.\n")
exit(1)
patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
# Remove potentially stale patched source.
if os.path.exists(patched_source):
shutil.rmtree(patched_source)
# Copy the cached source and apply any patches to the copy.
shutil.copytree(cached_source, patched_source, symlinks=True)
self._apply_patch(directory, build_log_file)
def _download(self, directory: str, build_log_file: IO):
"""
Run the script to download the project, if it exists.
"""
if self.project.source == DownloadType.GIT:
self._download_from_git(directory, build_log_file)
elif self.project.source == DownloadType.ZIP:
self._unpack_zip(directory, build_log_file)
elif self.project.source == DownloadType.SCRIPT:
self._run_download_script(directory, build_log_file)
else:
raise ValueError(
f"Unknown source type '{self.project.source}' is found "
f"for the '{self.project.name}' project")
def _download_from_git(self, directory: str, build_log_file: IO):
cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
check_call(f"git clone {self.project.origin} {cached_source}",
cwd=directory, stderr=build_log_file,
stdout=build_log_file, shell=True)
check_call(f"git checkout --quiet {self.project.commit}",
cwd=cached_source, stderr=build_log_file,
stdout=build_log_file, shell=True)
def _unpack_zip(self, directory: str, build_log_file: IO):
zip_files = list(glob.glob(os.path.join(directory, "/*.zip")))
if len(zip_files) == 0:
raise ValueError(
f"Couldn't find any zip files to unpack for the "
f"'{self.project.name}' project")
if len(zip_files) > 1:
raise ValueError(
f"Couldn't decide which of the zip files ({zip_files}) "
f"for the '{self.project.name}' project to unpack")
with zipfile.ZipFile(zip_files[0], "r") as zip_file:
zip_file.extractall(os.path.join(directory,
CACHED_SOURCE_DIR_NAME))
@staticmethod
def _run_download_script(directory: str, build_log_file: IO):
script_path = os.path.join(directory, DOWNLOAD_SCRIPT)
SATestUtils.run_script(script_path, build_log_file, directory,
out=LOCAL.stdout, err=LOCAL.stderr,
verbose=VERBOSE)
@staticmethod
def _apply_patch(directory: str, build_log_file: IO):
patchfile_path = os.path.join(directory, PATCHFILE_NAME)
patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
if not os.path.exists(patchfile_path):
stdout(" No local patches.\n")
return
stdout(" Applying patch.\n")
try:
check_call(f"patch -p1 < '{patchfile_path}'",
cwd=patched_source,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
except CalledProcessError:
stderr(f"Error: Patch failed. "
f"See {build_log_file.name} for details.\n")
sys.exit(1)
class TestProjectThread(threading.Thread):
def __init__(self, tasks_queue: TestQueue,