Real-time corruption detection with accumulative checksum (#11255)

* acs framework

* code refactor and fix bugs

* add ss crash loop protector

* use sharedptr instead of raw pointer

* fixed critical bugs and add provate mutation acs to the framework

* enable ACS for all mutations except for clear serverTag mutation and fix bugs

* fix restarting tests

* refactor code and fix bugs

* fix AccumulativeChecksumState toString

* fix bugs

* allow all mutations in acs and fixed bugs

* fix bugs and code cleanup

* code clean up for adding recovery support

* simplify code and support recovery

* clear acs state at ss

* fix bug

* terminate validator if ss will be removed in the current batch

* simplify code

* add trace

* address comments

* optimize code

* deep copy when adding mutation to acs validator

* warp encode and decode persist acs key

* make acstable private

* remove unless func

* remove unless func

* remove epoch in ACS validator

* add acs mutation counter in SS metrics

* code cleanup and make knob check better

* make mutation buffer global

* simplify code

* add comments

* make knob randomly set

* address comments

* ss reboot after acs mismatch found
This commit is contained in:
Zhe Wang 2024-04-04 15:03:44 -07:00 committed by GitHub
parent 8635035cc5
commit 33eecd0775
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 999 additions and 132 deletions

View File

@ -336,8 +336,9 @@ void ClientKnobs::initialize(Randomize randomize) {
init( REST_KMS_ALLOW_NOT_SECURE_CONNECTION, false ); if ( randomize && BUGGIFY ) REST_KMS_ALLOW_NOT_SECURE_CONNECTION = !REST_KMS_ALLOW_NOT_SECURE_CONNECTION;
init( SIM_KMS_VAULT_MAX_KEYS, 4096 );
init( ENABLE_MUTATION_CHECKSUM, false ); // if ( randomize && BUGGIFY ) ENABLE_MUTATION_CHECKSUM = true; Enable this after deserialiser is ported to 7.3.
init( ENABLE_ACCUMULATIVE_CHECKSUM, false ); // if ( randomize && BUGGIFY ) ENABLE_ACCUMULATIVE_CHECKSUM = true; Enable this after deserialiser is ported to 7.3.
init( ENABLE_MUTATION_CHECKSUM, false ); if ( randomize && BUGGIFY ) ENABLE_MUTATION_CHECKSUM = true; // Enable this after deserialiser is ported to 7.3.
init( ENABLE_ACCUMULATIVE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ENABLE_ACCUMULATIVE_CHECKSUM = true; // Enable this after deserialiser is ported to 7.3.
init( ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING, false );
// clang-format on
}

View File

@ -303,6 +303,19 @@ const KeyRangeRef readConflictRangeKeysRange =
const KeyRangeRef writeConflictRangeKeysRange = KeyRangeRef("\xff\xff/transaction/write_conflict_range/"_sr,
"\xff\xff/transaction/write_conflict_range/\xff\xff"_sr);
const KeyRef accumulativeChecksumKey = "\xff\xff/accumulativeChecksum"_sr;
const Value accumulativeChecksumValue(const AccumulativeChecksumState& acsState) {
return ObjectWriter::toValue(acsState, IncludeVersion());
}
AccumulativeChecksumState decodeAccumulativeChecksum(const ValueRef& value) {
AccumulativeChecksumState acsState;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(acsState);
return acsState;
}
const KeyRangeRef auditKeys = KeyRangeRef("\xff/audits/"_sr, "\xff/audits0"_sr);
const KeyRef auditPrefix = auditKeys.begin;
const KeyRangeRef auditRanges = KeyRangeRef("\xff/auditRanges/"_sr, "\xff/auditRanges0"_sr);
@ -1239,6 +1252,9 @@ bool isBackupLogMutation(const MutationRef& m) {
return isSingleKeyMutation((MutationRef::Type)m.type) &&
(backupLogKeys.contains(m.param1) || applyLogKeys.contains(m.param1));
}
bool isAccumulativeChecksumMutation(const MutationRef& m) {
return m.type == MutationRef::SetValue && m.param1 == accumulativeChecksumKey;
}
// static_assert( backupLogKeys.begin.size() == backupLogPrefixBytes, "backupLogPrefixBytes incorrect" );
const KeyRef backupVersionKey = "\xff/backupDataFormat"_sr;
const ValueRef backupVersionValue = "4"_sr;

View File

@ -0,0 +1,52 @@
/*
* AccumulativeChecksum.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FDBCLIENT_ACCUMULATIVECHECKSUM_H
#define FDBCLIENT_ACCUMULATIVECHECKSUM_H
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
struct AccumulativeChecksumState {
constexpr static FileIdentifier file_identifier = 13804380;
AccumulativeChecksumState() : acs(0), version(invalidVersion), epoch(0), acsIndex(0) {}
AccumulativeChecksumState(uint16_t acsIndex, uint32_t acs, Version version, LogEpoch epoch)
: acs(acs), acsIndex(acsIndex), version(version), epoch(epoch) {}
std::string toString() const {
return "AccumulativeChecksumState: [ACS Index]: " + std::to_string(acsIndex) +
", [Acs]: " + std::to_string(acs) + ", [Version]: " + std::to_string(version) +
", [Epoch]: " + std::to_string(epoch);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, acsIndex, acs, version, epoch);
}
uint32_t acs;
uint16_t acsIndex;
Version version;
LogEpoch epoch;
};
#endif

View File

@ -336,7 +336,10 @@ public:
int SIM_KMS_VAULT_MAX_KEYS;
bool ENABLE_MUTATION_CHECKSUM;
// Enable to start accumulative checksum population and validation
bool ENABLE_ACCUMULATIVE_CHECKSUM;
// Enable to logging verbose trace events related to the accumulative checksum
bool ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING;
ClientKnobs(Randomize randomize);
void initialize(Randomize randomize);

View File

@ -94,7 +94,6 @@ struct MutationRef {
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
Encrypted, /* Represents an encrypted mutation and cannot be used directly before decrypting */
AccumulativeChecksum, /* Used to propagate accumulative checksum from commit proxy to storage server */
MAX_ATOMIC_OP
};
@ -172,11 +171,27 @@ struct MutationRef {
// If mutation checksum is enabled, we must set accumulative index before serialization
// Once accumulative index is set, it cannot change over time
void setAccumulativeChecksumIndex(uint16_t index) {
if (this->accumulativeChecksumIndex.present()) {
ASSERT(this->accumulativeChecksumIndex.get() == index);
} else {
this->accumulativeChecksumIndex = index;
if (!CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
return;
}
if (withAccumulativeChecksumIndex()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Type already has acsIndex flag when setting acsIndex")
.detail("Mutation", toString());
this->corrupted = true;
}
if (this->accumulativeChecksumIndex.present() && this->accumulativeChecksumIndex.get() != index) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "AcsIndex mismatch existing one when setting a new acsIndex")
.detail("NewAcsIndex", index)
.detail("Mutation", toString());
this->corrupted = true;
}
this->accumulativeChecksumIndex = index;
return;
}
@ -186,6 +201,8 @@ struct MutationRef {
void offloadChecksum() {
if (this->checksum.present()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Internal checksum has been set when offloading checksum")
.detail("Mutation", toString());
this->corrupted = true;
@ -195,6 +212,8 @@ struct MutationRef {
}
if (withAccumulativeChecksumIndex()) { // Removing checksum must be after removing acs index
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Type contains acs index flag when offloading checksum")
.detail("Mutation", toString());
this->corrupted = true;
@ -210,6 +229,8 @@ struct MutationRef {
void offloadAccumulativeChecksumIndex() {
if (this->accumulativeChecksumIndex.present()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Internal acs index has been set when offloading acs index")
.detail("Mutation", this->toString());
this->corrupted = true;
@ -234,16 +255,20 @@ struct MutationRef {
if (withChecksum() && withAccumulativeChecksumIndex()) {
if (this->param2.size() < 6) { // 4 bytes for checksum, 2 bytes for accumulative index
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Param2 size is wrong with both checksum and acs index")
.detail("Param2Size", this->param2.size())
.detail("Param2", this->param2)
.detail("Mutation", toString());
this->corrupted = true;
}
} else if (withChecksum() && !withAccumulativeChecksumIndex()) {
if (this->param2.size() < 4) { // 4 bytes for checksum
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Param2 size is wrong with checksum and without acs index")
.detail("Param2Size", this->param2.size())
.detail("Param2", this->param2)
.detail("Mutation", toString());
this->corrupted = true;
}
@ -252,8 +277,13 @@ struct MutationRef {
// Generate 32 bits checksum and set it to this->checksum
void populateChecksum() {
if (!CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM) {
return;
}
if (withChecksum()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Type already has checksum flag when populating checksum")
.detail("Mutation", toString());
this->corrupted = true;
@ -263,6 +293,8 @@ struct MutationRef {
crc = crc32c_append(crc, this->param2.begin(), this->param2.size());
if (this->checksum.present() && this->checksum.get() != crc) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Checksum mismatch when populating a new checksum")
.detail("CalculatedChecksum", std::to_string(crc))
.detail("Mutation", this->toString());
@ -275,6 +307,8 @@ struct MutationRef {
bool validateChecksum() const {
if (this->corrupted) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Mutation has been marked as corrupted")
.detail("Mutation", this->toString());
return false;
@ -287,6 +321,8 @@ struct MutationRef {
crc = crc32c_append(crc, this->param2.begin(), this->param2.size());
if (crc != static_cast<uint32_t>(this->checksum.get())) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Mutation checksum mismatch")
.detail("Mutation", this->toString())
.detail("ExistingChecksum", this->checksum.get())
@ -347,6 +383,8 @@ struct MutationRef {
if (type == ClearRange && param2 == StringRef() && param1 != StringRef()) {
if (param1[param1.size() - 1] != '\x00') {
TraceEvent(SevError, "MutationRefUnexpectedError")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Reason", "Param1 is not end with \\x00 for single key clear range")
.detail("Param1", param1)
.detail("Mutation", toString());
@ -356,7 +394,10 @@ struct MutationRef {
param1 = param2.substr(0, param2.size() - 1);
}
if (!validateChecksum()) {
TraceEvent(SevError, "MutationRefCorruptionDetected").detail("Mutation", this->toString());
TraceEvent(SevError, "MutationRefCorruptionDetected")
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Mutation", this->toString());
this->corrupted = true;
}
}

View File

@ -24,6 +24,7 @@
// Functions and constants documenting the organization of the reserved keyspace in the database beginning with "\xFF"
#include "fdbclient/AccumulativeChecksum.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h to remove this dependency
#include "fdbclient/StorageServerInterface.h"
@ -133,6 +134,10 @@ void decodeKeyServersValue(RangeResult result,
bool missingIsError = true);
bool isSystemKey(KeyRef key);
extern const KeyRef accumulativeChecksumKey;
const Value accumulativeChecksumValue(const AccumulativeChecksumState& acsState);
AccumulativeChecksumState decodeAccumulativeChecksum(const ValueRef& value);
extern const KeyRangeRef auditKeys;
extern const KeyRef auditPrefix;
extern const KeyRangeRef auditRanges;
@ -613,6 +618,9 @@ extern const KeyRangeRef applyLogKeys;
// Returns true if m is a blog (backup log) or alog (apply log) mutation
bool isBackupLogMutation(const MutationRef& m);
// Returns true if m is an acs mutation: a mutation carrying accumulative checksum value
bool isAccumulativeChecksumMutation(const MutationRef& m);
extern const KeyRef backupVersionKey;
extern const ValueRef backupVersionValue;
extern const int backupVersion;

View File

@ -21,32 +21,283 @@
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/Knobs.h"
uint16_t getCommitProxyAccumulativeChecksumIndex(uint16_t commitProxyIndex) {
// We leave flexibility in acs index generated from different components
// Acs index ends with 1 indicates the mutation is from a commit proxy
return commitProxyIndex * 10 + 1;
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
Tag inputTag,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId) {
mutation.populateChecksum();
mutation.setAccumulativeChecksumIndex(acsIndex);
acsBuilder->addMutation(mutation, inputTag, epoch, commitProxyId, commitVersion);
return;
}
bool validateAccumulativeChecksumIndexAtStorageServer(MutationRef m) {
if (m.checksum.present() && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM &&
(!m.accumulativeChecksumIndex.present())) {
TraceEvent(SevError, "ACSIndexNotPresent").detail("Mutation", m);
return false;
} else if (m.checksum.present() && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM &&
m.accumulativeChecksumIndex.present() &&
m.accumulativeChecksumIndex.get() == invalidAccumulativeChecksumIndex) {
TraceEvent(SevError, "ACSIndexNotSet").detail("Mutation", m);
return false;
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
const std::vector<Tag>& inputTags,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId) {
mutation.populateChecksum();
mutation.setAccumulativeChecksumIndex(acsIndex);
for (const auto& inputTag : inputTags) {
acsBuilder->addMutation(mutation, inputTag, epoch, commitProxyId, commitVersion);
}
return true;
return;
}
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
const std::set<Tag>& inputTags,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId) {
mutation.populateChecksum();
mutation.setAccumulativeChecksumIndex(acsIndex);
for (const auto& inputTag : inputTags) {
acsBuilder->addMutation(mutation, inputTag, epoch, commitProxyId, commitVersion);
}
return;
}
void AccumulativeChecksumBuilder::addMutation(const MutationRef& mutation,
Tag tag,
LogEpoch epoch,
UID commitProxyId,
Version commitVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
if (!tagSupportAccumulativeChecksum(tag)) {
return;
}
uint32_t oldAcs = 0;
auto it = acsTable.find(tag);
if (it != acsTable.end()) {
oldAcs = it->second.acs;
}
uint32_t newAcs = updateTable(tag, mutation.checksum.get(), commitVersion, epoch);
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsBuilderAddMutation", commitProxyId)
.detail("AcsTag", tag)
.detail("AcsIndex", mutation.accumulativeChecksumIndex.get())
.detail("CommitVersion", commitVersion)
.detail("OldAcs", oldAcs)
.detail("NewAcs", newAcs)
.detail("Mutation", mutation);
}
return;
}
uint32_t AccumulativeChecksumBuilder::updateTable(Tag tag, uint32_t checksum, Version version, LogEpoch epoch) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint32_t newAcs = 0;
auto it = acsTable.find(tag);
if (it == acsTable.end()) {
newAcs = checksum;
acsTable[tag] = AccumulativeChecksumState(acsIndex, newAcs, version, epoch);
} else {
ASSERT(version >= it->second.version);
ASSERT(version >= currentVersion);
newAcs = calculateAccumulativeChecksum(it->second.acs, checksum);
it->second = AccumulativeChecksumState(acsIndex, newAcs, version, epoch);
}
currentVersion = version;
return newAcs;
}
void AccumulativeChecksumBuilder::newTag(Tag tag, UID ssid, Version commitVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
bool exist = acsTable.erase(tag) > 0;
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsBuilderNewAcsTag")
.detail("AcsIndex", acsIndex)
.detail("AcsTag", tag)
.detail("CommitVersion", commitVersion)
.detail("Exist", exist)
.detail("SSID", ssid);
}
}
void AccumulativeChecksumValidator::addMutation(const MutationRef& mutation, UID ssid, Tag tag, Version ssVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
ASSERT(mutation.checksum.present() && mutation.accumulativeChecksumIndex.present());
const uint16_t& acsIndex = mutation.accumulativeChecksumIndex.get();
Version atAcsVersion = 0;
if (!mutationBuffer.empty()) {
ASSERT(mutationBuffer[0].accumulativeChecksumIndex.present());
if (mutationBuffer[0].accumulativeChecksumIndex.get() != acsIndex) {
TraceEvent(SevError, "AcsValidatorMissingAcs", ssid)
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("MissingAcsIndex", mutationBuffer[0].accumulativeChecksumIndex.get())
.detail("Mutation", mutation.toString())
.detail("LastAcsVersion", atAcsVersion)
.detail("SSVersion", ssVersion);
}
}
mutationBuffer.push_back(mutationBuffer.arena(), mutation);
totalAddedMutations++;
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsValidatorAddMutation", ssid)
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("Mutation", mutation.toString())
.detail("LastAcsVersion", atAcsVersion)
.detail("SSVersion", ssVersion);
}
}
Optional<AccumulativeChecksumState> AccumulativeChecksumValidator::processAccumulativeChecksum(
const AccumulativeChecksumState& acsMutationState,
UID ssid,
Tag tag,
Version ssVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
const LogEpoch& epoch = acsMutationState.epoch;
const uint16_t& acsIndex = acsMutationState.acsIndex;
auto it = acsTable.find(acsIndex);
if (it == acsTable.end()) {
// Unexpected. Since we assign acs mutation in commit batch
// So, there must be acs entry set up when adding the mutations of the batch
acsTable[acsIndex] = acsMutationState;
mutationBuffer.clear();
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevError, "AcsValidatorAcsMutationSkip", ssid)
.detail("Reason", "No Entry")
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("SSVersion", ssVersion)
.detail("Epoch", epoch);
}
return acsMutationState;
}
if ((acsMutationState.version < it->second.version || acsMutationState.epoch < it->second.epoch)) {
mutationBuffer.clear();
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevError, "AcsValidatorAcsMutationSkip", ssid)
.detail("Reason", "Acs Mutation Too Old")
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("SSVersion", ssVersion)
.detail("AcsMutation", acsMutationState.toString())
.detail("Epoch", epoch);
}
return Optional<AccumulativeChecksumState>();
}
// Clear the old acs state if new epoch comes
bool cleared = false;
if (acsMutationState.epoch > it->second.epoch) {
acsTable.erase(it);
cleared = true;
}
// Apply mutations in cache to acs
ASSERT(mutationBuffer.size() >= 1);
uint32_t oldAcs = !cleared ? it->second.acs : initialAccumulativeChecksum;
Version oldVersion = !cleared ? it->second.version : 0;
uint32_t newAcs = aggregateAcs(oldAcs, mutationBuffer);
checkedMutations = checkedMutations + mutationBuffer.size();
checkedVersions = checkedVersions + 1;
Version newVersion = acsMutationState.version;
if (newAcs != acsMutationState.acs) {
TraceEvent(SevError, "AcsValidatorAcsMutationMismatch", ssid)
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("SSVersion", ssVersion)
.detail("FromAcs", oldAcs)
.detail("FromVersion", oldVersion)
.detail("ToAcs", newAcs)
.detail("ToVersion", newVersion)
.detail("AcsToValidate", acsMutationState.acs)
.detail("Epoch", acsMutationState.epoch)
.detail("Cleared", cleared);
throw please_reboot();
} else {
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsValidatorAcsMutationValidated", ssid)
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("SSVersion", ssVersion)
.detail("FromAcs", oldAcs)
.detail("FromVersion", oldVersion)
.detail("ToAcs", newAcs)
.detail("ToVersion", newVersion)
.detail("Epoch", acsMutationState.epoch)
.detail("Cleared", cleared);
}
}
it->second = acsMutationState;
mutationBuffer.clear();
return acsMutationState;
}
void AccumulativeChecksumValidator::restore(const AccumulativeChecksumState& acsState,
UID ssid,
Tag tag,
Version ssVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
const uint16_t& acsIndex = acsState.acsIndex;
acsTable[acsIndex] = acsState;
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsValidatorRestore", ssid)
.detail("AcsIndex", acsIndex)
.detail("AcsTag", tag)
.detail("AcsState", acsState.toString())
.detail("SSVersion", ssVersion)
.detail("Epoch", acsState.epoch);
}
}
void AccumulativeChecksumValidator::clearCache(UID ssid, Tag tag, Version ssVersion) {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
if (!mutationBuffer.empty()) {
TraceEvent(SevError, "AcsValidatorCachedMutationNotChecked", ssid)
.detail("AcsTag", tag)
.detail("SSVersion", ssVersion);
}
}
uint64_t AccumulativeChecksumValidator::getAndClearCheckedMutations() {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint64_t res = checkedMutations;
checkedMutations = 0;
return res;
}
uint64_t AccumulativeChecksumValidator::getAndClearCheckedVersions() {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint64_t res = checkedVersions;
checkedVersions = 0;
return res;
}
uint64_t AccumulativeChecksumValidator::getAndClearTotalMutations() {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint64_t res = totalMutations;
totalMutations = 0;
return res;
}
uint64_t AccumulativeChecksumValidator::getAndClearTotalAcsMutations() {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint64_t res = totalAcsMutations;
totalAcsMutations = 0;
return res;
}
uint64_t AccumulativeChecksumValidator::getAndClearTotalAddedMutations() {
ASSERT(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM);
uint64_t res = totalAddedMutations;
totalAddedMutations = 0;
return res;
}
TEST_CASE("noSim/AccumulativeChecksum/MutationRef") {
printf("testing MutationRef encoding/decoding\n");
MutationRef m(MutationRef::SetValue, "TestKey"_sr, "TestValue"_sr);
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
m.setAccumulativeChecksumIndex(512);
}
m.setAccumulativeChecksumIndex(512);
BinaryWriter wr(AssumeVersion(ProtocolVersion::withMutationChecksum()));
wr << m;
@ -74,5 +325,26 @@ TEST_CASE("noSim/AccumulativeChecksum/MutationRef") {
ASSERT(de.validateChecksum());
Standalone<MutationRef> acsMutation;
LogEpoch epoch = 0;
uint16_t acsIndex = 1;
Standalone<StringRef> param2 = accumulativeChecksumValue(AccumulativeChecksumState(acsIndex, 1, 20, epoch));
acsMutation.type = MutationRef::SetValue;
acsMutation.param1 = accumulativeChecksumKey;
acsMutation.param2 = param2;
acsMutation.setAccumulativeChecksumIndex(1);
acsMutation.populateChecksum();
BinaryWriter acsWr(IncludeVersion());
acsWr << acsMutation;
Standalone<StringRef> acsValue = acsWr.toValue();
BinaryReader acsRd(acsValue, IncludeVersion());
Standalone<MutationRef> acsDe;
acsRd >> acsDe;
if (acsDe.type != MutationRef::SetValue || acsDe.param1 != accumulativeChecksumKey || acsDe.param2 != param2) {
TraceEvent(SevError, "AcsMutationMismatch");
ASSERT(false);
}
ASSERT(acsDe.validateChecksum());
return Void();
}

View File

@ -63,7 +63,7 @@ public:
const VectorRef<MutationRef>& mutations_,
IKeyValueStore* txnStateStore_)
: spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_),
confChange(dummyConfChange), encryptMode(EncryptionAtRestMode::DISABLED) {}
confChange(dummyConfChange), encryptMode(EncryptionAtRestMode::DISABLED), epoch(Optional<LogEpoch>()) {}
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
Arena& arena_,
@ -89,7 +89,8 @@ public:
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
tenantNameIndex(&proxyCommitData_.tenantNameIndex), lockedTenants(&proxyCommitData_.lockedTenants),
initialCommit(initialCommit_), provisionalCommitProxy(provisionalCommitProxy_),
accumulativeChecksumIndex(getCommitProxyAccumulativeChecksumIndex(proxyCommitData_.commitProxyIndex)) {
accumulativeChecksumIndex(getCommitProxyAccumulativeChecksumIndex(proxyCommitData_.commitProxyIndex)),
acsBuilder(proxyCommitData_.acsBuilder), epoch(proxyCommitData_.epoch) {
if (encryptMode.isEncryptionEnabled()) {
ASSERT(cipherKeys != nullptr);
ASSERT(cipherKeys->count(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) > 0);
@ -97,6 +98,8 @@ public:
ASSERT(cipherKeys->count(ENCRYPT_HEADER_DOMAIN_ID));
}
}
// If commit proxy, epoch must be set
ASSERT(toCommit == nullptr || epoch.present());
}
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
@ -109,7 +112,7 @@ public:
toCommit(resolverData_.toCommit), confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem),
popVersion(resolverData_.popVersion), keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
initialCommit(resolverData_.initialCommit), forResolver(true),
accumulativeChecksumIndex(resolverAccumulativeChecksumIndex) {
accumulativeChecksumIndex(resolverAccumulativeChecksumIndex), epoch(Optional<LogEpoch>()) {
if (encryptMode.isEncryptionEnabled()) {
ASSERT(cipherKeys != nullptr);
ASSERT(cipherKeys->count(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) > 0);
@ -173,6 +176,10 @@ private:
// indicate which commit proxy / resolver applies mutations
uint16_t accumulativeChecksumIndex = invalidAccumulativeChecksumIndex;
std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder = nullptr;
Optional<LogEpoch> epoch;
private:
// The following variables are used internally
@ -265,7 +272,6 @@ private:
txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get());
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivateMutation", dbgid)
.detail("Original", m)
@ -274,6 +280,10 @@ private:
.detail("TagKey", serverTagKeyFor(serverKeysDecodeServer(m.param1)))
.detail("Tag", tag.toString());
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, tag, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTag(tag);
writeMutation(privatized);
}
@ -287,14 +297,23 @@ private:
UID id = decodeServerTagKey(m.param1);
Tag tag = decodeServerTagValue(m.param2);
// At this point, this tag will be visible to others
// So, acsBuilder should create an brand new acsState for this tag
// If there exists an old acsState, overwite it
if (acsBuilder != nullptr) {
acsBuilder->newTag(tag, id, version);
}
if (toCommit) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString());
TraceEvent(SevDebug, "SendingPrivatized_ServerTag", dbgid).detail("M", "LogProtocolMessage");
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, tag, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTag(tag);
toCommit->writeTypedMessage(LogProtocolMessage(), true);
TraceEvent(SevDebug, "SendingPrivatized_ServerTag", dbgid).detail("M", privatized);
@ -335,7 +354,6 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
cachedRangeInfo[k] = privatized;
@ -359,9 +377,12 @@ private:
// This is done to make the cache servers aware of the cached key-ranges
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_CacheTag", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, cacheTag, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTag(cacheTag);
writeMutation(privatized);
}
@ -401,13 +422,21 @@ private:
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
auto ranges = keyInfo->intersectingRanges(r);
auto firstRange = ranges.begin();
++firstRange;
if (firstRange == ranges.end()) {
ranges.begin().value().populateTags();
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
ranges.begin().value().tags,
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTags(ranges.begin().value().tags);
} else {
std::set<Tag> allSources;
@ -415,6 +444,10 @@ private:
r.value().populateTags();
allSources.insert(r.value().tags.begin(), r.value().tags.end());
}
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, allSources, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTags(allSources);
}
TraceEvent(SevDebug, "SendingPrivatized_ChangeFeed", dbgid).detail("M", privatized);
@ -467,12 +500,20 @@ private:
// send private mutation to SS that it now has a TSS pair
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
if (tagV.present()) {
TraceEvent(SevDebug, "SendingPrivatized_TSSID", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
decodeServerTagValue(tagV.get()),
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTag(decodeServerTagValue(tagV.get()));
writeMutation(privatized);
}
@ -501,9 +542,17 @@ private:
if (tagV.present()) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_TSSQuarantine", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
decodeServerTagValue(tagV.get()),
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTag(decodeServerTagValue(tagV.get()));
writeMutation(privatized);
}
@ -668,9 +717,12 @@ private:
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_GlobalKeys", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, allTags, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTags(allTags);
writeMutation(privatized);
}
@ -693,7 +745,6 @@ private:
const Tag tag = decodeServerTagValue(tagValue.get());
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("SendingPrivateMutationCheckpoint", dbgid)
.detail("Original", m)
@ -703,6 +754,11 @@ private:
.detail("Tag", tag.toString())
.detail("Checkpoint", checkpoint.toString());
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, tag, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTag(tag);
writeMutation(privatized);
}
@ -811,8 +867,12 @@ private:
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, allTags, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
writeMutation(privatized);
}
@ -942,12 +1002,16 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
privatized.param2 = keyAfter(privatized.param1, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearServerTag", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, tag, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTag(tag);
writeMutation(privatized);
}
@ -967,13 +1031,22 @@ private:
if (tagV.present()) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena);
privatized.param2 =
keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_TSSClearServerTag", dbgid)
.detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
decodeServerTagValue(tagV.get()),
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTag(decodeServerTagValue(tagV.get()));
writeMutation(privatized);
}
@ -1166,10 +1239,18 @@ private:
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) {
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSMapping", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
decodeServerTagValue(tagV.get()),
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTag(decodeServerTagValue(tagV.get()));
writeMutation(privatized);
}
@ -1195,10 +1276,18 @@ private:
MutationRef privatized = m;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSQuarantine", dbgid).detail("M", privatized);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(acsBuilder,
privatized,
decodeServerTagValue(tagV.get()),
accumulativeChecksumIndex,
epoch.get(),
version,
dbgid);
}
toCommit->addTag(decodeServerTagValue(tagV.get()));
writeMutation(privatized);
}
@ -1277,7 +1366,6 @@ private:
MutationRef privatized;
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.type = MutationRef::ClearRange;
privatized.param1 = systemKeys.begin.withSuffix(std::max(range.begin, subspace.begin), arena);
if (range.end < subspace.end) {
@ -1285,6 +1373,10 @@ private:
} else {
privatized.param2 = systemKeys.begin.withSuffix(subspace.begin).withSuffix("\xff\xff"_sr, arena);
}
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, privatized, allTags, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
writeMutation(privatized);
}
@ -1401,8 +1493,16 @@ private:
TraceEvent(SevDebug, "SendingPrivatized_CachedKeyRange", dbgid)
.detail("MBegin", mutationBegin)
.detail("MEnd", mutationEnd);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, mutationBegin, allTags, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTags(allTags);
writeMutation(mutationBegin);
if (acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
acsBuilder, mutationEnd, allTags, accumulativeChecksumIndex, epoch.get(), version, dbgid);
}
toCommit->addTags(allTags);
writeMutation(mutationEnd);
}

