bad_clone_prediction/GitOperator.py

154 lines
5.4 KiB
Python

import os
import pathlib
import subprocess
from typing import List
from dulwich.objects import Blob, Commit, Tree
from dulwich.repo import Repo
from models.BlobInfo import BlobInfo
from models.RepoInfo import RepoInfo
class GitOperator(object):
def __init__(self, repoInfo: RepoInfo) -> None:
self.repoInfo = repoInfo
def download_repo(self):
"""
Function: download a project in git
"""
if not os.path.exists(self.repoInfo.bare_repo_path):
p = subprocess.Popen(
"git clone --bare {git_url} {target_path}".format(
git_url=self.repoInfo.git_url,
target_path=self.repoInfo.bare_repo_path,
),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
# output = p.stdout.read().decode("utf-8", errors="replace")
_ = p.wait()
def extract_blobs_4_commit(self, commit: Commit):
"""
Function: extract blobs for a commit
params:
- commit: the target Commit object
return:
-
"""
# def find_blobs_in_tree(
# repo: Repo, commit: Commit, tree: Tree, relpath: bytes = b""
# ) -> List[BlobInfo]:
# """
# Function: iterately find all the blobs for the target commit
# params:
# - commit: the target commit
# - tree: current Tree object in iteration
# - relpath: the relative path before this iteration
# return:
# - a list of BlobInfo objects regarding this commit
# """
# result = []
# for entry in tree.items():
# if (not repo.object_store.contains_loose(entry.sha)) and (
# not repo.object_store.contains_packed(entry.sha)
# ):
# # the object cannot be found in the repo
# return result
# obj = repo.object_store[entry.sha]
# new_relpath = os.path.join(relpath, entry.path)
# if obj.type_name == b"blob":
# result.append(
# BlobInfo(
# repo=repo, commit=commit, filepath=new_relpath, blob=obj
# )
# )
# elif obj.type_name == b"tree":
# new_tree = obj
# result.extend(
# find_blobs_in_tree(
# repo=repo,
# commit=commit,
# tree=new_tree,
# relpath=new_relpath,
# )
# )
# else:
# # there is something wrong with this tree object
# return result
# return result
def find_blobs_in_tree(
repo: Repo, commit: Commit, tree: Tree, relpath: bytes = b""
) -> List[BlobInfo]:
"""
Function: iterately find all the blobs for the target commit
params:
- commit: the target commit
- tree: current Tree object in iteration
- relpath: the relative path before this iteration
return:
- a list of BlobInfo objects regarding this commit
"""
result = []
for entry in Repo(
self.repoInfo.bare_repo_path
).object_store.iter_tree_contents(commit.tree):
if (not repo.object_store.contains_loose(entry.sha)) and (
not repo.object_store.contains_packed(entry.sha)
):
# the object cannot be found in the repo
continue
obj = repo.object_store[entry.sha]
new_relpath = str(pathlib.Path(entry.path.decode())).encode()
result.append(
BlobInfo(repo=repo, commit=commit, filepath=new_relpath, blob=obj)
)
return result
blobInfos = find_blobs_in_tree(
repo=Repo(self.repoInfo.bare_repo_path),
commit=commit,
tree=Repo(self.repoInfo.bare_repo_path).object_store[commit.tree],
)
return blobInfos
def extract_commits(self) -> List[Commit]:
"""
Function: extract commits of the target repository
"""
commits = []
repo = Repo(self.repoInfo.bare_repo_path)
object_shas = list(repo.object_store)
for object_sha in object_shas:
obj = repo.object_store[object_sha]
if obj.type_name == b"commit":
commits.append(obj)
return commits
def extract_blobs(self) -> List[Blob]:
"""
Function: extract blobs of the target repository
"""
blobs = []
repo = Repo(self.repoInfo.bare_repo_path)
object_shas = list(repo.object_store)
for object_sha in object_shas:
obj = repo.object_store[object_sha]
if obj.type_name == b"blob":
blobs.append(obj)
return blobs
if __name__ == "__main__":
gitOp = GitOperator(
repoInfo=RepoInfo(
bare_repo_path="/home/zxh/programs/bad_clone/bad_clone_prediction/bare_repos/JoanZapata@base-adapter-helper"
)
)
print(gitOp.get_all_heads())