Enable Accumulative Checksum in MutationRef (#11225)

* code clean up and add accumulative checksum bits to mutation ref

* address comments and fix issues

* address comments

* propagate acs index from commit proxy to storage server

* address comments

* address comments

* address comments

* address comments
This commit is contained in:
Zhe Wang 2024-03-11 09:51:31 -07:00 committed by GitHub
parent 19e3f3e2dd
commit b10c7107bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 434 additions and 133 deletions

View File

@ -337,6 +337,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( SIM_KMS_VAULT_MAX_KEYS, 4096 );
init( ENABLE_MUTATION_CHECKSUM, false ); // if ( randomize && BUGGIFY ) ENABLE_MUTATION_CHECKSUM = true; Enable this after deserialiser is ported to 7.3.
init( ENABLE_ACCUMULATIVE_CHECKSUM, false ); // if ( randomize && BUGGIFY ) ENABLE_ACCUMULATIVE_CHECKSUM = true; Enable this after deserialiser is ported to 7.3.
// clang-format on
}

View File

@ -301,7 +301,7 @@ inline void transformVersionstampMutation(MutationRef& mutation,
StringRef MutationRef::*param,
Version version,
uint16_t transactionNumber) {
mutation.removeChecksum();
mutation.clearChecksumAndAccumulativeIndex();
if ((mutation.*param).size() >= 4) {
int32_t pos = parseVersionstampOffset(mutation.*param);
mutation.*param = (mutation.*param).substr(0, (mutation.*param).size() - 4);

View File

@ -336,6 +336,7 @@ public:
int SIM_KMS_VAULT_MAX_KEYS;
bool ENABLE_MUTATION_CHECKSUM;
bool ENABLE_ACCUMULATIVE_CHECKSUM;
ClientKnobs(Randomize randomize);
void initialize(Randomize randomize);

View File

@ -61,12 +61,15 @@ static const char* typeString[] = { "SetValue",
"Reserved_For_SpanContextMessage",
"Reserved_For_OTELSpanContextMessage",
"Encrypted",
"AccumulativeChecksum",
"MAX_ATOMIC_OP" };
struct MutationRef {
static const int OVERHEAD_BYTES = 12; // 12 is the size of Header in MutationList entries
static const uint8_t CHECKSUM_FLAG_MASK = 128U;
enum Type : uint8_t {
static const uint8_t CHECKSUM_FLAG_MASK = 128U; // 10000000, the first bit indicates if checksum is set
static const uint8_t ACCUMULATIVE_CHECKSUM_INDEX_FLAG_MASK =
64U; // 01000000, the second bit indicates if the acs index is set
enum Type : uint8_t { // At most 64 types is available, since the first two bits have been reserved
SetValue = 0,
ClearRange,
AddValue,
@ -91,23 +94,28 @@ struct MutationRef {
Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */,
Reserved_For_OTELSpanContextMessage,
Encrypted, /* Represents an encrypted mutation and cannot be used directly before decrypting */
AccumulativeChecksum, /* Used to propagate accumulative checksum from commit proxy to storage server */
MAX_ATOMIC_OP
};
// This is stored this way for serialization purposes. Note: the first bit of `type` is used to indicate whether a
// checksum is appended to `param2`, when checksum is enabled, the bit is set during serialization, and reset during
// deserialization.
// deserialization. The second bit of `type` indicates whether an accumulativeChecksumIndex is appended to `param2`.
uint8_t type;
StringRef param1, param2;
Optional<uint32_t> checksum;
Optional<uint32_t> checksum; // 4 bytes for checksum
Optional<uint16_t> accumulativeChecksumIndex; // 2 bytes for indexing accumulative checksum
bool corrupted;
MutationRef() : type(MAX_ATOMIC_OP) {}
MutationRef(Type t, StringRef a, StringRef b) : type(t), param1(a), param2(b) {}
MutationRef(Arena& to, Type t, StringRef a, StringRef b) : type(t), param1(to, a), param2(to, b) {}
MutationRef() : type(MAX_ATOMIC_OP), corrupted(false) {}
MutationRef(Type t, StringRef a, StringRef b) : type(t), param1(a), param2(b), corrupted(false) {}
MutationRef(Arena& to, Type t, StringRef a, StringRef b)
: type(t), param1(to, a), param2(to, b), corrupted(false) {}
MutationRef(Arena& to, const MutationRef& from)
: type(from.type), param1(to, from.param1), param2(to, from.param2) {}
: type(from.type), param1(to, from.param1), param2(to, from.param2), corrupted(false) {}
int totalSize() const {
return OVERHEAD_BYTES + param1.size() + param2.size() + (checksum.present() ? sizeof(uint32_t) + 1 : 1);
return OVERHEAD_BYTES + param1.size() + param2.size() + (checksum.present() ? sizeof(uint32_t) + 1 : 1) +
(accumulativeChecksumIndex.present() ? sizeof(uint16_t) + 1 : 1);
}
int expectedSize() const { return param1.size() + param2.size(); }
int weightedTotalSize() const {
@ -123,59 +131,170 @@ struct MutationRef {
std::string toString() const {
std::string checksumStr;
if (checksum.present()) {
checksumStr = format("checksum: %s ", std::to_string(checksum.get()).c_str());
std::string accumulativeChecksumIndexStr;
if (this->checksum.present()) {
checksumStr = format("checksum: %s ", std::to_string(this->checksum.get()).c_str());
if (this->accumulativeChecksumIndex.present()) {
accumulativeChecksumIndexStr = format("accumulativechecksumindex: %s ",
std::to_string(this->accumulativeChecksumIndex.get()).c_str());
}
}
uint8_t cType = type & ~CHECKSUM_FLAG_MASK;
return format("%scode: %s param1: %s param2: %s",
uint8_t mType = this->type & ~CHECKSUM_FLAG_MASK;
mType = mType & ~ACCUMULATIVE_CHECKSUM_INDEX_FLAG_MASK;
return format("%s%scode: %s param1: %s param2: %s",
checksumStr.c_str(),
cType < MutationRef::MAX_ATOMIC_OP ? typeString[(int)cType] : "Unset",
accumulativeChecksumIndexStr.c_str(),
mType < MutationRef::MAX_ATOMIC_OP ? typeString[(int)mType] : "Unset",
printable(param1).c_str(),
printable(param2).c_str());
}
uint8_t typeWithChecksum() const { return this->type | CHECKSUM_FLAG_MASK; }
Optional<uint32_t> removeChecksum() {
this->checksum.reset();
if (!withChecksum()) {
return Optional<uint32_t>();
}
ASSERT(this->param2.size() >= 4);
const int idx = this->param2.size() - 4;
const uint32_t pc = *(const uint32_t*)(this->param2.substr(idx).begin());
this->type &= ~CHECKSUM_FLAG_MASK;
this->param2 = this->param2.substr(0, idx);
return pc;
}
// Return true if the mutation param2 carries checksum
bool withChecksum() const { return this->type & CHECKSUM_FLAG_MASK; }
bool isAtomicOp() const { return (ATOMIC_MASK & (1 << type)) != 0; }
bool isValid() const { return type < MAX_ATOMIC_OP; }
// Return true if the mutation param2 carries acs index
bool withAccumulativeChecksumIndex() const { return this->type & ACCUMULATIVE_CHECKSUM_INDEX_FLAG_MASK; }
uint32_t populateChecksum() {
ASSERT(!this->withChecksum());
uint32_t c = crc32c_append(static_cast<uint32_t>(this->type), param1.begin(), param1.size());
crc32c_append(c, param2.begin(), param2.size());
if (this->checksum.present()) {
if (this->checksum.get() != c) {
TraceEvent(SevError, "MutationRefChecksumMismatch")
.detail("CalculatedChecksum", std::to_string(c))
.detail("Mutatino", toString());
}
} else {
this->checksum = c;
}
return c;
// Set checksum bit to mutation type
uint8_t createTypeWithChecksum(uint8_t inType) const { return inType | CHECKSUM_FLAG_MASK; }
// Set acs bit to mutation type
uint8_t createTypeWithAccumulativeChecksumIndex(uint8_t inType) const {
return inType | ACCUMULATIVE_CHECKSUM_INDEX_FLAG_MASK;
}
bool isAtomicOp() const { return (ATOMIC_MASK & (1 << type)) != 0; }
bool isValid() const {
static_assert(MAX_ATOMIC_OP < 64U); // 2 bits have been reserved for checksum and accumulative checksum index
return type < MAX_ATOMIC_OP;
}
// If mutation checksum is enabled, we must set accumulative index before serialization
// Once accumulative index is set, it cannot change over time
void setAccumulativeChecksumIndex(uint16_t index) {
if (this->accumulativeChecksumIndex.present()) {
ASSERT(this->accumulativeChecksumIndex.get() == index);
} else {
this->accumulativeChecksumIndex = index;
}
return;
}
// If param2 includes checksum suffix, remove the suffix and set it to this->checksum
// Unflag the corresponding bit in type
// This operation must be after removing the acs index if exists
void offloadChecksum() {
if (this->checksum.present()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Internal checksum has been set when offloading checksum")
.detail("Mutation", toString());
this->corrupted = true;
}
if (!withChecksum()) {
return;
}
if (withAccumulativeChecksumIndex()) { // Removing checksum must be after removing acs index
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Type contains acs index flag when offloading checksum")
.detail("Mutation", toString());
this->corrupted = true;
}
this->type &= ~CHECKSUM_FLAG_MASK;
int index = this->param2.size() - 4;
this->checksum = *(const uint32_t*)(this->param2.substr(index, 4).begin());
this->param2 = this->param2.substr(0, index);
}
// If param2 includes acs index suffix, remove the suffix and set it to this->accumulativeChecksumIndex
// Unflag the corresponding bit in type
void offloadAccumulativeChecksumIndex() {
if (this->accumulativeChecksumIndex.present()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Internal acs index has been set when offloading acs index")
.detail("Mutation", this->toString());
this->corrupted = true;
}
if (!withAccumulativeChecksumIndex()) {
return;
}
this->type &= ~ACCUMULATIVE_CHECKSUM_INDEX_FLAG_MASK;
int index = this->param2.size() - 2;
this->accumulativeChecksumIndex = *(const uint16_t*)(this->param2.substr(index, 2).begin());
this->param2 = this->param2.substr(0, index);
}
// Clear checksum and acs index if exist
void clearChecksumAndAccumulativeIndex() {
this->checksum.reset();
this->accumulativeChecksumIndex.reset();
}
// Validate param2 correctness
void validateParam2() {
if (withChecksum() && withAccumulativeChecksumIndex()) {
if (this->param2.size() < 6) { // 4 bytes for checksum, 2 bytes for accumulative index
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Param2 size is wrong with both checksum and acs index")
.detail("Param2Size", this->param2.size())
.detail("Mutation", toString());
this->corrupted = true;
}
} else if (withChecksum() && !withAccumulativeChecksumIndex()) {
if (this->param2.size() < 4) { // 4 bytes for checksum
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Param2 size is wrong with checksum and without acs index")
.detail("Param2Size", this->param2.size())
.detail("Mutation", toString());
this->corrupted = true;
}
}
}
// Generate 32 bits checksum and set it to this->checksum
void populateChecksum() {
if (withChecksum()) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Type already has checksum flag when populating checksum")
.detail("Mutation", toString());
this->corrupted = true;
}
uint32_t crc = static_cast<uint32_t>(this->type);
crc = crc32c_append(crc, this->param1.begin(), this->param1.size());
crc = crc32c_append(crc, this->param2.begin(), this->param2.size());
if (this->checksum.present() && this->checksum.get() != crc) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Checksum mismatch when populating a new checksum")
.detail("CalculatedChecksum", std::to_string(crc))
.detail("Mutation", this->toString());
this->corrupted = true;
}
this->checksum = crc;
}
// Calculate crc based on type and param1 and param2 and compare the crc with this->checksum
bool validateChecksum() const {
if (!checksum.present()) {
if (this->corrupted) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Mutation has been marked as corrupted")
.detail("Mutation", this->toString());
return false;
}
if (!this->checksum.present()) {
return true;
}
uint32_t crc = static_cast<uint32_t>(this->type);
crc = crc32c_append(crc, this->param1.begin(), this->param1.size());
crc = crc32c_append(crc, this->param2.begin(), this->param2.size());
if (crc != static_cast<uint32_t>(this->checksum.get())) {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Mutation checksum mismatch")
.detail("Mutation", this->toString())
.detail("ExistingChecksum", this->checksum.get())
.detail("NewChecksum", crc);
return false;
} else {
return true;
}
uint32_t c = crc32c_append(static_cast<uint32_t>(this->type), param1.begin(), param1.size());
crc32c_append(c, param2.begin(), param2.size());
return c == checksum.get();
}
template <class Ar>
@ -184,35 +303,62 @@ struct MutationRef {
StringRef empty;
if (!isEncrypted() && ar.protocolVersion().hasMutationChecksum() &&
CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM) {
uint32_t c = populateChecksum();
StringRef cs = StringRef((uint8_t*)&c, 4);
uint8_t cType = this->typeWithChecksum();
serializer(ar, cType, param2, cs);
// Attach checksum at first, then attach acs index
populateChecksum();
uint8_t cType = createTypeWithChecksum(this->type);
uint32_t cs = this->checksum.get();
Standalone<StringRef> cEmpty = empty.withSuffix(StringRef((uint8_t*)&cs, 4));
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM && this->accumulativeChecksumIndex.present()) {
cType = createTypeWithAccumulativeChecksumIndex(cType);
uint16_t acsIdx = this->accumulativeChecksumIndex.get();
cEmpty = cEmpty.withSuffix(StringRef((uint8_t*)&acsIdx, 2));
}
serializer(ar, cType, param2, cEmpty);
} else {
serializer(ar, type, param2, empty);
}
} else if (!isEncrypted() && ar.isSerializing && ar.protocolVersion().hasMutationChecksum() &&
CLIENT_KNOBS->ENABLE_MUTATION_CHECKSUM) {
uint32_t c = populateChecksum();
StringRef cs = StringRef((uint8_t*)&c, 4);
uint8_t cType = this->typeWithChecksum();
Standalone<StringRef> param2WithChecksum = param2.withSuffix(cs);
StringRef p2 = param2WithChecksum;
serializer(ar, cType, param1, p2);
// Attach checksum at first, then attach acs index
populateChecksum();
uint8_t cType = createTypeWithChecksum(this->type);
uint32_t cs = this->checksum.get();
Standalone<StringRef> cParam2 = param2.withSuffix(StringRef((uint8_t*)&cs, 4));
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM && this->accumulativeChecksumIndex.present()) {
cType = createTypeWithAccumulativeChecksumIndex(cType);
uint16_t acsIdx = this->accumulativeChecksumIndex.get();
cParam2 = cParam2.withSuffix(StringRef((uint8_t*)&acsIdx, 2));
}
serializer(ar, cType, param1, cParam2);
} else {
serializer(ar, type, param1, param2);
}
if (ar.isDeserializing) {
validateParam2();
if (withChecksum()) {
checksum = removeChecksum();
// Remove acs index at first, then remove checksum
if (withAccumulativeChecksumIndex()) {
offloadAccumulativeChecksumIndex();
}
offloadChecksum();
}
validateParam2();
if (type == ClearRange && param2 == StringRef() && param1 != StringRef()) {
ASSERT(param1[param1.size() - 1] == '\x00');
if (param1[param1.size() - 1] != '\x00') {
TraceEvent(SevError, "MutationRefUnexpectedError")
.detail("Reason", "Param1 is not end with \\x00 for single key clear range")
.detail("Param1", param1)
.detail("Mutation", toString());
this->corrupted = true;
}
param2 = param1;
param1 = param2.substr(0, param2.size() - 1);
}
validateChecksum();
if (!validateChecksum()) {
TraceEvent(SevError, "MutationRefCorruptionDetected").detail("Mutation", this->toString());
this->corrupted = true;
}
}
}
@ -579,24 +725,4 @@ struct EncryptedMutationsAndVersionRef {
};
};
TEST_CASE("noSim/CommitTransaction/MutationRef") {
printf("testing MutationRef encoding/decoding\n");
MutationRef m(MutationRef::SetValue, "TestKey"_sr, "TestValue"_sr);
BinaryWriter wr(AssumeVersion(ProtocolVersion::withMutationChecksum()));
wr << m;
Standalone<StringRef> value = wr.toValue();
TraceEvent("EncodedMutation").detail("RawBytes", value);
BinaryReader rd(value, AssumeVersion(ProtocolVersion::withBlobGranule()));
Standalone<MutationRef> de;
rd >> de;
printf("Deserialized mutation: %s\n", de.toString().c_str());
return Void();
}
#endif

View File

@ -0,0 +1,78 @@
/*
* AccumulativeChecksumUtil.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/Knobs.h"
uint16_t getCommitProxyAccumulativeChecksumIndex(uint16_t commitProxyIndex) {
// We leave flexibility in acs index generated from different components
// Acs index ends with 1 indicates the mutation is from a commit proxy
return commitProxyIndex * 10 + 1;
}
bool validateAccumulativeChecksumIndexAtStorageServer(MutationRef m) {
if (m.checksum.present() && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM &&
(!m.accumulativeChecksumIndex.present())) {
TraceEvent(SevError, "ACSIndexNotPresent").detail("Mutation", m);
return false;
} else if (m.checksum.present() && CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM &&
m.accumulativeChecksumIndex.present() &&
m.accumulativeChecksumIndex.get() == invalidAccumulativeChecksumIndex) {
TraceEvent(SevError, "ACSIndexNotSet").detail("Mutation", m);
return false;
}
return true;
}
TEST_CASE("noSim/AccumulativeChecksum/MutationRef") {
printf("testing MutationRef encoding/decoding\n");
MutationRef m(MutationRef::SetValue, "TestKey"_sr, "TestValue"_sr);
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
m.setAccumulativeChecksumIndex(512);
}
BinaryWriter wr(AssumeVersion(ProtocolVersion::withMutationChecksum()));
wr << m;
Standalone<StringRef> value = wr.toValue();
TraceEvent("EncodedMutation").detail("RawBytes", value);
BinaryReader rd(value, AssumeVersion(ProtocolVersion::withMutationChecksum()));
Standalone<MutationRef> de;
rd >> de;
printf("Deserialized mutation: %s\n", de.toString().c_str());
if (de.type != m.type || de.param1 != m.param1 || de.param2 != m.param2) {
TraceEvent(SevError, "MutationMismatch")
.detail("OldType", m.type)
.detail("NewType", de.type)
.detail("OldParam1", m.param1)
.detail("NewParam1", de.param1)
.detail("OldParam2", m.param2)
.detail("NewParam2", de.param2);
ASSERT(false);
}
ASSERT(de.validateChecksum());
return Void();
}

View File

@ -25,6 +25,7 @@
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/Tenant.h"
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
@ -87,7 +88,8 @@ public:
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
tenantNameIndex(&proxyCommitData_.tenantNameIndex), lockedTenants(&proxyCommitData_.lockedTenants),
initialCommit(initialCommit_), provisionalCommitProxy(provisionalCommitProxy_) {
initialCommit(initialCommit_), provisionalCommitProxy(provisionalCommitProxy_),
accumulativeChecksumIndex(getCommitProxyAccumulativeChecksumIndex(proxyCommitData_.commitProxyIndex)) {
if (encryptMode.isEncryptionEnabled()) {
ASSERT(cipherKeys != nullptr);
ASSERT(cipherKeys->count(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) > 0);
@ -106,7 +108,8 @@ public:
cipherKeys(cipherKeys_), encryptMode(encryptMode), txnStateStore(resolverData_.txnStateStore),
toCommit(resolverData_.toCommit), confChange(resolverData_.confChanges), logSystem(resolverData_.logSystem),
popVersion(resolverData_.popVersion), keyInfo(resolverData_.keyInfo), storageCache(resolverData_.storageCache),
initialCommit(resolverData_.initialCommit), forResolver(true) {
initialCommit(resolverData_.initialCommit), forResolver(true),
accumulativeChecksumIndex(resolverAccumulativeChecksumIndex) {
if (encryptMode.isEncryptionEnabled()) {
ASSERT(cipherKeys != nullptr);
ASSERT(cipherKeys->count(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID) > 0);
@ -167,6 +170,9 @@ private:
// true if called from a provisional commit proxy
bool provisionalCommitProxy = false;
// indicate which commit proxy / resolver applies mutations
uint16_t accumulativeChecksumIndex = invalidAccumulativeChecksumIndex;
private:
// The following variables are used internally
@ -258,7 +264,8 @@ private:
Tag tag = decodeServerTagValue(
txnStateStore->readValue(serverTagKeyFor(serverKeysDecodeServer(m.param1))).get().get());
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivateMutation", dbgid)
.detail("Original", m)
@ -282,7 +289,8 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("ServerTag", dbgid).detail("Server", id).detail("Tag", tag.toString());
@ -326,7 +334,8 @@ private:
// This is done to make the storage servers aware of the cached key-ranges
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
//TraceEvent(SevDebug, "SendingPrivateMutation", dbgid).detail("Original", m.toString()).detail("Privatized", privatized.toString());
cachedRangeInfo[k] = privatized;
@ -349,7 +358,8 @@ private:
// Create a private mutation for cache servers
// This is done to make the cache servers aware of the cached key-ranges
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_CacheTag", dbgid).detail("M", privatized);
toCommit->addTag(cacheTag);
@ -390,7 +400,8 @@ private:
if (toCommit && keyInfo) {
KeyRange r = std::get<0>(decodeChangeFeedValue(m.param2));
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
auto ranges = keyInfo->intersectingRanges(r);
auto firstRange = ranges.begin();
@ -455,7 +466,8 @@ private:
if (toCommit) {
// send private mutation to SS that it now has a TSS pair
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get();
@ -488,7 +500,8 @@ private:
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_TSSQuarantine", dbgid).detail("M", privatized);
toCommit->addTag(decodeServerTagValue(tagV.get()));
@ -654,7 +667,8 @@ private:
}
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_GlobalKeys", dbgid).detail("M", privatized);
toCommit->addTags(allTags);
@ -678,7 +692,8 @@ private:
}
const Tag tag = decodeServerTagValue(tagValue.get());
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent("SendingPrivateMutationCheckpoint", dbgid)
.detail("Original", m)
@ -795,7 +810,8 @@ private:
toCommit->addTags(allTags);
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
writeMutation(privatized);
}
@ -925,7 +941,8 @@ private:
if (toCommit) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
privatized.param2 = keyAfter(privatized.param1, arena);
@ -949,7 +966,8 @@ private:
Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssi.tssPairID.get())).get();
if (tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = maybeTssRange.begin.withPrefix(systemKeys.begin, arena);
privatized.param2 =
keyAfter(maybeTssRange.begin, arena).withPrefix(systemKeys.begin, arena);
@ -1147,7 +1165,8 @@ private:
// send private mutation to SS to notify that it no longer has a tss pair
if (Optional<Value> tagV = txnStateStore->readValue(serverTagKeyFor(ssId)).get(); tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSMapping", dbgid).detail("M", privatized);
@ -1175,7 +1194,8 @@ private:
tagV.present()) {
MutationRef privatized = m;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
privatized.param2 = m.param2.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivatized_ClearTSSQuarantine", dbgid).detail("M", privatized);
@ -1256,7 +1276,8 @@ private:
toCommit->addTags(allTags);
MutationRef privatized;
privatized.removeChecksum();
privatized.clearChecksumAndAccumulativeIndex();
privatized.setAccumulativeChecksumIndex(accumulativeChecksumIndex);
privatized.type = MutationRef::ClearRange;
privatized.param1 = systemKeys.begin.withSuffix(std::max(range.begin, subspace.begin), arena);
if (range.end < subspace.end) {

View File

@ -200,11 +200,13 @@ ACTOR Future<Void> newCommitProxies(Reference<ClusterRecoveryData> self, Recruit
req.recoveryTransactionVersion = self->recoveryTransactionVersion;
req.firstProxy = i == 0;
req.encryptMode = getEncryptionAtRest(self->configuration);
req.commitProxyIndex = i;
TraceEvent("CommitProxyReplies", self->dbgid)
.detail("WorkerID", recr.commitProxies[i].id())
.detail("RecoveryTxnVersion", self->recoveryTransactionVersion)
.detail("EncryptMode", req.encryptMode.toString())
.detail("FirstProxy", req.firstProxy ? "True" : "False");
.detail("FirstProxy", req.firstProxy ? "True" : "False")
.detail("CommitProxyIndex", req.commitProxyIndex);
initializationReplies.push_back(
transformErrors(throwErrorOr(recr.commitProxies[i].commitProxy.getReplyUnlessFailedFor(
req, SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),

View File

@ -40,6 +40,7 @@
#include "fdbclient/TransactionLineage.h"
#include "fdbrpc/TenantInfo.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistributorInterface.h"
@ -561,6 +562,10 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self,
backupMutation.param2 = val.substr(
part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE,
std::min(val.size() - part * CLIENT_KNOBS->MUTATION_BLOCK_SIZE, CLIENT_KNOBS->MUTATION_BLOCK_SIZE));
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
backupMutation.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->commitProxyIndex));
}
// Write the last part of the mutation to the serialization, if the buffer is not defined
if (!partBuffer) {
@ -1981,6 +1986,10 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
state MutationRef m = (*pMutations)[mutationNum];
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
m.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
}
state Optional<MutationRef> encryptedMutation =
encryptedMutations->size() > 0 ? (*encryptedMutations)[mutationNum] : Optional<MutationRef>();
state Arena arena;
@ -2213,37 +2222,46 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
&self->computeStart));
}
buildIdempotencyIdMutations(self->trs,
self->idempotencyKVBuilder,
self->commitVersion,
self->committed,
ConflictBatch::TransactionCommitted,
self->locked,
[&](const KeyValue& kv) {
MutationRef idempotencyIdSet;
idempotencyIdSet.type = MutationRef::Type::SetValue;
idempotencyIdSet.param1 = kv.key;
idempotencyIdSet.param2 = kv.value;
auto& tags = pProxyCommitData->tagsForKey(kv.key);
self->toCommit.addTags(tags);
if (self->pProxyCommitData->encryptMode.isEncryptionEnabled()) {
CODE_PROBE(true, "encrypting idempotency mutation");
EncryptCipherDomainId domainId =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet);
MutationRef encryptedMutation = idempotencyIdSet.encrypt(
self->cipherKeys, domainId, self->arena, BlobCipherMetrics::TLOG);
ASSERT(encryptedMutation.isEncrypted());
self->toCommit.writeTypedMessage(encryptedMutation);
} else {
self->toCommit.writeTypedMessage(idempotencyIdSet);
}
});
buildIdempotencyIdMutations(
self->trs,
self->idempotencyKVBuilder,
self->commitVersion,
self->committed,
ConflictBatch::TransactionCommitted,
self->locked,
[&](const KeyValue& kv) {
MutationRef idempotencyIdSet;
idempotencyIdSet.type = MutationRef::Type::SetValue;
idempotencyIdSet.param1 = kv.key;
idempotencyIdSet.param2 = kv.value;
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
idempotencyIdSet.setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
}
auto& tags = pProxyCommitData->tagsForKey(kv.key);
self->toCommit.addTags(tags);
if (self->pProxyCommitData->encryptMode.isEncryptionEnabled()) {
CODE_PROBE(true, "encrypting idempotency mutation");
EncryptCipherDomainId domainId =
getEncryptDetailsFromMutationRef(self->pProxyCommitData, idempotencyIdSet);
MutationRef encryptedMutation =
idempotencyIdSet.encrypt(self->cipherKeys, domainId, self->arena, BlobCipherMetrics::TLOG);
ASSERT(encryptedMutation.isEncrypted());
self->toCommit.writeTypedMessage(encryptedMutation);
} else {
self->toCommit.writeTypedMessage(idempotencyIdSet);
}
});
state int i = 0;
for (i = 0; i < pProxyCommitData->idempotencyClears.size(); i++) {
auto& tags = pProxyCommitData->tagsForKey(pProxyCommitData->idempotencyClears[i].param1);
self->toCommit.addTags(tags);
// We already have an arena with an appropriate lifetime handy
Arena& arena = pProxyCommitData->idempotencyClears.arena();
if (CLIENT_KNOBS->ENABLE_ACCUMULATIVE_CHECKSUM) {
pProxyCommitData->idempotencyClears[i].setAccumulativeChecksumIndex(
getCommitProxyAccumulativeChecksumIndex(self->pProxyCommitData->commitProxyIndex));
}
WriteMutationRefVar var = wait(writeMutation(
self, SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, &pProxyCommitData->idempotencyClears[i], nullptr, &arena));
ASSERT(std::holds_alternative<MutationRef>(var));
@ -3718,7 +3736,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
bool firstProxy,
std::string whitelistBinPaths,
EncryptionAtRestMode encryptMode,
bool provisional) {
bool provisional,
uint16_t commitProxyIndex) {
state ProxyCommitData commitData(proxy.id(),
master,
proxy.getConsistentReadVersion,
@ -3727,7 +3746,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
db,
firstProxy,
encryptMode,
provisional);
provisional,
commitProxyIndex);
state Future<Sequence> sequenceFuture = (Sequence)0;
state PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int>> batchedCommits;
@ -3959,7 +3979,8 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
req.firstProxy,
whitelistBinPaths,
req.encryptMode,
proxy.provisional);
proxy.provisional,
req.commitProxyIndex);
wait(core || updateLocalDbInfo(db, localDb, req.recoveryCount, proxy));
} catch (Error& e) {
TraceEvent("CommitProxyTerminated", proxy.id()).errorUnsuppressed(e);

View File

@ -0,0 +1,34 @@
/*
* AccumulativeChecksumUtil.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ACCUMULATIVECHECKSUMUTIL_H
#define ACCUMULATIVECHECKSUMUTIL_H
#pragma once
#include "fdbclient/CommitTransaction.h"
static const uint16_t invalidAccumulativeChecksumIndex = 0;
static const uint16_t resolverAccumulativeChecksumIndex = 2;
uint16_t getCommitProxyAccumulativeChecksumIndex(uint16_t commitProxyIndex);
bool validateAccumulativeChecksumIndexAtStorageServer(MutationRef m);
#endif

View File

@ -224,6 +224,7 @@ struct ProxyCommitData {
KeyRangeMap<bool> cacheInfo;
std::map<Key, ApplyMutationsData> uid_applyMutationsData;
bool firstProxy;
uint16_t commitProxyIndex; // decided when the cluster controller recruits commit proxies
double lastCoalesceTime;
bool locked;
Optional<Value> metadataVersion;
@ -335,7 +336,8 @@ struct ProxyCommitData {
Reference<AsyncVar<ServerDBInfo> const> db,
bool firstProxy,
EncryptionAtRestMode encryptMode,
bool provisional)
bool provisional,
uint16_t commitProxyIndex)
: dbgid(dbgid), commitBatchesMemBytesCount(0),
stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount, &tenantMap), master(master),
logAdapter(nullptr), txnStateStore(nullptr), committedVersion(recoveryTransactionVersion),
@ -347,7 +349,7 @@ struct ProxyCommitData {
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
singleKeyMutationEvent("SingleKeyMutation"_sr), lastTxsPop(0), popRemoteTxs(false), lastStartCommit(0),
lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0), lastMasterReset(now()),
lastResolverReset(now()) {
lastResolverReset(now()), commitProxyIndex(commitProxyIndex) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};

View File

@ -739,11 +739,19 @@ struct InitializeCommitProxyRequest {
bool firstProxy;
ReplyPromise<CommitProxyInterface> reply;
EncryptionAtRestMode encryptMode;
uint16_t commitProxyIndex;
template <class Ar>
void serialize(Ar& ar) {
serializer(
ar, master, masterLifetime, recoveryCount, recoveryTransactionVersion, firstProxy, reply, encryptMode);
serializer(ar,
master,
masterLifetime,
recoveryCount,
recoveryTransactionVersion,
firstProxy,
reply,
encryptMode,
commitProxyIndex);
}
};

View File

@ -70,6 +70,7 @@
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/Smoother.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/AccumulativeChecksumUtil.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbclient/GetEncryptCipherKeys.h"
@ -11211,7 +11212,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
decryptionTime += decryptionTimeV;
}
} else {
ASSERT(msg.validateChecksum());
if (!msg.validateChecksum() || !validateAccumulativeChecksumIndexAtStorageServer(msg)) {
TraceEvent(SevError, "ValidateChecksumOrAcsIndexError", data->thisServerID)
.detail("Mutation", msg)
.detail("ResolverGeneratePrivateMutation",
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS);
ASSERT(false);
}
}
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);