View File

@ -22,6 +22,7 @@
#include <tuple>
#include <variant>
#include "fdbclient/AccumulativeChecksum.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BlobCipher.h"
@ -562,10 +563,6 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
backupMutation.param2 = val.substr(
part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE,
std::min(val.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE));
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
backupMutation.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->commitProxyIndex));
}
// Write the last part of the mutation to the serialization, if the buffer is not defined
if (!partBuffer) {
@ -585,6 +582,18 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
auto& tags = self->tagsForKey(backupMutation.param1);
toCommit->addTags(tags);
if (self->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
self->acsBuilder,
backupMutation,
tags,
getCommitProxyAccumulativeChecksumIndex(self->commitProxyIndex),
self->epoch,
commitVersion,
self->dbgid);
}
toCommit->writeTypedMessage(backupMutation);
// if (DEBUG_MUTATION("BackupProxyCommit", commitVersion, backupMutation)) {
@ -1939,6 +1948,38 @@ double pushToBackupMutations(CommitBatchContext* self,
return encryptionTime;
}
void addAccumulativeChecksumMutations(CommitBatchContext* self) {
ASSERT(self->pProxyCommitData->acsBuilder != nullptr);
const uint16_t acsIndex = getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex);
for (const auto& [tag, acsState] : self->pProxyCommitData->acsBuilder->getAcsTable()) {
ASSERT(tagSupportAccumulativeChecksum(tag));
ASSERT(acsState.version <= self->commitVersion);
if (acsState.version < self->commitVersion) {
// Have not updated in the current commit batch
// So, need not send acs mutation for this tag
continue;
}
ASSERT(acsState.epoch == self->pProxyCommitData->epoch);
MutationRef acsMutation;
acsMutation.type = MutationRef::SetValue;
acsMutation.param1 = accumulativeChecksumKey; // private mutation
acsMutation.param2 = accumulativeChecksumValue(
AccumulativeChecksumState(acsIndex, acsState.acs, self->commitVersion, self->pProxyCommitData->epoch));
acsMutation.setAccumulativeChecksumIndex(acsIndex);
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM_LOGGING) {
TraceEvent(SevInfo, "AcsBuilderIssueAccumulativeChecksumMutation", self->pProxyCommitData->dbgid)
.detail("Acs", acsState.acs)
.detail("AcsTag", tag)
.detail("AcsIndex", acsIndex)
.detail("CommitVersion", self->commitVersion)
.detail("Mutation", acsMutation)
.detail("CommitProxyIndex", self->pProxyCommitData->commitProxyIndex);
}
self->toCommit.addTag(tag);
self->toCommit.writeTypedMessage(acsMutation);
}
}
/// This second pass through committed transactions assigns the actual mutations to the appropriate storage servers'
/// tags
ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
@ -1983,10 +2024,6 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
state MutationRef m = (*pMutations)[mutationNum];
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
m.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
}
state Optional<MutationRef> encryptedMutation =
encryptedMutations->size() > 0 ? (*encryptedMutations)[mutationNum] : Optional<MutationRef>();
state Arena arena;
@ -2040,6 +2077,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
ASSERT(encryptedMutation.get().isEncrypted());
}
if (pProxyCommitData->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
pProxyCommitData->acsBuilder,
m,
tags,
getCommitProxyAccumulativeChecksumIndex(pProxyCommitData->commitProxyIndex),
pProxyCommitData->epoch,
self->commitVersion,
pProxyCommitData->dbgid);
}
WriteMutationRefVar var =
wait(writeMutation(self, encryptDomain, &m, &encryptedMutation, &arena, &curEncryptionTime));
totalEncryptionTime += curEncryptionTime;
@ -2058,6 +2106,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
ranges.begin().value().populateTags();
self->toCommit.addTags(ranges.begin().value().tags);
if (pProxyCommitData->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
pProxyCommitData->acsBuilder,
m,
ranges.begin().value().tags,
getCommitProxyAccumulativeChecksumIndex(pProxyCommitData->commitProxyIndex),
pProxyCommitData->epoch,
self->commitVersion,
pProxyCommitData->dbgid);
}
// check whether clear is sampled
if (checkSample && !trCost->get().clearIdxCosts.empty() &&
trCost->get().clearIdxCosts[0].first == mutationNum) {
@ -2098,6 +2157,17 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
.detail("Dbgid", pProxyCommitData->dbgid)
.detail("To", allSources);
self->toCommit.addTags(allSources);
if (self->pProxyCommitData->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
pProxyCommitData->acsBuilder,
m,
allSources,
getCommitProxyAccumulativeChecksumIndex(pProxyCommitData->commitProxyIndex),
pProxyCommitData->epoch,
self->commitVersion,
pProxyCommitData->dbgid);
}
}
if (pProxyCommitData->needsCacheTag(clearRange)) {
@ -2231,10 +2301,6 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
idempotencyIdSet.type = MutationRef::Type::SetValue;
idempotencyIdSet.param1 = kv.key;
idempotencyIdSet.param2 = kv.value;
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
idempotencyIdSet.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
}
auto& tags = pProxyCommitData->tagsForKey(kv.key);
self->toCommit.addTags(tags);
if (self->pProxyCommitData->encryptMode.isEncryptionEnabled()) {
@ -2246,6 +2312,16 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
ASSERT(encryptedMutation.isEncrypted());
self->toCommit.writeTypedMessage(encryptedMutation);
} else {
if (pProxyCommitData->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
pProxyCommitData->acsBuilder,
idempotencyIdSet,
tags,
getCommitProxyAccumulativeChecksumIndex(pProxyCommitData->commitProxyIndex),
pProxyCommitData->epoch,
self->commitVersion,
pProxyCommitData->dbgid);
}
self->toCommit.writeTypedMessage(idempotencyIdSet);
}
});
@ -2255,9 +2331,15 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
self->toCommit.addTags(tags);
// We already have an arena with an appropriate lifetime handy
Arena& arena = pProxyCommitData->idempotencyClears.arena();
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
pProxyCommitData->idempotencyClears[i].setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
if (pProxyCommitData->acsBuilder != nullptr) {
updateMutationWithAcsAndAddMutationToAcsBuilder(
pProxyCommitData->acsBuilder,
pProxyCommitData->idempotencyClears[i],
tags,
getCommitProxyAccumulativeChecksumIndex(pProxyCommitData->commitProxyIndex),
pProxyCommitData->epoch,
self->commitVersion,
pProxyCommitData->dbgid);
}
WriteMutationRefVar var = wait(writeMutation(
self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &pProxyCommitData->idempotencyClears[i], nullptr, &arena));
@ -2350,6 +2432,10 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR) {
tpcvMap = self->tpcvMap;
}
if (self->pProxyCommitData->acsBuilder != nullptr) {
// Issue acs mutation at the end of this commit batch
addAccumulativeChecksumMutations(self);
}
self->loggingComplete = pProxyCommitData->logSystem->push(self->prevVersion,
self->commitVersion,
pProxyCommitData->committedVersion.get(),
@ -3746,7 +3832,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
firstProxy,
encryptMode,
provisional,
commitProxyIndex);
commitProxyIndex,
epoch);
state Future<Sequence> sequenceFuture = (Sequence)0;
state PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int>> batchedCommits;

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,13 +22,165 @@
#define ACCUMULATIVECHECKSUMUTIL_H
#pragma once
#include "fdbclient/AccumulativeChecksum.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/SystemData.h"
static const uint16_t invalidAccumulativeChecksumIndex = 0;
static const uint16_t resolverAccumulativeChecksumIndex = 2;
static const uint32_t initialAccumulativeChecksum = 0;
uint16_t getCommitProxyAccumulativeChecksumIndex(uint16_t commitProxyIndex);
// Define the mapping from commitProxy Index to acsIndex
inline uint16_t getCommitProxyAccumulativeChecksumIndex(uint16_t commitProxyIndex) {
// We leave flexibility in acs index generated from different components
// Acs index ends with 1 indicates the mutation comes from a commit proxy
return commitProxyIndex * 10 + 1;
}
bool validateAccumulativeChecksumIndexAtStorageServer(MutationRef m);
// Define the ACS value calculation
inline uint32_t calculateAccumulativeChecksum(uint32_t currentAccumulativeChecksum, uint32_t newChecksum) {
return currentAccumulativeChecksum ^ newChecksum;
}
// Define the supported tags to track ACS values
inline bool tagSupportAccumulativeChecksum(Tag tag) {
// TODO: add log router tag, i.e., -2, so that new backup (backup workers) can be supported.
return tag.locality >= 0;
}
// Define how to aggregate ACS values of a vector of mutations from a starting ACS
inline uint32_t aggregateAcs(uint32_t startAcs, Standalone<VectorRef<MutationRef>> mutations) {
uint32_t newAcs = startAcs;
for (const auto& mutation : mutations) {
ASSERT(mutation.checksum.present());
newAcs = calculateAccumulativeChecksum(newAcs, mutation.checksum.get());
}
return newAcs;
}
// A builder to generate accumulative checksum and keep tracking
// the accumulative checksum for each tag
// Currently, accumulative checksum only supports the mutation
// generated by commit proxy and the encryption is disabled with
// storage server tags (aka. locality >= 0)
class AccumulativeChecksumBuilder {
public:
AccumulativeChecksumBuilder(uint16_t acsIndex) : acsIndex(acsIndex), currentVersion(0) {}
// Called when commit proxy applies a new tag assignment mutation
// At this time, this method erases the corresponding ACS value of the tag
void newTag(Tag tag, UID ssid, Version commitVersion);
// Called when commit proxy assigning tags to a mutation (e.g. mutation, private mutation)
// Update ACS value for the input tag assigned to the mutation
void addMutation(const MutationRef& mutation, Tag tag, LogEpoch epoch, UID commitProxyId, Version commitVersion);
// Return read-only ACS map
const std::unordered_map<Tag, AccumulativeChecksumState>& getAcsTable() { return acsTable; }
private:
uint16_t acsIndex; // Essentially, this is the ID of commit proxy
Version currentVersion;
std::unordered_map<Tag, AccumulativeChecksumState> acsTable;
// Update ACS state of the input tag to the input values
uint32_t updateTable(Tag tag, uint32_t checksum, Version version, LogEpoch epoch);
};
// This function changes the input mutation by populating checksum and setting ACS index in the mutation ref
// Add the input mutation and the corresponding inputTag to ACS builder
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
Tag inputTag,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId);
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
const std::vector<Tag>& inputTags,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId);
void updateMutationWithAcsAndAddMutationToAcsBuilder(std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder,
MutationRef& mutation,
const std::set<Tag>& inputTags,
uint16_t acsIndex,
LogEpoch epoch,
Version commitVersion,
UID commitProxyId);
// A validator to check if the accumulative checksum is correct for
// each version that has mutations
class AccumulativeChecksumValidator {
public:
AccumulativeChecksumValidator() {}
// Called when SS pulls a non-ACS mutation
// Add the mutation to the mutation buffer
void addMutation(const MutationRef& mutation, UID ssid, Tag tag, Version ssVersion);
// Called when SS receives an ACS mutation
// Consume the current mutation buffer to generate ACS
// Validate the generated ACS with the ACS carried by ACS mutation
// Report error if ACS values are mismatch
// Update acs table using the ACS mutation
// Return acs state to persist (a mutation is issued to persist ACS state after this method is called)
Optional<AccumulativeChecksumState> processAccumulativeChecksum(const AccumulativeChecksumState& acsMutationState,
UID ssid,
Tag tag,
Version ssVersion);
// Called when SS restores from persisted private data
// Overwrite existing acsState with the input acsState for the same acsIndex
void restore(const AccumulativeChecksumState& acsState, UID ssid, Tag tag, Version ssVersion);
// Called when SS applied pulled mutations in a round
// At this time, we are not expected to see any mutation
// in the buffer, since the mutation buffer is consumed
// by ACS mutation at the end of each version which having
// mutations. However, in case we are missing ACS mutations,
// the mutation buffer is not consumed. In this case,
// force to clear existing mutation buffer to keep the
// memory usage bounded. ACS value mismatch will be reported
// when the next ACS mutation arrives
void clearCache(UID ssid, Tag tag, Version ssVersion);
// Called when StorageMetrics is generated
// Return the existing counter and clear the counter,
// so that StorageMetrics reports the counter accumulated
// since the last StorageMetrics pops.
uint64_t getAndClearCheckedMutations();
uint64_t getAndClearCheckedVersions();
uint64_t getAndClearTotalMutations();
uint64_t getAndClearTotalAcsMutations();
uint64_t getAndClearTotalAddedMutations();
// Called when SS pulls a mutation
void incrementTotalMutations() { totalMutations++; }
// Called when SS receives an ACS mutation
void incrementTotalAcsMutations() { totalAcsMutations++; }
private:
std::unordered_map<uint16_t, AccumulativeChecksumState> acsTable;
// Any mutation is added to mutationBuffer at first. Those mutations
// will be consumed to generate ACS value until SS receives the first
// following ACS mutation.
Standalone<VectorRef<MutationRef>> mutationBuffer;
uint64_t checkedMutations = 0; // the number of mutations checked by ACS
uint64_t checkedVersions = 0; // the number of versions checked by ACS
uint64_t totalMutations = 0; // the number of mutations received by SS
uint64_t totalAcsMutations = 0; // the number of ACS mutations received by SS
uint64_t totalAddedMutations = 0; // the number of mutations added to mutationBuffer
};
#endif

