Merge branch 'main' into storage-quota-in-tenant-metadata-space

This commit is contained in:
A.J. Beamon 2023-02-11 09:04:15 -08:00
commit 4579a4319d
19 changed files with 334 additions and 127 deletions

View File

@ -1037,7 +1037,7 @@ void printStatus(StatusObjectReader statusObj,
try {
double tx = -1, rx = -1, mCPUUtil = -1;
int64_t processTotalSize;
int64_t processRSS;
// Get the machine for this process
// StatusObjectReader mach = machinesMap[procObj["machine_id"].get_str()];
@ -1056,7 +1056,7 @@ void printStatus(StatusObjectReader statusObj,
}
}
procObj.get("memory.used_bytes", processTotalSize);
procObj.get("memory.rss_bytes", processRSS);
StatusObjectReader procCPUObj;
procObj.get("cpu", procCPUObj);
@ -1074,9 +1074,7 @@ void printStatus(StatusObjectReader statusObj,
if (procObj.get("disk.busy", diskBusy))
line += format("%3.0f%% disk IO;", 100.0 * diskBusy);
line += processTotalSize != -1
? format("%4.1f GB", processTotalSize / (1024.0 * 1024 * 1024))
: "";
line += processRSS != -1 ? format("%4.1f GB", processRSS / (1024.0 * 1024 * 1024)) : "";
double availableBytes;
if (procObj.get("memory.available_bytes", availableBytes))

View File

@ -39,6 +39,7 @@
#include "flow/serialize.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
#include "flow/xxhash.h"
#include <chrono>
#include <cstring>
@ -90,6 +91,95 @@ uint32_t BlobCipherEncryptHeaderRef::getHeaderSize(const int flagVersion,
return total;
}
void BlobCipherEncryptHeaderRef::validateEncryptionHeaderDetails(const BlobCipherDetails& textCipherDetails,
const BlobCipherDetails& headerCipherDetails,
const StringRef& ivRef) const {
ASSERT(CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION);
if (flagsVersion > CLIENT_KNOBS->ENCRYPT_HEADER_FLAGS_VERSION) {
TraceEvent("ValidateEncryptHeaderUnsupportedFlagVersion")
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_FLAGS_VERSION)
.detail("Version", flagsVersion);
throw not_implemented();
}
BlobCipherEncryptHeaderFlagsV1 flags = std::get<BlobCipherEncryptHeaderFlagsV1>(this->flags);
BlobCipherDetails persistedTextCipherDetails;
BlobCipherDetails persistedHeaderCipherDetails;
uint8_t* persistedIV = nullptr;
if (flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE) {
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION) {
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
.detail("AuthMode", "No-Auth")
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION)
.detail("Version", algoHeaderVersion);
throw not_implemented();
}
persistedTextCipherDetails = std::get<AesCtrNoAuthV1>(this->algoHeader).cipherTextDetails;
persistedIV = (uint8_t*)(&std::get<AesCtrNoAuthV1>(this->algoHeader).iv[0]);
} else {
if (flags.authTokenAlgo == ENCRYPT_HEADER_AUTH_TOKEN_ALGO_HMAC_SHA) {
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION) {
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
.detail("AuthMode", "Hmac-Sha")
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_HMAC_SHA_AUTH_VERSION)
.detail("Version", algoHeaderVersion);
}
persistedTextCipherDetails =
std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).cipherTextDetails;
persistedHeaderCipherDetails =
std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).cipherHeaderDetails;
persistedIV = (uint8_t*)(&std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).iv[0]);
} else if (flags.authTokenAlgo == ENCRYPT_HEADER_AUTH_TOKEN_ALGO_AES_CMAC) {
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION) {
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
.detail("AuthMode", "Aes-Cmac")
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION)
.detail("Version", algoHeaderVersion);
}
persistedTextCipherDetails =
std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).cipherTextDetails;
persistedHeaderCipherDetails =
std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).cipherHeaderDetails;
persistedIV = (uint8_t*)(&std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).iv[0]);
} else {
throw not_implemented();
}
}
// Validate encryption header 'cipherHeader' details sanity
if (flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE &&
headerCipherDetails != persistedHeaderCipherDetails) {
TraceEvent(SevError, "ValidateEncryptHeaderMismatch")
.detail("HeaderDomainId", headerCipherDetails.encryptDomainId)
.detail("PersistedHeaderDomainId", persistedHeaderCipherDetails.encryptDomainId)
.detail("HeaderBaseCipherId", headerCipherDetails.baseCipherId)
.detail("ExpectedHeaderBaseCipherId", persistedHeaderCipherDetails.baseCipherId)
.detail("HeaderSalt", headerCipherDetails.salt)
.detail("ExpectedHeaderSalt", persistedHeaderCipherDetails.salt);
throw encrypt_header_metadata_mismatch();
}
// Validate encryption header 'cipherText' details sanity
if (textCipherDetails != persistedTextCipherDetails) {
TraceEvent(SevError, "ValidateEncryptHeaderMismatch")
.detail("TextDomainId", textCipherDetails.encryptDomainId)
.detail("PersistedTextDomainId", persistedTextCipherDetails.encryptDomainId)
.detail("TextBaseCipherId", textCipherDetails.baseCipherId)
.detail("PersistedTextBaseCipherId", persistedTextCipherDetails.encryptDomainId)
.detail("TextSalt", textCipherDetails.salt)
.detail("PersistedTextSalt", persistedTextCipherDetails.salt);
throw encrypt_header_metadata_mismatch();
}
// Validate 'Initialization Vector' sanity
if (memcmp(ivRef.begin(), persistedIV, AES_256_IV_LENGTH) != 0) {
TraceEvent(SevError, "EncryptionHeader_IVMismatch")
.detail("IVChecksum", XXH3_64bits(ivRef.begin(), ivRef.size()))
.detail("ExpectedIVChecksum", XXH3_64bits(persistedIV, AES_256_IV_LENGTH));
throw encrypt_header_metadata_mismatch();
}
}
// BlobCipherMetrics methods
BlobCipherMetrics::CounterSet::CounterSet(CounterCollection& cc, std::string name)
@ -2185,6 +2275,9 @@ void testKeyCacheCleanup(const int minDomainId, const int maxDomainId) {
TEST_CASE("/blobCipher") {
DomainKeyMap domainKeyMap;
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
g_knobs.setKnob("enable_configurable_encryption", KnobValueRef::create(bool{ true }));
const EncryptCipherDomainId minDomainId = 1;
const EncryptCipherDomainId maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5;
const EncryptCipherBaseKeyId minBaseCipherKeyId = 100;

View File

@ -253,6 +253,19 @@ void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
throw encrypt_header_metadata_mismatch();
}
}
void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
const BlobCipherEncryptHeaderRef& headerRef,
const StringRef& ivRef) {
headerRef.validateEncryptionHeaderDetails(BlobCipherDetails(eKeys.textCipherKey->getDomainId(),
eKeys.textCipherKey->getBaseCipherId(),
eKeys.textCipherKey->getSalt()),
BlobCipherDetails(eKeys.headerCipherKey->getDomainId(),
eKeys.headerCipherKey->getBaseCipherId(),
eKeys.headerCipherKey->getSalt()),
ivRef);
}
} // namespace
struct IndexBlock {
@ -287,6 +300,7 @@ struct IndexBlockRef {
TraceEvent(SevDebug, "IndexBlockEncrypt_Before").detail("Chksum", chksum);
}
Value serializedBuff = ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
EncryptBlobCipherAes265Ctr encryptor(
eKeys.textCipherKey,
eKeys.headerCipherKey,
@ -294,11 +308,20 @@ struct IndexBlockRef {
AES_256_IV_LENGTH,
getEncryptAuthTokenMode(EncryptAuthTokenMode::ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE),
BlobCipherMetrics::BLOB_GRANULE);
Value serializedBuff = ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
BlobCipherEncryptHeaderRef headerRef;
buffer = encryptor.encrypt(
serializedBuff.contents().begin(), serializedBuff.contents().size(), &headerRef, arena);
Standalone<StringRef> serialized = BlobCipherEncryptHeaderRef::toStringRef(headerRef);
arena.dependsOn(serialized.arena());
encryptHeaderRef = serialized;
} else {
BlobCipherEncryptHeader header;
buffer = encryptor.encrypt(serializedBuff.contents().begin(), serializedBuff.contents().size(), &header, arena)
buffer =
encryptor.encrypt(serializedBuff.contents().begin(), serializedBuff.contents().size(), &header, arena)
->toStringRef();
encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
}
if (BG_ENCRYPT_COMPRESS_DEBUG) {
XXH64_hash_t chksum = XXH3_64bits(buffer.begin(), buffer.size());
@ -316,15 +339,25 @@ struct IndexBlockRef {
XXH64_hash_t chksum = XXH3_64bits(idxRef.buffer.begin(), idxRef.buffer.size());
TraceEvent(SevDebug, "IndexBlockEncrypt_Before").detail("Chksum", chksum);
}
StringRef decrypted;
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
BlobCipherEncryptHeaderRef headerRef =
BlobCipherEncryptHeaderRef::fromStringRef(idxRef.encryptHeaderRef.get());
validateEncryptionHeaderDetails(eKeys, headerRef, cipherKeysCtx.ivRef);
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
eKeys.headerCipherKey,
cipherKeysCtx.ivRef.begin(),
BlobCipherMetrics::BLOB_GRANULE);
decrypted = decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), headerRef, arena);
} else {
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(idxRef.encryptHeaderRef.get());
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
DecryptBlobCipherAes256Ctr decryptor(
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
StringRef decrypted =
decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), header, arena)->toStringRef();
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
eKeys.headerCipherKey,
cipherKeysCtx.ivRef.begin(),
BlobCipherMetrics::BLOB_GRANULE);
decrypted = decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), header, arena)->toStringRef();
}
if (BG_ENCRYPT_COMPRESS_DEBUG) {
XXH64_hash_t chksum = XXH3_64bits(decrypted.begin(), decrypted.size());
@ -418,10 +451,18 @@ struct IndexBlobGranuleFileChunkRef {
AES_256_IV_LENGTH,
getEncryptAuthTokenMode(EncryptAuthTokenMode::ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE),
BlobCipherMetrics::BLOB_GRANULE);
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
BlobCipherEncryptHeaderRef headerRef;
chunkRef.buffer = encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &headerRef, arena);
Standalone<StringRef> serialized = BlobCipherEncryptHeaderRef::toStringRef(headerRef);
arena.dependsOn(serialized.arena());
chunkRef.encryptHeaderRef = serialized;
} else {
BlobCipherEncryptHeader header;
chunkRef.buffer =
encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &header, arena)->toStringRef();
chunkRef.encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
}
if (BG_ENCRYPT_COMPRESS_DEBUG) {
XXH64_hash_t chksum = XXH3_64bits(chunkRef.buffer.begin(), chunkRef.buffer.size());
@ -442,14 +483,26 @@ struct IndexBlobGranuleFileChunkRef {
TraceEvent(SevDebug, "BlobChunkDecrypt_Before").detail("Chksum", chksum);
}
StringRef decrypted;
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
BlobCipherEncryptHeaderRef headerRef =
BlobCipherEncryptHeaderRef::fromStringRef(chunkRef.encryptHeaderRef.get());
validateEncryptionHeaderDetails(eKeys, headerRef, cipherKeysCtx.ivRef);
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
eKeys.headerCipherKey,
cipherKeysCtx.ivRef.begin(),
BlobCipherMetrics::BLOB_GRANULE);
decrypted = decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), headerRef, arena);
} else {
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(chunkRef.encryptHeaderRef.get());
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
DecryptBlobCipherAes256Ctr decryptor(
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
StringRef decrypted =
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
eKeys.headerCipherKey,
cipherKeysCtx.ivRef.begin(),
BlobCipherMetrics::BLOB_GRANULE);
decrypted =
decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), header, arena)->toStringRef();
}
if (BG_ENCRYPT_COMPRESS_DEBUG) {
XXH64_hash_t chksum = XXH3_64bits(decrypted.begin(), decrypted.size());

View File

@ -36,6 +36,7 @@
#include "flow/Platform.h"
#include "flow/ProtocolVersion.h"
#include "flow/serialize.h"
#include "flow/Trace.h"
#include <boost/functional/hash.hpp>
#include <cinttypes>
@ -171,6 +172,7 @@ struct BlobCipherDetails {
bool operator==(const BlobCipherDetails& o) const {
return encryptDomainId == o.encryptDomainId && baseCipherId == o.baseCipherId && salt == o.salt;
}
bool operator!=(const BlobCipherDetails& o) const { return !(*this == o); }
template <class Ar>
void serialize(Ar& ar) {
@ -380,6 +382,7 @@ struct BlobCipherEncryptHeaderRef {
serializer(ar, flagsVersion, algoHeaderVersion);
if (ar.isSerializing) {
if (flagsVersion != 1) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedFlagVersion").detail("Version", flagsVersion);
throw not_implemented();
}
@ -389,7 +392,13 @@ struct BlobCipherEncryptHeaderRef {
authAlgo = (EncryptAuthTokenAlgo)f.authTokenAlgo;
serializer(ar, f);
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR || algoHeaderVersion != 1) {
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedEncryptMode").detail("Mode", encryptMode);
throw not_implemented();
}
if (algoHeaderVersion != 1) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedAlgoHeaderVersion")
.detail("Version", algoHeaderVersion);
throw not_implemented();
}
@ -411,6 +420,7 @@ struct BlobCipherEncryptHeaderRef {
}
} else if (ar.isDeserializing) {
if (flagsVersion != 1) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedFlagVersion").detail("Version", flagsVersion);
throw not_implemented();
}
BlobCipherEncryptHeaderFlagsV1 f;
@ -420,7 +430,13 @@ struct BlobCipherEncryptHeaderRef {
authMode = (EncryptAuthTokenMode)f.authTokenMode;
authAlgo = (EncryptAuthTokenAlgo)f.authTokenAlgo;
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR || algoHeaderVersion != 1) {
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedEncryptMode").detail("Mode", encryptMode);
throw not_implemented();
}
if (algoHeaderVersion != 1) {
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedAlgoHeaderVersion")
.detail("Version", algoHeaderVersion);
throw not_implemented();
}
@ -443,6 +459,10 @@ struct BlobCipherEncryptHeaderRef {
}
}
}
void validateEncryptionHeaderDetails(const BlobCipherDetails& textCipherDetails,
const BlobCipherDetails& headerCipherDetails,
const StringRef& ivRef) const;
};
#pragma pack(pop)

