bad_clone_prediction/CloneOperator.py

115 lines
4.0 KiB
Python

# aim: This is the class for doing clone detection
import os
import pathlib
import subprocess
from typing import List
from dulwich.repo import Repo
from ConfigOperator import ConfigOperator
from models.RepoInfo import RepoInfo
from MySQLOperator import MySQLOperator
current_folder = os.path.dirname(__file__)
NIL_path = str(pathlib.Path(current_folder).joinpath("bin/NIL.jar"))
class CloneOperator(object):
def __init__(self) -> None:
pass
def prepare_files(
self,
repoInfo: RepoInfo,
target_folder: str,
mysqlOp: MySQLOperator,
configOp: ConfigOperator,
):
"""
Function: used to write blob contents into source code files for clone detection
params:
- repoInfo: the RepoInfo object which the blob_shas belongs to
- target_folder: the path where the source code files belong to
- blobs: the list of tuples: (blob sha, suffix-lang)
"""
step_name = "blob target folder extraction"
mysqlOp.cursor.execute(
"select handled from `{steps_tablename}` where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(step_name),
)
handled = mysqlOp.cursor.fetchone()["handled"]
if not handled:
blobs: List = mysqlOp.get_supported_blobs(
langs=configOp.config["NIL"]["lang"],
)
if not os.path.exists(target_folder):
os.makedirs(target_folder)
for tuple in blobs:
target_path = os.path.join(
target_folder, (tuple[0] + b"." + tuple[1]).decode()
)
if not os.path.exists(target_path):
with open(target_path, "wb") as f:
f.write(
Repo(repoInfo.bare_repo_path).object_store[tuple[0]].data
)
# update steps table
mysqlOp.cursor.execute(
"update `{steps_tablename}` set handled=%s where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(1, step_name),
)
mysqlOp.connection.commit()
def run(self, target_folder: str, nil_config: dict, mysqlOp: MySQLOperator):
"""
Function: this is the function for running clone detection
params:
- target_folder: the folder path for detection
"""
step_name = "clone detection"
mysqlOp.cursor.execute(
"select handled from `{steps_tablename}` where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(step_name),
)
handled = mysqlOp.cursor.fetchone()["handled"]
if not handled:
p = subprocess.Popen(
"cd {target_folder} && java -jar {NIL_path} -s ./ -mit {mit} -mil {mil} -t {thread_num} -o result.csv -p {partition_num}".format(
target_folder=target_folder,
NIL_path=NIL_path,
mit=int(nil_config["mit"]),
mil=int(nil_config["mil"]),
thread_num=int(nil_config["thread_num"]),
partition_num=int(nil_config["partition_num"]),
),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
output = p.stdout.read().decode("utf-8", errors="replace")
if "[main] INFO jp.ac.osaka_u.sdl.nil.NILMain - End" not in output:
print(output)
_ = p.wait()
# update steps table
mysqlOp.cursor.execute(
"update `{steps_tablename}` set handled=%s where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(1, step_name),
)
mysqlOp.connection.commit()
if __name__ == "__main__":
print("finish")