View File

@ -29,6 +29,7 @@
#include "fdbclient/GetEncryptCipherKeys.h"
#include "fdbclient/Tenant.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
@ -224,7 +225,6 @@ struct ProxyCommitData {
KeyRangeMap<bool> cacheInfo;
std::map<Key, ApplyMutationsData> uid_applyMutationsData;
bool firstProxy;
uint16_t commitProxyIndex; // decided when the cluster controller recruits commit proxies
double lastCoalesceTime;
bool locked;
Optional<Value> metadataVersion;
@ -271,6 +271,10 @@ struct ProxyCommitData {
AsyncVar<bool> triggerCommit;
uint16_t commitProxyIndex; // decided when the cluster controller recruits commit proxies
std::shared_ptr<AccumulativeChecksumBuilder> acsBuilder = nullptr;
LogEpoch epoch;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
@ -337,7 +341,8 @@ struct ProxyCommitData {
bool firstProxy,
EncryptionAtRestMode encryptMode,
bool provisional,
uint16_t commitProxyIndex)
uint16_t commitProxyIndex,
LogEpoch epoch)
: dbgid(dbgid), commitBatchesMemBytesCount(0),
stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount, &tenantMap), master(master),
logAdapter(nullptr), txnStateStore(nullptr), committedVersion(recoveryTransactionVersion),
@ -349,7 +354,13 @@ struct ProxyCommitData {
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
singleKeyMutationEvent("SingleKeyMutation"_sr), lastTxsPop(0), popRemoteTxs(false), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0), lastMasterReset(now()),
lastResolverReset(now()), commitProxyIndex(commitProxyIndex) {
lastResolverReset(now()), commitProxyIndex(commitProxyIndex),
acsBuilder(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM &&
!encryptMode.isEncryptionEnabled()
? std::make_shared<AccumulativeChecksumBuilder>(
getCommitProxyAccumulativeChecksumIndex(commitProxyIndex))
: nullptr),
epoch(epoch) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};

