fix: last event process

This commit is contained in:
danfengcao 2016-12-05 16:27:27 +08:00
parent 6fdadafe58
commit 0e106ff966
1 changed files with 36 additions and 20 deletions

View File

@ -7,8 +7,9 @@ from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent
def command_line_parser():
"""Returns a command line parser used for binlog2sql"""
@ -57,10 +58,10 @@ def fix_object(value):
else:
return value
def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popPk=False):
def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, popPk=False):
if flashback and popPk:
raise ValueError('only one of flashback or popPk can be True')
if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent):
raise ValueError('binlogevent must be WriteRowsEvent, UpdateRowsEvent or DeleteRowsEvent')
sql = ''
@ -70,20 +71,22 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popP
binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, row['values'].values())
# print template
# print row['values'].values()
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, DeleteRowsEvent):
template = 'INSERT INTO {0}({1}) VALUES ({2});'.format(
binlogevent.table,
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, row['values'].values())
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format(
binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]),
' AND '.join(map(compare_items, row['after_values'].items())))
sql = cursor.mogrify(template, row['before_values'].values()+row['after_values'].values())
sql = cursor.mogrify(template, map(fix_object, row['before_values'].values()+row['after_values'].values()))
else:
if isinstance(binlogevent, WriteRowsEvent):
if popPk:
@ -95,20 +98,24 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popP
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, row['values'].values())
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, DeleteRowsEvent):
template ='DELETE FROM {0} WHERE {1} LIMIT 1;'.format(
binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, row['values'].values())
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format(
binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]),
' AND '.join(map(compare_items, row['before_values'].items()))
)
sql = cursor.mogrify(template, row['after_values'].values()+row['before_values'].values())
sql = cursor.mogrify(template, map(fix_object, row['after_values'].values()+row['before_values'].values()))
elif isinstance(binlogevent, QueryEvent):
sql ='USE {0};\n{1};'.format(
binlogevent.schema, fix_object(binlogevent.query)
)
return sql
@ -165,12 +172,20 @@ class Binlog2sql(object):
cur = self.connection.cursor()
tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagExit = 0
try:
for binlogevent in stream:
if (stream.log_file == self.endFile and stream.log_pos >= self.endPos) or (stream.log_file == self.eofFile and stream.log_pos >= self.eofPos):
if (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos):
break
if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
continue
elif (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagExit = 1
if isinstance(binlogevent, QueryEvent):
if binlogevent.query != 'BEGIN':
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk)
if sql:
print sql
elif type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk)
if self.flashback:
@ -178,14 +193,15 @@ class Binlog2sql(object):
else:
print sql
if flagExit:
break
ftmp.close()
if self.flashback:
# doesn't work if you can't fit the whole file in memory.
# need to be optimized
for line in reversed(open(tmpFile).readlines()):
print line.rstrip()
finally:
ftmp.close()
os.remove(tmpFile)
cur.close()