revert checksum
This commit is contained in:
parent
560da75257
commit
ce3f09699f
|
@ -50,68 +50,6 @@ constexpr UID WLTOKEN_ENDPOINT_NOT_FOUND(-1, 0);
|
|||
constexpr UID WLTOKEN_PING_PACKET(-1, 1);
|
||||
const uint64_t TOKEN_STREAM_FLAG = 1;
|
||||
|
||||
struct IChecksum {
|
||||
public:
|
||||
// Width in number of bytes
|
||||
virtual int width() const = 0;
|
||||
// Append to checksum
|
||||
virtual void append(std::string_view bytes) = 0;
|
||||
// Write checksum to preallocated buffer
|
||||
virtual void writeSum(char* buffer) = 0;
|
||||
};
|
||||
|
||||
struct CRC32 : IChecksum {
|
||||
public:
|
||||
CRC32() {
|
||||
checksum = uint32_t(0);
|
||||
checksumWidth = sizeof(checksum);
|
||||
}
|
||||
int width() const override {
|
||||
return checksumWidth;
|
||||
}
|
||||
void append(std::string_view bytes) override {
|
||||
checksum = crc32c_append(checksum, (uint8_t*)bytes.data(), bytes.size());
|
||||
}
|
||||
void writeSum(char* buffer) override {
|
||||
std::memcpy(buffer, &checksum, checksumWidth);
|
||||
reset();
|
||||
}
|
||||
private:
|
||||
void reset() {
|
||||
checksum = 0;
|
||||
}
|
||||
|
||||
uint32_t checksum;
|
||||
int checksumWidth;
|
||||
};
|
||||
|
||||
static const std::map<ProtocolVersion, std::unique_ptr<IChecksum>> protocolToChecksum = []() {
|
||||
std::map<ProtocolVersion, std::unique_ptr<IChecksum>> m;
|
||||
m[ProtocolVersion(0)] = std::make_unique<CRC32>(CRC32());
|
||||
return m;
|
||||
}();
|
||||
|
||||
IChecksum* getMinChecksum(ProtocolVersion protocol) {
|
||||
ASSERT(protocol >= protocolToChecksum.begin()->first);
|
||||
|
||||
// returns an iterator to the first pair in protocolToChecksum with key strictly greater than protocol
|
||||
auto protocolChecksumIter = protocolToChecksum.upper_bound(protocol);
|
||||
--protocolChecksumIter;
|
||||
|
||||
return protocolChecksumIter->second.get();
|
||||
}
|
||||
|
||||
TEST_CASE("fdbrpc/ChecksumAlgorithm/GetAlgorithm") {
|
||||
const ProtocolVersion IMPLEMENTED_PROTOCOL_VERSION = ProtocolVersion(0x0FDB00B070010001LL);
|
||||
|
||||
IChecksum* resOne = getMinChecksum(ProtocolVersion(0));
|
||||
ASSERT(resOne == protocolToChecksum.at(ProtocolVersion(0)).get());
|
||||
|
||||
IChecksum* resTwo = getMinChecksum(IMPLEMENTED_PROTOCOL_VERSION);
|
||||
ASSERT(resTwo == protocolToChecksum.at(ProtocolVersion(0)).get());
|
||||
return Void();
|
||||
}
|
||||
|
||||
class EndpointMap : NonCopyable {
|
||||
public:
|
||||
// Reserve space for this many wellKnownEndpoints
|
||||
|
@ -945,24 +883,27 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
|
|||
|
||||
const bool checksumEnabled = !peerAddress.isTLS();
|
||||
loop {
|
||||
uint32_t packetLen;
|
||||
uint32_t packetLen, packetChecksum;
|
||||
|
||||
IChecksum* checksum = getMinChecksum(std::min(g_network->protocolVersion(), peerProtocolVersion));
|
||||
int checksumWidth = checksumEnabled ? checksum->width() : 0;
|
||||
if(e-p < sizeof(packetLen) + checksumWidth) break;
|
||||
|
||||
packetLen = *(uint32_t*)p; p += PACKET_LEN_WIDTH;
|
||||
//Retrieve packet length and checksum
|
||||
if (checksumEnabled) {
|
||||
if (e-p < sizeof(uint32_t) * 2) break;
|
||||
packetLen = *(uint32_t*)p; p += PACKET_LEN_WIDTH;
|
||||
packetChecksum = *(uint32_t*)p; p += sizeof(uint32_t);
|
||||
} else {
|
||||
if (e-p < sizeof(uint32_t)) break;
|
||||
packetLen = *(uint32_t*)p; p += PACKET_LEN_WIDTH;
|
||||
}
|
||||
|
||||
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
|
||||
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
|
||||
throw platform_error();
|
||||
}
|
||||
if(e < p + packetLen + checksumWidth) break;
|
||||
|
||||
if (e-p<packetLen) break;
|
||||
ASSERT( packetLen >= sizeof(UID) );
|
||||
|
||||
if(checksumEnabled) {
|
||||
char* checksumStart = (char*)p;
|
||||
p += checksum->width();
|
||||
if (checksumEnabled) {
|
||||
bool isBuggifyEnabled = false;
|
||||
if(g_network->isSimulated() && g_network->now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && BUGGIFY_WITH_PROB(0.0001)) {
|
||||
g_simulator.lastConnectionFailure = g_network->now();
|
||||
|
@ -984,37 +925,21 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
|
|||
}
|
||||
}
|
||||
|
||||
char* checksumBuffer = (char*) alloca(checksum->width());
|
||||
checksum->append(std::string_view((char*)p, packetLen));
|
||||
checksum->writeSum(checksumBuffer);
|
||||
|
||||
// store stringified checksums in hex for trace
|
||||
std::stringstream packetStream;
|
||||
std::stringstream calculatedStream;
|
||||
packetStream << std::hex;
|
||||
calculatedStream << std::hex;
|
||||
for(int i = 0; i<checksum->width(); i++){
|
||||
packetStream << (int)checksumStart[i];
|
||||
calculatedStream << (int)checksumBuffer[i];
|
||||
}
|
||||
std::string packetChecksum = packetStream.str();
|
||||
std::string calculatedChecksum = calculatedStream.str();
|
||||
|
||||
if(memcmp(checksumBuffer, checksumStart, checksum->width()) != 0) {
|
||||
uint32_t calculatedChecksum = crc32c_append(0, p, packetLen);
|
||||
if (calculatedChecksum != packetChecksum) {
|
||||
if (isBuggifyEnabled) {
|
||||
TraceEvent(SevInfo, "ChecksumMismatchExp").detail("PacketChecksum", packetChecksum).detail("CalculatedChecksum", calculatedChecksum);
|
||||
TraceEvent(SevInfo, "ChecksumMismatchExp").detail("PacketChecksum", (int)packetChecksum).detail("CalculatedChecksum", (int)calculatedChecksum);
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "ChecksumMismatchUnexp").detail("PacketChecksum", packetChecksum).detail("CalculatedChecksum", calculatedChecksum);
|
||||
TraceEvent(SevWarnAlways, "ChecksumMismatchUnexp").detail("PacketChecksum", (int)packetChecksum).detail("CalculatedChecksum", (int)calculatedChecksum);
|
||||
}
|
||||
throw checksum_failed();
|
||||
} else {
|
||||
if (isBuggifyEnabled) {
|
||||
TraceEvent(SevError, "ChecksumMatchUnexp").detail("PacketChecksum", packetChecksum).detail("CalculatedChecksum", calculatedChecksum);
|
||||
TraceEvent(SevError, "ChecksumMatchUnexp").detail("PacketChecksum", (int)packetChecksum).detail("CalculatedChecksum", (int)calculatedChecksum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#if VALGRIND
|
||||
VALGRIND_CHECK_MEM_IS_DEFINED(p, packetLen);
|
||||
#endif
|
||||
|
@ -1052,17 +977,13 @@ static int getNewBufferSize(const uint8_t* begin, const uint8_t* end, const Netw
|
|||
if (len < PACKET_LEN_WIDTH) {
|
||||
return FLOW_KNOBS->MIN_PACKET_BUFFER_BYTES;
|
||||
}
|
||||
ASSERT(peerProtocolVersion != ProtocolVersion());
|
||||
const uint32_t packetLen = *(uint32_t*)begin;
|
||||
|
||||
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
|
||||
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
|
||||
throw platform_error();
|
||||
}
|
||||
|
||||
int checksumWidth = !peerAddress.isTLS() ? getMinChecksum(std::min(g_network->protocolVersion(), peerProtocolVersion))->width() : 0;
|
||||
return std::max<uint32_t>(FLOW_KNOBS->MIN_PACKET_BUFFER_BYTES,
|
||||
packetLen + checksumWidth + sizeof(uint32_t) * 2);
|
||||
packetLen + sizeof(uint32_t) * (peerAddress.isTLS() ? 2 : 3));
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> connectionReader(
|
||||
|
@ -1484,12 +1405,10 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
|
|||
|
||||
// Reserve some space for packet length and checksum, write them after serializing data
|
||||
SplitBuffer packetInfoBuffer;
|
||||
|
||||
IChecksum* checksum = getMinChecksum(g_network->protocolVersion());
|
||||
uint32_t len;
|
||||
uint32_t len, checksum = 0;
|
||||
int packetInfoSize = PACKET_LEN_WIDTH;
|
||||
if (checksumEnabled) {
|
||||
packetInfoSize += checksum->width();
|
||||
packetInfoSize += sizeof(checksum);
|
||||
}
|
||||
|
||||
wr.writeAhead(packetInfoSize , &packetInfoBuffer);
|
||||
|
@ -1511,7 +1430,7 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
|
|||
while (checksumUnprocessedLength > 0) {
|
||||
uint32_t processLength =
|
||||
std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
|
||||
checksum->append(std::string_view((char*)checksumPb->data() + prevBytesWritten, processLength));
|
||||
checksum = crc32c_append(checksum, checksumPb->data() + prevBytesWritten, processLength);
|
||||
checksumUnprocessedLength -= processLength;
|
||||
checksumPb = checksumPb->nextPacketBuffer();
|
||||
prevBytesWritten = 0;
|
||||
|
@ -1521,9 +1440,7 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
|
|||
// Write packet length and checksum into packet buffer
|
||||
packetInfoBuffer.write(&len, sizeof(len));
|
||||
if (checksumEnabled) {
|
||||
char* checksumBuffer = (char*) alloca(checksum->width());
|
||||
checksum->writeSum(checksumBuffer);
|
||||
packetInfoBuffer.write(checksumBuffer, checksum->width(), sizeof(len));
|
||||
packetInfoBuffer.write(&checksum, sizeof(checksum), sizeof(len));
|
||||
}
|
||||
|
||||
if (len > FLOW_KNOBS->PACKET_LIMIT) {
|
||||
|
|
|
@ -20,10 +20,6 @@
|
|||
|
||||
#ifndef FLOW_TRANSPORT_H
|
||||
#define FLOW_TRANSPORT_H
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/serialize.h"
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
|
@ -31,6 +27,7 @@
|
|||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/Net2Packet.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
|
||||
|
|
Loading…
Reference in New Issue