bad_clone_prediction/FunctionIdUpdater.py

124 lines
4.7 KiB
Python

from typing import List
import GlobalConstants
from models.MethodInfo import MethodInfo
from models.RepoInfo import RepoInfo
from MySQLOperator import MySQLOperator
def update_function_id(repoInfo: RepoInfo, mysqlOp: MySQLOperator):
step_name = "updating function id"
mysqlOp.cursor.execute(
"select handled from `{steps_tablename}` where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(step_name),
)
handled = mysqlOp.cursor.fetchone()["handled"]
if not handled:
mysqlOp.cursor.execute(
"update `{blob_method_tablename}` set function_id=null".format(
blob_method_tablename=mysqlOp.tablename_dict["blob_methods"]
)
)
def find_function_id() -> int:
"""
Function: create a new function id
params:
- method_ids
return
- the related function id
"""
mysqlOp.cursor.execute(
"select max(function_id) as max_function_id from `{blob_method_tablename}`".format(
blob_method_tablename=mysqlOp.tablename_dict["blob_methods"]
)
)
max_function_id = mysqlOp.cursor.fetchone()["max_function_id"]
if max_function_id is None:
max_function_id = 0
return max_function_id + 1
def get_related_methods(method_id: int) -> List[int]:
sql = """
with recursive temp as (
select `method_id_2` from `{method_function_relation_tablename}` where `method_id_1`={method_id_1}
union distinct
select bm.`method_id_2` from `{method_function_relation_tablename}` bm inner join temp on temp.`method_id_2`=bm.`method_id_1`
)
select `method_id_2` from temp;
""".format(
method_function_relation_tablename=mysqlOp.tablename_dict[
"method_function_relations"
],
method_id_1=method_id,
)
mysqlOp.cursor.execute(sql)
method_ids_2 = mysqlOp.cursor.fetchall()
method_ids_2 = [method_id["method_id_2"] for method_id in method_ids_2]
sql = """
with recursive temp as (
select `method_id_1` from `{method_function_relation_tablename}` where `method_id_2`={method_id_2}
union distinct
select bm.`method_id_1` from `{method_function_relation_tablename}` bm inner join temp on temp.`method_id_1`=bm.`method_id_2`
)
select `method_id_1` from temp;
""".format(
method_function_relation_tablename=mysqlOp.tablename_dict[
"method_function_relations"
],
method_id_2=method_id,
)
mysqlOp.cursor.execute(sql)
method_ids_1 = mysqlOp.cursor.fetchall()
method_ids_1 = [method_id["method_id_1"] for method_id in method_ids_1]
method_ids = [method_id]
method_ids = list(
(set(method_ids).union(set(method_ids_1))).union(set(method_ids_2))
)
return method_ids
mysqlOp.cursor.execute(
"select id from `{blob_method_tablename}` where function_id is null".format(
blob_method_tablename=mysqlOp.tablename_dict["blob_methods"]
)
)
methods = mysqlOp.cursor.fetchall()
methods = [method["id"] for method in methods]
while len(methods) > 0:
method_id = methods[0]
method_ids = get_related_methods(method_id=method_id)
function_id = find_function_id()
method_ids_str = [str(method_id) for method_id in method_ids]
method_id_str = "({method_id_str})".format(
method_id_str=",".join(method_ids_str)
)
mysqlOp.cursor.execute(
"update `{blob_method_tablename}` set function_id=%s where id in {method_id_str}".format(
blob_method_tablename=mysqlOp.tablename_dict["blob_methods"],
method_id_str=method_id_str,
),
((function_id,)),
)
mysqlOp.connection.commit()
for method_id in method_ids:
if method_id in methods:
methods.remove(method_id)
# update steps table
mysqlOp.cursor.execute(
"update `{steps_tablename}` set handled=%s where step_name=%s".format(
steps_tablename=mysqlOp.tablename_dict["steps"]
),
(1, step_name),
)
mysqlOp.connection.commit()