View File

@ -213,6 +213,24 @@ static const std::string serverCheckpointFolder = "serverCheckpoints";
static const std::string checkpointBytesSampleTempFolder = "/metadata_temp";
static const std::string fetchedCheckpointFolder = "fetchedCheckpoints";
// Accumulative checksum related prefix
static const KeyRangeRef persistAccumulativeChecksumKeys =
KeyRangeRef(PERSIST_PREFIX "AccumulativeChecksum/"_sr, PERSIST_PREFIX "AccumulativeChecksum0"_sr);
inline Key encodePersistAccumulativeChecksumKey(LogEpoch epoch, uint16_t acsIndex) {
epoch = bigEndian64(epoch);
acsIndex = bigEndian16(acsIndex);
return persistAccumulativeChecksumKeys.begin.withSuffix(StringRef((uint8_t*)&epoch, 8))
.withSuffix(StringRef((uint8_t*)&acsIndex, 2));
}
inline std::pair<LogEpoch, uint16_t> decodePersistAccumulativeChecksumKey(const Key& key) {
StringRef keyStr = key.removePrefix(persistAccumulativeChecksumKeys.begin);
LogEpoch epoch = *(const uint64_t*)(keyStr.substr(0, 8).begin());
uint16_t acsIndex = *(const uint16_t*)(keyStr.substr(8, 2).begin());
return std::make_pair(fromBigEndian64(epoch), fromBigEndian16(acsIndex));
}
// MoveInUpdates caches new updates of a move-in shard, before that shard is ready to accept writes.
struct MoveInUpdates {
MoveInUpdates() : spilled(MoveInUpdatesSpilled::False) {}
@ -549,6 +567,8 @@ struct StorageServerDisk {
int64_t& bytesLeft,
UnlimitedCommitBytes unlimitedCommitBytes);
void makeVersionDurable(Version version);
void makeAccumulativeChecksumDurable(const AccumulativeChecksumState& acsState);
void clearAccumulativeChecksumState(const AccumulativeChecksumState& acsState);
void makeTssQuarantineDurable();
Future<bool> restoreDurableState();
@ -1617,6 +1637,8 @@ public:
// Tenant metadata to manage connection to blob store for fetchKeys()
BGTenantMap tenantData;
std::shared_ptr<AccumulativeChecksumValidator> acsValidator = nullptr;
StorageServer(IKeyValueStore* storage,
Reference<AsyncVar<ServerDBInfo> const> const& db,
StorageServerInterface const& ssi,
@ -1679,7 +1701,10 @@ public:
busiestWriteTagContext(ssi.id()), getEncryptCipherKeysMonitor(encryptionMonitor), counters(this),
storageServerSourceTLogIDEventHolder(
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")),
tenantData(db) {
tenantData(db),
acsValidator(CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM
? std::make_shared<AccumulativeChecksumValidator>()
: nullptr) {
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READTYPE_PRIORITY_MAP, ',');
ASSERT(readPriorityRanks.size() > (int)ReadType::MAX);
version.initMetric("StorageServer.Version"_sr, counters.cc.getId());
@ -10522,7 +10547,6 @@ public:
if (MUTATION_TRACKING_ENABLED) {
DEBUG_MUTATION("SSUpdateMutation", ver, m, data->thisServerID).detail("FromFetch", fromFetch);
}
splitMutation(data, data->shards, m, encryptedMutation, ver, fromFetch);
}
@ -10912,6 +10936,22 @@ private:
.detail("KeyCount", keyCount)
.detail("ValSize", valSize)
.detail("Seed", seed);
} else if (isAccumulativeChecksumMutation(m)) {
if (data->acsValidator != nullptr) {
ASSERT(m.checksum.present() && m.accumulativeChecksumIndex.present());
AccumulativeChecksumState acsMutationState = decodeAccumulativeChecksum(m.param2);
Optional<AccumulativeChecksumState> stateToPersist = data->acsValidator->processAccumulativeChecksum(
acsMutationState, data->thisServerID, data->tag, data->version.get());
if (stateToPersist.present()) {
auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion());
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
encodePersistAccumulativeChecksumKey(stateToPersist.get().epoch,
stateToPersist.get().acsIndex),
accumulativeChecksumValue(stateToPersist.get())));
}
}
} else {
ASSERT(false); // Unknown private mutation
}
@ -11200,7 +11240,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
cloneReader >> msg;
ASSERT(data->encryptionMode.present());
ASSERT(!data->encryptionMode.get().isEncryptionEnabled() || msg.isEncrypted() ||
isBackupLogMutation(msg));
isBackupLogMutation(msg) || isAccumulativeChecksumMutation(msg));
if (msg.isEncrypted()) {
if (!cipherKeys.present()) {
msg.updateEncryptCipherDetails(cipherDetails);
@ -11212,16 +11252,21 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
decryptionTime += decryptionTimeV;
}
} else {
if (!msg.validateChecksum() || !validateAccumulativeChecksumIndexAtStorageServer(msg)) {
TraceEvent(SevError, "ValidateChecksumOrAcsIndexError", data->thisServerID)
.detail("Mutation", msg)
.detail("ResolverGeneratePrivateMutation",
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS);
if (!msg.validateChecksum()) {
TraceEvent(SevError, "ValidateChecksumError", data->thisServerID)
.setMaxFieldLength(-1)
.setMaxEventLength(-1)
.detail("Mutation", msg);
ASSERT(false);
}
}
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
if (data->acsValidator != nullptr) {
data->acsValidator->incrementTotalMutations();
if (isAccumulativeChecksumMutation(msg)) {
data->acsValidator->incrementTotalAcsMutations();
}
}
if (!collectingCipherKeys) {
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
@ -11368,7 +11413,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
rd >> msg;
ASSERT(data->encryptionMode.present());
ASSERT(!data->encryptionMode.get().isEncryptionEnabled() || msg.isEncrypted() ||
isBackupLogMutation(msg));
isBackupLogMutation(msg) || isAccumulativeChecksumMutation(msg));
if (msg.isEncrypted()) {
ASSERT(cipherKeys.present());
encryptedMutation.mutation = msg;
@ -11377,6 +11422,11 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
msg = msg.decrypt(
encryptedMutation.cipherKeys, rd.arena(), BlobCipherMetrics::TLOG, nullptr, &decryptionTimeV);
decryptionTime += decryptionTimeV;
} else if (data->acsValidator != nullptr && msg.checksum.present() &&
msg.accumulativeChecksumIndex.present() && !isAccumulativeChecksumMutation(msg)) {
// We have to check accumulative checksum when iterating through cloneCursor2,
// where ss removal by tag assignment takes effect immediately
data->acsValidator->addMutation(msg, data->thisServerID, data->tag, data->version.get());
}
Span span("SS:update"_loc, spanContext);
@ -11441,6 +11491,10 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
}
if (data->acsValidator != nullptr) {
data->acsValidator->clearCache(data->thisServerID, data->tag, data->version.get());
}
if (SERVER_KNOBS->GENERATE_DATA_ENABLED && data->constructedData.size() && ver != invalidVersion) {
int mutationCount =
std::min(static_cast<int>(data->constructedData.size()), SERVER_KNOBS->GENERATE_DATA_PER_VERSION_MAX);
@ -12656,6 +12710,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
state Future<RangeResult> fMoveInShards = storage->readRange(persistMoveInShardsKeyRange());
state Future<RangeResult> fTenantMap = storage->readRange(persistTenantMapKeys);
state Future<RangeResult> fStorageShards = storage->readRange(persistStorageServerShardKeys);
state Future<RangeResult> fAccumulativeChecksum = storage->readRange(persistAccumulativeChecksumKeys);
state Promise<Void> byteSampleSampleRecovered;
state Promise<Void> startByteSampleRestore;
@ -12672,7 +12727,8 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
fCheckpoints,
fMoveInShards,
fTenantMap,
fStorageShards }));
fStorageShards,
fAccumulativeChecksum }));
wait(byteSampleSampleRecovered.getFuture());
TraceEvent("RestoringDurableState", data->thisServerID).log();
@ -12770,6 +12826,18 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
wait(yield());
}
// Restore acs validator from persisted disk
if (data->acsValidator != nullptr) {
RangeResult accumulativeChecksums = fAccumulativeChecksum.get();
data->bytesRestored += accumulativeChecksums.logicalSize();
for (int acsLoc = 0; acsLoc < accumulativeChecksums.size(); acsLoc++) {
std::pair<LogEpoch, uint16_t> res = decodePersistAccumulativeChecksumKey(accumulativeChecksums[acsLoc].key);
AccumulativeChecksumState acsState = decodeAccumulativeChecksum(accumulativeChecksums[acsLoc].value);
ASSERT(res.first == acsState.epoch && res.second == acsState.acsIndex);
data->acsValidator->restore(acsState, data->thisServerID, data->tag, data->version.get());
}
}
state RangeResult assigned = fShardAssigned.get();
data->bytesRestored += assigned.logicalSize();
data->bytesRestored += fStorageShards.get().logicalSize();
@ -13215,6 +13283,13 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
UID(self->thisServerID.first() ^ self->ssPairID.get().first(),
self->thisServerID.second() ^ self->ssPairID.get().second()));
}
if (self->acsValidator != nullptr) {
te.detail("ACSCheckedMutationsSinceLastPrint", self->acsValidator->getAndClearCheckedMutations());
te.detail("ACSCheckedVersionsSinceLastPrint", self->acsValidator->getAndClearCheckedVersions());
te.detail("TotalMutations", self->acsValidator->getAndClearTotalMutations());
te.detail("TotalAcsMutations", self->acsValidator->getAndClearTotalAcsMutations());
te.detail("TotalAddedMutations", self->acsValidator->getAndClearTotalAddedMutations());
}
}));
wait(serveStorageMetricsRequests(self, ssi));