View File

@ -1136,45 +1136,6 @@ struct LogMessageVersion {
}
};
struct AddressExclusion {
IPAddress ip;
int port;
AddressExclusion() : ip(0), port(0) {}
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
bool operator<(AddressExclusion const& r) const {
if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
bool isWholeMachine() const { return port == 0; }
bool isValid() const { return ip.isValid() || port != 0; }
bool excludes(NetworkAddress const& addr) const {
if (isWholeMachine())
return ip == addr.ip;
return ip == addr.ip && port == addr.port;
}
// This is for debugging and IS NOT to be used for serialization to persistant state
std::string toString() const {
if (!isWholeMachine())
return formatIpPort(ip, port);
return ip.toString();
}
static AddressExclusion parse(StringRef const&);
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ip, port);
}
};
inline bool addressExcluded(std::set<AddressExclusion> const& exclusions, NetworkAddress const& addr) {
return exclusions.count(AddressExclusion(addr.ip, addr.port)) || exclusions.count(AddressExclusion(addr.ip));
}

View File

@ -715,10 +715,10 @@ struct WaitMetricsRequest {
// Send a reversed range for min, max to receive an immediate report
constexpr static FileIdentifier file_identifier = 1795961;
Arena arena;
// Setting the tenantInfo makes the request tenant-aware. Need to set `minVersion` to a version where
// the tenant info was read.
// Setting the tenantInfo makes the request tenant-aware.
TenantInfo tenantInfo;
Version minVersion;
// Set `minVersion` to a version where the tenant info was read. Not needed for non-tenant-aware request.
Version minVersion = 0;
KeyRangeRef keys;
StorageMetrics min, max;
ReplyPromise<StorageMetrics> reply;

View File

@ -41,7 +41,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful v2.16.0+incompatible // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect

View File

@ -82,8 +82,9 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epTksy9HQ1MyrbDXSdYhM=
github.com/emicklei/go-restful v2.16.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=

View File

@ -130,6 +130,7 @@ public:
KillType kt,
KillType* newKillType) const = 0;
virtual bool isAvailable() const = 0;
virtual std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const = 0;
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
virtual void displayWorkers() const;
ProtocolVersion protocolVersion() const override = 0;

