增加数据库数据处理、查询、校验

This commit is contained in:
leozhanggg 2021-03-12 17:27:13 +08:00
parent d076f9c9ab
commit b4c8836af6
6 changed files with 296 additions and 22 deletions

View File

@ -0,0 +1,2 @@
import phoenixdb
import phoenixdb.cursor

View File

@ -160,7 +160,7 @@ def write_case_yaml(har_path):
check["expected_result"] = result_name
else:
check["expected_result"] = expected_request
test_case["check"] = check
test_case["check_body"] = check
test_case_list.append(test_case)
# 合并测试信息、用例信息

View File

@ -6,6 +6,10 @@
import re
import allure
import operator
import logging
from decimal import Decimal
from comm.unit import readRelevance, replaceRelevance
from comm.unit import queryDatabase as qdb
def check_json(src_data, dst_data):
@ -32,36 +36,120 @@ def check_json(src_data, dst_data):
raise Exception("JSON校验内容非dict格式{}".format(src_data))
def check_database(actual, expected, mark=''):
"""校验数据库
:param actual: 实际结果
:param expected: 期望结果
:param mark: 标识
:return:
"""
if isinstance(actual, dict) and isinstance(expected, dict):
result = list()
logging.info('校验数据库{}>>>'.format(mark))
content = '\n%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \
% {'key': 'KEY', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'}
for key in expected:
if key in actual:
actual_value = actual[key]
else:
actual_value = None
expected_value = expected[key]
if actual_value or expected_value:
if isinstance(actual_value, (int, float, Decimal)):
if int(actual_value) == int(expected_value):
rst = 'PASS'
else:
rst = 'FAIL'
else:
if str(actual_value) == str(expected_value):
rst = 'PASS'
else:
rst = 'FAIL'
else:
rst = 'PASS'
result.append(rst)
line = '%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \
% {'key': key, 'actual': str(actual_value) + ' ',
'expected': str(expected_value) + ' ', 'result': rst}
content = content + '\n' + line
logging.info(content)
allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content))
if 'FAIL' in result:
raise AssertionError('校验数据库{}未通过!'.format(mark))
elif isinstance(actual, list) and isinstance(expected, list):
result = list()
logging.info('校验数据库{}>>>'.format(mark))
content = '\n%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \
% {'key': 'INDEX', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'}
for index in range(len(expected)):
if index < len(actual):
actual_value = actual[index]
else:
actual_value = None
expected_value = expected[index]
if actual_value or expected_value:
if isinstance(actual_value, (int, float, Decimal)):
if int(actual_value) == int(expected_value):
rst = 'PASS'
else:
rst = 'FAIL'
else:
if str(actual_value) == str(expected_value):
rst = 'PASS'
else:
rst = 'FAIL'
else:
rst = 'PASS'
result.append(rst)
line = '%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \
% {'key': index, 'actual': str(actual_value) + ' ',
'expected': str(expected_value) + ' ', 'result': rst}
content = content + '\n' + line
logging.info(content)
allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content))
if 'FAIL' in result:
raise AssertionError('校验数据库{}未通过!'.format(mark))
else:
logging.info('校验数据库{}>>>'.format(mark))
logging.info('ACTUAL: {}\nEXPECTED: {}'.format(actual, expected))
if str(expected) != str(actual):
raise AssertionError('校验数据库{}未通过!'.format(mark))
def check_result(case_data, code, data):
"""
校验测试结果
:param case_data: 用例数据
:param code: HTTP状态
:param code: 接口状态码
:param data: 返回的接口json数据
:return:
"""
try:
# 获取用例检查信息
check_type = case_data['check']['check_type']
expected_code = case_data['check']['expected_code']
expected_result = case_data['check']['expected_result']
check_type = case_data['check_body']['check_type']
expected_code = case_data['check_body']['expected_code']
expected_result = case_data['check_body']['expected_result']
except Exception as e:
raise KeyError('获取用例检查信息失败:{}'.format(e))
# 接口数据校验
if check_type == 'no_check':
with allure.step("不校验结果"):
with allure.step("不校验接口结果"):
pass
elif check_type == 'check_code':
with allure.step("HTTP状态码校验"):
with allure.step("仅校验接口状态码"):
allure.attach(name="实际code", body=str(code))
allure.attach(name="期望code", body=str(expected_code))
allure.attach(name='实际data', body=str(data))
if int(code) != expected_code:
raise Exception("http状态码错误\n %s != %s" % (code, expected_code))
raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))
elif check_type == 'check_json':
with allure.step("JSON格式校验结果"):
with allure.step("JSON格式校验接口"):
allure.attach(name="实际code", body=str(code))
allure.attach(name="期望code", body=str(expected_code))
allure.attach(name='实际data', body=str(data))
@ -71,10 +159,10 @@ def check_result(case_data, code, data):
data = "{}"
check_json(expected_result, data)
else:
raise Exception("http状态码错误!\n %s != %s" % (code, expected_code))
raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))
elif check_type == 'entirely_check':
with allure.step("完全校验结果"):
with allure.step("完全校验接口结果"):
allure.attach(name="实际code", body=str(code))
allure.attach(name="期望code", body=str(expected_code))
allure.attach(name='实际data', body=str(data))
@ -84,7 +172,7 @@ def check_result(case_data, code, data):
if not result:
raise Exception("完全校验失败! %s ! = %s" % (expected_result, data))
else:
raise Exception("http状态码错误!\n %s != %s" % (code, expected_code))
raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))
elif check_type == 'regular_check':
if int(code) == expected_code:
@ -96,7 +184,7 @@ def check_result(case_data, code, data):
allure.attach('校验完成结果\n', str(result))
else:
result = re.findall(expected_result.replace("\"", "\'"), str(data))
with allure.step("正则匹配校验结果"):
with allure.step("正则校验接口结果"):
allure.attach(name="实际code", body=str(code))
allure.attach(name="期望code", body=str(expected_code))
allure.attach(name='实际data', body=str(data))
@ -108,7 +196,65 @@ def check_result(case_data, code, data):
except KeyError:
raise Exception("正则校验执行失败! %s\n正则表达式为空时" % expected_result)
else:
raise Exception("http状态码错误!\n %s != %s" % (code, expected_code))
raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))
else:
raise Exception("无该校验方式%s" % check_type)
raise Exception("无该接口校验方式%s" % check_type)
# 判断是否存在数据库校验标识
if 'check_db' in case_data:
check_db = case_data['check_db']
# 获取数据库期望结果:获取期望结果-获取关联值-替换关联值
data['parameter'] = case_data['parameter']
__relevance = readRelevance.get_relevance(data, check_db)
check_db = replaceRelevance.replace(check_db, __relevance)
# 循环校验数据库
for each in check_db:
try:
check_type = each['check_type']
execute_sql = each['execute_sql']
expected_result = each['expected_result']
except KeyError as e:
raise KeyError('【check_db】存在错误字段\n{}'.format(e))
except TypeError:
raise KeyError("【check_db】类型错误期望<class 'list'>,而不是%s" % type(expected_result))
if not isinstance(expected_result, list):
raise KeyError("【expected_result】类型错误期望<class 'list'>,而不是%s" % type(expected_result))
# 检索SQL语句
exp = r"^select (.*?) from (.*?) where (.*?)$"
res = re.findall(exp, execute_sql.strip())[0]
for r in res:
if not each:
msg = '标准格式: ' + exp
raise Exception('无效SQL>>> {}\n{}'.format(execute_sql, msg))
# 判断数据库检查类型
if check_type == 'mysql':
actual = qdb.query_mysql(execute_sql)
elif check_type == 'hbase':
actual = qdb.query_hbase(execute_sql)
elif check_type == 'solr':
actual = qdb.query_solr(execute_sql)
elif check_type == 'es':
actual = qdb.query_es(execute_sql)
else:
raise Exception("无该数据库校验方式%s" % check_type)
# 增加输出并进行数据校验
mark = check_type.replace('check_', '').upper() + '['+res[1]+']'
with allure.step("校验数据库{}".format(mark)):
allure.attach(name="实际结果", body=str(actual))
allure.attach(name='期望结果', body=str(expected_result))
# expected_num = each['expected_num']
# allure.attach(name="实际行数", body=str(len(actual)))
# allure.attach(name='期望行数', body=str(expected_num))
# # 验证数据库实际结果数量是否正确
# if len(actual) != int(expected_num):
# raise AssertionError('校验数据库{}行数未通过!'.format(mark))
# 检查实际结果中第一条结果值 ***************
for index, expected in enumerate(expected_result):
try:
check_database(actual[index], expected, mark+str(index))
except IndexError:
raise IndexError('校验数据库{}失败,期望结果超出实际条目!'.format(mark+str(index)))