View File

@ -297,17 +297,17 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES rare/StatusBuilderPerf.toml)
add_fdb_test(TEST_FILES rare/TLogVersionMessagesOverheadFactor.toml)
add_fdb_test(
TEST_FILES restarting/from_5.0.0_until_6.3.0/CycleTestRestart-1.txt
restarting/from_5.0.0_until_6.3.0/CycleTestRestart-2.txt)
TEST_FILES restarting/from_5.0.0_until_6.3.0/CycleTestRestart-1.toml
restarting/from_5.0.0_until_6.3.0/CycleTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_5.0.0_until_6.3.0/StorefrontTestRestart-1.txt
restarting/from_5.0.0_until_6.3.0/StorefrontTestRestart-2.txt)
TEST_FILES restarting/from_5.0.0_until_6.3.0/StorefrontTestRestart-1.toml
restarting/from_5.0.0_until_6.3.0/StorefrontTestRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_5.1.7_until_6.3.0/DrUpgradeRestart-1.txt
restarting/from_5.1.7_until_6.3.0/DrUpgradeRestart-2.txt)
TEST_FILES restarting/from_5.1.7_until_6.3.0/DrUpgradeRestart-1.toml
restarting/from_5.1.7_until_6.3.0/DrUpgradeRestart-2.toml)
add_fdb_test(
TEST_FILES restarting/from_5.2.0_until_6.3.0/ClientTransactionProfilingCorrectness-1.txt
restarting/from_5.2.0_until_6.3.0/ClientTransactionProfilingCorrectness-2.txt)
TEST_FILES restarting/from_5.2.0_until_6.3.0/ClientTransactionProfilingCorrectness-1.toml
restarting/from_5.2.0_until_6.3.0/ClientTransactionProfilingCorrectness-2.toml)
add_fdb_test(
TEST_FILES restarting/from_6.3.13/ClientTransactionProfilingCorrectness-1.txt
restarting/from_6.3.13/ClientTransactionProfilingCorrectness-2.txt)

