Merge pull request '黄成波 pull request' (#1) from bobo_242/PRWelBot4SECourse:master into master

This commit is contained in:
observerw 2023-12-06 22:07:35 +08:00
commit 5598773d06
5 changed files with 71 additions and 77 deletions

View File

@ -1,31 +1,30 @@
#!/usr/bin/python
# 负责调用平台接口 完成各项操作
import sys
sys.path.append('..')
import requests
sys.path.append('..') # 将上一级目录添加到 sys.path用于导入上级目录的模块
import requests # 导入用于发送 HTTP 请求的库
import json
import config.baseConfig as baseconfig
from commons.logUtil import logger
import config.baseConfig as baseconfig # 导入基本配置信息
from commons.logUtil import logger # 导入自定义的日志工具模块
# 获取 pull状态新信息
# 获取 pull 状态新信息
def get_pull_infor(owner, repo, index):
url = baseconfig.apiUrl + "api/v1/{}/pulls/{}.json".format(repo, index)
response = requests.get(url, headers=baseconfig.header, proxies = baseconfig.proxies)
logger.info("get_pull_infor调用"+str(response.json()))
return response.json()
url = baseconfig.apiUrl + "api/v1/{}/pulls/{}.json".format(repo, index) # 构建请求 URL
response = requests.get(url, headers=baseconfig.header, proxies=baseconfig.proxies) # 发送 GET 请求
logger.info("get_pull_infor调用"+str(response.json())) # 记录调用日志
return response.json() # 返回响应的 JSON 数据
# 获取 添加pull评论信息
# 创建 pull 评论信息
def create_pull_comment(issue_id):
url = baseconfig.apiUrl + "api/issues/{}/journals.json".format(issue_id)
url = baseconfig.apiUrl + "api/issues/{}/journals.json".format(issue_id) # 构建请求 URL
COMMENT = "注意!\n该合并请求已创建满两小时,长时间未处理可能会降低贡献的质量和贡献者积极性。\n请及时处理!"
data = json.dumps({'content': COMMENT})
response = requests.post(url, data=data, headers=baseconfig.header, proxies = baseconfig.proxies)
logger.info("create_pull_comment调用"+str(response.json()))
return response.json()
data = json.dumps({'content': COMMENT}) # 构建请求数据
response = requests.post(url, data=data, headers=baseconfig.header, proxies=baseconfig.proxies) # 发送 POST 请求
logger.info("create_pull_comment调用"+str(response.json())) # 记录调用日志
return response.json() # 返回响应的 JSON 数据
if __name__ == '__main__':
get_pull_infor("wuxiaojun", "wuxiaojun/botreascrch", "3")
get_pull_infor("wuxiaojun", "wuxiaojun/botreascrch", "3") # 测试 get_pull_infor 函数

View File

@ -1,14 +1,14 @@
#!/usr/bin/python
# 负责调用平台接口 完成各项操作
import json
import sys
sys.path.append('..')
sys.path.append('..') # 将上一级目录添加到 sys.path用于导入上级目录的模块
import jwt
import time
import sys
# 通过bot基本信息字段获取JWT 和 token
# 通过 bot 基本信息字段获取 JWT 和 token
def getJWT():
if len(sys.argv) > 1:
pem = sys.argv[1]
@ -16,32 +16,30 @@ def getJWT():
# pem = input("Enter path of private PEM file: ")
pem = 'C:\\Users\\文学奖\\Downloads\\my-first-apps1.2023-02-05.private-key (1).pem' # (1)
# Get the App ID
# 获取 App ID
if len(sys.argv) > 2:
app_id = sys.argv[2]
else:
# app_id = input("Enter your APP ID: ")
app_id = 10009 #"CrnV5uYEmwMGu7H6osHHi-4aSyKdY99qRMw4i44Kfi4" #186702
app_id = 10009 # "CrnV5uYEmwMGu7H6osHHi-4aSyKdY99qRMw4i44Kfi4" #186702
# Open PEM
# 打开 PEM 文件
with open(pem, 'rb') as pem_file:
signing_key = jwt.jwk_from_pem(pem_file.read())
#signing_key = jwt.jwk_from_pem(privateKey)
payload = {
# Issued at time
# JWT 签发时间
'iat': int(time.time()),
# JWT expiration time (10 minutes maximum)
# JWT 过期时间 (最大为 10 分钟)
'exp': int(time.time()) + 600,
# GitHub App's identifier
# GitHub App 的标识符
'iss': app_id
}
# Create JWT
# 创建 JWT
jwt_instance = jwt.JWT()
encoded_jwt = jwt_instance.encode(payload, signing_key, alg='RS256')
print(f"JWT: ", encoded_jwt)
if __name__ == "__main__":
print("hello")
print("hello") # 测试输出

View File

@ -2,26 +2,25 @@
import logging
import logging.handlers
'''
日志工具类
'''
class Logging:
def __init__(self):
# log文件存储路径
# log 文件存储路径
self._log_filename = 'bot.log'
'''
%(levelno)s: 打印日志级别的数值
%(levelname)s: 打印日志级别名称
%(pathname)s: 打印当前执行程序的路径其实就是sys.argv[0]
%(pathname)s: 打印当前执行程序的路径其实就是 sys.argv[0]
%(filename)s: 打印当前执行程序名
%(funcName)s: 打印日志的当前函数
%(lineno)d: 打印日志的当前行号
%(asctime)s: 打印日志的时间
%(thread)d: 打印线程ID
%(thread)d: 打印线程 ID
%(threadName)s: 打印线程名称
%(process)d: 打印进程ID
%(process)d: 打印进程 ID
%(message)s: 打印日志信息
'''
# 日志信息输出格式
@ -29,8 +28,8 @@ class Logging:
'%(pathname)s - %(levelname)s: %(message)s')
# 创建一个日志对象
self._logger = logging.getLogger()
# 设置控制台日志的输出级别: 级别排序:CRITICAL > ERROR > WARNING > INFO > DEBUG
self._logger.setLevel(logging.INFO) # 大于info级别的日志信息都会被输出
# 设置控制台日志的输出级别: 级别排序: CRITICAL > ERROR > WARNING > INFO > DEBUG
self._logger.setLevel(logging.INFO) # 大于 info 级别的日志信息都会被输出
self.set_console_logger()
self.set_file_logger()
@ -53,5 +52,4 @@ class Logging:
def get_logger(self):
return self._logger
logger = Logging().get_logger()
logger = Logging().get_logger()

