From 9a1464228111ff7378a838bc29af2543bbb321ca Mon Sep 17 00:00:00 2001 From: dfcao Date: Fri, 15 Dec 2017 10:31:16 +0800 Subject: [PATCH] support python3 --- .gitignore | 3 +++ README.md | 2 +- binlog2sql/binlog2sql.py | 46 +++++++++++----------------------- binlog2sql/binlog2sql_util.py | 47 ++++++++++++++++++++++++++++++----- 4 files changed, 60 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 72364f9..4c9b811 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +*~ +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/README.md b/README.md index b63ead1..b618c4d 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ binlog2sql 正常维护。应用于大众点评线上环境。线上环境的操作,请在对MySQL**相当熟悉**的同学指导下进行 * 已测试环境 - * Python 2.6, 2.7 + * Python 2.6, 2.7, 3.4 * MySQL 5.6 diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 89e641f..e486b02 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import os import sys import datetime import pymysql @@ -12,7 +11,8 @@ from pymysqlreplication.row_event import ( DeleteRowsEvent, ) from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent -from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, reversed_lines +from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \ + temp_open, print_rollback_sql class Binlog2sql(object): @@ -35,7 +35,7 @@ class Binlog2sql(object): if start_time: self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") else: - self.start_time = datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") + self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") if stop_time: self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") else: @@ -69,19 +69,21 @@ class Binlog2sql(object): log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, only_tables=self.only_tables, resume_stream=True) - cursor = self.connection.cursor() - # to simplify code, we do not use flock for tmp_file. - tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) - f_tmp = open(tmp_file, "w") flag_last_event = False e_start_pos, last_pos = stream.log_pos, stream.log_pos - try: + # to simplify code, we do not use flock for tmp_file. + tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port'])) + with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor: for binlog_event in stream: if not self.stop_never: + try: + event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp) + except OSError: + event_time = datetime.datetime(1980, 1, 1, 0, 0) if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): flag_last_event = True - elif datetime.datetime.fromtimestamp(binlog_event.timestamp) < self.start_time: + elif event_time < self.start_time: if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)): last_pos = binlog_event.packet.log_pos @@ -89,7 +91,7 @@ class Binlog2sql(object): elif (stream.log_file not in self.binlogList) or \ (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ - (datetime.datetime.fromtimestamp(binlog_event.timestamp) >= self.stop_time): + (event_time >= self.stop_time): break # else: # raise ValueError('unknown binlog file or position') @@ -116,36 +118,18 @@ class Binlog2sql(object): last_pos = binlog_event.packet.log_pos if flag_last_event: break + + stream.close() f_tmp.close() - if self.flashback: - self.print_rollback_sql(filename=tmp_file) - finally: - os.remove(tmp_file) - cursor.close() - stream.close() + print_rollback_sql(filename=tmp_file) return True - @staticmethod - def print_rollback_sql(filename): - """print rollback sql from tmp_file""" - with open(filename) as f_tmp: - sleep_interval = 1000 - i = 0 - for line in reversed_lines(f_tmp): - print(line.rstrip()) - if i >= sleep_interval: - print('SELECT SLEEP(1);') - i = 0 - else: - i += 1 - def __del__(self): pass if __name__ == '__main__': - args = command_line_args(sys.argv[1:]) conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, diff --git a/binlog2sql/binlog2sql_util.py b/binlog2sql/binlog2sql_util.py index b10a7e1..d8c8330 100755 --- a/binlog2sql/binlog2sql_util.py +++ b/binlog2sql/binlog2sql_util.py @@ -5,6 +5,7 @@ import os import sys import argparse import datetime +from contextlib import contextmanager from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, @@ -12,6 +13,11 @@ from pymysqlreplication.row_event import ( ) from pymysqlreplication.event import QueryEvent +if sys.version > '3': + PY3PLUS = True +else: + PY3PLUS = False + def is_valid_datetime(string): try: @@ -33,6 +39,16 @@ def create_unique_file(filename): return result_file +@contextmanager +def temp_open(filename, mode): + f = open(filename, mode) + try: + yield f + finally: + f.close() + os.remove(filename) + + def parse_args(): """parse args for binlog2sql""" @@ -105,8 +121,9 @@ def command_line_args(args): return args -def compare_items((k, v)): - #caution: if v is NULL, may need to process +def compare_items(items): + # caution: if v is NULL, may need to process + (k, v) = items if v is None: return '`%s` IS %%s' % k else: @@ -115,7 +132,9 @@ def compare_items((k, v)): def fix_object(value): """Fixes python objects so that they can be properly inserted into SQL queries""" - if isinstance(value, unicode): + if PY3PLUS and isinstance(value, bytes): + return value.decode('utf-8') + elif not PY3PLUS and isinstance(value, unicode): return value.encode('utf-8') else: return value @@ -166,7 +185,7 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): binlog_event.schema, binlog_event.table, ', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) - values = map(fix_object, row['before_values'].values()+row['after_values'].values()) + values = map(fix_object, list(row['before_values'].values())+list(row['after_values'].values())) else: if isinstance(binlog_event, WriteRowsEvent): if no_pk: @@ -193,15 +212,31 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False): ', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) - values = map(fix_object, row['after_values'].values()+row['before_values'].values()) + values = map(fix_object, list(row['after_values'].values())+list(row['before_values'].values())) - return {'template': template, 'values': values} + return {'template': template, 'values': list(values)} + + +def print_rollback_sql(filename): + """print rollback sql from tmp_file""" + with open(filename, "rb") as f_tmp: + sleep_interval = 1000 + i = 0 + for line in reversed_lines(f_tmp): + print(line.rstrip()) + if i >= sleep_interval: + print('SELECT SLEEP(1);') + i = 0 + else: + i += 1 def reversed_lines(fin): """Generate the lines of file in reverse order.""" part = '' for block in reversed_blocks(fin): + if PY3PLUS: + block = block.decode("utf-8") for c in reversed(block): if c == '\n' and part: yield part[::-1]