View File

@ -1422,6 +1422,19 @@ public:
return canKillProcesses(processesLeft, processesDead, KillType::KillInstantly, nullptr);
}
std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const override {
std::vector<AddressExclusion> addresses;
if (!dcId.present()) {
return addresses;
}
for (const auto& processInfo : getAllProcesses()) {
if (processInfo->locality.dcId() == dcId) {
addresses.emplace_back(processInfo->address.ip, processInfo->address.port);
}
}
return addresses;
}
bool datacenterDead(Optional<Standalone<StringRef>> dcId) const override {
if (!dcId.present()) {
return false;

View File

@ -176,6 +176,20 @@ public:
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
try {
wait(self->checkBuildTeams());
if (!self->primary && !self->readyToStart.isReady()) {
// When remote DC is not ready, DD shouldn't reply with a new team because
// a data movement to that team can't be completed and such a move
// may block the primary DC from reaching "storage_recovered".
auto team = self->findTeamFromServers(req.completeSources, /*wantHealthy=*/false);
TraceEvent("GetTeamNotReady", self->distributorId)
.suppressFor(1.0)
.detail("Primary", self->primary)
.detail("Team", team.present() ? describe(team.get()->getServerIDs()) : "");
req.reply.send(std::make_pair(team, true));
return Void();
}
// report the median available space
if (now() - self->lastMedianAvailableSpaceUpdate > SERVER_KNOBS->AVAILABLE_SPACE_UPDATE_DELAY) {
self->lastMedianAvailableSpaceUpdate = now();
@ -232,32 +246,15 @@ public:
bool wigglingBestOption = false; // best option contains server in paused wiggle state
Optional<Reference<IDataDistributionTeam>> bestOption;
std::vector<Reference<TCTeamInfo>> randomTeams;
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
// Note: this block does not apply any filters from the request
if (!req.wantsNewServers) {
for (int i = 0; i < req.completeSources.size(); i++) {
if (!self->server_info.count(req.completeSources[i])) {
continue;
}
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
for (int j = 0; j < teamList.size(); j++) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for (int k = 0; k < teamList[j]->size(); k++) {
if (!completeSources.count(serverIDs[k])) {
found = false;
break;
}
}
if (found && teamList[j]->isHealthy()) {
bestOption = teamList[j];
req.reply.send(std::make_pair(bestOption, foundSrc));
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/true);
if (healthyTeam.present()) {
req.reply.send(std::make_pair(healthyTeam, foundSrc));
return Void();
}
}
}
}
if (req.wantsTrueBest) {
ASSERT(!bestOption.present());
@ -355,35 +352,18 @@ public:
// Note: this block does not apply any filters from the request
if (!bestOption.present() && self->zeroHealthyTeams->get()) {
// Attempt to find the unhealthy source server team and return it
for (int i = 0; i < req.completeSources.size(); i++) {
if (!self->server_info.count(req.completeSources[i])) {
continue;
}
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
for (int j = 0; j < teamList.size(); j++) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for (int k = 0; k < teamList[j]->size(); k++) {
if (!completeSources.count(serverIDs[k])) {
found = false;
break;
}
}
if (found) {
bestOption = teamList[j];
req.reply.send(std::make_pair(bestOption, foundSrc));
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/false);
if (healthyTeam.present()) {
req.reply.send(std::make_pair(healthyTeam, foundSrc));
return Void();
}
}
}
}
// if (!bestOption.present()) {
// TraceEvent("GetTeamRequest").detail("Request", req.getDesc());
// self->traceAllInfo(true);
// }
req.reply.send(std::make_pair(bestOption, foundSrc));
return Void();
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled)
@ -1929,6 +1909,7 @@ public:
.detail("AddressesExcluded", excludedResults.size())
.detail("AddressesFailed", failedResults.size())
.detail("LocalitiesExcluded", excludedLocalityResults.size())
.detail("Primary", self->isPrimary())
.detail("LocalitiesFailed", failedLocalityResults.size());
self->restartRecruiting.trigger();
@ -3023,6 +3004,7 @@ public:
.detail("UnhealthyServers", self->unhealthyServers)
.detail("ServerCount", self->server_info.size())
.detail("StorageTeamSize", self->configuration.storageTeamSize)
.detail("ZeroHealthy", self->zeroOptimalTeams.get())
.detail("HighestPriority", highestPriority)
.trackLatest(self->primary ? "TotalDataInFlight"
: "TotalDataInFlightRemote"); // This trace event's trackLatest
@ -3400,6 +3382,31 @@ bool DDTeamCollection::teamContainsFailedServer(Reference<TCTeamInfo> team) cons
return false;
}
Optional<Reference<IDataDistributionTeam>> DDTeamCollection::findTeamFromServers(const std::vector<UID>& servers,
bool wantHealthy) {
const std::set<UID> completeSources(servers.begin(), servers.end());
for (const auto& server : servers) {
if (!server_info.count(server)) {
continue;
}
auto const& teamList = server_info[server]->getTeams();
for (const auto& team : teamList) {
bool found = true;
for (const UID& s : team->getServerIDs()) {
if (!completeSources.count(s)) {
found = false;
break;
}
}
if (found && (!wantHealthy || team->isHealthy())) {
return team;
}
}
}
return Optional<Reference<IDataDistributionTeam>>();
}
Future<Void> DDTeamCollection::logOnCompletion(Future<Void> signal) {
return DDTeamCollectionImpl::logOnCompletion(this, signal);
}

View File

@ -364,7 +364,9 @@ public:
wait(self->loadDatabaseConfiguration());
self->initDcInfo();
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString());
TraceEvent("DDInitGotConfiguration", self->ddId)
.setMaxFieldLength(-1)
.detail("Conf", self->configuration.toString());
wait(self->updateReplicaKeys());
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
@ -461,7 +463,7 @@ public:
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
}
if (traceShard) {
TraceEvent(SevDebug, "DDInitShard")
TraceEvent(SevDebug, "DDInitShard", self->ddId)
.detail("Keys", keys)
.detail("PrimarySrc", describe(iShard.primarySrc))
.detail("RemoteSrc", describe(iShard.remoteSrc))

View File

@ -802,13 +802,16 @@ ACTOR Future<Void> waitForShardReady(StorageServerInterface server,
try {
GetShardStateReply rep =
wait(server.getShardState.getReply(GetShardStateRequest(keys, mode), TaskPriority::MoveKeys));
TraceEvent("GetShardStateReadyDD").detail("RepVersion", rep.first).detail("MinVersion", rep.second).log();
TraceEvent("GetShardStateReadyDD", server.id())
.detail("RepVersion", rep.first)
.detail("MinVersion", rep.second)
.log();
if (rep.first >= minVersion) {
return Void();
}
wait(delayJittered(SERVER_KNOBS->SHARD_READY_DELAY, TaskPriority::MoveKeys));
} catch (Error& e) {
TraceEvent("GetShardStateReadyError").error(e).log();
TraceEvent("GetShardStateReadyError", server.id()).error(e).log();
if (e.code() != error_code_timed_out) {
if (e.code() != error_code_broken_promise)
throw e;
@ -1174,7 +1177,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
tssCount += tssReady[s].isReady() && !tssReady[s].isError();
TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId);
readyServersEv.detail("ReadyServers", count);
readyServersEv.detail("ReadyServers", count).detail("Dests", dest.size());
if (tssReady.size()) {
readyServersEv.detail("ReadyTSS", tssCount);
}

View File

@ -23,7 +23,6 @@
#include <type_traits>
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"
@ -715,7 +714,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
std::string context) {
if (g_network->isSimulated() && g_simulator->usableRegions > 1 && !g_simulator->quiesced) {
bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
state bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
bool remoteDead = g_simulator->datacenterDead(g_simulator->remoteDcId);
// FIXME: the primary and remote can both be considered dead because excludes are not handled properly by the
@ -731,6 +730,15 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
.detail("RemoteDead", remoteDead)
.detail("PrimaryDead", primaryDead);
g_simulator->usableRegions = 1;
state std::vector<AddressExclusion> servers = g_simulator->getAllAddressesInDCToExclude(
primaryDead ? g_simulator->primaryDcId : g_simulator->remoteDcId);
wait(excludeServers(cx, servers, false));
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration")
.detail("Location", context)
.detail("Stage", "ServerExcluded")
.detail("Servers", describe(servers));
wait(success(ManagementAPI::changeConfig(
cx.getReference(),
(primaryDead ? g_simulator->disablePrimary : g_simulator->disableRemote) + " repopulate_anti_quorum=1",

View File

@ -10402,7 +10402,7 @@ TEST_CASE("Lredwood/correctness/btree") {
wait(verifyTask);
// Sometimes close and reopen before destructive sanity check
if (deterministicRandom()->coinflip()) {
if (!pagerMemoryOnly && deterministicRandom()->coinflip()) {
Future<Void> closedFuture = btree->onClosed();
btree->close();
wait(closedFuture);

View File

@ -312,6 +312,10 @@ protected:
// When configuration is changed, we may have machine teams with old storageTeamSize
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const;
// Returns a server team from given "servers", empty team if not found.
// When "wantHealthy" is true, only return if the team is healthy.
Optional<Reference<IDataDistributionTeam>> findTeamFromServers(const std::vector<UID>& servers, bool wantHealthy);
Future<Void> logOnCompletion(Future<Void> signal);
void resetLocalitySet();

View File

@ -163,4 +163,45 @@ struct NetworkAddressList {
}
};
extern std::string formatIpPort(const IPAddress& ip, uint16_t port);
struct AddressExclusion {
IPAddress ip;
int port;
AddressExclusion() : ip(0), port(0) {}
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
bool operator<(AddressExclusion const& r) const {
if (ip != r.ip)
return ip < r.ip;
return port < r.port;
}
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
bool isWholeMachine() const { return port == 0; }
bool isValid() const { return ip.isValid() || port != 0; }
bool excludes(NetworkAddress const& addr) const {
if (isWholeMachine())
return ip == addr.ip;
return ip == addr.ip && port == addr.port;
}
// This is for debugging and IS NOT to be used for serialization to persistant state
std::string toString() const {
if (!isWholeMachine())
return formatIpPort(ip, port);
return ip.toString();
}
static AddressExclusion parse(StringRef const&);
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ip, port);
}
};
#endif // FLOW_NETWORKADDRESS_H

View File

@ -12,6 +12,7 @@ encryptModes = ['domain_aware', 'cluster_aware']
bg_metadata_source = "tenant"
bg_key_tuple_truncate_offset = 1
enable_encryption = true
enable_configurable_encryption = true
[[test]]
testTitle = 'BlobGranuleCorrectness'

View File

@ -9,6 +9,7 @@ encryptModes = ['domain_aware', 'cluster_aware']
[[knobs]]
bg_metadata_source = "tenant"
enable_encryption = true
enable_configurable_encryption = true
[[test]]
testTitle = 'BlobGranuleCorrectness'