View File

@ -94,11 +94,11 @@ def init_premise(test_info, case_data, case_path):
logging.debug("请求参数处理结果:{}".format(parameter))
# 获取当前接口期望结果:获取期望结果-获取关联值-替换关联值
expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path)
expected_rs = read_json(case_data['summary'], case_data['check_body']['expected_result'], case_path)
parameter['data'] = data
__relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance)
expected_rs = replaceRelevance.replace(expected_rs, __relevance)
case_data['check']['expected_result'] = expected_rs
case_data['check_body']['expected_result'] = expected_rs
logging.debug("期望返回处理结果:{}".format(case_data))
break
else:
@ -115,10 +115,10 @@ def init_premise(test_info, case_data, case_path):
logging.debug("请求参数处理结果:{}".format(parameter))
# 获取当前接口期望结果:获取期望结果-获取关联值-替换关联值
expected_rs = read_json(case_data['summary'], case_data['check']['expected_result'], case_path)
expected_rs = read_json(case_data['summary'], case_data['check_body']['expected_result'], case_path)
__relevance = readRelevance.get_relevance(parameter, expected_rs, __relevance)
expected_rs = replaceRelevance.replace(expected_rs, __relevance)
case_data['check']['expected_result'] = expected_rs
case_data['check_body']['expected_result'] = expected_rs
logging.debug("期望返回处理结果:{}".format(case_data))
return test_info, case_data

