增加数据库查询方法

This commit is contained in:
leozhanggg 2021-03-12 17:24:16 +08:00
parent 18ec95acde
commit d076f9c9ab
6 changed files with 299 additions and 0 deletions

5
comm/db/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from .querySolr import search_solr
from .queryHBase import PhoenixServer
from .queryES import elastic_search
from .queryMysql import MysqlServer
from .queryRedis import exec_redis

66
comm/db/queryES.py Normal file
View File

@ -0,0 +1,66 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/09
# @Author : Leo Zhang
# @File : queryES.py
# **************************
import elasticsearch
import logging
import re
def elastic_search(address, db, sql):
"""获取ElasticSearch数据
:param address: ES地址(格式ip:port)
:param db: 索引库名
:param sql: sql查询语句
:return:
"""
# 初始化ES
try:
es = elasticsearch.Elasticsearch([address])
except Exception as e:
raise Exception('连接异常>>> \n{}'.format(e))
# 检索SQL语句
exp = r"^select (.*?) from (.*?) where (.*?)$"
res = re.findall(exp, sql.strip())
fields, table, conditions = res[0]
# 重新拼接查询条件
query = ''
for each in conditions.split(' and '):
key, value = each.replace('\'', '').replace('\"', '').split('=')
if query:
query += '&& '
query += '{}:"{}" '.format(key, value)
# 执行查询
db_table = db + '@' + table
# query_str = 'GET http://{0}/{1}/_search?q={2}'.format(address, db_table, query)
# logging.info('执行查询>>> {}'.format(query_str))
try:
# 查询ES并截取hits字段
data = es.search(index=db_table, q=query)
hits = data['hits']['hits']
if hits:
# 截取字段值_source
new_hits = list()
for hit in hits:
new_hits.append(hit['_source'])
# 判断是否返回全部字段
if fields == '*':
return new_hits
else:
# 返回指定字段
source_new = list()
for hit in new_hits:
result = dict()
for each in fields.split(','):
try:
result[each] = hit[each]
except KeyError:
raise KeyError('ES未知字段>>> {}'.format(each))
source_new.append(result)
return source_new
except Exception as e:
raise Exception('查询异常>>> \n{}'.format(e))

56
comm/db/queryHBase.py Normal file
View File

@ -0,0 +1,56 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/09
# @Author : Leo Zhang
# @File : queryHBase.py
# **************************
# import phoenixdb
# import phoenixdb.cursor
from comm import *
import logging
class PhoenixServer(object):
"""
封装HBase常用方法
"""
def __init__(self, address):
url = 'http://{}/'.format(address)
logging.debug('Connect HBase: {}'.format(url))
try:
self.conn = phoenixdb.connect(url, autocommit=True)
except Exception as e:
raise Exception('连接异常>>> {}'.format(e))
# 增加、修改、删除命令语句
def execute(self, sql):
try:
# 创建游标
cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.Cursor)
# 执行sql语句
cursor.execute(sql)
# 关闭游标
cursor.close()
except Exception as e:
raise Exception('执行异常>>> {}'.format(e))
# 查询所有数据,多个值
def query(self, sql, is_dict):
try:
# 判断是否需要返回结果为字典类型
if is_dict:
cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
else:
cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.Cursor)
# 执行sql语句
cursor.execute(sql)
# 查询结果
data = cursor.fetchall()
# 关闭游标
cursor.close()
return data
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
# 关闭数据库连接
def close(self):
self.conn.close()

61
comm/db/queryMysql.py Normal file
View File

