用例集执行完成后,发送邮件进行通知。

This commit is contained in:
jerrylizilong 2018-09-19 11:47:32 +08:00
parent 74c60ef7fe
commit 7a56be7421
12 changed files with 116 additions and 88 deletions

View File

@ -1,3 +1,6 @@
# update : 2018-09-19
# 新增执行完成后发送邮件
# update : 2018-09-14
## 优化步骤出错处理: 使用了 retrying 模块。 需要新安装该模块: pip install retrying
## 部分步骤进行了调整,建议重新初始化 keywords 表的数据。
@ -68,8 +71,8 @@ git clone https://github.com/jerrylizilong/autotest_platform.git
cd autotest_platform
pip3 install -r requirements.txt
```
- 3. 数据库配置: 创建数据库,并执行 init.sql 建表并初始化配置数据。
- 4. 配置: 修改 app/config.py 中关于数据库部分的配置: host、port、database、user、password
- 3. 数据库配置: 创建数据库,并执行 init.sql 建表并初始化配置数据。 修改 app/config.py 中关于数据库部分的配置: host、port、database、user、password。
- 4. 邮件配置:修改 app/config.py 中关于邮件部分的配置
## 启动:
### 1.启动 flask

View File

@ -2,7 +2,7 @@ DBtype = '2' # '1' : sqlite, 2: mysql
host='localhost'
port='3306'
user='root'
password='jerryli'
password='yourpassword'
database='test_auto_new'
isUseATX=True
@ -22,9 +22,12 @@ else:
logPath = currentPath + '/log/'
screen_shot_path = currentPath +'/static/screenshot/'
server_host = 'smtp.163.com'
server_port = '25'
from_email = 'youaccount@163.com'
server_user = from_email
server_password = 'yourpassword'
is_email_enable = False
flask_host = 'http://localhost:5000' # 邮件中的报告链接会使用
smtp_server_host = 'smtp.163.com'
smtp_server_port = '25'
smtp_from_email = 'youraccount@163.com'
smtp_default_to_email = 'youraccount@163.com'
smtp_server_user = smtp_from_email
smtp_server_password = 'yourpassword'

View File

@ -5,12 +5,9 @@ from app.db import test_task_manage
def main():
# test_task_manage.test_task_manage().update_test_suite_check()
idList = test_task_manage.test_task_manage().test_suite_list()
idList1 = test_task_manage.test_task_manage().test_case_list()
# log.log().logger.info(idList)
result1=0
result2=0
if len(idList):
for caselist in idList:
test_suite_id = caselist[0]
@ -27,22 +24,15 @@ def main():
else:
threadNum = 0
runType = ''
# if runType == '':
# test_task_manage.test_task_manage().update_test_suite(test_suite_id, '3')
# else:
# test_task_manage.test_task_manage().update_test_suite(test_suite_id, '2')
process.process().runmain(test_suite_id, threadNum, runType)
# test_task_manage.test_task_manage().update_test_suite_check()
result1 = 0
else:
# log.log().logger.info('no test is available!')
result1=1
if len(idList1):
threadNum = 1
process.process().multipleRun(idList1, threadNum)
result2 = 0
else:
# log.log().logger.info('no test is available!')
result2=1
result = result1 +result2
return result

View File

@ -112,7 +112,7 @@ class hubs():
return results
def getDevices(self):
url = config.ATXHost+'/list'
url = config.ATXHost + '/list'
response, content = util.util().send(url)
content = json.loads(content)
deviceList = []
@ -125,7 +125,7 @@ class hubs():
#获取设备列表信息
def getDevicesList(self):
url = config.ATXHost+'/list'
url = config.ATXHost + '/list'
response, content = util.util().send(url)
content = json.loads(content)
deviceLists=[]

View File

@ -5,7 +5,7 @@ from app import config
class log :
def __init__(self) :
self.logfile =config.logPath + 'core-service.log'
self.logfile = config.logPath + 'core-service.log'
logzero.logfile(self.logfile, maxBytes = 1e6, backupCount = 3)
import logging
formatter = logging.Formatter('%(asctime)-15s - [%(filename)s: %(lineno)s] -%(levelname)s: %(message)s');

View File

@ -69,53 +69,3 @@ class util():
m.update(preosign)
return m.hexdigest()
def sendEmail(self,to_email, msg1,filename):
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart,MIMEBase
from email.header import Header
from email.mime.image import MIMEImage
from email import encoders
from email.utils import parseaddr,formataddr
import smtplib
from app import config
server_host = config.server_host
server_port = config.server_port
from_email = config.from_email
server_user = config.server_user
server_password = config.server_password
def _format_addrs(s):
name,addr = parseaddr(s)
return formataddr((Header(name,'utf-8').encode(),addr))
# msg = MIMEText(msg1,'plain','utf-8')
msg = MIMEMultipart()
msg['From']=_format_addrs('Autotest platform: %s' %from_email)
msg['To']=_format_addrs('User : %s' %to_email)
msg['Subject']=Header(msg1,'utf-8').encode()
msg.attach(MIMEText(msg1,'plain','utf-8'))
# msg.attach()
fp = open(file=filename.replace(u'\u202a',''),mode='rb')
msgImage = MIMEImage(fp.read())
msgImage['Content-Type'] = 'application/octet-stream'
msgImage['Content-Disposition'] = 'attachment;filename="1.png"'
fp.close()
# msgImage.add_header('Content-ID', '<image1>')
msg.attach(msgImage)
server = smtplib.SMTP(server_host,server_port)
server.starttls()
server.set_debuglevel(1)
try:
server.login(server_user,server_password)
server.sendmail(from_email,[to_email],msg.as_string())
except smtplib.SMTPAuthenticationError as e:
print(e)
except smtplib.SMTPDataError as e:
print(e)
server.quit()

View File

@ -307,17 +307,17 @@ class test_batch_manage(object):
sql = 'SELECT id FROM test_suite WHERE (STATUS = 0 OR STATUS = 2 ) ; '
result = useDB.useDB().search(sql)
check_result = ''
test_suite_list = []
if len(result):
for id in result:
sql = 'SELECT COUNT(1) FROM test_batch WHERE test_suite_id = %s AND STATUS in (0,4);' %id
for test_suite_id in result:
sql = 'SELECT COUNT(1) FROM test_batch WHERE test_suite_id = %s AND STATUS in (0,4);' %test_suite_id
result1 = useDB.useDB().search(sql)
# print(result1[0][0])
if len(result1) and (result1[0][0]==0):
if check_result !='':
check_result += ','
check_result +=(str(id[0]))
# print(check_result)
return check_result
check_result +=(str(test_suite_id[0]))
test_suite_list.append(test_suite_id[0])
return check_result,test_suite_list
def set_test_running(self,id,deviceList=[]):

View File

@ -1,18 +1,13 @@
from app.core import log
from app import useDB
from app.db import test_batch_manage
class test_task_manage():
def test_suite_list(self):
sql = 'select id,run_type from test_suite where status in (0,2);'
# sql = sql.substitute(type = type)
# log.log().logger.info(sql)
idList = useDB.useDB().search(sql)
return idList
def test_case_list(self):
sql = 'select id, steps,browser_type from test_batch where status in (0) and test_suite_id = 0;'
# sql = sql.substitute(type = type)
# log.log().logger.info(sql)
idList = useDB.useDB().search(sql)
return idList
@ -20,12 +15,15 @@ class test_task_manage():
import string
sql = string.Template('update test_suite set status = $status, runCount = runCount+1 where id = "$relateId";')
sql = sql.substitute(relateId = relateId,status=status)
# log.log().logger.info(sql)
useDB.useDB().insert(sql)
def update_test_suite_check(self):
check_list = test_batch_manage.test_batch_manage().search_done_test_suite()
check_list,test_suite_list = test_batch_manage.test_batch_manage().search_done_test_suite()
# print(check_list,test_suite_list)
if len(check_list):
sql = 'update test_suite set status = 1 where id in (%s);' %check_list
# print(sql)
useDB.useDB().insert(sql)
useDB.useDB().insert(sql)
from app import config
if config.is_email_enable:
from app.util import sendEmail
sendEmail.sendEmail().send_test_results(test_suite_list)

View File

@ -49,3 +49,4 @@ def run_all():
#插入数据库
test_unittest_manage.test_unittest_manage().new_unittest_case(reportName, start_time, end_time, reportFileName)
run_all()

View File

@ -52,7 +52,7 @@ class mysqlDB(object):
def connect(self):
# change root password to yours:
import mysql.connector
conn = mysql.connector.connect(host=config.host,port=config.port, user=config.user, password=config.password, database=config.database)
conn = mysql.connector.connect(host=config.db_host, port=config.db_port, user=config.db_user, password=config.db_password, database=config.database)
return conn
def search(self, sql):

0
app/util/__init__.py Normal file
View File

83
app/util/sendEmail.py Normal file
View File

@ -0,0 +1,83 @@
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.mime.image import MIMEImage
from email.utils import parseaddr, formataddr
import smtplib
from app import config
from app.core import log
class sendEmail(object):
def __init__(self):
self.server_host = config.smtp_server_host
self.server_port = config.smtp_server_port
self.from_email = config.smtp_from_email
self.default_to_email=config.smtp_default_to_email
self.server_user = config.smtp_server_user
self.server_password = config.smtp_server_password
def _format_addrs(self,s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
def init_MIMEMultipart(self,to_email,title,content,filename=''):
msg = MIMEMultipart()
msg['From'] = self._format_addrs('Autotest platform: %s' % self.from_email)
msg['To'] = self._format_addrs('User : %s' % to_email)
msg['Subject'] = Header(title, 'utf-8').encode()
msg.attach(MIMEText(content, 'plain', 'utf-8'))
if filename !='':
fp = open(file=filename.replace(u'\u202a', ''), mode='rb')
msgImage = MIMEImage(fp.read())
msgImage['Content-Type'] = 'application/octet-stream'
msgImage['Content-Disposition'] = 'attachment;filename="1.png"'
fp.close()
msg.attach(msgImage)
return msg
def init_server(self):
server = smtplib.SMTP(self.server_host, self.server_port)
server.starttls()
server.set_debuglevel(1)
return server
def sendEmail(self, to_email,title = 'test result', content='', filename=''):
filename.replace("\\", '/')
server = self.init_server()
msg = self.init_MIMEMultipart(to_email,title,content,filename)
try:
server.login(self.server_user, self.server_password)
server.sendmail(self.from_email, [to_email], msg.as_string())
except smtplib.SMTPAuthenticationError as e:
print(e)
except smtplib.SMTPDataError as e:
print(e)
server.quit()
def send_test_result(self,test_suit_id,to_email= []):
if len(to_email)==0:
to_email = [self.default_to_email]
from app.db import test_batch_manage,test_suite_manage
test_suit_id = str(test_suit_id)
result = test_batch_manage.test_batch_manage().show_test_batch_status(test_suit_id)
# print(result)
test_title = 'test result for batch : ' + test_suit_id +'-'+test_suite_manage.test_suite_manage().search_test_suite(test_suit_id,'name')[0][0]
test_result = 'id: %s, 总用例数:%s, 成功用例数:%s, 失败用例数: %s, 通过率: %s , 报告地址: %s' % (
test_suit_id, result['total'], result['success'], result['fail'], result['successRate'],
config.flask_host + '/test_batch_detail?test_suite_id=' + test_suit_id)
self.sendEmail(to_email,test_title,test_result)
def send_test_results(self,test_suite_list):
for test_suite_id in test_suite_list:
send_time = 3
while send_time:
log.log().logger.info('sending email for test suite: %s' % str(test_suite_id))
try:
self.send_test_result(test_suite_id)
log.log().logger.info('sended email for test suite: %s' % str(test_suite_id))
break
except:
log.log().logger.info('send error for the %s time!' %(4-send_time))
send_time += -1