try to change from decision to latency

This commit is contained in:
zhangxunhui 2021-08-21 15:41:34 +08:00
parent eddb7cc916
commit 347e056102
41 changed files with 1551 additions and 0 deletions

0
app/__init__.py Normal file
View File

0
app/db/__init__.py Normal file
View File

60
app/db/alembic/env.py Normal file
View File

@ -0,0 +1,60 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[3]))
from app.utils.config_loader import ConfigLoader
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from app.db.tables import sqlalchemy_orm
target_metadata = sqlalchemy_orm.Base.metadata
env = ConfigLoader().load_env()
connection_url = "mysql+pymysql://{user}:{password}@{host}:{port}/{db}".format(user=env["MYSQL"]["USER"], password=env["MYSQL"]["PASSWORD"], host=env["MYSQL"]["HOST"], port=env["MYSQL"]["PORT"], db=env["MYSQL"]["DB"])
config.set_main_option("sqlalchemy.url", connection_url) # we need to add this in main option, because the configuration of sqlalchemy.url is in .env.yaml, not in alembic.ini
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()

View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,57 @@
"""add table pull_requests
Revision ID: f96a0f70e7a1
Revises:
Create Date: 2021-08-21 14:26:21.109899
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'f96a0f70e7a1'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('comments',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('pull_request_id', sa.Integer(), nullable=False),
sa.Column('comment_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.TIMESTAMP(), nullable=False),
sa.Column('body', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_comments_pull_request_id'), 'comments', ['pull_request_id'], unique=False)
op.create_table('pull_requests',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('owner_login', sa.VARCHAR(length=255), nullable=False),
sa.Column('repo_name', sa.VARCHAR(length=255), nullable=False),
sa.Column('installation_id', sa.Integer(), nullable=False),
sa.Column('number', sa.Integer(), nullable=False),
sa.Column('state', sa.VARCHAR(length=255), nullable=False),
sa.Column('locked', sa.Integer(), nullable=True),
sa.Column('created_at', sa.TIMESTAMP(), nullable=False),
sa.Column('last_comment_at', sa.TIMESTAMP(), nullable=True),
sa.Column('comment_or_not', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_pull_requests_locked'), 'pull_requests', ['locked'], unique=False)
op.create_index(op.f('ix_pull_requests_state'), 'pull_requests', ['state'], unique=False)
op.create_index('owner_repo_num', 'pull_requests', ['owner_login', 'repo_name', 'number'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('owner_repo_num', table_name='pull_requests')
op.drop_index(op.f('ix_pull_requests_state'), table_name='pull_requests')
op.drop_index(op.f('ix_pull_requests_locked'), table_name='pull_requests')
op.drop_table('pull_requests')
op.drop_index(op.f('ix_comments_pull_request_id'), table_name='comments')
op.drop_table('comments')
# ### end Alembic commands ###

View File

View File

@ -0,0 +1,18 @@
import sys, pathlib, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[3]))
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from typing import Optional
from app.utils.config_loader import ConfigLoader
class BaseOperator:
engine: Optional[AsyncEngine] = None
def __init__(self) -> None:
try:
env = ConfigLoader().load_env()
url = "mysql+aiomysql://{user}:{password}@{host}:{port}/{db}".format(user=env["MYSQL"]["USER"], password=env["MYSQL"]["PASSWORD"], host=env["MYSQL"]["HOST"], port=env["MYSQL"]["PORT"], db=env["MYSQL"]["DB"])
self.engine = create_async_engine(url, echo=True)
except Exception as e:
print("error with initialization of BaseOperator: %s" % (repr(e)))
print(traceback.format_exc())

View File

@ -0,0 +1,155 @@
import sys, pathlib, asyncio, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[3]))
from app.db.operators.base_operator import BaseOperator
from app.models.pull_request import PullRequest
from app.models.scheduler import PRScheduler
from app.models.user import User
from app.models.repository import Repository
from app.models.installation import Installation
from sqlalchemy import text
from app.utils.config_loader import ConfigLoader
from typing import List
class PullRequestOperator(BaseOperator):
def __init__(self) -> None:
super().__init__()
async def query_pull_request_id(self, pr: PullRequest) -> int:
try:
async with self.engine.connect() as connection:
result: int = None
q = """
select id
from pull_requests
where owner_login=:owner_login and repo_name=:repo_name and number=:number;
"""
query_result = await connection.execute(text(q), {'owner_login': pr.owner.login, 'repo_name': pr.repo.name, 'number': pr.number})
for row in query_result:
result = row["id"]
break
return result
except Exception as e:
print("error with func query_pull_request_id: %s" % (repr(e)))
print(traceback.format_exc())
async def insert_pull_request(self, pr: PullRequest, installation: Installation) -> None:
try:
async with self.engine.connect() as connection:
q = """
select id
from pull_requests
where owner_login=:owner_login and repo_name=:repo_name and number=:number;
"""
query_result = await connection.execute(text(q), {'owner_login': pr.owner.login, 'repo_name': pr.repo.name, 'number': pr.number})
if query_result.rowcount == 0:
q = """
insert into pull_requests (
owner_login, repo_name, number, state, created_at, locked, last_comment_at, installation_id
) values (:owner_login, :repo_name, :number, :state, :created_at, :locked, :last_comment_at, :installation_id);
"""
await connection.execute(text(q), {
"owner_login": pr.owner.login,
"repo_name": pr.repo.name,
"number": pr.number,
"state": pr.state,
"created_at": pr.created_at,
"locked": pr.locked,
"last_comment_at": pr.last_comment_at,
"installation_id": installation.id
})
else:
q = """
update pull_requests set
state=:state,
locked=:locked,
installation_id=:installation_id
where owner_login=:owner_login and repo_name=:repo_name and number=:number;
"""
await connection.execute(text(q), {
"state": pr.state,
"locked": pr.locked,
"installation_id": installation.id,
"owner_login": pr.owner.login,
"repo_name": pr.repo.name,
"number": pr.number
})
await connection.commit()
except Exception as e:
print("error with func insert_pull_request: %s" % (repr(e)))
print(traceback.format_exc())
async def update_pull_request_comment(self, pr: PullRequest, last_comment_at, comment_or_not:int, comment_id:int, comment_body:str) -> None:
try:
async with self.engine.connect() as connection:
q = """
update pull_requests set
last_comment_at=:last_comment_at,
comment_or_not=:comment_or_not
where owner_login=:owner_login and repo_name=:repo_name and number=:number;
"""
await connection.execute(text(q), {
"last_comment_at": last_comment_at,
"comment_or_not": comment_or_not,
"owner_login": pr.owner.login,
"repo_name": pr.repo.name,
"number": pr.number
})
pr_id = await self.query_pull_request_id(pr)
q = """
insert into comments (
pull_request_id, comment_id, created_at, body
) values (:pull_request_id, :comment_id, :created_at, :body);
"""
await connection.execute(text(q), {
"pull_request_id": pr_id,
"comment_id": comment_id,
"created_at": last_comment_at,
"body": comment_body
})
await connection.commit()
except Exception as e:
print("error with func update_pull_request_comment: %s" % (repr(e)))
print(traceback.format_exc())
async def query_prScheduler_4_scheduler(self) -> List[PRScheduler]:
try:
result: List[PRScheduler] = []
env = ConfigLoader().load_env()
async with self.engine.connect() as connection:
q = """
select pr.id, pr.owner_login, pr.repo_name, pr.number, pr.installation_id
from pull_requests pr
where state='open' and locked=0 and
(last_comment_at is null or
last_comment_at < TIMESTAMPADD(HOUR, -%s, UTC_TIMESTAMP()))
order by last_comment_at asc
"""
query_result = await connection.execute(text(q % env["SERVICE"]["REMIND_EVERY_HOURS"]))
for row in query_result:
pr = PRScheduler(installation=Installation(id=row['installation_id']), pr=PullRequest(owner=User(login=row['owner_login']), repo=Repository(name=row['repo_name']), number=row['number'], id=row['id']))
result.append(pr)
return result
except Exception as e:
print("error with func query_prScheduler_4_scheduler: %s" % (repr(e)))
print(traceback.format_exc())
if __name__ == "__main__":
async def test_insert_pull_requests() -> None:
prOp = PullRequestOperator()
await prOp.query_prScheduler_4_scheduler()
# await prOp.insert_pull_request(PullRequest(owner=User("test2"), repo=Repository("test"), number=1, state="open", locked=1, created_at='2020-01-01 00:00:00'), installation=Installation(id=1))
loop = asyncio.get_event_loop()
loop.run_until_complete(test_insert_pull_requests())

View File

@ -0,0 +1,26 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, TIMESTAMP, Text, Index
Base = declarative_base()
class PullRequest(Base):
__tablename__ = "pull_requests"
id = Column(Integer, primary_key=True)
owner_login = Column(VARCHAR(255), nullable=False)
repo_name = Column(VARCHAR(255), nullable=False)
installation_id = Column(Integer, nullable=False)
number = Column(Integer, nullable=False)
state = Column(VARCHAR(255), nullable=False, index=True)
locked = Column(Integer, nullable=True, index=True)
created_at = Column(TIMESTAMP, nullable=False)
last_comment_at = Column(TIMESTAMP, nullable=True)
comment_or_not = Column(Integer, nullable=True)
Index("owner_repo_num", PullRequest.owner_login, PullRequest.repo_name, PullRequest.number, unique=True)
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True)
pull_request_id = Column(Integer, nullable=False, index=True)
comment_id = Column(Integer, nullable=False)
created_at = Column(TIMESTAMP, nullable=False)
body = Column(Text, nullable=True)

View File

@ -0,0 +1,9 @@
from typing import Optional
class Installation():
id: int
app_id: Optional[int] = None
def __init__(self, id, app_id: Optional[int]=None) -> None:
self.id = id
self.app_id = app_id

11
app/models/jwt_query.py Normal file
View File

@ -0,0 +1,11 @@
from typing import List
from typing import Dict
class JWTQuery():
headers: Dict
url: str
def __init__(self, headers, url) -> None:
self.headers = headers
self.url = url

15
app/models/pr_comment.py Normal file
View File

@ -0,0 +1,15 @@
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.pull_request import PullRequest
from app.models.user import User
from typing import Optional
class PRComment():
pr: PullRequest
body: str
sender: User
def __init__(self, pr: PullRequest, body: str, sender: Optional[User] = None) -> None:
self.pr = pr
self.body = body
self.sender = sender

9
app/models/pr_label.py Normal file
View File

@ -0,0 +1,9 @@
from app.models.user import User
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.pull_request import PullRequest
class PRLabel():
pr: PullRequest
body: str

View File

@ -0,0 +1,27 @@
from optparse import Option
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.user import User
from app.models.repository import Repository
from typing import Optional
from datetime import datetime
class PullRequest():
id: int
owner: User
repo: Repository
number: int
state: str
locked: bool
created_at: datetime
last_comment_at: datetime
def __init__(self, owner, repo, number, state: Optional[str] = None, locked: Optional[bool] = None, created_at: Optional[datetime] = None, last_comment_at: Optional[datetime] = None, id: Optional[int] = None) -> None:
self.owner = owner
self.repo = repo
self.number = number
self.state = state
self.locked = locked
self.created_at = created_at
self.last_comment_at = last_comment_at
self.id = id

5
app/models/repository.py Normal file
View File

@ -0,0 +1,5 @@
class Repository():
name: str
def __init__(self, name) -> None:
self.name = name

17
app/models/scheduler.py Normal file
View File

@ -0,0 +1,17 @@
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.installation import Installation
from app.models.pull_request import PullRequest
class SchedulerModel():
installation: Installation
def __init__(self, installation: Installation) -> None:
self.installation = installation
class PRScheduler(SchedulerModel):
pr: PullRequest
def __init__(self, installation: Installation, pr: PullRequest) -> None:
super().__init__(installation)
self.pr = pr

35
app/models/trigger.py Normal file
View File

@ -0,0 +1,35 @@
import sys, pathlib
from typing import Optional
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.pull_request import PullRequest
from app.models.installation import Installation
from app.models.repository import Repository
from app.models.user import User
class Trigger():
installation: Installation
def __init__(self, installation) -> None:
self.installation = installation
class PRTrigger(Trigger):
repo: Optional[Repository] = None
sender: Optional[User] = None
pr: PullRequest
action: str
def __init__(self, installation, repo, sender, pr, action) -> None:
super(PRTrigger, self).__init__(installation)
self.repo = repo
self.sender = sender
self.pr = pr
self.action = action
class PRSchedulerTrigger(Trigger):
pr: PullRequest
def __init__(self, installation, pr) -> None:
super(PRSchedulerTrigger, self).__init__(installation)
self.pr = pr

5
app/models/user.py Normal file
View File

@ -0,0 +1,5 @@
class User():
login: str
def __init__(self, login) -> None:
self.login = login

49
app/pr-latency-bot.py Normal file
View File

@ -0,0 +1,49 @@
from configparser import ConfigParser
import re, sys, pathlib, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[1]))
import json5 as json
from app.services.comments import return_pr_latency
from app.services.triggers import parseTriggers
from app.models.trigger import Trigger, PRTrigger
from wsgiref.simple_server import make_server
from app.prediction_service.trainer import Trainer
from app.utils.global_variables import GlobalVariable
from app.services.queries import query_app_id
from app.services.scheduler import Scheduler
from app.utils.config_loader import ConfigLoader
def application(environ, start_response) -> bytearray:
start_response('200 OK', [('Content-Type', 'application/json')])
request_body = environ["wsgi.input"].read(int(environ.get("CONTENT_LENGTH", 0)))
json_str = request_body.decode('utf-8')
json_str = re.sub('\'','\"', json_str)
json_dict = json.loads(json_str)
trigger: Trigger = parseTriggers(json_dict)
# only when the trigger is open pull request can return the result
if trigger is not None and type(trigger) == PRTrigger and trigger.action == "opened":
return_pr_latency(trigger)
return ["success".encode('utf-8')]
if __name__ == "__main__":
try:
# train the prediction model
GlobalVariable.trainer = Trainer()
# request appId from GitHub
GlobalVariable.appId = query_app_id()
s = Scheduler()
service_port = ConfigLoader().load_env()["SERVICE"]["PORT"]
httpd = make_server("0.0.0.0", service_port , application)
print("serving http on port {0}...".format(str(service_port)))
httpd.serve_forever()
except Exception as e:
print("error with the start of the application in main.py: %s" % (repr(e)))
print(traceback.format_exc())

View File

View File

@ -0,0 +1,10 @@
trainer:
mode: "old" # the mode of trainer, old or new/for now, only old mode is supported
dataset_url: "https://zenodo.org/record/4837135/files/new_pullreq.csv?download=1" # only need to set when the mode is old
dataset_name: "trainer_dataset.csv" # the dataset name you set after downloading
model_names:
submission: "app/prediction_service/trained_model_submission.sav"
process: "app/prediction_service/trained_model_process.sav"
factor_list:
submission: ["core_member", "num_commits_open", "files_added_open", "prev_pullreqs", "open_pr_num", "account_creation_days", "first_pr", "files_changed_open", "project_age", "stars", "description_length", "followers"]
process: ["lifetime_minutes", "has_comments", "core_member", "num_commits_close", "files_added_close", "prev_pullreqs", "open_pr_num", "account_creation_days", "first_pr", "files_changed_close", "project_age", "reopen_or_not", "stars", "description_length", "followers"]

View File

@ -0,0 +1,369 @@
# used to get all factors for prediction
import sys, pathlib, requests, json, datetime, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.pull_request import PullRequest
from app.models.installation import Installation
from app.services.authentication import getToken
from app.utils.time_operator import TimeOperator
from app.utils.word_counter import WordCounter
from app.models.installation import Installation
from app.models.pull_request import PullRequest
from app.models.user import User
from app.models.repository import Repository
class FactorGetter():
pr: PullRequest
installation: Installation
token: str
def __init__(self, pr: PullRequest, installation: Installation) -> None:
self.pr = pr
self.installation = installation
self.token = getToken(self.installation)
def lifetime_minutes(self):
try:
if self.pr.created_at is None:
# /repos/{owner}/{repo}/pulls/{pull_number}
headers = {'Authorization': 'token ' + self.token, 'Accept': 'application/vnd.github.v3+json'}
url = "https://api.github.com/repos/{owner}/{repo}/pulls/{pull_request_number}".format(owner=self.pr.owner.login, repo=self.pr.repo.name, pull_request_number=self.pr.number)
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception("error with func lifetime_minutes: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
self.pr.created_at = TimeOperator().convertTZTime2TimeStamp(response.json()["created_at"])
return int((datetime.datetime.utcnow() - self.pr.created_at).total_seconds() / 60.0)
except Exception as e:
print("error with func lifetime_minutes: %s" % (repr(e)))
print(traceback.format_exc())
def has_comments(self):
try:
# /repos/{owner}/{repo}/issues/{issue_number}/timeline
headers = {'Authorization': 'token ' + self.token, 'Accept': 'application/vnd.github.mockingbird-preview+json'}
url = "https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/timeline?per_page=100".format(owner=self.pr.owner.login, repo=self.pr.repo.name, issue_number=self.pr.number)
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception("error with func has_comments: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
events = response.json()["created_at"]
print("pause")
except Exception as e:
print("error with func has_comments: %s" % (repr(e)))
print(traceback.format_exc())
def query_pr_infos(self):
try:
# use graphql and query all the information
'''
1. lifetime_minutes: createdAt
2. has_comments: issue comments: comments; pull request comment: reviews; commit comments: commits
3. core_member: collaborators
4. num_commits: commits-totalCount (need recrawl according to the totalCount)
5. files_added: files-nodes (need recrawl according to the totalCount)
6. prev_pullreqs: (depend on defaultBranchRef)
7. open_pr_num: pullRequests (states: [OPEN]) { totalCount }
8. account_creation_days: user-createdAt
9. first_pr: pullRequests(baseRefName: "main") { totalCount }
10. files_changed: changedFiles
11. project_age: Repository-createdAt
12. reopen_or_not: timeline-REOPENED_EVENT
13. stars: stargazerCount
14. description_length: bodyText
15. followers: followers
'''
# first query the author:login, repo:default branch, number of changed files, number of commits; and all the attributed that do not depend on these variables
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
createdAt
comments { totalCount }
reviews (first:1, states: COMMENTED) { totalCount }
commits { totalCount }
changedFiles
timelineItems (itemTypes: [REOPENED_EVENT]) { totalCount }
bodyText
title
author {
login
}
}
pullRequests { totalCount }
createdAt
stargazerCount
}
}
"""
headers = {'Authorization': 'Bearer ' + self.token}
url = "https://api.github.com/graphql"
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
author_login = content["data"]["repository"]["pullRequest"]["author"]["login"]
# factors
lifetime_minutes = int((datetime.datetime.utcnow() - TimeOperator().convertTZTime2TimeStamp(content["data"]["repository"]["pullRequest"]["createdAt"])).total_seconds() / 60)
has_issue_comments = 1 if content["data"]["repository"]["pullRequest"]["comments"]["totalCount"] > 0 else 0
has_pr_comments = 1 if content["data"]["repository"]["pullRequest"]["reviews"]["totalCount"] > 0 else 0
num_commits = content["data"]["repository"]["pullRequest"]["commits"]["totalCount"]
all_pr_num = content["data"]["repository"]["pullRequests"]["totalCount"]
files_changed = content["data"]["repository"]["pullRequest"]["changedFiles"]
project_age = int((datetime.datetime.utcnow() - TimeOperator().convertTZTime2TimeStamp(content["data"]["repository"]["createdAt"])).total_seconds() / 60 / 60 / 24 / 30)
reopen_or_not = 1 if content["data"]["repository"]["pullRequest"]["timelineItems"]["totalCount"] > 0 else 0
stars = content["data"]["repository"]["stargazerCount"]
description_length = WordCounter().count([content["data"]["repository"]["pullRequest"]["bodyText"], content["data"]["repository"]["pullRequest"]["title"]])
# second query factors including: account_creation_days, core_member, prev_pullreqs, first_pr, followers
query = """
query {
repository(owner: "%s", name: "%s") {
collaborators (query: "%s") { edges{permission} }
}
user(login: "%s") {
createdAt
followers { totalCount }
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, author_login, author_login)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
account_creation_days = int((datetime.datetime.utcnow() - TimeOperator().convertTZTime2TimeStamp(content["data"]["user"]["createdAt"])).total_seconds() / 60 / 60 / 24)
core_member = 1 if content["data"]["repository"]["collaborators"] is not None and content["data"]["repository"]["collaborators"]["edges"][0]["permission"] != "READ" else 0
followers = content["data"]["user"]["followers"]["totalCount"]
# third query has_commit_comments -> has comments
has_comments = None
if has_issue_comments == 0 and has_pr_comments == 0:
if num_commits <= 100:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
commits (first:100) { nodes { commit { comments { totalCount } } } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for commit in content["data"]["repository"]["pullRequest"]["commits"]["nodes"]:
if commit["commit"]["comments"]["totalCount"] > 0:
has_comments = 1
break
else:
page = 0
endCursor = None
while(True):
page += 1
if page == 1:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
commits (first:100) { nodes { commit { comments { totalCount } } } pageInfo { endCursor hasNextPage } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number)}
else:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
commits (first:100, after:"%s") { nodes { commit { comments { totalCount } } } pageInfo { endCursor hasNextPage } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number, endCursor)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for commit in content["data"]["repository"]["pullRequest"]["commits"]["nodes"]:
if commit["commit"]["comments"]["totalCount"] > 0:
has_comments = 1
break
hasNextPage = content["data"]["repository"]["pullRequest"]["commits"]["pageInfo"]["hasNextPage"]
endCursor = content["data"]["repository"]["pullRequest"]["commits"]["pageInfo"]["endCursor"]
if has_comments is not None or hasNextPage == False:
break
if has_comments is None:
has_comments = 0
else:
has_comments = 1
# fourth query files_added
files_added = 0
if files_changed <= 100:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
files (first: 100) { nodes { additions deletions } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for file in content["data"]["repository"]["pullRequest"]["files"]["nodes"]:
if file["additions"] > 0 and file["deletions"] == 0:
files_added += 1
else:
page = 0
endCursor = None
while(True):
page += 1
if page == 1:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
files (first:100) { nodes { additions deletions } pageInfo { endCursor hasNextPage } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number)}
else:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequest(number: %s) {
files (first:100, after:"%s") { nodes { additions deletions } pageInfo { endCursor hasNextPage } }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, self.pr.number, endCursor)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for file in content["data"]["repository"]["pullRequest"]["files"]["nodes"]:
if file["additions"] > 0 and file["deletions"] == 0:
files_added += 1
hasNextPage = content["data"]["repository"]["pullRequest"]["files"]["pageInfo"]["hasNextPage"]
endCursor = content["data"]["repository"]["pullRequest"]["files"]["pageInfo"]["endCursor"]
if hasNextPage == False:
break
# fifth query all the pull requests for: open_pr_num, prev_pullreqs, first_pr
open_pr_num = 0
prev_pullreqs = 0
if all_pr_num <= 100:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequests(first: 100) {
nodes { author { login } state }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for prTmp in content["data"]["repository"]["pullRequests"]["nodes"]:
if prTmp["state"] == "OPEN":
open_pr_num += 1
if prTmp["author"]["login"] == author_login:
prev_pullreqs += 1
open_pr_num = open_pr_num - 1 if open_pr_num > 0 else open_pr_num
prev_pullreqs = prev_pullreqs - 1 if prev_pullreqs > 0 else prev_pullreqs
first_pr = 0 if prev_pullreqs > 0 else 1
else:
page = 0
endCursor = None
while(True):
page += 1
if page == 1:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequests(first: 100) {
nodes { author { login } state }
pageInfo { endCursor hasNextPage }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name)}
else:
query = """
query {
repository(owner: "%s", name: "%s") {
pullRequests(first: 100, after: "%s") {
nodes { author { login } state }
pageInfo { endCursor hasNextPage }
}
}
}
"""
values = {"query": query % (self.pr.owner.login, self.pr.repo.name, endCursor)}
response = requests.post(url=url, headers = headers, json=values)
response.encoding = 'utf-8'
if response.status_code != 200:
raise Exception("error with func query_pr_infos: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
content = response.json()
for prTmp in content["data"]["repository"]["pullRequests"]["nodes"]:
if prTmp["state"] == "OPEN":
open_pr_num += 1
if prTmp["author"]["login"] == author_login:
prev_pullreqs += 1
hasNextPage = content["data"]["repository"]["pullRequests"]["pageInfo"]["hasNextPage"]
endCursor = content["data"]["repository"]["pullRequests"]["pageInfo"]["endCursor"]
if hasNextPage == False:
break
open_pr_num = open_pr_num - 1 if open_pr_num > 0 else open_pr_num
prev_pullreqs = prev_pullreqs - 1 if prev_pullreqs > 0 else prev_pullreqs
first_pr = 0 if prev_pullreqs > 0 else 1
# return results
result = {
"lifetime_minutes": lifetime_minutes, "has_comments": has_comments, "core_member": core_member, "num_commits": num_commits, "files_added": files_added, "prev_pullreqs": prev_pullreqs, "open_pr_num": open_pr_num, "account_creation_days": account_creation_days, "first_pr": first_pr, "files_changed": files_changed, "project_age": project_age, "reopen_or_not": reopen_or_not, "stars": stars, "description_length": description_length, "followers": followers
}
return result
except Exception as e:
print("error with func query_pr_infos: %s" % (repr(e)))
print(traceback.format_exc())
if __name__ == "__main__":
factorGetter = FactorGetter(
pr=PullRequest(owner=User(login="zhangxunhui"), repo=Repository(name="bot-pullreq-decision"), number=5),
installation=Installation(id=18836058)
)
result = factorGetter.query_pr_infos()
print("finish")

View File

@ -0,0 +1,71 @@
# predict using the trained model
import sys, pathlib, traceback
from typing import List, Dict
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.pull_request import PullRequest
from app.models.installation import Installation
from app.prediction_service.factor_getter import FactorGetter
from app.utils.config_loader import ConfigLoader
from app.prediction_service.trainer import Trainer
'''
需要修改为合适的factors list并且要修改预测函数及调用同时要修改预测后次方转换2^Y才是最终分钟数
还需要修改预测后的返回结果返回信息非boolean如何体现数值大小
'''
class Predictor():
modelSubmission = None
modelProcess = None
type: str
def __init__(self, trainer: Trainer, type: str) -> None:
self.modelSubmission = trainer.modelSubmission
self.modelProcess = trainer.modelProcess
self.type = type
def _get_factors(self, pr: PullRequest, installation: Installation) -> Dict:
result: Dict
result = FactorGetter(pr, installation).query_pr_infos()
return result
def _factor_cut_suffix(self, s, suffixList):
try:
for suffix in suffixList:
if s.endswith(suffix):
return s[:-len(suffix)]
return s
except Exception as e:
print("error with func _factor_cut_suffix: %s" % (repr(e)))
print(traceback.format_exc())
def predict(self, pr: PullRequest, installation: Installation) -> bool:
'''
predict whether the pull request can be merged
params:
pr: with owner login, repo name and number
installation: which installation is for
return:
can merge or not: bool
'''
try:
# get the factors for this pr
factorDict = self._get_factors(pr, installation)
factorList = ConfigLoader().load_prediction_service_config()["trainer"]["factor_list"][self.type]
X_test = [factorDict[self._factor_cut_suffix(f, ["_open", "_close"])] for f in factorList]
if self.type == "submission":
predictions = self.modelSubmission.predict([X_test])
elif self.type == "process":
predictions = self.modelProcess.predict([X_test])
if predictions[0] == 1:
return True
elif predictions[0] == 0:
return False
else:
raise Exception("error with the prediction result of func predict.")
except Exception as e:
print("error with func predict: %s" % (repr(e)))
print(traceback.format_exc())

View File

@ -0,0 +1,151 @@
# train the prediction model
# there are two modes:
# 1. old: train using our dataset (use data from all the repositories)
# 2. new: crawl new dataset for training (use only data in the target repository)
import sys, pathlib, requests
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from typing import Optional, List
from app.utils.config_loader import ConfigLoader
# for training the model
import pickle, traceback
import pandas as pd
from sklearn import model_selection
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
import numpy as np
class Trainer():
mode: Optional[str]
datasetUrl: Optional[str]
datasetName: Optional[str]
'''
We split models into two kinds:
1. model trained using just the factors occurred at the submission time
2. model trained using the process factors, e.g., the number of comments
For users, they can also split according to the usage of CI tools, etc. Please have a look at the paper.
'''
modelSubmissionPath: Optional[str]
modelSubmissionFactors: List[str]
modelSubmission = None
modelProcessPath: Optional[str]
modelProcessFactors: List[str]
modelProcess = None
def __init__(self) -> None:
config = ConfigLoader().load_prediction_service_config()
if "trainer" not in config or "mode" not in config["trainer"]:
raise Exception("error with the initialization of Trainer object: [trainer]->[mode] not in configuration")
if "factor_list" not in config["trainer"] or "submission" not in config["trainer"]["factor_list"] or "process" not in config["trainer"]["factor_list"]:
raise Exception("error with the initialization of Trainer object: [trainer]->[factor_list]->[submission/process] not in configuration")
self.modelSubmissionFactors = config["trainer"]["factor_list"]["submission"]
self.modelProcessFactors = config["trainer"]["factor_list"]["process"]
self.mode = config['trainer']['mode']
if self.mode == "old":
# choose to use the old mode
if "dataset_url" not in config["trainer"] or "dataset_name" not in config["trainer"]:
raise Exception("error with the initialization of Trainer object: [trainer]->[dataset_url/name] not in configuration")
self.datasetUrl = config["trainer"]["dataset_url"]
self.datasetName = config["trainer"]["dataset_name"]
if "model_names" not in config["trainer"] or "submission" not in config["trainer"]["model_names"] or "process" not in config["trainer"]["model_names"]:
raise Exception("error with the initialization of Trainer object: [trainer]->[model_names] not in configuration")
self.modelSubmissionPath = config["trainer"]["model_names"]["submission"]
self.modelProcessPath = config["trainer"]["model_names"]["process"]
self.train() # initialize the models (self.modelSubmission/self.modelProcess)
else:
# choose to use the new mode
raise Exception("new mode is not supported right now")
def download_dataset(self):
# download the dataset
if pathlib.Path(self.datasetName).is_file():
print("already downloaded the training dataset")
else:
print("downloading training dataset from %s ..." % (self.datasetUrl))
dataset = requests.get(self.datasetUrl)
open(self.datasetName, 'wb').write(dataset.content)
print("finish downloading training dataset.")
def _log_transfer(self, factors: List, df: pd.DataFrame) -> pd.DataFrame:
try:
for factor in factors:
if factor in df.columns:
df[factor] = np.log2(df[factor] + 0.5)
return df
except Exception as e:
print("error with func _log_transfer: %s" % (repr(e)))
print(traceback.format_exc())
def _train_one_model(self, factors: List, dataPath: str, modelPath: str):
'''
This function is used to train one model (90% data for training and 10% for testing)
params:
factors: the list of factors for training the model
'''
try:
df = pd.read_csv(dataPath, nrows=700000)
Y = df[['lifetime_minutes']]
Y = self._log_transfer(['lifetime_minutes'], Y)
X = df[factors]
continuous_factors = ConfigLoader().load_prediction_service_config()["trainer"]["factor_list"]["continuous"] # read continuous variables
X = self._log_transfer(continuous_factors, X)
X_train, X_test, Y_train, Y_test = model_selection.train_test_split(X, Y, test_size=0.1, random_state=10)
# Fit the model on training set
model = LinearRegression()
model.fit(X_train, Y_train)
# Test the model performance
Y_test_hat = model.predict(X_test)
print("the performance of the trained model (r2_score):")
print(r2_score(Y_test, Y_test_hat))
# save the model to disk
pickle.dump(model, open(modelPath, 'wb'))
return model
except Exception as e:
print("error with func _train_one_model: %s" % (repr(e)))
print(traceback.format_exc())
def train(self):
'''
train a model, save it to the model path and return it
return:
pickle models
'''
try:
# train the model
if pathlib.Path(self.modelSubmissionPath).is_file():
print("already trained the submission model")
self.modelSubmission = pickle.load(open(self.modelSubmissionPath, 'rb'))
if pathlib.Path(self.modelProcessPath).is_file():
print("already trained the process model")
self.modelProcess = pickle.load(open(self.modelProcessPath, 'rb'))
if self.modelSubmission is None:
# download dataset
self.download_dataset()
# train the model
self.modelSubmission = self._train_one_model(factors=self.modelSubmissionFactors, dataPath=self.datasetName, modelPath=self.modelSubmissionPath)
if self.modelProcess is None:
self.download_dataset()
self.modelProcess = self._train_one_model(factors=self.modelProcessFactors, dataPath=self.datasetName, modelPath=self.modelProcessPath)
except Exception as e:
print("error with func train: %s" % (repr(e)))
print(traceback.format_exc())
if __name__ == "__main__":
print("training the model...")
trainer = Trainer()
print("finish")

0
app/services/__init__.py Normal file
View File

View File

@ -0,0 +1,44 @@
# this is the authentication service
import sys, time, pathlib, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
import jwt
from app.utils.global_variables import GlobalVariable
from app.models.jwt_query import JWTQuery
from app.models.installation import Installation
from app.services.queries import query_access_token
from app.utils.config_loader import ConfigLoader
ALGORITHM = "RS256"
def getToken(installation: Installation) -> str:
result: str = None
try:
envConfig = ConfigLoader().load_env()
if "APP" not in envConfig or "PRIVATE_KEY_PATH" not in envConfig['APP']:
raise Exception("error with configuration .env.yaml")
private_key_path: str = envConfig['APP']['PRIVATE_KEY_PATH']
with open(private_key_path) as f:
private_pem = f.read()
payload = {
"iat": int(time.time()) - 60,
"exp": int(time.time()) + (10 * 60),
"iss": GlobalVariable.appId
}
encoded_jwt = jwt.encode(payload, private_pem, algorithm=ALGORITHM)
# query access token of the installation
headers = {'Authorization': 'bearer ' + encoded_jwt, "Accept": "application/vnd.github.v3+json"}
url = "https://api.github.com/app/installations/{installation_id}/access_tokens".format(installation_id=installation.id)
jwtQuery = JWTQuery(headers=headers, url=url)
result = query_access_token(jwtQuery)
except Exception as e:
print("error with func getToken: %s" % repr(e))
print(traceback.format_exc())
finally:
return result

75
app/services/comments.py Normal file
View File

@ -0,0 +1,75 @@
# the services related to labels
import sys, requests, pathlib, json, asyncio, datetime, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.services.authentication import getToken
from app.models.pr_comment import PRComment
from app.models.trigger import *
from app.db.operators.pull_request_operator import PullRequestOperator
from app.utils.global_variables import GlobalVariable
from app.prediction_service.predictor import Predictor
LATENCY_ACCEPT = "This pull request can be merged"
LATENCY_REJECT = "This pull request cannot be merged"
def return_pr_latency(prTrigger: PRTrigger) -> bool:
try:
# insert/update db: pull_requests
async def insert_pull_request() -> None:
prOp = PullRequestOperator()
await prOp.insert_pull_request(pr=prTrigger.pr, installation=prTrigger.installation)
loop = asyncio.get_event_loop()
loop.run_until_complete(insert_pull_request())
# predict the result:
latency = Predictor(trainer=GlobalVariable.trainer, type="submission").predict(pr=prTrigger.pr, installation=prTrigger.installation)
latency_comment = LATENCY_ACCEPT if latency else LATENCY_REJECT
token = getToken(prTrigger.installation)
comment = PRComment(pr=prTrigger.pr, body=latency_comment)
headers = {'Authorization': 'token ' + token, 'Accept': 'application/vnd.github.v3+json'}
url = "https://api.github.com/repos/{owner}/{repo}/issues/{pull_request_number}/comments".format(owner=comment.pr.owner.login, repo=comment.pr.repo.name, pull_request_number=comment.pr.number)
data = {"body": comment.body}
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code != 201:
raise Exception("error with func return_pr_latency: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
comment_id = response.json()["id"]
comment_body = comment.body
# insert/update db: pull_requests - last_comment_at, comment_or_not
async def update_pull_request_comment() -> None:
prOp = PullRequestOperator()
await prOp.update_pull_request_comment(pr=prTrigger.pr, last_comment_at=datetime.datetime.utcnow(), comment_or_not=1, comment_id=comment_id, comment_body=comment_body)
loop = asyncio.get_event_loop()
loop.run_until_complete(update_pull_request_comment())
except Exception as e:
print("error with func return_pr_latency: %s" % (repr(e)))
print(traceback.format_exc())
async def return_pr_latency_schedular(prSchedulerTrigger:PRSchedulerTrigger) -> bool:
try:
# predict the result:
latency = Predictor(trainer=GlobalVariable.trainer, type="process").predict(pr=prSchedulerTrigger.pr, installation=prSchedulerTrigger.installation)
latency_comment = LATENCY_ACCEPT if latency else LATENCY_REJECT
token = getToken(prSchedulerTrigger.installation)
comment = PRComment(pr=prSchedulerTrigger.pr, body=latency)
headers = {'Authorization': 'token ' + token, 'Accept': 'application/vnd.github.v3+json'}
url = "https://api.github.com/repos/{owner}/{repo}/issues/{pull_request_number}/comments".format(owner=comment.pr.owner.login, repo=comment.pr.repo.name, pull_request_number=comment.pr.number)
data = {"body": comment.body}
response = requests.post(url, data=json.dumps(data), headers=headers)
if response.status_code != 201:
raise Exception("error with func return_pr_latency_schedular: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
comment_id = response.json()["id"]
comment_body = comment.body
# insert/update db: pull_requests - last_comment_at, comment_or_not
prOp = PullRequestOperator()
await prOp.update_pull_request_comment(pr=prSchedulerTrigger.pr, last_comment_at=datetime.datetime.utcnow(), comment_or_not=1, comment_id=comment_id, comment_body=comment_body)
except Exception as e:
print("error with func return_pr_latency_schedular: %s" % (repr(e)))
print(traceback.format_exc())
if __name__ == "__main__":
return_pr_latency()

89
app/services/queries.py Normal file
View File

@ -0,0 +1,89 @@
# query github using apis
import sys, requests, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
import json, time, jwt, traceback
from typing import Dict, List
from app.models.installation import Installation
from app.models.jwt_query import JWTQuery
from app.utils.config_loader import ConfigLoader
from app.utils.global_variables import GlobalVariable
ALGORITHM = "RS256"
def query_installations() -> Dict:
result: Dict = {}
try:
envConfig = ConfigLoader().load_env()
if "APP" not in envConfig or "PRIVATE_KEY_PATH" not in envConfig['APP']:
raise Exception("error with configuration .env.yaml")
private_key_path: str = envConfig['APP']['PRIVATE_KEY_PATH']
with open(private_key_path) as f:
private_pem = f.read()
payload = {
"iat": int(time.time()) - 60,
"exp": int(time.time()) + (10 * 60),
"iss": GlobalVariable.appId
}
encoded_jwt = jwt.encode(payload, private_pem, algorithm=ALGORITHM)
headers = {'Authorization': 'bearer ' + encoded_jwt, "Accept": "application/vnd.github.v3+json"}
url = "https://api.github.com/app/installations"
page = 1
per_page = 100
while(True):
urlNew = url + "?per_page=%s&page=%s" % (per_page, page)
response = requests.get(urlNew, headers=headers)
if response.status_code != 200:
raise Exception("error with func query_installations: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
installations = response.json()
for installation in installations:
result[installation["account"]["login"]] = Installation(id=installation["id"], app_id=installation["app_id"]) # installation is related to a user
if len(installations) < per_page:
break
else:
page += 1
except Exception as e:
print("error with func query_installations: %s" % (repr(e)))
print(traceback.format_exc())
finally:
return result
def query_access_token(query: JWTQuery) -> str:
result: str = None
try:
response = requests.post(query.url, headers=query.headers)
if response.status_code != 201:
raise Exception("error with func query_access_token: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
result = response.json()["token"]
except Exception as e:
print("error with func query_access_token: %s" % (repr(e)))
print(traceback.format_exc())
finally:
return result
def query_app_id() -> int:
try:
result: int
app_slug = ConfigLoader().load_env()["APP"]["APP_SLUG"]
personal_token = ConfigLoader().load_env()["APP"]["PERSONAL_TOKEN"]
url = "https://api.github.com/apps/{app_slug}".format(app_slug=app_slug)
headers = {'Authorization': 'Bearer ' + personal_token, "Accept": "application/vnd.github.v3+json"}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise Exception("error with func query_app_id: code: %s, message: %s" % (response.status_code, json.loads(response.text)["message"]))
result = response.json()["id"]
return result
except Exception as e:
print("error with func query_app_id: %s" % (repr(e)))
print(traceback.format_exc())

41
app/services/scheduler.py Normal file
View File

@ -0,0 +1,41 @@
import sys, pathlib, asyncio, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from apscheduler.schedulers.background import BackgroundScheduler
from app.db.operators.pull_request_operator import PullRequestOperator
from app.models.scheduler import SchedulerModel
from typing import List
from app.models.trigger import PRSchedulerTrigger
from app.services.comments import return_pr_latency_schedular
from app.services.queries import query_installations
from app.utils.config_loader import ConfigLoader
class Scheduler():
sched: BackgroundScheduler
def __init__(self) -> None:
try:
self.sched = BackgroundScheduler()
self.sched.add_job(self.job_predict_latency, 'interval', minutes=ConfigLoader().load_env()["SERVICE"]["SCHEDULER"]["CYCLE_MINUTES"])
self.sched.start()
print("the schedular is started.")
except Exception as e:
print("error with the initialization of Scheduler: %s" % (repr(e)))
print(traceback.format_exc())
def job_predict_latency(self) -> None:
try:
# query prs
prOp = PullRequestOperator()
tasks: List[SchedulerModel] = asyncio.run(prOp.query_prScheduler_4_scheduler())
# the installation_id may change, so we need to use bot_slug to get app_id and then get related installation_id
installationDict = query_installations()
# handle each task and predict the latency
for task in tasks:
# if user remove the installation, we also need to remove it......
asyncio.run(return_pr_latency_schedular(PRSchedulerTrigger(installation=installationDict[task.pr.owner.login], pr=task.pr)))
except Exception as e:
print("error with func job_predict_latency: %s" % (repr(e)))
print(traceback.format_exc())

41
app/services/triggers.py Normal file
View File

@ -0,0 +1,41 @@
# handle triggers
import sys, pathlib, asyncio, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.trigger import *
from app.models.repository import Repository
from app.models.user import User
from app.models.installation import Installation
from app.models.pull_request import PullRequest
from app.utils.time_operator import TimeOperator
from app.db.operators.pull_request_operator import PullRequestOperator
from typing import Dict
def parseTriggers(response: Dict) -> Trigger:
try:
if 'repository' not in response or "installation" not in response:
return None # this is not a right trigger, e.g., the trigger will also be touched when install the app
# check whether the permissions have been admitted by the installation
repo = Repository(name=response['repository']['name'])
owner = User(login=response['repository']['full_name'].split("/")[0])
sender = User(login=response['sender']['login'])
installation = Installation(id=response['installation']['id'], app_id=None) # app_id is not in the response Dict
if "pull_request" in response:
# create PRTrigger
pr = PullRequest(owner=owner, repo=repo, number=response['number'], state=response['pull_request']['state'], locked=response['pull_request']['locked'], created_at=TimeOperator().convertTZTime2TimeStamp(response['pull_request']['created_at']))
# insert into database
async def insert_pr(pr: PullRequest) -> None:
prOp = PullRequestOperator()
prs = await prOp.insert_pull_request(pr=pr, installation=installation)
return prs
loop = asyncio.get_event_loop()
loop.run_until_complete(insert_pr(pr))
prTrigger = PRTrigger(installation=installation, repo=repo, sender=sender, pr=pr, action=response['action'])
return prTrigger
except Exception as e:
print("error with func parseTriggers: %s" % (repr(e)))
print(traceback.format_exc())

0
app/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,27 @@
from typing import Dict
import yaml, sys, pathlib, traceback
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
class ConfigLoader():
def load_env(self) -> Dict:
result: Dict = {}
try:
with open(".env.yaml") as f:
result = yaml.load(f, Loader=yaml.FullLoader)
except Exception as e:
print("error with func load_env: %s" % (repr(e)))
print(traceback.format_exc())
finally:
return result
def load_prediction_service_config(self) -> Dict:
result: Dict = {}
try:
with open("app/prediction_service/config.yaml") as f:
result = yaml.load(f, Loader=yaml.FullLoader)
except Exception as e:
print("error with func load_prediction_service_config: %s" % (repr(e)))
print(traceback.format_exc())
finally:
return result

View File

@ -0,0 +1,8 @@
import aiomysql
from typing import Optional
class GlobalVariable():
dbPool: Optional[aiomysql.pool.Pool] = None
trainer = None # the models for predicting pull request latency
appId: int

View File

@ -0,0 +1,14 @@
import datetime, traceback
class TimeOperator():
def convertTZTime2TimeStamp(self, utc):
result = None
try:
UTC_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
result = datetime.datetime.strptime(utc, UTC_FORMAT)
except Exception as e:
print('error with func convertTZTime2TimeStamp: %s' % (repr(e)))
print(traceback.format_exc())
finally:
return result

17
app/utils/word_counter.py Normal file
View File

@ -0,0 +1,17 @@
from typing import List
import re, traceback
class WordCounter():
def count(self, strs: List) -> int:
try:
result: int = 0
for s in strs:
if s is None:
pass
result += len(re.findall(r'\w+', s))
return result
except Exception as e:
print("error with func count: %s" % (repr(e)))
print(traceback.format_exc())

0
tests/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,11 @@
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[3]))
from app.db.operators.base_operator import BaseOperator
import pytest
from sqlalchemy.ext.asyncio import AsyncEngine
@pytest.mark.asyncio
async def test_base_operator() -> None:
baseOp = BaseOperator()
assert type(baseOp.engine) == AsyncEngine

View File

@ -0,0 +1,26 @@
from app.models.installation import Installation
from gettext import install
import sys, pathlib
sys.path.append(str(pathlib.Path(__file__).resolve().parents[3]))
from app.db.operators.pull_request_operator import PullRequestOperator
import pytest
from app.models.pull_request import PullRequest
from app.models.installation import Installation
from app.models.user import User
from app.models.repository import Repository
@pytest.mark.asyncio
async def test_insert_pull_requests() -> None:
prOp = PullRequestOperator()
installation=Installation(id=1, app_id=None)
assert installation != None
pr=PullRequest(owner=User("zxh2"), repo=Repository("test"), number=1, state="open", locked=1, created_at='2020-01-01 00:00:00')
assert pr != None
await prOp.insert_pull_request(pr=pr, installation=installation)
@pytest.mark.asyncio
async def test_query_prScheduler_4_scheduler() -> None:
prOp = PullRequestOperator()
result = await prOp.query_prScheduler_4_scheduler()
assert len(result) == 0

View File

@ -0,0 +1,35 @@
import sys, pathlib, datetime
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2]))
from app.models.installation import Installation
from app.models.pull_request import PullRequest
from app.models.user import User
from app.models.repository import Repository
from app.utils.time_operator import TimeOperator
from app.prediction_service.factor_getter import FactorGetter
import pytest
def test_lifetime_minutes() -> None:
factorGetter = FactorGetter(
pr=PullRequest(owner=User(login="zhangxunhui"), repo=Repository(name="bot-pullreq-decision"), number=5),
installation=Installation(id=18836058)
)
lifetime = factorGetter.lifetime_minutes()
assert lifetime == int((datetime.datetime.utcnow() - TimeOperator().convertTZTime2TimeStamp("2021-08-15T07:43:10Z")).total_seconds()/60)
def test_has_comments() -> None:
factorGetter = FactorGetter(
pr=PullRequest(owner=User(login="zhangxunhui"), repo=Repository(name="bot-pullreq-decision"), number=5),
installation=Installation(id=18836058)
)
has_comments = factorGetter.has_comments()
def test_query_pr_infos() -> None:
factorGetter = FactorGetter(
pr=PullRequest(owner=User(login="zhangxunhui"), repo=Repository(name="bot-pullreq-decision"), number=5),
installation=Installation(id=18836058)
)
result = factorGetter.query_pr_infos()
print("pause")