diff --git a/comm/db/__init__.py b/comm/db/__init__.py new file mode 100644 index 0000000..39bd233 --- /dev/null +++ b/comm/db/__init__.py @@ -0,0 +1,5 @@ +from .querySolr import search_solr +from .queryHBase import PhoenixServer +from .queryES import elastic_search +from .queryMysql import MysqlServer +from .queryRedis import exec_redis diff --git a/comm/db/queryES.py b/comm/db/queryES.py new file mode 100644 index 0000000..8671bb7 --- /dev/null +++ b/comm/db/queryES.py @@ -0,0 +1,66 @@ +# -*- coding:utf-8 -*- +# @Time : 2021/03/09 +# @Author : Leo Zhang +# @File : queryES.py +# ************************** +import elasticsearch +import logging +import re + + +def elastic_search(address, db, sql): + """获取ElasticSearch数据 + + :param address: ES地址(格式:ip:port) + :param db: 索引库名 + :param sql: sql查询语句 + :return: + """ + # 初始化ES + try: + es = elasticsearch.Elasticsearch([address]) + except Exception as e: + raise Exception('连接异常>>> \n{}'.format(e)) + + # 检索SQL语句 + exp = r"^select (.*?) from (.*?) where (.*?)$" + res = re.findall(exp, sql.strip()) + fields, table, conditions = res[0] + # 重新拼接查询条件 + query = '' + for each in conditions.split(' and '): + key, value = each.replace('\'', '').replace('\"', '').split('=') + if query: + query += '&& ' + query += '{}:"{}" '.format(key, value) + + # 执行查询 + db_table = db + '@' + table + # query_str = 'GET http://{0}/{1}/_search?q={2}'.format(address, db_table, query) + # logging.info('执行查询>>> {}'.format(query_str)) + try: + # 查询ES,并截取hits字段 + data = es.search(index=db_table, q=query) + hits = data['hits']['hits'] + if hits: + # 截取字段值_source + new_hits = list() + for hit in hits: + new_hits.append(hit['_source']) + # 判断是否返回全部字段 + if fields == '*': + return new_hits + else: + # 返回指定字段 + source_new = list() + for hit in new_hits: + result = dict() + for each in fields.split(','): + try: + result[each] = hit[each] + except KeyError: + raise KeyError('ES未知字段>>> {}'.format(each)) + source_new.append(result) + return source_new + except Exception as e: + raise Exception('查询异常>>> \n{}'.format(e)) diff --git a/comm/db/queryHBase.py b/comm/db/queryHBase.py new file mode 100644 index 0000000..fc4c69e --- /dev/null +++ b/comm/db/queryHBase.py @@ -0,0 +1,56 @@ +# -*- coding:utf-8 -*- +# @Time : 2021/03/09 +# @Author : Leo Zhang +# @File : queryHBase.py +# ************************** +# import phoenixdb +# import phoenixdb.cursor +from comm import * +import logging + + +class PhoenixServer(object): + """ + 封装HBase常用方法。 + """ + def __init__(self, address): + url = 'http://{}/'.format(address) + logging.debug('Connect HBase: {}'.format(url)) + try: + self.conn = phoenixdb.connect(url, autocommit=True) + except Exception as e: + raise Exception('连接异常>>> {}'.format(e)) + + # 增加、修改、删除命令语句 + def execute(self, sql): + try: + # 创建游标 + cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.Cursor) + # 执行sql语句 + cursor.execute(sql) + # 关闭游标 + cursor.close() + except Exception as e: + raise Exception('执行异常>>> {}'.format(e)) + + # 查询所有数据,多个值 + def query(self, sql, is_dict): + try: + # 判断是否需要返回结果为字典类型 + if is_dict: + cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor) + else: + cursor = self.conn.cursor(cursor_factory=phoenixdb.cursor.Cursor) + # 执行sql语句 + cursor.execute(sql) + # 查询结果 + data = cursor.fetchall() + # 关闭游标 + cursor.close() + return data + except Exception as e: + raise Exception('查询异常>>> {}'.format(e)) + + # 关闭数据库连接 + def close(self): + self.conn.close() diff --git a/comm/db/queryMysql.py b/comm/db/queryMysql.py new file mode 100644 index 0000000..7252eef --- /dev/null +++ b/comm/db/queryMysql.py @@ -0,0 +1,61 @@ +# -*- coding:utf-8 -*- +# @Time : 2021/03/09 +# @Author : Leo Zhang +# @File : queryMysql.py +# ************************** +import pymysql +import logging + + +class MysqlServer: + """ + 封装MySQL常用方法。 + """ + def __init__(self, host, port, db, user, passwd, charset='utf8'): + # 初始化数据库 + logging.debug('Connect MySQL: host={host}, port={port}, db={db}, user={user}, passwd={passwd}' + .format(host=host, port=port, db=db, user=user, passwd=passwd)) + try: + self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset) + except Exception as e: + raise Exception('连接异常>>> {}'.format(e)) + + # 增加、修改、删除命令语句 + def execute(self, sql): + try: + # 创建游标 + cur = self.conn.cursor(cursor=pymysql.cursors.Cursor) + # 执行sql语句 + cur.execute(sql) + # 提交事务 + self.conn.commit() + # 关闭游标 + cur.close() + except Exception as e: + # 出错时回滚 + self.conn.rollback() + raise Exception('执行异常>>> {}'.format(e)) + + # 查询所有数据,多个值 + def query(self, sql, is_dict): + try: + # 检查当前连接是否已关闭并进行重接 + self.conn.ping(reconnect=True) + # 判断是否需要返回结果为字典类型 + if is_dict: + cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) + else: + cur = self.conn.cursor(cursor=pymysql.cursors.Cursor) + # 执行sql语句 + cur.execute(sql) + # 查询结果 + data = cur.fetchall() + # 关闭游标 + cur.close() + return data + except Exception as e: + raise Exception('查询异常>>> {}'.format(e)) + + # 关闭数据库连接 + def close(self): + self.conn.close() diff --git a/comm/db/queryRedis.py b/comm/db/queryRedis.py new file mode 100644 index 0000000..75da966 --- /dev/null +++ b/comm/db/queryRedis.py @@ -0,0 +1,48 @@ +# -*- coding:utf-8 -*- +# @Time : 2021/03/09 +# @Author : Leo Zhang +# @File : queryRedis.py +# ************************** +import logging +import redis + + +def exec_redis(address, auth, db_num, db_type, key): + """获取redis数据 + + :param address: 内存库地址(格式:ip:port) + :param auth: 内存库密码 + :param db_num: 查询内存库号 + :param db_type: 数据库类型 + :param key: 查询key值 + :return: + """ + # 初始化redis连接 + host, port = address.split(':') + logging.debug('Redis Info: host={}, port={}, auth={}, db={}, type={}' + .format(host, port, auth, db_num, db_type)) + conn = redis.StrictRedis(host=host, port=int(port), db=db_num, + password=auth, decode_responses=True) + logging.info('执行查询>>> db={}, key={}'.format(db_num, key)) + + # 获取string类型指定key的对应值 + if 'string' in db_type: + result = conn.get(key) + # 获取hash类型指定key的所有数据 + elif 'hash' in db_type: + result = conn.hgetall(key) + # 获取zset类型指定key的前50行数据 + elif 'zset' in db_type: + result = conn.zrange(key, 0, 50, desc=False, + withscores=True, score_cast_func=int) + # 获取set类型指定key的所有数据 + elif 'set' in db_type: + result = conn.smembers(key) + # 获取list类型指定key的前10个数据 + elif 'list' in db_type: + result = conn.lrange(key, 0, 10) + else: + raise Exception('未知的数据类型{},请检查dbinfo配置'.format(db_type)) + + logging.info('查询结果>>> {}'.format(result)) + return result diff --git a/comm/db/querySolr.py b/comm/db/querySolr.py new file mode 100644 index 0000000..297686f --- /dev/null +++ b/comm/db/querySolr.py @@ -0,0 +1,63 @@ +# -*- coding:utf-8 -*- +# @Time : 2021/03/12 +# @Author : Leo Zhang +# @File : querySolr.py +# ************************* +import pysolr +import logging +import re + + +def search_solr(address, sql): + """获取Solr数据 + + :param address: 地址(格式:ip:port) + :param sql: sql查询语句 + :return: + """ + # 检索SQL语句 + exp = r"^select (.*?) from (.*?) where (.*?)$" + res = re.findall(exp, sql.strip()) + fields, db, conditions = res[0] + + # 重新拼接查询条件 + query = '' + for each in conditions.split(' and '): + key, value = each.replace('\'', '').replace('\"', '').split('=') + if query: + query += '&& ' + query += '{}:"{}" '.format(key, value) + + # 初始化Solr + try: + base_url = 'http://{0}/solr/{1}/'.format(address, db) + solr = pysolr.Solr(base_url) + except Exception as e: + raise Exception('连接异常>>> \n{}'.format(e)) + + # 执行查询 + query_str = 'GET {0}select?q={1}'.format(base_url, query) + logging.info('执行查询>>> {}'.format(query_str)) + try: + data = solr.search(query) + result = list() + for each in data: + result.append(each) + if result: + # 判断是否返回全部字段 + if fields == '*': + return result + else: + # 返回指定字段 + result_new = list() + for res in result: + line = dict() + for each in fields.split(','): + try: + line[each] = res[each] + except KeyError: + raise KeyError('Solr未知字段>>> {}'.format(each)) + result_new.append(result) + return result_new + except Exception as e: + raise Exception('查询异常>>> \n{}'.format(e))