View File

@ -1,35 +1,35 @@
#!/usr/bin/python
import sys
sys.path.append('..')
from flask import Flask, jsonify, request
from flask_cors import CORS, cross_origin
sys.path.append('..') # 将上一级目录添加到sys.path用于导入上级目录的模块
from flask import Flask, jsonify, request # 导入 Flask 相关模块
from flask_cors import CORS, cross_origin # 导入处理跨域请求的模块
import json
import services.pullService as pullService
import services.pullService as pullService # 导入自定义的拉取服务模块
import datetime
from commons.logUtil import logger
from commons.logUtil import logger # 导入自定义的日志工具模块
app = Flask(__name__)
app = Flask(__name__) # 创建 Flask 应用实例
# 测试
# 测试路由,返回一个 JSON 响应
@app.route('/hello', methods=['get'])
@cross_origin()
@cross_origin() # 处理跨域请求
def hello():
return jsonify({
"code": 0,
"msg": "hello"
})
# 功能:每当有一个新的PR创建/关闭时,根据状态执行存储或更新操作
# 处理 GitHub Webhook 的路由,根据 PR 的状态执行存储或更新操作
@app.route('/prwelcome', methods=['POST'])
@cross_origin()
@cross_origin() # 处理跨域请求
def pr_welcome():
try:
payload = request.json
payload = request.json # 从请求中获取 JSON 数据
print(payload)
logger.info("获取webhook信息成功:" + str(payload))
# # 判断是新创建issue还是issue的状态发生改变
logger.info("获取webhook信息成功:" + str(payload)) # 记录成功获取 Webhook 信息的日志
# 判断是新创建 PR 还是 PR 的状态发生改变
if payload["action"] == 'opened':
pull_id = payload["pull_request"]["id"]
index = payload["pull_request"]["number"]
@ -39,27 +39,25 @@ def pr_welcome():
created = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%S")
check_time = created + datetime.timedelta(hours=2)
values = (pull_id, index, owner, repo, created, check_time, 0)
# print(values)
pullService.insert_pull(values)
pullService.insert_pull(values) # 插入新 PR 的信息到数据库
# 关闭 更新数据库对应pr状态 (合并或者拒绝都属于关闭)
# 关闭 PR更新数据库对应 PR 状态(合并或拒绝都属于关闭)
elif payload["action"] == 'closed':
index = payload["pull_request"]["number"]
owner = payload["pull_request"]["user"]["login"]
repo = payload["repository"]["full_name"]
pullService.update_pull(index, owner, repo)
pullService.update_pull(index, owner, repo) # 更新数据库中对应 PR 的状态
return jsonify({
"code": 0,
"msg": "success"
})
except Exception as e:
return jsonify({
"code": -1,
"msg": "exception:" + str(e)
})
if __name__ == '__main__':
app.run(port=3090)
app.run(port=3090) # 如果脚本直接运行而非被导入,则运行 Flask 应用在指定端口

29
main.py
View File

@ -1,29 +1,30 @@
#!/usr/bin/python
# 项目主函数入口
import datetime
import time
from commons.logUtil import logger
from controllers.prController import app
from services.pullService import check_and_update_pull
from threading import Thread
import config.baseConfig as baseConfig
from commons.logUtil import logger # 导入日志工具
from controllers.prController import app # 导入应用实例
from services.pullService import check_and_update_pull # 导入检查和更新拉取的服务函数
from threading import Thread # 导入多线程模块
import config.baseConfig as baseConfig # 导入基本配置信息
# 定义一个监控循环的函数
def monitoring_loop():
while True:
check_and_update_pull()
check_and_update_pull() # 调用检查和更新拉取的服务函数
now = datetime.datetime.now()
logger.info('执行定时任务 :' + now.strftime('%Y-%m-%d %H:%M:%S'))
time.sleep(600)
logger.info('执行定时任务 :' + now.strftime('%Y-%m-%d %H:%M:%S')) # 记录定时任务执行的时间
time.sleep(600) # 等待10分钟
# 定义主运行函数
def run():
monitoring_thread = Thread(target=monitoring_loop)
monitoring_thread.start()
app.run(host=baseConfig.host, port=baseConfig.port)
monitoring_thread = Thread(target=monitoring_loop) # 创建一个线程,运行监控循环
monitoring_thread.start() # 启动监控线程
app.run(host=baseConfig.host, port=baseConfig.port) # 运行应用实例,指定主机和端口
# 如果脚本直接运行而非被导入则执行run函数
if __name__ == '__main__':
run()