View File

@ -1,31 +1,38 @@
[[test]]
testTitle=Clogged
clearAfterTest=false
[[test.workload]]
testName=Cycle
transactionsPerSecond=500.0
nodeCount=2500
testDuration=10.0
expectedRate=0
[[test.workload]]
testName=RandomClogging
testDuration=10.0
[[test.workload]]
testName=Rollback
meanDelay=10.0
testDuration=10.0
[[test.workload]]
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=10.0
[[test.workload]]
testName=Attrition
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=10.0
[[test.workload]]
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0

View File

@ -1,26 +1,35 @@
testTitle=Clogged
[[knobs]]
enable_accumulative_checksum=false
[[test]]
testTitle='Clogged'
runSetup=false
testName=Cycle
[[test.workload]]
testName='Cycle'
transactionsPerSecond=2500.0
nodeCount=2500
testDuration=10.0
expectedRate=0
testName=RandomClogging
[[test.workload]]
testName='RandomClogging'
testDuration=10.0
testName=Rollback
[[test.workload]]
testName='Rollback'
meanDelay=10.0
testDuration=10.0
testName=Attrition
[[test.workload]]
testName='Attrition'
machinesToKill=10
machinesToLeave=3
reboot=true
testDuration=10.0
testName=Attrition
[[test.workload]]
testName='Attrition'
machinesToKill=10
machinesToLeave=3
reboot=true

