change IChecksum interface and remove coordinator protocol version optional

This commit is contained in:
Richard Chen 2020-10-19 21:34:31 +00:00
parent 340e21fa93
commit c84a5f4014
5 changed files with 54 additions and 77 deletions

View File

@ -1011,7 +1011,6 @@ const char* MultiVersionApi::getClientVersion() {
ThreadFuture<uint64_t> MultiVersionApi::getServerProtocol(const char *clusterFilePath) {
// is this hacky
return api->localClient->api->getServerProtocol(clusterFilePath);
}

View File

@ -4170,63 +4170,36 @@ Future<Standalone<StringRef>> Transaction::getVersionstamp() {
return versionstampPromise.getFuture();
}
// Will not throw, will just return non-present Optional if error
ACTOR Future<Optional<ProtocolVersion>> coordinatorProtocolsFetcher(Reference<ClusterConnectionFile> f, bool *quorum_reachable) {
try {
state ClientCoordinators coord(f);
ACTOR Future<ProtocolVersion> coordinatorProtocolsFetcher(Reference<ClusterConnectionFile> f) {
state ClientCoordinators coord(f);
state vector<Future<Optional<LeaderInfo>>> leaderServers;
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
leaderServers.push_back(retryBrokenPromise(coord.clientLeaderServers[i].getLeader, GetLeaderRequest(coord.clusterKey, UID()), TaskPriority::CoordinationReply));
}
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
state vector<Future<ProtocolInfoReply>> coordProtocols;
coordProtocols.reserve(coord.clientLeaderServers.size());
for (int i = 0; i < coord.clientLeaderServers.size(); i++) {
RequestStream<ProtocolInfoRequest> requestStream{ Endpoint{
{ coord.clientLeaderServers[i].getLeader.getEndpoint().addresses }, WLTOKEN_PROTOCOL_INFO } };
coordProtocols.push_back(retryBrokenPromise(requestStream, ProtocolInfoRequest{}));
}
wait(smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5));
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.5) &&
smartQuorum(coordProtocols, coordProtocols.size() / 2 + 1, 1.5) ||
delay(2.0));
*quorum_reachable = quorum(leaderServers, leaderServers.size() / 2 + 1).isReady();
if(quorum(coordProtocols, coordProtocols.size() / 2 + 1).isReady()) {
std::unordered_map<uint64_t, int> protocolCount;
for (int i = 0; i < leaderServers.size(); i++) {
if(coordProtocols[i].isReady()) {
protocolCount[coordProtocols[i].get().version.version()]++;
}
}
uint64_t majorityProtocol = std::max_element(protocolCount.begin(), protocolCount.end(), [](const std::pair<uint64_t, int>& l, const std::pair<uint64_t, int>& r){
return l.second < r.second;
})->first;
return ProtocolVersion(majorityProtocol);
}
else {
return Optional<ProtocolVersion>();
std::unordered_map<uint64_t, int> protocolCount;
for(int i = 0; i<coordProtocols.size(); i++) {
if(coordProtocols[i].isReady()) {
protocolCount[coordProtocols[i].get().version.version()]++;
}
}
catch (Error &e){
*quorum_reachable = false;
return Optional<ProtocolVersion>();
}
uint64_t majorityProtocol = std::max_element(protocolCount.begin(), protocolCount.end(), [](const std::pair<uint64_t, int>& l, const std::pair<uint64_t, int>& r){
return l.second < r.second;
})->first;
return ProtocolVersion(majorityProtocol);
}
ACTOR Future<uint64_t> getCoordinatorProtocols(Reference<ClusterConnectionFile> f) {
state bool quorum_reachable = false;
Optional<ProtocolVersion> protocolVersionOptional = wait(coordinatorProtocolsFetcher(f, &quorum_reachable));
if(!protocolVersionOptional.present()) {
// what to do if optional isn't present?
return 0;
}
return protocolVersionOptional.get().version();
// let client know if server is present but before this feature is introduced
ProtocolVersion protocolVersion = wait(coordinatorProtocolsFetcher(f));
return protocolVersion.version();
}
uint32_t Transaction::getSize() {

View File

@ -298,13 +298,6 @@ ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
return onMainThread([tr]() -> Future<int64_t> { return tr->getApproximateSize(); });
}
// ThreadFuture<uint64_t> ThreadSafeTransaction::getProtocolVersion() {
// ReadYourWritesTransaction *tr = this->tr;
// return onMainThread( [tr]() -> Future< uint64_t > {
// return tr->getProtocolVersion();
// } );
// }
ThreadFuture<Standalone<StringRef>> ThreadSafeTransaction::getVersionstamp() {
ReadYourWritesTransaction *tr = this->tr;
return onMainThread([tr]() -> Future < Standalone<StringRef> > {

View File

@ -331,20 +331,21 @@ int CRC32::width() const {
return checksumWidth;
}
void CRC32::append(const uint8_t* data, size_t processLength) {
checksum = crc32c_append(std::get<uint32_t>(checksum), data, processLength);
void CRC32::append(std::string_view bytes) {
checksum = crc32c_append(std::get<uint32_t>(checksum), (uint8_t*)bytes.data(), bytes.size());
}
void CRC32::writeSum(SplitBuffer& buffer, int offset) {
buffer.write(&checksum, checksumWidth, offset);
void CRC32::writeSum(char* buffer) {
uint32_t cs = std::get<uint32_t>(checksum);
std::memcpy(buffer, &cs, checksumWidth);
reset();
}
bool CRC32::checkSum(uint8_t*& p) {
uint32_t packetLen = *(uint32_t*)p; p += sizeof(uint32_t);
uint32_t packetChecksum = *(uint32_t*)p; p += sizeof(uint32_t);
bool CRC32::checkSum(const char* buffer) {
uint32_t packetLen = *(uint32_t*)buffer; buffer += sizeof(uint32_t);
uint32_t packetChecksum = *(uint32_t*)buffer; buffer += sizeof(uint32_t);
append(p, packetLen);
append(std::string_view(buffer, packetLen));
bool matches = std::get<uint32_t>(checksum) == packetChecksum;
reset();
return matches;
@ -866,16 +867,17 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
int checksumWidth = checksumEnabled ? checksum->width() : 0;
if(e-p < sizeof(packetLen) + checksumWidth) break;
packetLen = *(uint32_t*)p;
packetLen = *(uint32_t*)p; p += sizeof(uint32_t);
if (packetLen > FLOW_KNOBS->PACKET_LIMIT) {
TraceEvent(SevError, "PacketLimitExceeded").detail("FromPeer", peerAddress.toString()).detail("Length", (int)packetLen);
throw platform_error();
}
if(e < p + sizeof(packetLen) + packetLen + checksumWidth) break;
if(e < p + packetLen + checksumWidth) break;
ASSERT( packetLen >= sizeof(UID) );
if(checksumEnabled) {
p += sizeof(uint32_t);
bool isBuggifyEnabled = false;
if(g_network->isSimulated() && g_network->now() - g_simulator.lastConnectionFailure > g_simulator.connectionFailuresDisableDuration && BUGGIFY_WITH_PROB(0.0001)) {
g_simulator.lastConnectionFailure = g_network->now();
@ -896,7 +898,8 @@ static void scanPackets(TransportData* transport, uint8_t*& unprocessed_begin, c
}
}
}
if (!checksum->checkSum(p)) {
if (!checksum->checkSum((char*) unprocessed_begin)) {
if (isBuggifyEnabled) {
// how important is it to have the correct vs calculated checksum in the trace?
// TraceEvent(SevInfo, "ChecksumMismatchExp").detail("PacketChecksum", (int)packetChecksum).detail("CalculatedChecksum", (int)std::get<uint32_t>(checksum));
@ -1407,7 +1410,7 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
while (checksumUnprocessedLength > 0) {
uint32_t processLength =
std::min(checksumUnprocessedLength, (uint32_t)(checksumPb->bytes_written - prevBytesWritten));
checksum->append(checksumPb->data() + prevBytesWritten, processLength);
checksum->append(std::string_view((char*)checksumPb->data() + prevBytesWritten, processLength));
checksumUnprocessedLength -= processLength;
checksumPb = checksumPb->nextPacketBuffer();
prevBytesWritten = 0;
@ -1417,7 +1420,11 @@ static ReliablePacket* sendPacket(TransportData* self, Reference<Peer> peer, ISe
// Write packet length and checksum into packet buffer
packetInfoBuffer.write(&len, sizeof(len));
if (checksumEnabled) {
checksum->writeSum(packetInfoBuffer, sizeof(len));
char* checksumBuffer = (char*) alloca(checksum->width());
checksum->writeSum(checksumBuffer);
packetInfoBuffer.write(checksumBuffer, checksum->width(), sizeof(len));
uint8_t* begin = packetInfoBuffer.begin;
begin += sizeof(uint32_t);
}
if (len > FLOW_KNOBS->PACKET_LIMIT) {

View File

@ -254,13 +254,18 @@ inline bool Endpoint::isLocal() const {
}
struct IChecksum {
// width in number of bytes
// Width in number of bytes
virtual int width() const = 0;
virtual void append(const uint8_t* data, size_t processLength) = 0;
virtual void writeSum(SplitBuffer& buffer, int offset) = 0;
virtual bool checkSum(uint8_t*& unprocessed_begin) = 0;
// Append to checksum
virtual void append(std::string_view bytes) = 0;
// Write checksum to preallocated buffer
virtual void writeSum(char* buffer) = 0;
// Validate checksum of packet. buffer ptr points to beginning of packetlen
virtual bool checkSum(const char* buffer) = 0;
// Reset checksum
virtual void reset() = 0;
// variant that allows for checksum of varying widths
std::variant<uint32_t> checksum;
int checksumWidth;
};
@ -268,9 +273,9 @@ struct IChecksum {
struct CRC32 : IChecksum {
CRC32();
int width() const override;
void append(const uint8_t* data, size_t processLength) override;
void writeSum(SplitBuffer& buffer, int offset) override;
bool checkSum(uint8_t*& unprocessed_begin) override;
void append(std::string_view bytes) override;
void writeSum(char* buffer) override;
bool checkSum(const char* buffer) override;
void reset() override;
};