@ -0,0 +1,61 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/09
# @Author : Leo Zhang
# @File : queryMysql.py
# **************************
import pymysql
import logging
class MysqlServer:
"""
封装MySQL常用方法
"""
def __init__(self, host, port, db, user, passwd, charset='utf8'):
# 初始化数据库
logging.debug('Connect MySQL: host={host}, port={port}, db={db}, user={user}, passwd={passwd}'
.format(host=host, port=port, db=db, user=user, passwd=passwd))
try:
self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)
except Exception as e:
raise Exception('连接异常>>> {}'.format(e))
# 增加、修改、删除命令语句
def execute(self, sql):
try:
# 创建游标
cur = self.conn.cursor(cursor=pymysql.cursors.Cursor)
# 执行sql语句
cur.execute(sql)
# 提交事务
self.conn.commit()
# 关闭游标
cur.close()
except Exception as e:
# 出错时回滚
self.conn.rollback()
raise Exception('执行异常>>> {}'.format(e))
# 查询所有数据,多个值
def query(self, sql, is_dict):
try:
# 检查当前连接是否已关闭并进行重接
self.conn.ping(reconnect=True)
# 判断是否需要返回结果为字典类型
if is_dict:
cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
else:
cur = self.conn.cursor(cursor=pymysql.cursors.Cursor)
# 执行sql语句
cur.execute(sql)
# 查询结果
data = cur.fetchall()
# 关闭游标
cur.close()
return data
except Exception as e:
raise Exception('查询异常>>> {}'.format(e))
# 关闭数据库连接
def close(self):
self.conn.close()

48
comm/db/queryRedis.py Normal file
View File

@ -0,0 +1,48 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/09
# @Author : Leo Zhang
# @File : queryRedis.py
# **************************
import logging
import redis
def exec_redis(address, auth, db_num, db_type, key):
"""获取redis数据
:param address: 内存库地址(格式ip:port)
:param auth: 内存库密码
:param db_num: 查询内存库号
:param db_type: 数据库类型
:param key: 查询key值
:return:
"""
# 初始化redis连接
host, port = address.split(':')
logging.debug('Redis Info: host={}, port={}, auth={}, db={}, type={}'
.format(host, port, auth, db_num, db_type))
conn = redis.StrictRedis(host=host, port=int(port), db=db_num,
password=auth, decode_responses=True)
logging.info('执行查询>>> db={}, key={}'.format(db_num, key))
# 获取string类型指定key的对应值
if 'string' in db_type:
result = conn.get(key)
# 获取hash类型指定key的所有数据
elif 'hash' in db_type:
result = conn.hgetall(key)
# 获取zset类型指定key的前50行数据
elif 'zset' in db_type:
result = conn.zrange(key, 0, 50, desc=False,
withscores=True, score_cast_func=int)
# 获取set类型指定key的所有数据
elif 'set' in db_type:
result = conn.smembers(key)
# 获取list类型指定key的前10个数据
elif 'list' in db_type:
result = conn.lrange(key, 0, 10)
else:
raise Exception('未知的数据类型{}请检查dbinfo配置'.format(db_type))
logging.info('查询结果>>> {}'.format(result))
return result

63
comm/db/querySolr.py Normal file
View File

@ -0,0 +1,63 @@
# -*- coding:utf-8 -*-
# @Time : 2021/03/12
# @Author : Leo Zhang
# @File : querySolr.py
# *************************
import pysolr
import logging
import re
def search_solr(address, sql):
"""获取Solr数据
:param address: 地址(格式ip:port)
:param sql: sql查询语句
:return:
"""
# 检索SQL语句
exp = r"^select (.*?) from (.*?) where (.*?)$"
res = re.findall(exp, sql.strip())
fields, db, conditions = res[0]
# 重新拼接查询条件
query = ''
for each in conditions.split(' and '):
key, value = each.replace('\'', '').replace('\"', '').split('=')
if query:
query += '&& '
query += '{}:"{}" '.format(key, value)
# 初始化Solr
try:
base_url = 'http://{0}/solr/{1}/'.format(address, db)
solr = pysolr.Solr(base_url)
except Exception as e:
raise Exception('连接异常>>> \n{}'.format(e))
# 执行查询
query_str = 'GET {0}select?q={1}'.format(base_url, query)
logging.info('执行查询>>> {}'.format(query_str))
try:
data = solr.search(query)
result = list()
for each in data:
result.append(each)
if result:
# 判断是否返回全部字段
if fields == '*':
return result
else:
# 返回指定字段
result_new = list()
for res in result:
line = dict()
for each in fields.split(','):
try:
line[each] = res[each]
except KeyError:
raise KeyError('Solr未知字段>>> {}'.format(each))
result_new.append(result)
return result_new
except Exception as e:
raise Exception('查询异常>>> \n{}'.format(e))