diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 1bccc09..f38041d 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -7,8 +7,9 @@ from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( WriteRowsEvent, UpdateRowsEvent, - DeleteRowsEvent + DeleteRowsEvent, ) +from pymysqlreplication.event import QueryEvent def command_line_parser(): """Returns a command line parser used for binlog2sql""" @@ -57,10 +58,10 @@ def fix_object(value): else: return value -def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popPk=False): +def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, popPk=False): if flashback and popPk: raise ValueError('only one of flashback or popPk can be True') - if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent): + if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent): raise ValueError('binlogevent must be WriteRowsEvent, UpdateRowsEvent or DeleteRowsEvent') sql = '' @@ -70,20 +71,22 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popP binlogevent.table, ' AND '.join(map(compare_items, row['values'].items())) ) - sql = cursor.mogrify(template, row['values'].values()) +# print template +# print row['values'].values() + sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, DeleteRowsEvent): template = 'INSERT INTO {0}({1}) VALUES ({2});'.format( binlogevent.table, ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) - sql = cursor.mogrify(template, row['values'].values()) + sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, UpdateRowsEvent): template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format( binlogevent.table, ', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) - sql = cursor.mogrify(template, row['before_values'].values()+row['after_values'].values()) + sql = cursor.mogrify(template, map(fix_object, row['before_values'].values()+row['after_values'].values())) else: if isinstance(binlogevent, WriteRowsEvent): if popPk: @@ -95,20 +98,24 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row , flashback=False, popP ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) - sql = cursor.mogrify(template, row['values'].values()) + sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, DeleteRowsEvent): template ='DELETE FROM {0} WHERE {1} LIMIT 1;'.format( binlogevent.table, ' AND '.join(map(compare_items, row['values'].items())) ) - sql = cursor.mogrify(template, row['values'].values()) + sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, UpdateRowsEvent): template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format( binlogevent.table, ', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) - sql = cursor.mogrify(template, row['after_values'].values()+row['before_values'].values()) + sql = cursor.mogrify(template, map(fix_object, row['after_values'].values()+row['before_values'].values())) + elif isinstance(binlogevent, QueryEvent): + sql ='USE {0};\n{1};'.format( + binlogevent.schema, fix_object(binlogevent.query) + ) return sql @@ -165,27 +172,36 @@ class Binlog2sql(object): cur = self.connection.cursor() tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile. ftmp = open(tmpFile ,"w") + flagExit = 0 try: for binlogevent in stream: - if (stream.log_file == self.endFile and stream.log_pos >= self.endPos) or (stream.log_file == self.eofFile and stream.log_pos >= self.eofPos): + if (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos): break - if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent): - continue - for row in binlogevent.rows: - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk) - if self.flashback: - ftmp.write(sql + '\n') - else: - print sql + elif (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): + flagExit = 1 + if isinstance(binlogevent, QueryEvent): + if binlogevent.query != 'BEGIN': + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk) + if sql: + print sql + elif type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent): + for row in binlogevent.rows: + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk) + if self.flashback: + ftmp.write(sql + '\n') + else: + print sql + + if flagExit: + break + ftmp.close() if self.flashback: # doesn't work if you can't fit the whole file in memory. # need to be optimized for line in reversed(open(tmpFile).readlines()): print line.rstrip() - finally: - ftmp.close() os.remove(tmpFile) cur.close()