126
comm/unit/queryDatabase.py Normal file
View File

@ -0,0 +1,126 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/09
# @Author : Leo Zhang
# @File : queryDatabase.py
# **************************
from comm.utils.readYaml import read_yaml_data
from config import DB_CONFIG, PROJECT_NAME
from comm.db import *
import logging
import time
import re
dbcfg = read_yaml_data(DB_CONFIG)[PROJECT_NAME]
def query_mysql(sql):
"""查询MySQL数据
:param sql: sql查询语句
:return:
"""
# 获取配置信息
timeout = dbcfg['timeout']
address = dbcfg['mysql_info']['address']
user = dbcfg['mysql_info']['user']
auth = dbcfg['mysql_info']['auth']
db = dbcfg['mysql_info']['db']
# 初始化MySQL
host, port = address.split(':')
mysql = MysqlServer(host, int(port), db, user, auth)
logging.info('执行查询>>> {}'.format(sql))
# 循环查询
for i in range(int(timeout)):
try:
result = mysql.query(sql, is_dict=True)
mysql.close()
if result:
return result
else:
time.sleep(1)
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
else:
return []
def query_hbase(sql):
"""查询HBase数据
:param sql: sql查询语句
:return:
"""
# 获取配置信息
timeout = dbcfg['timeout']
address = dbcfg['hbase_info']['address']
db = dbcfg['hbase_info']['db']
# 检索SQL语句
exp = r"^select .*? from (.*?) where .*?$"
table = re.findall(exp, sql.strip())[0]
# 添加数据库
if '.' not in table:
sql = sql.strip().replace(table, db+'.'+table)
# 初始化HBase
hbase = PhoenixServer(address)
logging.info('执行查询>>> {}'.format(sql))
# 循环查询
for i in range(int(timeout)):
try:
result = hbase.query(sql, is_dict=True)
if result:
return result
else:
time.sleep(1)
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
else:
return []
def query_es(sql):
"""查询ES数据
:param sql: sql查询语句
:return:
"""
# 获取配置信息
timeout = dbcfg['timeout']
address = dbcfg['es_info']['address']
db = dbcfg['es_info']['db']
logging.info('执行查询>>> {}'.format(sql))
# 循环查询
for i in range(int(timeout)):
try:
result = elastic_search(address, db, sql)
if result:
return result
else:
time.sleep(1)
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
else:
return []
def query_solr(sql):
"""查询solr数据
:param sql: sql查询语句
:return:
"""
# 获取配置信息
timeout = dbcfg['timeout']
address = dbcfg['solr_info']['address']
logging.info('执行查询>>> {}'.format(sql))
# 循环查询
for i in range(int(timeout)):
try:
result = search_solr(address, sql)
if result:
return result
else:
time.sleep(1)
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
else:
return []

View File

@ -111,12 +111,12 @@ def generate_date(expr=''):
def generate_datetime(expr=''):
"""生成日期时间对象(含时分秒毫秒)
"""生成日期时间对象(含时分秒)
:param expr: 日期表达式"d-1"代表日期减1
:return:
"""
now = datetime.datetime.now()
now = datetime.datetime.now().replace(microsecond=0)
if expr:
try:
mark = expr[:1]