View File

@ -0,0 +1,15 @@
[[test]]
testTitle=StorefrontTest
clearAfterTest=false
[[test.workload]]
testName=Storefront
actorsPerClient=20
transactionsPerSecond=200
itemCount=20000
maxOrderSize=4
[[test.workload]]
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0

View File

@ -1,10 +0,0 @@
testTitle=StorefrontTest
clearAfterTest=false
testName=Storefront
actorsPerClient=20
transactionsPerSecond=200
itemCount=20000
maxOrderSize=4
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=10.0

View File

@ -0,0 +1,13 @@
[[knobs]]
enable_accumulative_checksum=false
[[test]]
testTitle='StorefrontTest'
runSetup=false
[[test.workload]]
testName='Storefront'
actorsPerClient=20
transactionsPerSecond=200
itemCount=20000
maxOrderSize=4

View File

@ -1,7 +0,0 @@
testTitle=StorefrontTest
runSetup=false
testName=Storefront
actorsPerClient=20
transactionsPerSecond=200
itemCount=20000
maxOrderSize=4

View File

@ -1,20 +1,25 @@
[configuration]
extraDB=3
[[test]]
testTitle=DrUpgrade
clearAfterTest=false
simBackupAgents=BackupToDB
[[test.workload]]
testName=Cycle
nodeCount=30000
transactionsPerSecond=1000.0
testDuration=30.0
expectedRate=0
[[test.workload]]
testName=BackupToDBUpgrade
backupAfter=10.0
stopDifferentialAfter=50.0
backupRangesCount=-1
[[test.workload]]
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=40.0

