Try fix mysql protocol parse failure

This commit is contained in:
zhang2014 2020-09-15 20:28:42 +08:00
parent 6dd764bcfe
commit 106e05ab2f
6 changed files with 160 additions and 42 deletions

View File

@ -2,6 +2,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/MySQLBinlogEventReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <common/DateLUT.h>
#include <Common/FieldVisitors.h>
@ -100,9 +101,7 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(schema.data()), schema_len);
payload.ignore(1);
size_t len = payload.available() - CHECKSUM_CRC32_SIGNATURE_LENGTH;
query.resize(len);
payload.readStrict(reinterpret_cast<char *>(query.data()), len);
readStringUntilEOF(query, payload);
if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
{
typ = QUERY_EVENT_MULTI_TXN_FLAG;
@ -285,7 +284,7 @@ namespace MySQLReplication
break;
}
while (payload.available() > CHECKSUM_CRC32_SIGNATURE_LENGTH)
while (!payload.eof())
{
parseRow(payload, columns_present_bitmap1);
if (header.type == UPDATE_ROWS_EVENT_V1 || header.type == UPDATE_ROWS_EVENT_V2)
@ -738,7 +737,7 @@ namespace MySQLReplication
payload.readStrict(reinterpret_cast<char *>(&gtid.seq_no), 8);
/// Skip others.
payload.ignore(payload.available() - CHECKSUM_CRC32_SIGNATURE_LENGTH);
payload.ignoreAll();
}
void GTIDEvent::dump(std::ostream & out) const
@ -804,46 +803,51 @@ namespace MySQLReplication
void MySQLFlavor::readPayloadImpl(ReadBuffer & payload)
{
UInt16 header = static_cast<unsigned char>(*payload.position());
MySQLBinlogEventReadBuffer event_payload(payload);
UInt16 header = static_cast<unsigned char>(*event_payload.position());
switch (header)
{
case PACKET_EOF:
throw ReplicationError("Master maybe lost", ErrorCodes::UNKNOWN_EXCEPTION);
case PACKET_ERR:
ERRPacket err;
err.readPayloadWithUnpacked(payload);
err.readPayloadWithUnpacked(event_payload);
throw ReplicationError(err.error_message, ErrorCodes::UNKNOWN_EXCEPTION);
}
// skip the header flag.
payload.ignore(1);
event_payload.ignore(1);
EventType event_type = static_cast<EventType>(*(payload.position() + 4));
EventType event_type = static_cast<EventType>(*(event_payload.position() + 4));
switch (event_type)
{
case FORMAT_DESCRIPTION_EVENT: {
case FORMAT_DESCRIPTION_EVENT:
{
event = std::make_shared<FormatDescriptionEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
position.update(event);
break;
}
case ROTATE_EVENT: {
case ROTATE_EVENT:
{
event = std::make_shared<RotateEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
position.update(event);
break;
}
case QUERY_EVENT: {
case QUERY_EVENT:
{
event = std::make_shared<QueryEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
auto query = std::static_pointer_cast<QueryEvent>(event);
switch (query->typ)
{
case QUERY_EVENT_MULTI_TXN_FLAG:
case QUERY_EVENT_XA: {
case QUERY_EVENT_XA:
{
event = std::make_shared<DryRunEvent>();
break;
}
@ -852,68 +856,74 @@ namespace MySQLReplication
}
break;
}
case XID_EVENT: {
case XID_EVENT:
{
event = std::make_shared<XIDEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
position.update(event);
break;
}
case TABLE_MAP_EVENT: {
case TABLE_MAP_EVENT:
{
event = std::make_shared<TableMapEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
table_map = std::static_pointer_cast<TableMapEvent>(event);
break;
}
case WRITE_ROWS_EVENT_V1:
case WRITE_ROWS_EVENT_V2: {
case WRITE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<WriteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
case DELETE_ROWS_EVENT_V1:
case DELETE_ROWS_EVENT_V2: {
case DELETE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<DeleteRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
case UPDATE_ROWS_EVENT_V1:
case UPDATE_ROWS_EVENT_V2: {
case UPDATE_ROWS_EVENT_V2:
{
if (do_replicate())
event = std::make_shared<UpdateRowsEvent>(table_map);
else
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
case GTID_EVENT: {
case GTID_EVENT:
{
event = std::make_shared<GTIDEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
position.update(event);
break;
}
default: {
default:
{
event = std::make_shared<DryRunEvent>();
event->parseHeader(payload);
event->parseEvent(payload);
event->parseHeader(event_payload);
event->parseEvent(event_payload);
break;
}
}
payload.ignoreAll();
}
}

View File

@ -505,7 +505,7 @@ namespace MySQLReplication
class MySQLFlavor : public IFlavor
{
public:
void readPayloadImpl(ReadBuffer & payload) override;
void readPayloadImpl(ReadBuffer & event_payload) override;
String getName() const override { return "MySQL"; }
Position getPosition() const override { return position; }
BinlogEventPtr readOneEvent() override { return event; }

View File

@ -0,0 +1,61 @@
#include <IO/MySQLBinlogEventReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MySQLBinlogEventReadBuffer::MySQLBinlogEventReadBuffer(ReadBuffer & in_)
: ReadBuffer(nullptr, 0, 0), in(in_)
{
}
bool MySQLBinlogEventReadBuffer::nextImpl()
{
if (hasPendingData())
return true;
if (in.eof())
return false;
if (likely(in.available() > CHECKSUM_CRC32_SIGNATURE_LENGTH))
{
working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end() - CHECKSUM_CRC32_SIGNATURE_LENGTH);
in.ignore(working_buffer.size());
return true;
}
if (checksum_buff_size == checksum_buff_limit)
{
in.readStrict(checksum_buf, CHECKSUM_CRC32_SIGNATURE_LENGTH);
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
}
else
{
for (size_t index = 0; index < checksum_buff_size - checksum_buff_limit; ++index)
checksum_buf[index] = checksum_buf[checksum_buff_limit + index];
checksum_buff_size -= checksum_buff_limit;
size_t read_bytes = CHECKSUM_CRC32_SIGNATURE_LENGTH - checksum_buff_size;
in.readStrict(checksum_buf + checksum_buff_size, read_bytes); /// Minimum CHECKSUM_CRC32_SIGNATURE_LENGTH bytes
checksum_buff_size = checksum_buff_limit = CHECKSUM_CRC32_SIGNATURE_LENGTH;
}
if (in.eof())
return false;
if (in.available() < CHECKSUM_CRC32_SIGNATURE_LENGTH)
{
size_t left_move_size = CHECKSUM_CRC32_SIGNATURE_LENGTH - in.available();
checksum_buff_limit = checksum_buff_size - left_move_size;
}
working_buffer = ReadBuffer::Buffer(checksum_buf, checksum_buf + checksum_buff_limit);
return true;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <IO/ReadBuffer.h>
namespace DB
{
class MySQLBinlogEventReadBuffer : public ReadBuffer
{
protected:
static const size_t CHECKSUM_CRC32_SIGNATURE_LENGTH = 4;
ReadBuffer & in;
size_t checksum_buff_size = 0;
size_t checksum_buff_limit = 0;
char checksum_buf[CHECKSUM_CRC32_SIGNATURE_LENGTH];
bool nextImpl() override;
public:
MySQLBinlogEventReadBuffer(ReadBuffer & in_);
};
}

View File

@ -0,0 +1,20 @@
#include <gtest/gtest.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/MySQLBinlogEventReadBuffer.h>
using namespace DB;
TEST(MySQLBinlogEventReadBuffer, CheckBoundary)
{
for (size_t index = 1; index < 4; ++index)
{
std::vector<char> memory_data(index, 0x01);
ReadBufferFromMemory nested_in(memory_data.data(), index);
MySQLBinlogEventReadBuffer binlog_in(nested_in);
EXPECT_THROW(binlog_in.ignore(), Exception);
}
}

View File

@ -28,6 +28,7 @@ SRCS(
MemoryReadWriteBuffer.cpp
MMapReadBufferFromFile.cpp
MMapReadBufferFromFileDescriptor.cpp
MySQLBinlogEventReadBuffer.cpp
MySQLPacketPayloadReadBuffer.cpp
MySQLPacketPayloadWriteBuffer.cpp
NullWriteBuffer.cpp