View File

@ -0,0 +1,25 @@
[configuration]
extraDatabaseMode='Local'
[[knobs]]
enable_accumulative_checksum=false
[[test]]
testTitle='DrUpgrade'
runSetup=false
clearAfterTest=false
simBackupAgents='BackupToDB'
waitForQuiescenceBegin=false
[[test.workload]]
testName='Cycle'
nodeCount=30000
transactionsPerSecond=1000.0
testDuration=30.0
expectedRate=0
[[test.workload]]
testName='BackupToDBUpgrade'
backupAfter=10.0
backupRangesCount=-1
stopDifferentialAfter=70.0

View File

@ -1,18 +0,0 @@
extraDatabaseMode=Local
testTitle=DrUpgrade
runSetup=false
clearAfterTest=false
simBackupAgents=BackupToDB
waitForQuiescenceBegin=false
testName=Cycle
nodeCount=30000
transactionsPerSecond=1000.0
testDuration=30.0
expectedRate=0
testName=BackupToDBUpgrade
backupAfter=10.0
backupRangesCount=-1
stopDifferentialAfter=70.0

View File

@ -1,8 +1,10 @@
[[test]]
testTitle=ClientTransactionProfilingCorrectness
clearAfterTest=false
runSetup=true
timeout=2100
[[test.workload]]
testName=ApiCorrectness
numKeys=5000
onlyLowerCase=true
@ -22,8 +24,10 @@ timeout=2100
maxTransactionBytes=500000
randomTestDuration=30
[[test.workload]]
testName=ClientTransactionProfileCorrectness
[[test.workload]]
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=60

View File

@ -1,9 +1,14 @@
testTitle=ClientTransactionProfilingCorrectness
[[knobs]]
enable_accumulative_checksum=false
[[test]]
testTitle='ClientTransactionProfilingCorrectness'
clearAfterTest=true
timeout=2100
runSetup=true
testName=ApiCorrectness
[[test.workload]]
testName='ApiCorrectness'
numKeys=5000
onlyLowerCase=true
shortKeysRatio=0.5
@ -22,5 +27,6 @@ runSetup=true
maxTransactionBytes=500000
randomTestDuration=60
testName=ClientTransactionProfileCorrectness
[[test.workload]]
testName='ClientTransactionProfileCorrectness'