Merge branch 'main' of https://github.com/apple/foundationdb into fix/main/tenantList
This commit is contained in:
commit
a9c7632c83
|
@ -23,6 +23,7 @@
|
|||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/IClientApi.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Metacluster.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "fdbclient/Schemas.h"
|
||||
|
||||
|
@ -30,6 +31,7 @@
|
|||
#include "flow/FastRef.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
#include <string>
|
||||
|
||||
namespace fdb_cli {
|
||||
|
||||
|
@ -83,14 +85,23 @@ void printMetaclusterConfigureOptionsUsage() {
|
|||
|
||||
// metacluster create command
|
||||
ACTOR Future<bool> metaclusterCreateCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() != 3) {
|
||||
fmt::print("Usage: metacluster create_experimental <NAME>\n\n");
|
||||
if (tokens.size() != 4) {
|
||||
fmt::print("Usage: metacluster create_experimental <NAME> <TENANT_ID_PREFIX>\n\n");
|
||||
fmt::print("Configures the cluster to be a management cluster in a metacluster.\n");
|
||||
fmt::print("NAME is an identifier used to distinguish this metacluster from other metaclusters.\n");
|
||||
fmt::print("TENANT_ID_PREFIX is an integer in the range [0,32767] inclusive which is prepended to all tenant "
|
||||
"ids in the metacluster.\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
Optional<std::string> errorStr = wait(MetaclusterAPI::createMetacluster(db, tokens[2]));
|
||||
int64_t tenantIdPrefix = std::stoi(tokens[3].toString());
|
||||
if (tenantIdPrefix < TenantAPI::TENANT_ID_PREFIX_MIN_VALUE ||
|
||||
tenantIdPrefix > TenantAPI::TENANT_ID_PREFIX_MAX_VALUE) {
|
||||
fmt::print("TENANT_ID_PREFIX must be in the range [0,32767] inclusive\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
Optional<std::string> errorStr = wait(MetaclusterAPI::createMetacluster(db, tokens[2], tenantIdPrefix));
|
||||
if (errorStr.present()) {
|
||||
fmt::print("ERROR: {}.\n", errorStr.get());
|
||||
} else {
|
||||
|
@ -420,7 +431,7 @@ std::vector<const char*> metaclusterHintGenerator(std::vector<StringRef> const&
|
|||
if (tokens.size() == 1) {
|
||||
return { "<create_experimental|decommission|register|remove|configure|list|get|status>", "[ARGS]" };
|
||||
} else if (tokencmp(tokens[1], "create_experimental")) {
|
||||
return { "<NAME>" };
|
||||
return { "<NAME> <TENANT_ID_PREFIX>" };
|
||||
} else if (tokencmp(tokens[1], "decommission")) {
|
||||
return {};
|
||||
} else if (tokencmp(tokens[1], "register") && tokens.size() < 5) {
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "fdbcli/fdbcli.actor.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last include
|
||||
|
||||
namespace {
|
||||
|
@ -60,22 +61,25 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, QuotaTy
|
|||
loop {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
try {
|
||||
state ThreadFuture<Optional<Value>> resultFuture =
|
||||
tr->get(quotaType == QuotaType::STORAGE ? storageQuotaKey(tag) : ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
if (!v.present()) {
|
||||
fmt::print("<empty>\n");
|
||||
} else {
|
||||
if (quotaType == QuotaType::STORAGE) {
|
||||
int64_t storageQuota = BinaryReader::fromStringRef<int64_t>(v.get(), Unversioned());
|
||||
fmt::print("{}\n", storageQuota);
|
||||
return Void();
|
||||
if (quotaType == QuotaType::STORAGE) {
|
||||
Optional<int64_t> value = wait(TenantMetadata::storageQuota().get(tr, tag));
|
||||
if (value.present()) {
|
||||
fmt::print("{}\n", value.get());
|
||||
} else {
|
||||
fmt::print("<empty>\n");
|
||||
}
|
||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
if (quotaType == QuotaType::TOTAL) {
|
||||
fmt::print("{}\n", quota.totalQuota);
|
||||
} else {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
Optional<ThrottleApi::TagQuotaValue> quota =
|
||||
v.map([](Value val) { return ThrottleApi::TagQuotaValue::fromValue(val); });
|
||||
|
||||
if (!quota.present()) {
|
||||
fmt::print("<empty>\n");
|
||||
} else if (quotaType == QuotaType::TOTAL) {
|
||||
fmt::print("{}\n", quota.get().totalQuota);
|
||||
} else if (quotaType == QuotaType::RESERVED) {
|
||||
fmt::print("{}\n", quota.reservedQuota);
|
||||
fmt::print("{}\n", quota.get().reservedQuota);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
@ -91,7 +95,7 @@ ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, QuotaTy
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
if (quotaType == QuotaType::STORAGE) {
|
||||
tr->set(storageQuotaKey(tag), BinaryWriter::toValue<int64_t>(value, Unversioned()));
|
||||
TenantMetadata::storageQuota().set(tr, tag, value);
|
||||
} else {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
|
@ -129,7 +133,7 @@ ACTOR Future<Void> clearQuota(Reference<IDatabase> db, TransactionTag tag) {
|
|||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
tr->clear(storageQuotaKey(tag));
|
||||
TenantMetadata::storageQuota().erase(tr, tag);
|
||||
tr->clear(ThrottleApi::getTagQuotaKey(tag));
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
fmt::print("Successfully cleared quota.\n");
|
||||
|
|
|
@ -1037,7 +1037,7 @@ void printStatus(StatusObjectReader statusObj,
|
|||
|
||||
try {
|
||||
double tx = -1, rx = -1, mCPUUtil = -1;
|
||||
int64_t processTotalSize;
|
||||
int64_t processRSS;
|
||||
|
||||
// Get the machine for this process
|
||||
// StatusObjectReader mach = machinesMap[procObj["machine_id"].get_str()];
|
||||
|
@ -1056,7 +1056,7 @@ void printStatus(StatusObjectReader statusObj,
|
|||
}
|
||||
}
|
||||
|
||||
procObj.get("memory.used_bytes", processTotalSize);
|
||||
procObj.get("memory.rss_bytes", processRSS);
|
||||
|
||||
StatusObjectReader procCPUObj;
|
||||
procObj.get("cpu", procCPUObj);
|
||||
|
@ -1074,9 +1074,7 @@ void printStatus(StatusObjectReader statusObj,
|
|||
if (procObj.get("disk.busy", diskBusy))
|
||||
line += format("%3.0f%% disk IO;", 100.0 * diskBusy);
|
||||
|
||||
line += processTotalSize != -1
|
||||
? format("%4.1f GB", processTotalSize / (1024.0 * 1024 * 1024))
|
||||
: "";
|
||||
line += processRSS != -1 ? format("%4.1f GB", processRSS / (1024.0 * 1024 * 1024)) : "";
|
||||
|
||||
double availableBytes;
|
||||
if (procObj.get("memory.available_bytes", availableBytes))
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import random
|
||||
|
||||
from argparse import RawDescriptionHelpFormatter
|
||||
|
||||
|
@ -34,8 +35,8 @@ def get_cluster_connection_str(cluster_file_path):
|
|||
return conn_str
|
||||
|
||||
|
||||
def metacluster_create(cluster_file, name):
|
||||
return run_fdbcli_command(cluster_file, "metacluster create_experimental", name)
|
||||
def metacluster_create(cluster_file, name, tenant_id_prefix):
|
||||
return run_fdbcli_command(cluster_file, "metacluster create_experimental", name, str(tenant_id_prefix))
|
||||
|
||||
|
||||
def metacluster_register(management_cluster_file, data_cluster_file, name):
|
||||
|
@ -75,7 +76,8 @@ if __name__ == "__main__":
|
|||
names = ['meta_mgmt']
|
||||
names.extend(['data{}'.format(i) for i in range(1, num_clusters)])
|
||||
|
||||
metacluster_create(cluster_files[0], names[0])
|
||||
tenant_id_prefix = random.randint(0, 32767)
|
||||
metacluster_create(cluster_files[0], names[0], tenant_id_prefix)
|
||||
for (cf, name) in zip(cluster_files[1:], names[1:]):
|
||||
output = metacluster_register(cluster_files[0], cf, name)
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include "flow/serialize.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/xxhash.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
|
@ -90,6 +91,95 @@ uint32_t BlobCipherEncryptHeaderRef::getHeaderSize(const int flagVersion,
|
|||
return total;
|
||||
}
|
||||
|
||||
void BlobCipherEncryptHeaderRef::validateEncryptionHeaderDetails(const BlobCipherDetails& textCipherDetails,
|
||||
const BlobCipherDetails& headerCipherDetails,
|
||||
const StringRef& ivRef) const {
|
||||
ASSERT(CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION);
|
||||
|
||||
if (flagsVersion > CLIENT_KNOBS->ENCRYPT_HEADER_FLAGS_VERSION) {
|
||||
TraceEvent("ValidateEncryptHeaderUnsupportedFlagVersion")
|
||||
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_FLAGS_VERSION)
|
||||
.detail("Version", flagsVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
BlobCipherEncryptHeaderFlagsV1 flags = std::get<BlobCipherEncryptHeaderFlagsV1>(this->flags);
|
||||
BlobCipherDetails persistedTextCipherDetails;
|
||||
BlobCipherDetails persistedHeaderCipherDetails;
|
||||
uint8_t* persistedIV = nullptr;
|
||||
|
||||
if (flags.authTokenMode == ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE) {
|
||||
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION) {
|
||||
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
|
||||
.detail("AuthMode", "No-Auth")
|
||||
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION)
|
||||
.detail("Version", algoHeaderVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
persistedTextCipherDetails = std::get<AesCtrNoAuthV1>(this->algoHeader).cipherTextDetails;
|
||||
persistedIV = (uint8_t*)(&std::get<AesCtrNoAuthV1>(this->algoHeader).iv[0]);
|
||||
} else {
|
||||
if (flags.authTokenAlgo == ENCRYPT_HEADER_AUTH_TOKEN_ALGO_HMAC_SHA) {
|
||||
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_NO_AUTH_VERSION) {
|
||||
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
|
||||
.detail("AuthMode", "Hmac-Sha")
|
||||
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_HMAC_SHA_AUTH_VERSION)
|
||||
.detail("Version", algoHeaderVersion);
|
||||
}
|
||||
persistedTextCipherDetails =
|
||||
std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).cipherTextDetails;
|
||||
persistedHeaderCipherDetails =
|
||||
std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).cipherHeaderDetails;
|
||||
persistedIV = (uint8_t*)(&std::get<AesCtrWithAuthV1<AUTH_TOKEN_HMAC_SHA_SIZE>>(this->algoHeader).iv[0]);
|
||||
} else if (flags.authTokenAlgo == ENCRYPT_HEADER_AUTH_TOKEN_ALGO_AES_CMAC) {
|
||||
if (algoHeaderVersion > CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION) {
|
||||
TraceEvent("ValidateEncryptHeaderUnsupportedAlgoHeaderVersion")
|
||||
.detail("AuthMode", "Aes-Cmac")
|
||||
.detail("MaxSupportedVersion", CLIENT_KNOBS->ENCRYPT_HEADER_AES_CTR_AES_CMAC_AUTH_VERSION)
|
||||
.detail("Version", algoHeaderVersion);
|
||||
}
|
||||
persistedTextCipherDetails =
|
||||
std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).cipherTextDetails;
|
||||
persistedHeaderCipherDetails =
|
||||
std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).cipherHeaderDetails;
|
||||
persistedIV = (uint8_t*)(&std::get<AesCtrWithAuthV1<AUTH_TOKEN_AES_CMAC_SIZE>>(this->algoHeader).iv[0]);
|
||||
} else {
|
||||
throw not_implemented();
|
||||
}
|
||||
}
|
||||
|
||||
// Validate encryption header 'cipherHeader' details sanity
|
||||
if (flags.authTokenMode != ENCRYPT_HEADER_AUTH_TOKEN_MODE_NONE &&
|
||||
headerCipherDetails != persistedHeaderCipherDetails) {
|
||||
TraceEvent(SevError, "ValidateEncryptHeaderMismatch")
|
||||
.detail("HeaderDomainId", headerCipherDetails.encryptDomainId)
|
||||
.detail("PersistedHeaderDomainId", persistedHeaderCipherDetails.encryptDomainId)
|
||||
.detail("HeaderBaseCipherId", headerCipherDetails.baseCipherId)
|
||||
.detail("ExpectedHeaderBaseCipherId", persistedHeaderCipherDetails.baseCipherId)
|
||||
.detail("HeaderSalt", headerCipherDetails.salt)
|
||||
.detail("ExpectedHeaderSalt", persistedHeaderCipherDetails.salt);
|
||||
throw encrypt_header_metadata_mismatch();
|
||||
}
|
||||
// Validate encryption header 'cipherText' details sanity
|
||||
if (textCipherDetails != persistedTextCipherDetails) {
|
||||
TraceEvent(SevError, "ValidateEncryptHeaderMismatch")
|
||||
.detail("TextDomainId", textCipherDetails.encryptDomainId)
|
||||
.detail("PersistedTextDomainId", persistedTextCipherDetails.encryptDomainId)
|
||||
.detail("TextBaseCipherId", textCipherDetails.baseCipherId)
|
||||
.detail("PersistedTextBaseCipherId", persistedTextCipherDetails.encryptDomainId)
|
||||
.detail("TextSalt", textCipherDetails.salt)
|
||||
.detail("PersistedTextSalt", persistedTextCipherDetails.salt);
|
||||
throw encrypt_header_metadata_mismatch();
|
||||
}
|
||||
// Validate 'Initialization Vector' sanity
|
||||
if (memcmp(ivRef.begin(), persistedIV, AES_256_IV_LENGTH) != 0) {
|
||||
TraceEvent(SevError, "EncryptionHeader_IVMismatch")
|
||||
.detail("IVChecksum", XXH3_64bits(ivRef.begin(), ivRef.size()))
|
||||
.detail("ExpectedIVChecksum", XXH3_64bits(persistedIV, AES_256_IV_LENGTH));
|
||||
throw encrypt_header_metadata_mismatch();
|
||||
}
|
||||
}
|
||||
|
||||
// BlobCipherMetrics methods
|
||||
|
||||
BlobCipherMetrics::CounterSet::CounterSet(CounterCollection& cc, std::string name)
|
||||
|
@ -2185,6 +2275,9 @@ void testKeyCacheCleanup(const int minDomainId, const int maxDomainId) {
|
|||
|
||||
TEST_CASE("/blobCipher") {
|
||||
DomainKeyMap domainKeyMap;
|
||||
auto& g_knobs = IKnobCollection::getMutableGlobalKnobCollection();
|
||||
g_knobs.setKnob("enable_configurable_encryption", KnobValueRef::create(bool{ true }));
|
||||
|
||||
const EncryptCipherDomainId minDomainId = 1;
|
||||
const EncryptCipherDomainId maxDomainId = deterministicRandom()->randomInt(minDomainId, minDomainId + 10) + 5;
|
||||
const EncryptCipherBaseKeyId minBaseCipherKeyId = 100;
|
||||
|
|
|
@ -253,6 +253,19 @@ void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
|
|||
throw encrypt_header_metadata_mismatch();
|
||||
}
|
||||
}
|
||||
|
||||
void validateEncryptionHeaderDetails(const BlobGranuleFileEncryptionKeys& eKeys,
|
||||
const BlobCipherEncryptHeaderRef& headerRef,
|
||||
const StringRef& ivRef) {
|
||||
headerRef.validateEncryptionHeaderDetails(BlobCipherDetails(eKeys.textCipherKey->getDomainId(),
|
||||
eKeys.textCipherKey->getBaseCipherId(),
|
||||
eKeys.textCipherKey->getSalt()),
|
||||
BlobCipherDetails(eKeys.headerCipherKey->getDomainId(),
|
||||
eKeys.headerCipherKey->getBaseCipherId(),
|
||||
eKeys.headerCipherKey->getSalt()),
|
||||
ivRef);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
struct IndexBlock {
|
||||
|
@ -287,6 +300,7 @@ struct IndexBlockRef {
|
|||
TraceEvent(SevDebug, "IndexBlockEncrypt_Before").detail("Chksum", chksum);
|
||||
}
|
||||
|
||||
Value serializedBuff = ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
|
||||
EncryptBlobCipherAes265Ctr encryptor(
|
||||
eKeys.textCipherKey,
|
||||
eKeys.headerCipherKey,
|
||||
|
@ -294,11 +308,20 @@ struct IndexBlockRef {
|
|||
AES_256_IV_LENGTH,
|
||||
getEncryptAuthTokenMode(EncryptAuthTokenMode::ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
Value serializedBuff = ObjectWriter::toValue(block, IncludeVersion(ProtocolVersion::withBlobGranuleFile()));
|
||||
BlobCipherEncryptHeader header;
|
||||
buffer = encryptor.encrypt(serializedBuff.contents().begin(), serializedBuff.contents().size(), &header, arena)
|
||||
->toStringRef();
|
||||
encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
|
||||
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
|
||||
BlobCipherEncryptHeaderRef headerRef;
|
||||
buffer = encryptor.encrypt(
|
||||
serializedBuff.contents().begin(), serializedBuff.contents().size(), &headerRef, arena);
|
||||
Standalone<StringRef> serialized = BlobCipherEncryptHeaderRef::toStringRef(headerRef);
|
||||
arena.dependsOn(serialized.arena());
|
||||
encryptHeaderRef = serialized;
|
||||
} else {
|
||||
BlobCipherEncryptHeader header;
|
||||
buffer =
|
||||
encryptor.encrypt(serializedBuff.contents().begin(), serializedBuff.contents().size(), &header, arena)
|
||||
->toStringRef();
|
||||
encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
|
||||
}
|
||||
|
||||
if (BG_ENCRYPT_COMPRESS_DEBUG) {
|
||||
XXH64_hash_t chksum = XXH3_64bits(buffer.begin(), buffer.size());
|
||||
|
@ -316,15 +339,25 @@ struct IndexBlockRef {
|
|||
XXH64_hash_t chksum = XXH3_64bits(idxRef.buffer.begin(), idxRef.buffer.size());
|
||||
TraceEvent(SevDebug, "IndexBlockEncrypt_Before").detail("Chksum", chksum);
|
||||
}
|
||||
|
||||
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(idxRef.encryptHeaderRef.get());
|
||||
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
|
||||
StringRef decrypted =
|
||||
decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), header, arena)->toStringRef();
|
||||
StringRef decrypted;
|
||||
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
|
||||
BlobCipherEncryptHeaderRef headerRef =
|
||||
BlobCipherEncryptHeaderRef::fromStringRef(idxRef.encryptHeaderRef.get());
|
||||
validateEncryptionHeaderDetails(eKeys, headerRef, cipherKeysCtx.ivRef);
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
|
||||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
decrypted = decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), headerRef, arena);
|
||||
} else {
|
||||
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(idxRef.encryptHeaderRef.get());
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
|
||||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
decrypted = decryptor.decrypt(idxRef.buffer.begin(), idxRef.buffer.size(), header, arena)->toStringRef();
|
||||
}
|
||||
|
||||
if (BG_ENCRYPT_COMPRESS_DEBUG) {
|
||||
XXH64_hash_t chksum = XXH3_64bits(decrypted.begin(), decrypted.size());
|
||||
|
@ -418,10 +451,18 @@ struct IndexBlobGranuleFileChunkRef {
|
|||
AES_256_IV_LENGTH,
|
||||
getEncryptAuthTokenMode(EncryptAuthTokenMode::ENCRYPT_HEADER_AUTH_TOKEN_MODE_SINGLE),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
BlobCipherEncryptHeader header;
|
||||
chunkRef.buffer =
|
||||
encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &header, arena)->toStringRef();
|
||||
chunkRef.encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
|
||||
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
|
||||
BlobCipherEncryptHeaderRef headerRef;
|
||||
chunkRef.buffer = encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &headerRef, arena);
|
||||
Standalone<StringRef> serialized = BlobCipherEncryptHeaderRef::toStringRef(headerRef);
|
||||
arena.dependsOn(serialized.arena());
|
||||
chunkRef.encryptHeaderRef = serialized;
|
||||
} else {
|
||||
BlobCipherEncryptHeader header;
|
||||
chunkRef.buffer =
|
||||
encryptor.encrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), &header, arena)->toStringRef();
|
||||
chunkRef.encryptHeaderRef = BlobCipherEncryptHeader::toStringRef(header, arena);
|
||||
}
|
||||
|
||||
if (BG_ENCRYPT_COMPRESS_DEBUG) {
|
||||
XXH64_hash_t chksum = XXH3_64bits(chunkRef.buffer.begin(), chunkRef.buffer.size());
|
||||
|
@ -442,14 +483,26 @@ struct IndexBlobGranuleFileChunkRef {
|
|||
TraceEvent(SevDebug, "BlobChunkDecrypt_Before").detail("Chksum", chksum);
|
||||
}
|
||||
|
||||
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(chunkRef.encryptHeaderRef.get());
|
||||
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
|
||||
DecryptBlobCipherAes256Ctr decryptor(
|
||||
eKeys.textCipherKey, eKeys.headerCipherKey, cipherKeysCtx.ivRef.begin(), BlobCipherMetrics::BLOB_GRANULE);
|
||||
StringRef decrypted =
|
||||
decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), header, arena)->toStringRef();
|
||||
StringRef decrypted;
|
||||
if (CLIENT_KNOBS->ENABLE_CONFIGURABLE_ENCRYPTION) {
|
||||
BlobCipherEncryptHeaderRef headerRef =
|
||||
BlobCipherEncryptHeaderRef::fromStringRef(chunkRef.encryptHeaderRef.get());
|
||||
validateEncryptionHeaderDetails(eKeys, headerRef, cipherKeysCtx.ivRef);
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
|
||||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
decrypted = decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), headerRef, arena);
|
||||
} else {
|
||||
BlobCipherEncryptHeader header = BlobCipherEncryptHeader::fromStringRef(chunkRef.encryptHeaderRef.get());
|
||||
validateEncryptionHeaderDetails(eKeys, header, cipherKeysCtx.ivRef);
|
||||
DecryptBlobCipherAes256Ctr decryptor(eKeys.textCipherKey,
|
||||
eKeys.headerCipherKey,
|
||||
cipherKeysCtx.ivRef.begin(),
|
||||
BlobCipherMetrics::BLOB_GRANULE);
|
||||
decrypted =
|
||||
decryptor.decrypt(chunkRef.buffer.begin(), chunkRef.buffer.size(), header, arena)->toStringRef();
|
||||
}
|
||||
|
||||
if (BG_ENCRYPT_COMPRESS_DEBUG) {
|
||||
XXH64_hash_t chksum = XXH3_64bits(decrypted.begin(), decrypted.size());
|
||||
|
|
|
@ -2745,27 +2745,6 @@ bool schemaMatch(json_spirit::mValue const& schemaValue,
|
|||
}
|
||||
}
|
||||
|
||||
void setStorageQuota(Transaction& tr, StringRef tenantGroupName, int64_t quota) {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
auto key = storageQuotaKey(tenantGroupName);
|
||||
tr.set(key, BinaryWriter::toValue<int64_t>(quota, Unversioned()));
|
||||
}
|
||||
|
||||
void clearStorageQuota(Transaction& tr, StringRef tenantGroupName) {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
auto key = storageQuotaKey(tenantGroupName);
|
||||
tr.clear(key);
|
||||
}
|
||||
|
||||
ACTOR Future<Optional<int64_t>> getStorageQuota(Transaction* tr, StringRef tenantGroupName) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state Optional<Value> v = wait(tr->get(storageQuotaKey(tenantGroupName)));
|
||||
if (!v.present()) {
|
||||
return Optional<int64_t>();
|
||||
}
|
||||
return BinaryReader::fromStringRef<int64_t>(v.get(), Unversioned());
|
||||
}
|
||||
|
||||
std::string ManagementAPI::generateErrorMessage(const CoordinatorsResult& res) {
|
||||
// Note: the error message here should not be changed if possible
|
||||
// If you do change the message here,
|
||||
|
|
|
@ -1743,13 +1743,6 @@ Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value) {
|
|||
|
||||
const Key blobManifestVersionKey = "\xff\x02/blobManifestVersion"_sr;
|
||||
|
||||
const KeyRangeRef storageQuotaKeys("\xff/storageQuota/"_sr, "\xff/storageQuota0"_sr);
|
||||
const KeyRef storageQuotaPrefix = storageQuotaKeys.begin;
|
||||
|
||||
Key storageQuotaKey(StringRef tenantGroupName) {
|
||||
return tenantGroupName.withPrefix(storageQuotaPrefix);
|
||||
}
|
||||
|
||||
const KeyRangeRef idempotencyIdKeys("\xff\x02/idmp/"_sr, "\xff\x02/idmp0"_sr);
|
||||
const KeyRef idempotencyIdsExpiredVersion("\xff\x02/idmpExpiredVersion"_sr);
|
||||
|
||||
|
|
|
@ -196,6 +196,11 @@ Key TenantMetadata::tenantMapPrivatePrefix() {
|
|||
return _prefix;
|
||||
}
|
||||
|
||||
KeyBackedProperty<int64_t>& TenantMetadata::tenantIdPrefix() {
|
||||
static KeyBackedProperty<int64_t> instance(TenantMetadata::instance().subspace.withSuffix("idPrefix"_sr));
|
||||
return instance;
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/libb64/base64decoder") {
|
||||
Standalone<StringRef> buf = makeString(100);
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace TenantAPI {
|
||||
|
@ -66,4 +67,30 @@ int64_t extractTenantIdFromKeyRef(StringRef s) {
|
|||
return TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
|
||||
}
|
||||
|
||||
// validates whether the lastTenantId and the nextTenantId share the same 2 byte prefix
|
||||
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId) {
|
||||
if (getTenantIdPrefix(nextTenantId) != getTenantIdPrefix(lastTenantId)) {
|
||||
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "TenantIdPrefixMismatch")
|
||||
.detail("CurrentTenantId", lastTenantId)
|
||||
.detail("NewTenantId", nextTenantId)
|
||||
.detail("CurrentTenantIdPrefix", getTenantIdPrefix(lastTenantId))
|
||||
.detail("NewTenantIdPrefix", getTenantIdPrefix(nextTenantId));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// returns the maximum allowable tenant id in which the 2 byte prefix is not overriden
|
||||
int64_t getMaxAllowableTenantId(int64_t curTenantId) {
|
||||
// The maximum tenant id allowed is 1 for the first 48 bits (6 bytes) with the first 16 bits (2 bytes) being the
|
||||
// tenant prefix
|
||||
int64_t maxTenantId = curTenantId | 0xFFFFFFFFFFFFLL;
|
||||
ASSERT(maxTenantId > 0);
|
||||
return maxTenantId;
|
||||
}
|
||||
|
||||
int64_t getTenantIdPrefix(int64_t tenantId) {
|
||||
return tenantId >> 48;
|
||||
}
|
||||
|
||||
} // namespace TenantAPI
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "flow/Platform.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/serialize.h"
|
||||
#include "flow/Trace.h"
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include <cinttypes>
|
||||
|
@ -171,6 +172,7 @@ struct BlobCipherDetails {
|
|||
bool operator==(const BlobCipherDetails& o) const {
|
||||
return encryptDomainId == o.encryptDomainId && baseCipherId == o.baseCipherId && salt == o.salt;
|
||||
}
|
||||
bool operator!=(const BlobCipherDetails& o) const { return !(*this == o); }
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
@ -380,6 +382,7 @@ struct BlobCipherEncryptHeaderRef {
|
|||
serializer(ar, flagsVersion, algoHeaderVersion);
|
||||
if (ar.isSerializing) {
|
||||
if (flagsVersion != 1) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedFlagVersion").detail("Version", flagsVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
|
@ -389,7 +392,13 @@ struct BlobCipherEncryptHeaderRef {
|
|||
authAlgo = (EncryptAuthTokenAlgo)f.authTokenAlgo;
|
||||
serializer(ar, f);
|
||||
|
||||
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR || algoHeaderVersion != 1) {
|
||||
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedEncryptMode").detail("Mode", encryptMode);
|
||||
throw not_implemented();
|
||||
}
|
||||
if (algoHeaderVersion != 1) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedAlgoHeaderVersion")
|
||||
.detail("Version", algoHeaderVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
|
@ -411,6 +420,7 @@ struct BlobCipherEncryptHeaderRef {
|
|||
}
|
||||
} else if (ar.isDeserializing) {
|
||||
if (flagsVersion != 1) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedFlagVersion").detail("Version", flagsVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
BlobCipherEncryptHeaderFlagsV1 f;
|
||||
|
@ -420,7 +430,13 @@ struct BlobCipherEncryptHeaderRef {
|
|||
authMode = (EncryptAuthTokenMode)f.authTokenMode;
|
||||
authAlgo = (EncryptAuthTokenAlgo)f.authTokenAlgo;
|
||||
|
||||
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR || algoHeaderVersion != 1) {
|
||||
if (encryptMode != ENCRYPT_CIPHER_MODE_AES_256_CTR) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedEncryptMode").detail("Mode", encryptMode);
|
||||
throw not_implemented();
|
||||
}
|
||||
if (algoHeaderVersion != 1) {
|
||||
TraceEvent(SevWarn, "BlobCipherEncryptHeaderUnsupportedAlgoHeaderVersion")
|
||||
.detail("Version", algoHeaderVersion);
|
||||
throw not_implemented();
|
||||
}
|
||||
|
||||
|
@ -443,6 +459,10 @@ struct BlobCipherEncryptHeaderRef {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
void validateEncryptionHeaderDetails(const BlobCipherDetails& textCipherDetails,
|
||||
const BlobCipherDetails& headerCipherDetails,
|
||||
const StringRef& ivRef) const;
|
||||
};
|
||||
|
||||
#pragma pack(pop)
|
||||
|
|
|
@ -1136,45 +1136,6 @@ struct LogMessageVersion {
|
|||
}
|
||||
};
|
||||
|
||||
struct AddressExclusion {
|
||||
IPAddress ip;
|
||||
int port;
|
||||
|
||||
AddressExclusion() : ip(0), port(0) {}
|
||||
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
|
||||
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
|
||||
|
||||
bool operator<(AddressExclusion const& r) const {
|
||||
if (ip != r.ip)
|
||||
return ip < r.ip;
|
||||
return port < r.port;
|
||||
}
|
||||
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
|
||||
|
||||
bool isWholeMachine() const { return port == 0; }
|
||||
bool isValid() const { return ip.isValid() || port != 0; }
|
||||
|
||||
bool excludes(NetworkAddress const& addr) const {
|
||||
if (isWholeMachine())
|
||||
return ip == addr.ip;
|
||||
return ip == addr.ip && port == addr.port;
|
||||
}
|
||||
|
||||
// This is for debugging and IS NOT to be used for serialization to persistant state
|
||||
std::string toString() const {
|
||||
if (!isWholeMachine())
|
||||
return formatIpPort(ip, port);
|
||||
return ip.toString();
|
||||
}
|
||||
|
||||
static AddressExclusion parse(StringRef const&);
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ip, port);
|
||||
}
|
||||
};
|
||||
|
||||
inline bool addressExcluded(std::set<AddressExclusion> const& exclusions, NetworkAddress const& addr) {
|
||||
return exclusions.count(AddressExclusion(addr.ip, addr.port)) || exclusions.count(AddressExclusion(addr.ip));
|
||||
}
|
||||
|
|
|
@ -166,10 +166,5 @@ bool schemaMatch(json_spirit::mValue const& schema,
|
|||
// storage nodes
|
||||
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);
|
||||
|
||||
// Set/clear/get the storage quota for the given tenant group
|
||||
void setStorageQuota(Transaction& tr, StringRef tenantGroupName, int64_t quota);
|
||||
void clearStorageQuota(Transaction& tr, StringRef tenantGroupName);
|
||||
ACTOR Future<Optional<int64_t>> getStorageQuota(Transaction* tr, StringRef tenantGroupName);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -451,9 +451,11 @@ Future<Void> managementClusterCheckEmpty(Transaction tr) {
|
|||
}
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName name) {
|
||||
Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName name, int64_t tenantIdPrefix) {
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
state Optional<UID> metaclusterUid;
|
||||
ASSERT(tenantIdPrefix >= TenantAPI::TENANT_ID_PREFIX_MIN_VALUE &&
|
||||
tenantIdPrefix <= TenantAPI::TENANT_ID_PREFIX_MAX_VALUE);
|
||||
|
||||
loop {
|
||||
try {
|
||||
|
@ -462,7 +464,7 @@ Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName na
|
|||
state Future<Optional<MetaclusterRegistrationEntry>> metaclusterRegistrationFuture =
|
||||
MetaclusterMetadata::metaclusterRegistration().get(tr);
|
||||
|
||||
wait(managementClusterCheckEmpty(tr));
|
||||
state Future<Void> metaclusterEmptinessCheck = managementClusterCheckEmpty(tr);
|
||||
|
||||
Optional<MetaclusterRegistrationEntry> existingRegistration = wait(metaclusterRegistrationFuture);
|
||||
if (existingRegistration.present()) {
|
||||
|
@ -477,6 +479,8 @@ Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName na
|
|||
}
|
||||
}
|
||||
|
||||
wait(metaclusterEmptinessCheck);
|
||||
|
||||
if (!metaclusterUid.present()) {
|
||||
metaclusterUid = deterministicRandom()->randomUniqueID();
|
||||
}
|
||||
|
@ -484,6 +488,8 @@ Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName na
|
|||
MetaclusterMetadata::metaclusterRegistration().set(
|
||||
tr, MetaclusterRegistrationEntry(name, metaclusterUid.get()));
|
||||
|
||||
TenantMetadata::tenantIdPrefix().set(tr, tenantIdPrefix);
|
||||
|
||||
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -491,6 +497,8 @@ Future<Optional<std::string>> createMetacluster(Reference<DB> db, ClusterName na
|
|||
}
|
||||
}
|
||||
|
||||
TraceEvent("CreatedMetacluster").detail("Name", name).detail("Prefix", tenantIdPrefix);
|
||||
|
||||
return Optional<std::string>();
|
||||
}
|
||||
|
||||
|
@ -1266,8 +1274,18 @@ struct CreateTenantImpl {
|
|||
state Future<Void> setClusterFuture = self->ctx.setCluster(tr, assignment.first);
|
||||
|
||||
// Create a tenant entry in the management cluster
|
||||
Optional<int64_t> lastId = wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.get(tr));
|
||||
self->tenantEntry.setId(lastId.orDefault(-1) + 1);
|
||||
state Optional<int64_t> lastId = wait(ManagementClusterMetadata::tenantMetadata().lastTenantId.get(tr));
|
||||
// If the last tenant id is not present fetch the prefix from system keys and make it the prefix for the next
|
||||
// allocated tenant id
|
||||
if (!lastId.present()) {
|
||||
Optional<int64_t> tenantIdPrefix = wait(TenantMetadata::tenantIdPrefix().get(tr));
|
||||
ASSERT(tenantIdPrefix.present());
|
||||
lastId = tenantIdPrefix.get() << 48;
|
||||
}
|
||||
if (!TenantAPI::nextTenantIdPrefixMatches(lastId.get(), lastId.get() + 1)) {
|
||||
throw cluster_no_capacity();
|
||||
}
|
||||
self->tenantEntry.setId(lastId.get() + 1);
|
||||
ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, self->tenantEntry.id);
|
||||
|
||||
self->tenantEntry.tenantState = TenantState::REGISTERING;
|
||||
|
|
|
@ -35,6 +35,14 @@
|
|||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// Runs a RYW transaction in a retry loop on the given Database.
|
||||
//
|
||||
// Takes a function func that accepts a Reference<ReadYourWritesTransaction> as a parameter and returns a non-Void
|
||||
// Future. This function is run inside the transaction, and when the transaction is successfully committed the result of
|
||||
// the function is returned.
|
||||
//
|
||||
// The supplied function should be idempotent. Otherwise, outcome of this function will depend on how many times the
|
||||
// transaction is retried.
|
||||
ACTOR template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())> runRYWTransaction(
|
||||
Database cx,
|
||||
|
@ -42,7 +50,6 @@ Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>())
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
// func should be idempotent; otherwise, retry will get undefined result
|
||||
state decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue()) result =
|
||||
wait(func(tr));
|
||||
wait(tr->commit());
|
||||
|
@ -53,6 +60,27 @@ Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>())
|
|||
}
|
||||
}
|
||||
|
||||
// Runs a RYW transaction in a retry loop on the given Database.
|
||||
//
|
||||
// Takes a function func that accepts a Reference<ReadYourWritesTransaction> as a parameter and returns a Void
|
||||
// Future. This function is run inside the transaction.
|
||||
//
|
||||
// The supplied function should be idempotent. Otherwise, outcome of this function will depend on how many times the
|
||||
// transaction is retried.
|
||||
ACTOR template <class Function>
|
||||
Future<Void> runRYWTransactionVoid(Database cx, Function func) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
loop {
|
||||
try {
|
||||
wait(func(tr));
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR template <class Function>
|
||||
Future<decltype(std::declval<Function>()(Reference<ReadYourWritesTransaction>()).getValue())>
|
||||
runRYWTransactionFailIfLocked(Database cx, Function func) {
|
||||
|
|
|
@ -715,10 +715,10 @@ struct WaitMetricsRequest {
|
|||
// Send a reversed range for min, max to receive an immediate report
|
||||
constexpr static FileIdentifier file_identifier = 1795961;
|
||||
Arena arena;
|
||||
// Setting the tenantInfo makes the request tenant-aware. Need to set `minVersion` to a version where
|
||||
// the tenant info was read.
|
||||
// Setting the tenantInfo makes the request tenant-aware.
|
||||
TenantInfo tenantInfo;
|
||||
Version minVersion;
|
||||
// Set `minVersion` to a version where the tenant info was read. Not needed for non-tenant-aware request.
|
||||
Version minVersion = 0;
|
||||
KeyRangeRef keys;
|
||||
StorageMetrics min, max;
|
||||
ReplyPromise<StorageMetrics> reply;
|
||||
|
|
|
@ -734,12 +734,6 @@ const Value blobRestoreArgValueFor(BlobRestoreArg args);
|
|||
Standalone<BlobRestoreArg> decodeBlobRestoreArg(ValueRef const& value);
|
||||
extern const Key blobManifestVersionKey;
|
||||
|
||||
// Storage quota per tenant
|
||||
// "\xff/storageQuota/[[tenantGroupName]]" := "[[quota]]"
|
||||
extern const KeyRangeRef storageQuotaKeys;
|
||||
extern const KeyRef storageQuotaPrefix;
|
||||
Key storageQuotaKey(StringRef tenantGroupName);
|
||||
|
||||
extern const KeyRangeRef idempotencyIdKeys;
|
||||
extern const KeyRef idempotencyIdsExpiredVersion;
|
||||
|
||||
|
|
|
@ -203,6 +203,7 @@ struct TenantMetadataSpecification {
|
|||
KeyBackedObjectProperty<TenantTombstoneCleanupData, decltype(IncludeVersion())> tombstoneCleanupData;
|
||||
KeyBackedSet<Tuple> tenantGroupTenantIndex;
|
||||
KeyBackedObjectMap<TenantGroupName, TenantGroupEntry, decltype(IncludeVersion()), NullCodec> tenantGroupMap;
|
||||
KeyBackedMap<TenantGroupName, int64_t> storageQuota;
|
||||
KeyBackedBinaryValue<Versionstamp> lastTenantModification;
|
||||
|
||||
TenantMetadataSpecification(KeyRef prefix)
|
||||
|
@ -212,6 +213,7 @@ struct TenantMetadataSpecification {
|
|||
tombstoneCleanupData(subspace.withSuffix("tombstoneCleanup"_sr), IncludeVersion()),
|
||||
tenantGroupTenantIndex(subspace.withSuffix("tenantGroup/tenantIndex/"_sr)),
|
||||
tenantGroupMap(subspace.withSuffix("tenantGroup/map/"_sr), IncludeVersion()),
|
||||
storageQuota(subspace.withSuffix("storageQuota/"_sr)),
|
||||
lastTenantModification(subspace.withSuffix("lastModification"_sr)) {}
|
||||
};
|
||||
|
||||
|
@ -227,7 +229,11 @@ struct TenantMetadata {
|
|||
static inline auto& tombstoneCleanupData() { return instance().tombstoneCleanupData; }
|
||||
static inline auto& tenantGroupTenantIndex() { return instance().tenantGroupTenantIndex; }
|
||||
static inline auto& tenantGroupMap() { return instance().tenantGroupMap; }
|
||||
static inline auto& storageQuota() { return instance().storageQuota; }
|
||||
static inline auto& lastTenantModification() { return instance().lastTenantModification; }
|
||||
// This system keys stores the tenant id prefix that is used during metacluster/standalone cluster creation. If the
|
||||
// key is not present then we will assume the prefix to be 0
|
||||
static KeyBackedProperty<int64_t>& tenantIdPrefix();
|
||||
|
||||
static Key tenantMapPrivatePrefix();
|
||||
};
|
||||
|
|
|
@ -20,8 +20,11 @@
|
|||
|
||||
#pragma once
|
||||
#include "fdbclient/ClientBooleanParams.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include <algorithm>
|
||||
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_TENANT_MANAGEMENT_ACTOR_G_H)
|
||||
#define FDBCLIENT_TENANT_MANAGEMENT_ACTOR_G_H
|
||||
#include "fdbclient/TenantManagement.actor.g.h"
|
||||
|
@ -37,6 +40,9 @@
|
|||
|
||||
namespace TenantAPI {
|
||||
|
||||
static const int TENANT_ID_PREFIX_MIN_VALUE = 0;
|
||||
static const int TENANT_ID_PREFIX_MAX_VALUE = 32767;
|
||||
|
||||
template <class Transaction>
|
||||
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, int64_t tenantId) {
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
@ -120,6 +126,9 @@ Future<Void> checkTenantMode(Transaction tr, ClusterType expectedClusterType) {
|
|||
TenantMode tenantModeForClusterType(ClusterType clusterType, TenantMode tenantMode);
|
||||
int64_t extractTenantIdFromMutation(MutationRef m);
|
||||
int64_t extractTenantIdFromKeyRef(StringRef s);
|
||||
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId);
|
||||
int64_t getMaxAllowableTenantId(int64_t curTenantId);
|
||||
int64_t getTenantIdPrefix(int64_t tenantId);
|
||||
|
||||
// Returns true if the specified ID has already been deleted and false if not. If the ID is old enough
|
||||
// that we no longer keep tombstones for it, an error is thrown.
|
||||
|
@ -216,11 +225,20 @@ createTenantTransaction(Transaction tr, TenantMapEntry tenantEntry, ClusterType
|
|||
|
||||
ACTOR template <class Transaction>
|
||||
Future<int64_t> getNextTenantId(Transaction tr) {
|
||||
Optional<int64_t> lastId = wait(TenantMetadata::lastTenantId().get(tr));
|
||||
int64_t tenantId = lastId.orDefault(-1) + 1;
|
||||
state Optional<int64_t> lastId = wait(TenantMetadata::lastTenantId().get(tr));
|
||||
if (!lastId.present()) {
|
||||
// If the last tenant id is not present fetch the tenantIdPrefix (if any) and initalize the lastId
|
||||
int64_t tenantIdPrefix = wait(TenantMetadata::tenantIdPrefix().getD(tr, Snapshot::False, 0));
|
||||
// Shift by 6 bytes to make the prefix the first two bytes of the tenant id
|
||||
lastId = tenantIdPrefix << 48;
|
||||
}
|
||||
int64_t tenantId = lastId.get() + 1;
|
||||
if (BUGGIFY) {
|
||||
tenantId += deterministicRandom()->randomSkewedUInt32(1, 1e9);
|
||||
}
|
||||
if (!TenantAPI::nextTenantIdPrefixMatches(lastId.get(), tenantId)) {
|
||||
throw cluster_no_capacity();
|
||||
}
|
||||
return tenantId;
|
||||
}
|
||||
|
||||
|
|
|
@ -134,9 +134,13 @@ private:
|
|||
TenantMetadata::tenantCount().getD(&ryw->getTransaction(), Snapshot::False, 0);
|
||||
int64_t _nextId = wait(TenantAPI::getNextTenantId(&ryw->getTransaction()));
|
||||
state int64_t nextId = _nextId;
|
||||
ASSERT(nextId > 0);
|
||||
|
||||
state std::vector<Future<bool>> createFutures;
|
||||
for (auto const& [tenant, config] : tenants) {
|
||||
if (!TenantAPI::nextTenantIdPrefixMatches(nextId - 1, nextId)) {
|
||||
throw cluster_no_capacity();
|
||||
}
|
||||
createFutures.push_back(createTenant(ryw, tenant, config, nextId++, tenantGroupNetTenantDelta));
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ require (
|
|||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
|
||||
github.com/emicklei/go-restful v2.16.0+incompatible // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
github.com/go-openapi/jsonreference v0.19.5 // indirect
|
||||
github.com/go-openapi/swag v0.19.14 // indirect
|
||||
|
|
|
@ -82,8 +82,9 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
|||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=
|
||||
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epTksy9HQ1MyrbDXSdYhM=
|
||||
github.com/emicklei/go-restful v2.16.0+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
|
|
|
@ -130,6 +130,7 @@ public:
|
|||
KillType kt,
|
||||
KillType* newKillType) const = 0;
|
||||
virtual bool isAvailable() const = 0;
|
||||
virtual std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const = 0;
|
||||
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
|
||||
virtual void displayWorkers() const;
|
||||
ProtocolVersion protocolVersion() const override = 0;
|
||||
|
|
|
@ -1422,6 +1422,19 @@ public:
|
|||
return canKillProcesses(processesLeft, processesDead, KillType::KillInstantly, nullptr);
|
||||
}
|
||||
|
||||
std::vector<AddressExclusion> getAllAddressesInDCToExclude(Optional<Standalone<StringRef>> dcId) const override {
|
||||
std::vector<AddressExclusion> addresses;
|
||||
if (!dcId.present()) {
|
||||
return addresses;
|
||||
}
|
||||
for (const auto& processInfo : getAllProcesses()) {
|
||||
if (processInfo->locality.dcId() == dcId) {
|
||||
addresses.emplace_back(processInfo->address.ip, processInfo->address.port);
|
||||
}
|
||||
}
|
||||
return addresses;
|
||||
}
|
||||
|
||||
bool datacenterDead(Optional<Standalone<StringRef>> dcId) const override {
|
||||
if (!dcId.present()) {
|
||||
return false;
|
||||
|
|
|
@ -176,6 +176,20 @@ public:
|
|||
ACTOR static Future<Void> getTeam(DDTeamCollection* self, GetTeamRequest req) {
|
||||
try {
|
||||
wait(self->checkBuildTeams());
|
||||
|
||||
if (!self->primary && !self->readyToStart.isReady()) {
|
||||
// When remote DC is not ready, DD shouldn't reply with a new team because
|
||||
// a data movement to that team can't be completed and such a move
|
||||
// may block the primary DC from reaching "storage_recovered".
|
||||
auto team = self->findTeamFromServers(req.completeSources, /*wantHealthy=*/false);
|
||||
TraceEvent("GetTeamNotReady", self->distributorId)
|
||||
.suppressFor(1.0)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("Team", team.present() ? describe(team.get()->getServerIDs()) : "");
|
||||
req.reply.send(std::make_pair(team, true));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// report the median available space
|
||||
if (now() - self->lastMedianAvailableSpaceUpdate > SERVER_KNOBS->AVAILABLE_SPACE_UPDATE_DELAY) {
|
||||
self->lastMedianAvailableSpaceUpdate = now();
|
||||
|
@ -232,30 +246,13 @@ public:
|
|||
bool wigglingBestOption = false; // best option contains server in paused wiggle state
|
||||
Optional<Reference<IDataDistributionTeam>> bestOption;
|
||||
std::vector<Reference<TCTeamInfo>> randomTeams;
|
||||
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
|
||||
|
||||
// Note: this block does not apply any filters from the request
|
||||
if (!req.wantsNewServers) {
|
||||
for (int i = 0; i < req.completeSources.size(); i++) {
|
||||
if (!self->server_info.count(req.completeSources[i])) {
|
||||
continue;
|
||||
}
|
||||
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
|
||||
for (int j = 0; j < teamList.size(); j++) {
|
||||
bool found = true;
|
||||
auto serverIDs = teamList[j]->getServerIDs();
|
||||
for (int k = 0; k < teamList[j]->size(); k++) {
|
||||
if (!completeSources.count(serverIDs[k])) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found && teamList[j]->isHealthy()) {
|
||||
bestOption = teamList[j];
|
||||
req.reply.send(std::make_pair(bestOption, foundSrc));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/true);
|
||||
if (healthyTeam.present()) {
|
||||
req.reply.send(std::make_pair(healthyTeam, foundSrc));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -355,26 +352,10 @@ public:
|
|||
// Note: this block does not apply any filters from the request
|
||||
if (!bestOption.present() && self->zeroHealthyTeams->get()) {
|
||||
// Attempt to find the unhealthy source server team and return it
|
||||
for (int i = 0; i < req.completeSources.size(); i++) {
|
||||
if (!self->server_info.count(req.completeSources[i])) {
|
||||
continue;
|
||||
}
|
||||
auto const& teamList = self->server_info[req.completeSources[i]]->getTeams();
|
||||
for (int j = 0; j < teamList.size(); j++) {
|
||||
bool found = true;
|
||||
auto serverIDs = teamList[j]->getServerIDs();
|
||||
for (int k = 0; k < teamList[j]->size(); k++) {
|
||||
if (!completeSources.count(serverIDs[k])) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
bestOption = teamList[j];
|
||||
req.reply.send(std::make_pair(bestOption, foundSrc));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
auto healthyTeam = self->findTeamFromServers(req.completeSources, /* wantHealthy=*/false);
|
||||
if (healthyTeam.present()) {
|
||||
req.reply.send(std::make_pair(healthyTeam, foundSrc));
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
// if (!bestOption.present()) {
|
||||
|
@ -383,7 +364,6 @@ public:
|
|||
// }
|
||||
|
||||
req.reply.send(std::make_pair(bestOption, foundSrc));
|
||||
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_actor_cancelled)
|
||||
|
@ -1929,6 +1909,7 @@ public:
|
|||
.detail("AddressesExcluded", excludedResults.size())
|
||||
.detail("AddressesFailed", failedResults.size())
|
||||
.detail("LocalitiesExcluded", excludedLocalityResults.size())
|
||||
.detail("Primary", self->isPrimary())
|
||||
.detail("LocalitiesFailed", failedLocalityResults.size());
|
||||
|
||||
self->restartRecruiting.trigger();
|
||||
|
@ -3023,6 +3004,7 @@ public:
|
|||
.detail("UnhealthyServers", self->unhealthyServers)
|
||||
.detail("ServerCount", self->server_info.size())
|
||||
.detail("StorageTeamSize", self->configuration.storageTeamSize)
|
||||
.detail("ZeroHealthy", self->zeroOptimalTeams.get())
|
||||
.detail("HighestPriority", highestPriority)
|
||||
.trackLatest(self->primary ? "TotalDataInFlight"
|
||||
: "TotalDataInFlightRemote"); // This trace event's trackLatest
|
||||
|
@ -3400,6 +3382,31 @@ bool DDTeamCollection::teamContainsFailedServer(Reference<TCTeamInfo> team) cons
|
|||
return false;
|
||||
}
|
||||
|
||||
Optional<Reference<IDataDistributionTeam>> DDTeamCollection::findTeamFromServers(const std::vector<UID>& servers,
|
||||
bool wantHealthy) {
|
||||
const std::set<UID> completeSources(servers.begin(), servers.end());
|
||||
|
||||
for (const auto& server : servers) {
|
||||
if (!server_info.count(server)) {
|
||||
continue;
|
||||
}
|
||||
auto const& teamList = server_info[server]->getTeams();
|
||||
for (const auto& team : teamList) {
|
||||
bool found = true;
|
||||
for (const UID& s : team->getServerIDs()) {
|
||||
if (!completeSources.count(s)) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found && (!wantHealthy || team->isHealthy())) {
|
||||
return team;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Optional<Reference<IDataDistributionTeam>>();
|
||||
}
|
||||
|
||||
Future<Void> DDTeamCollection::logOnCompletion(Future<Void> signal) {
|
||||
return DDTeamCollectionImpl::logOnCompletion(this, signal);
|
||||
}
|
||||
|
|
|
@ -364,7 +364,9 @@ public:
|
|||
|
||||
wait(self->loadDatabaseConfiguration());
|
||||
self->initDcInfo();
|
||||
TraceEvent("DDInitGotConfiguration", self->ddId).detail("Conf", self->configuration.toString());
|
||||
TraceEvent("DDInitGotConfiguration", self->ddId)
|
||||
.setMaxFieldLength(-1)
|
||||
.detail("Conf", self->configuration.toString());
|
||||
|
||||
wait(self->updateReplicaKeys());
|
||||
TraceEvent("DDInitUpdatedReplicaKeys", self->ddId).log();
|
||||
|
@ -461,7 +463,7 @@ public:
|
|||
teams.push_back(ShardsAffectedByTeamFailure::Team(iShard.remoteSrc, false));
|
||||
}
|
||||
if (traceShard) {
|
||||
TraceEvent(SevDebug, "DDInitShard")
|
||||
TraceEvent(SevDebug, "DDInitShard", self->ddId)
|
||||
.detail("Keys", keys)
|
||||
.detail("PrimarySrc", describe(iShard.primarySrc))
|
||||
.detail("RemoteSrc", describe(iShard.remoteSrc))
|
||||
|
|
|
@ -802,13 +802,16 @@ ACTOR Future<Void> waitForShardReady(StorageServerInterface server,
|
|||
try {
|
||||
GetShardStateReply rep =
|
||||
wait(server.getShardState.getReply(GetShardStateRequest(keys, mode), TaskPriority::MoveKeys));
|
||||
TraceEvent("GetShardStateReadyDD").detail("RepVersion", rep.first).detail("MinVersion", rep.second).log();
|
||||
TraceEvent("GetShardStateReadyDD", server.id())
|
||||
.detail("RepVersion", rep.first)
|
||||
.detail("MinVersion", rep.second)
|
||||
.log();
|
||||
if (rep.first >= minVersion) {
|
||||
return Void();
|
||||
}
|
||||
wait(delayJittered(SERVER_KNOBS->SHARD_READY_DELAY, TaskPriority::MoveKeys));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GetShardStateReadyError").error(e).log();
|
||||
TraceEvent("GetShardStateReadyError", server.id()).error(e).log();
|
||||
if (e.code() != error_code_timed_out) {
|
||||
if (e.code() != error_code_broken_promise)
|
||||
throw e;
|
||||
|
@ -1174,7 +1177,7 @@ ACTOR static Future<Void> finishMoveKeys(Database occ,
|
|||
tssCount += tssReady[s].isReady() && !tssReady[s].isError();
|
||||
|
||||
TraceEvent readyServersEv(SevDebug, waitInterval.end(), relocationIntervalId);
|
||||
readyServersEv.detail("ReadyServers", count);
|
||||
readyServersEv.detail("ReadyServers", count).detail("Dests", dest.size());
|
||||
if (tssReady.size()) {
|
||||
readyServersEv.detail("ReadyTSS", tssCount);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
#include <type_traits>
|
||||
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
|
@ -715,7 +714,7 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
|
|||
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
|
||||
std::string context) {
|
||||
if (g_network->isSimulated() && g_simulator->usableRegions > 1 && !g_simulator->quiesced) {
|
||||
bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
|
||||
state bool primaryDead = g_simulator->datacenterDead(g_simulator->primaryDcId);
|
||||
bool remoteDead = g_simulator->datacenterDead(g_simulator->remoteDcId);
|
||||
|
||||
// FIXME: the primary and remote can both be considered dead because excludes are not handled properly by the
|
||||
|
@ -731,6 +730,15 @@ ACTOR Future<Void> repairDeadDatacenter(Database cx,
|
|||
.detail("RemoteDead", remoteDead)
|
||||
.detail("PrimaryDead", primaryDead);
|
||||
g_simulator->usableRegions = 1;
|
||||
|
||||
state std::vector<AddressExclusion> servers = g_simulator->getAllAddressesInDCToExclude(
|
||||
primaryDead ? g_simulator->primaryDcId : g_simulator->remoteDcId);
|
||||
wait(excludeServers(cx, servers, false));
|
||||
TraceEvent(SevWarnAlways, "DisablingFearlessConfiguration")
|
||||
.detail("Location", context)
|
||||
.detail("Stage", "ServerExcluded")
|
||||
.detail("Servers", describe(servers));
|
||||
|
||||
wait(success(ManagementAPI::changeConfig(
|
||||
cx.getReference(),
|
||||
(primaryDead ? g_simulator->disablePrimary : g_simulator->disableRemote) + " repopulate_anti_quorum=1",
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
#include <limits>
|
||||
#include <string>
|
||||
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/KeyBackedTypes.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
|
@ -188,26 +190,26 @@ public:
|
|||
ACTOR static Future<Void> monitorStorageQuota(TenantCache* tenantCache) {
|
||||
TraceEvent(SevInfo, "StartingTenantCacheStorageQuotaMonitor", tenantCache->id()).log();
|
||||
|
||||
state Transaction tr(tenantCache->dbcx());
|
||||
state Reference<ReadYourWritesTransaction> tr = tenantCache->dbcx()->createTransaction();
|
||||
|
||||
loop {
|
||||
try {
|
||||
state RangeResult currentQuotas = wait(tr.getRange(storageQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state KeyBackedRangeResult<std::pair<TenantGroupName, int64_t>> currentQuotas =
|
||||
wait(TenantMetadata::storageQuota().getRange(tr, {}, {}, CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER));
|
||||
// Reset the quota for all groups; this essentially sets the quota to `max` for groups where the
|
||||
// quota might have been cleared (i.e., groups that will not be returned in `getRange` request above).
|
||||
for (auto& [group, storage] : tenantCache->tenantStorageMap) {
|
||||
storage.quota = std::numeric_limits<int64_t>::max();
|
||||
}
|
||||
for (const auto kv : currentQuotas) {
|
||||
const TenantGroupName group = kv.key.removePrefix(storageQuotaPrefix);
|
||||
const int64_t quota = BinaryReader::fromStringRef<int64_t>(kv.value, Unversioned());
|
||||
tenantCache->tenantStorageMap[group].quota = quota;
|
||||
for (const auto& [groupName, quota] : currentQuotas.results) {
|
||||
tenantCache->tenantStorageMap[groupName].quota = quota;
|
||||
}
|
||||
tr.reset();
|
||||
tr->reset();
|
||||
wait(delay(SERVER_KNOBS->TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("TenantCacheGetStorageQuotaError", tenantCache->id()).error(e);
|
||||
wait(tr.onError(e));
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -312,6 +312,10 @@ protected:
|
|||
// When configuration is changed, we may have machine teams with old storageTeamSize
|
||||
Reference<TCMachineTeamInfo> findOneRandomMachineTeam(TCServerInfo const& chosenServer) const;
|
||||
|
||||
// Returns a server team from given "servers", empty team if not found.
|
||||
// When "wantHealthy" is true, only return if the team is healthy.
|
||||
Optional<Reference<IDataDistributionTeam>> findTeamFromServers(const std::vector<UID>& servers, bool wantHealthy);
|
||||
|
||||
Future<Void> logOnCompletion(Future<Void> signal);
|
||||
|
||||
void resetLocalitySet();
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
|
||||
// version.
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "flow/BooleanParam.h"
|
||||
#if defined(NO_INTELLISENSE) && !defined(WORKLOADS_METACLUSTER_CONSISTENCY_ACTOR_G_H)
|
||||
#define WORKLOADS_METACLUSTER_CONSISTENCY_ACTOR_G_H
|
||||
|
@ -60,6 +62,8 @@ private:
|
|||
|
||||
int64_t tenantCount;
|
||||
RangeResult systemTenantSubspaceKeys;
|
||||
Optional<int64_t> tenantIdPrefix;
|
||||
Optional<int64_t> lastTenantId;
|
||||
};
|
||||
|
||||
ManagementClusterData managementMetadata;
|
||||
|
@ -76,36 +80,41 @@ private:
|
|||
try {
|
||||
managementTr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state typename transaction_future_type<typename DB::TransactionT, RangeResult>::type
|
||||
systemTenantSubspaceKeysFuture = managementTr->getRange(prefixRange(TenantMetadata::subspace()), 1);
|
||||
systemTenantSubspaceKeysFuture = managementTr->getRange(prefixRange(TenantMetadata::subspace()), 2);
|
||||
|
||||
wait(store(self->managementMetadata.metaclusterRegistration,
|
||||
MetaclusterMetadata::metaclusterRegistration().get(managementTr)) &&
|
||||
store(self->managementMetadata.dataClusters,
|
||||
MetaclusterAPI::listClustersTransaction(
|
||||
managementTr, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1)) &&
|
||||
store(self->managementMetadata.clusterCapacityTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterCapacityIndex.getRange(
|
||||
managementTr, {}, {}, CLIENT_KNOBS->MAX_DATA_CLUSTERS)) &&
|
||||
store(self->managementMetadata.clusterTenantCounts,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantCount.getRange(
|
||||
managementTr, {}, {}, CLIENT_KNOBS->MAX_DATA_CLUSTERS)) &&
|
||||
store(self->managementMetadata.clusterTenantTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantIndex.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.clusterTenantGroupTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantGroupIndex.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.tenantCount,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantCount.getD(
|
||||
managementTr, Snapshot::False, 0)) &&
|
||||
store(tenantList,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantMap.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.tenantGroups,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantGroupMap.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.systemTenantSubspaceKeys,
|
||||
safeThreadFutureToFuture(systemTenantSubspaceKeysFuture)));
|
||||
wait(
|
||||
store(self->managementMetadata.tenantIdPrefix,
|
||||
TenantMetadata::tenantIdPrefix().get(managementTr)) &&
|
||||
store(self->managementMetadata.lastTenantId,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().lastTenantId.get(managementTr)) &&
|
||||
store(self->managementMetadata.metaclusterRegistration,
|
||||
MetaclusterMetadata::metaclusterRegistration().get(managementTr)) &&
|
||||
store(self->managementMetadata.dataClusters,
|
||||
MetaclusterAPI::listClustersTransaction(
|
||||
managementTr, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1)) &&
|
||||
store(self->managementMetadata.clusterCapacityTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterCapacityIndex.getRange(
|
||||
managementTr, {}, {}, CLIENT_KNOBS->MAX_DATA_CLUSTERS)) &&
|
||||
store(self->managementMetadata.clusterTenantCounts,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantCount.getRange(
|
||||
managementTr, {}, {}, CLIENT_KNOBS->MAX_DATA_CLUSTERS)) &&
|
||||
store(self->managementMetadata.clusterTenantTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantIndex.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.clusterTenantGroupTuples,
|
||||
MetaclusterAPI::ManagementClusterMetadata::clusterTenantGroupIndex.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.tenantCount,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantCount.getD(
|
||||
managementTr, Snapshot::False, 0)) &&
|
||||
store(tenantList,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantMap.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.tenantGroups,
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantGroupMap.getRange(
|
||||
managementTr, {}, {}, metaclusterMaxTenants)) &&
|
||||
store(self->managementMetadata.systemTenantSubspaceKeys,
|
||||
safeThreadFutureToFuture(systemTenantSubspaceKeysFuture)));
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -155,6 +164,12 @@ private:
|
|||
!managementMetadata.tenantGroups.more);
|
||||
ASSERT_EQ(managementMetadata.clusterTenantGroupTuples.results.size(),
|
||||
managementMetadata.tenantGroups.results.size());
|
||||
ASSERT(managementMetadata.tenantIdPrefix.present());
|
||||
|
||||
if (managementMetadata.lastTenantId.present()) {
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(managementMetadata.lastTenantId.get()) ==
|
||||
managementMetadata.tenantIdPrefix.get());
|
||||
}
|
||||
|
||||
// Parse the cluster capacity index. Check that no cluster is represented in the index more than once.
|
||||
std::map<ClusterName, int64_t> clusterAllocatedMap;
|
||||
|
@ -203,6 +218,7 @@ private:
|
|||
std::set<TenantGroupName> processedTenantGroups;
|
||||
for (auto [tenantId, entry] : managementMetadata.tenantMap) {
|
||||
ASSERT(entry.assignedCluster.present());
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(tenantId) == managementMetadata.tenantIdPrefix.get());
|
||||
|
||||
// Each tenant should be assigned to the same cluster where it is stored in the cluster tenant index
|
||||
auto clusterItr = managementMetadata.clusterTenantMap.find(entry.assignedCluster.get());
|
||||
|
@ -241,8 +257,8 @@ private:
|
|||
ASSERT(clusterItr->second.count(name));
|
||||
}
|
||||
|
||||
// We should not be storing any data in the `\xff` tenant subspace.
|
||||
ASSERT(managementMetadata.systemTenantSubspaceKeys.empty());
|
||||
// The only key in the `\xff` tenant subspace should be the tenant id prefix
|
||||
ASSERT(managementMetadata.systemTenantSubspaceKeys.size() == 1);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> validateDataCluster(MetaclusterConsistencyCheck* self,
|
||||
|
@ -257,11 +273,13 @@ private:
|
|||
|
||||
state TenantConsistencyCheck<IDatabase> tenantConsistencyCheck(dataDb);
|
||||
wait(tenantConsistencyCheck.run());
|
||||
state Optional<int64_t> lastTenantId;
|
||||
|
||||
loop {
|
||||
try {
|
||||
dataTr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
wait(store(dataClusterRegistration, MetaclusterMetadata::metaclusterRegistration().get(dataTr)) &&
|
||||
wait(store(lastTenantId, TenantMetadata::lastTenantId().get(dataTr)) &&
|
||||
store(dataClusterRegistration, MetaclusterMetadata::metaclusterRegistration().get(dataTr)) &&
|
||||
store(dataClusterTenantList,
|
||||
TenantMetadata::tenantMap().getRange(
|
||||
dataTr, {}, {}, CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER + 1)) &&
|
||||
|
@ -275,6 +293,10 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
if (lastTenantId.present()) {
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(lastTenantId.get()) == self->managementMetadata.tenantIdPrefix.get());
|
||||
}
|
||||
|
||||
state std::map<int64_t, TenantMapEntry> dataClusterTenantMap(dataClusterTenantList.results.begin(),
|
||||
dataClusterTenantList.results.end());
|
||||
state std::map<TenantGroupName, TenantGroupEntry> dataClusterTenantGroupMap(
|
||||
|
@ -312,6 +334,7 @@ private:
|
|||
TenantMapEntry const& metaclusterEntry = self->managementMetadata.tenantMap[tenantId];
|
||||
ASSERT(!entry.assignedCluster.present());
|
||||
ASSERT_EQ(entry.id, metaclusterEntry.id);
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(entry.id) == self->managementMetadata.tenantIdPrefix.get());
|
||||
ASSERT(entry.tenantName == metaclusterEntry.tenantName);
|
||||
|
||||
ASSERT_EQ(entry.tenantState, TenantState::READY);
|
||||
|
|
|
@ -86,12 +86,17 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
|
||||
int maxTenants;
|
||||
int maxTenantGroups;
|
||||
int64_t tenantIdPrefix;
|
||||
double testDuration;
|
||||
|
||||
MetaclusterManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
maxTenants = std::min<int>(1e8 - 1, getOption(options, "maxTenants"_sr, 1000));
|
||||
maxTenantGroups = std::min<int>(2 * maxTenants, getOption(options, "maxTenantGroups"_sr, 20));
|
||||
testDuration = getOption(options, "testDuration"_sr, 120.0);
|
||||
tenantIdPrefix = getOption(options,
|
||||
"tenantIdPrefix"_sr,
|
||||
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1));
|
||||
}
|
||||
|
||||
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("Attrition"); }
|
||||
|
@ -121,8 +126,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
self->dataDbs[self->dataDbIndex.back()] =
|
||||
DataClusterData(Database::createSimulatedExtraDatabase(connectionString, cx->defaultTenant));
|
||||
}
|
||||
|
||||
wait(success(MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr)));
|
||||
wait(success(
|
||||
MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr, self->tenantIdPrefix)));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -501,6 +506,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
|
|||
ASSERT(hasCapacity);
|
||||
ASSERT(entry.assignedCluster.present());
|
||||
ASSERT(entry.tenantGroup == tenantGroup);
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(entry.id) == self->tenantIdPrefix);
|
||||
|
||||
if (tenantGroup.present()) {
|
||||
auto tenantGroupData =
|
||||
|
|
|
@ -18,7 +18,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/RunRYWTransaction.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
|
@ -88,8 +91,8 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
state int64_t quota = size - 1;
|
||||
|
||||
// Check that the quota set/get functions work as expected.
|
||||
wait(setStorageQuotaHelper(cx, self->group, quota));
|
||||
state Optional<int64_t> quotaRead = wait(getStorageQuotaHelper(cx, self->group));
|
||||
wait(setStorageQuota(cx, self->group, quota));
|
||||
state Optional<int64_t> quotaRead = wait(getStorageQuota(cx, self->group));
|
||||
ASSERT(quotaRead.present() && quotaRead.get() == quota);
|
||||
|
||||
if (!SERVER_KNOBS->STORAGE_QUOTA_ENABLED) {
|
||||
|
@ -109,9 +112,9 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
// Increase the quota or clear the quota. Check that writes to both the tenants are now able to commit.
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
quota = size * 2;
|
||||
wait(setStorageQuotaHelper(cx, self->group, quota));
|
||||
wait(setStorageQuota(cx, self->group, quota));
|
||||
} else {
|
||||
wait(clearStorageQuotaHelper(cx, self->group));
|
||||
wait(clearStorageQuota(cx, self->group));
|
||||
}
|
||||
state bool committed1 = wait(tryWrite(self, cx, self->tenant, /*bypassQuota=*/false, /*expectOk=*/true));
|
||||
ASSERT(committed1);
|
||||
|
@ -144,42 +147,30 @@ struct StorageQuotaWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> setStorageQuotaHelper(Database cx, TenantGroupName tenantGroupName, int64_t quota) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
setStorageQuota(tr, tenantGroupName, quota);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
static Future<Void> setStorageQuota(Database cx, TenantGroupName tenantGroupName, int64_t quota) {
|
||||
return runRYWTransactionVoid(cx,
|
||||
[tenantGroupName = tenantGroupName,
|
||||
quota = quota](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TenantMetadata::storageQuota().set(tr, tenantGroupName, quota);
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> clearStorageQuotaHelper(Database cx, TenantGroupName tenantGroupName) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
clearStorageQuota(tr, tenantGroupName);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
static Future<Void> clearStorageQuota(Database cx, TenantGroupName tenantGroupName) {
|
||||
return runRYWTransactionVoid(
|
||||
cx, [tenantGroupName = tenantGroupName](Reference<ReadYourWritesTransaction> tr) -> Future<Void> {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TenantMetadata::storageQuota().erase(tr, tenantGroupName);
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<int64_t>> getStorageQuotaHelper(Database cx, TenantGroupName tenantGroupName) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
state Optional<int64_t> quota = wait(getStorageQuota(&tr, tenantGroupName));
|
||||
return quota;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
static Future<Optional<int64_t>> getStorageQuota(Database cx, TenantGroupName tenantGroupName) {
|
||||
return runRYWTransaction(cx, [tenantGroupName = tenantGroupName](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
return TenantMetadata::storageQuota().get(tr, tenantGroupName);
|
||||
});
|
||||
}
|
||||
|
||||
ACTOR static Future<bool> tryWrite(StorageQuotaWorkload* self,
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
/*
|
||||
* TenantCapacityLimits.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include "fdbclient/ClusterConnectionMemoryRecord.h"
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/GenericManagementAPI.actor.h"
|
||||
#include "fdbclient/Metacluster.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/RunRYWTransaction.actor.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/TenantSpecialKeys.actor.h"
|
||||
#include "fdbclient/ThreadSafeTransaction.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/workloads/MetaclusterConsistency.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/BooleanParam.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct TenantCapacityLimits : TestWorkload {
|
||||
static constexpr auto NAME = "TenantCapacityLimits";
|
||||
|
||||
Reference<IDatabase> managementDb;
|
||||
Database dataDb;
|
||||
|
||||
int64_t tenantIdPrefix;
|
||||
bool useMetacluster = false;
|
||||
const Key specialKeysTenantMapPrefix = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT)
|
||||
.begin.withSuffix(TenantRangeImpl::submoduleRange.begin)
|
||||
.withSuffix(TenantRangeImpl::mapSubRange.begin);
|
||||
|
||||
TenantCapacityLimits(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
tenantIdPrefix = getOption(options,
|
||||
"tenantIdPrefix"_sr,
|
||||
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1));
|
||||
if (clientId == 0) {
|
||||
useMetacluster = deterministicRandom()->coinflip();
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
if (clientId == 0) {
|
||||
return _setup(cx, this);
|
||||
} else {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
ACTOR static Future<Void> _setup(Database cx, TenantCapacityLimits* self) {
|
||||
if (self->useMetacluster) {
|
||||
Reference<IDatabase> threadSafeHandle =
|
||||
wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
|
||||
|
||||
MultiVersionApi::api->selectApiVersion(cx->apiVersion.version());
|
||||
self->managementDb = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
|
||||
|
||||
wait(success(
|
||||
MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr, self->tenantIdPrefix)));
|
||||
|
||||
DataClusterEntry entry;
|
||||
entry.capacity.numTenantGroups = 1e9;
|
||||
wait(MetaclusterAPI::registerCluster(
|
||||
self->managementDb, "test_data_cluster"_sr, g_simulator->extraDatabases[0], entry));
|
||||
|
||||
ASSERT(g_simulator->extraDatabases.size() == 1);
|
||||
self->dataDb = Database::createSimulatedExtraDatabase(g_simulator->extraDatabases[0], cx->defaultTenant);
|
||||
// wait for tenant mode change on dataDB
|
||||
wait(success(self->waitDataDbTenantModeChange()));
|
||||
} else {
|
||||
self->dataDb = cx;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Optional<Key>> waitDataDbTenantModeChange() const {
|
||||
return runRYWTransaction(dataDb, [](Reference<ReadYourWritesTransaction> tr) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
return tr->get("\xff"_sr); // just a meaningless read
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
if (clientId == 0) {
|
||||
return _start(cx, this);
|
||||
} else {
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
ACTOR static Future<Void> _start(Database cx, TenantCapacityLimits* self) {
|
||||
if (self->useMetacluster) {
|
||||
// Set the max tenant id for the metacluster
|
||||
state Reference<ITransaction> tr = self->managementDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
int64_t maxTenantId = TenantAPI::getMaxAllowableTenantId(self->tenantIdPrefix << 48);
|
||||
MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().lastTenantId.set(tr, maxTenantId);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
// Attempt to create a tenant on the metacluster which should fail since the cluster is at capacity
|
||||
try {
|
||||
TenantMapEntry entry;
|
||||
entry.tenantName = "test_tenant_metacluster"_sr;
|
||||
wait(MetaclusterAPI::createTenant(self->managementDb, entry, AssignClusterAutomatically::True));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_cluster_no_capacity);
|
||||
}
|
||||
} else {
|
||||
// set the max tenant id for the standalone cluster
|
||||
state Reference<ReadYourWritesTransaction> dataTr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
loop {
|
||||
try {
|
||||
dataTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
int64_t maxTenantId = TenantAPI::getMaxAllowableTenantId(0);
|
||||
TenantMetadata::lastTenantId().set(dataTr, maxTenantId);
|
||||
wait(dataTr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(dataTr->onError(e));
|
||||
}
|
||||
}
|
||||
// Use the management database api to create a tenant which should fail since the cluster is at capacity
|
||||
try {
|
||||
wait(success(TenantAPI::createTenant(self->dataDb.getReference(), "test_tenant_management_api"_sr)));
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
ASSERT(e.code() == error_code_cluster_no_capacity);
|
||||
}
|
||||
|
||||
// use special keys to create a tenant which should fail since the cluster is at capacity
|
||||
loop {
|
||||
try {
|
||||
dataTr->reset();
|
||||
dataTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
dataTr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
TenantMapEntry entry;
|
||||
dataTr->set(self->specialKeysTenantMapPrefix.withSuffix("test_tenant_special_keys"_sr), ""_sr);
|
||||
wait(dataTr->commit());
|
||||
ASSERT(false);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_cluster_no_capacity) {
|
||||
break;
|
||||
}
|
||||
wait(dataTr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<TenantCapacityLimits> TenantCapacityLimitsFactory;
|
|
@ -101,7 +101,11 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
|
|||
self->mvDb = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
|
||||
|
||||
if (self->useMetacluster && self->clientId == 0) {
|
||||
wait(success(MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr)));
|
||||
wait(success(MetaclusterAPI::createMetacluster(
|
||||
cx.getReference(),
|
||||
"management_cluster"_sr,
|
||||
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
|
||||
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1))));
|
||||
|
||||
DataClusterEntry entry;
|
||||
entry.capacity.numTenantGroups = 1e9;
|
||||
|
|
|
@ -103,6 +103,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
Reference<IDatabase> mvDb;
|
||||
Database dataDb;
|
||||
bool hasNoTenantKey = false; // whether this workload has non-tenant key
|
||||
int64_t tenantIdPrefix = 0;
|
||||
|
||||
// This test exercises multiple different ways to work with tenants
|
||||
enum class OperationType {
|
||||
|
@ -236,7 +237,8 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
if (self->useMetacluster) {
|
||||
fmt::print("Create metacluster and register data cluster ... \n");
|
||||
// Configure the metacluster (this changes the tenant mode)
|
||||
wait(success(MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr)));
|
||||
wait(success(
|
||||
MetaclusterAPI::createMetacluster(cx.getReference(), "management_cluster"_sr, self->tenantIdPrefix)));
|
||||
|
||||
DataClusterEntry entry;
|
||||
entry.capacity.numTenantGroups = 1e9;
|
||||
|
@ -549,6 +551,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
|
||||
ASSERT(entry.present());
|
||||
ASSERT(entry.get().id > self->maxId);
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(entry.get().id) == self->tenantIdPrefix);
|
||||
ASSERT(entry.get().tenantGroup == tenantItr->second.tenantGroup);
|
||||
ASSERT(entry.get().tenantState == TenantState::READY);
|
||||
|
||||
|
@ -561,6 +564,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
wait(TenantAPI::tryGetTenant(self->dataDb.getReference(), tenantItr->first));
|
||||
ASSERT(dataEntry.present());
|
||||
ASSERT(dataEntry.get().id == entry.get().id);
|
||||
ASSERT(TenantAPI::getTenantIdPrefix(dataEntry.get().id) == self->tenantIdPrefix);
|
||||
ASSERT(dataEntry.get().tenantGroup == entry.get().tenantGroup);
|
||||
ASSERT(dataEntry.get().tenantState == TenantState::READY);
|
||||
}
|
||||
|
|
|
@ -163,4 +163,45 @@ struct NetworkAddressList {
|
|||
}
|
||||
};
|
||||
|
||||
extern std::string formatIpPort(const IPAddress& ip, uint16_t port);
|
||||
|
||||
struct AddressExclusion {
|
||||
IPAddress ip;
|
||||
int port;
|
||||
|
||||
AddressExclusion() : ip(0), port(0) {}
|
||||
explicit AddressExclusion(const IPAddress& ip) : ip(ip), port(0) {}
|
||||
explicit AddressExclusion(const IPAddress& ip, int port) : ip(ip), port(port) {}
|
||||
|
||||
bool operator<(AddressExclusion const& r) const {
|
||||
if (ip != r.ip)
|
||||
return ip < r.ip;
|
||||
return port < r.port;
|
||||
}
|
||||
bool operator==(AddressExclusion const& r) const { return ip == r.ip && port == r.port; }
|
||||
|
||||
bool isWholeMachine() const { return port == 0; }
|
||||
bool isValid() const { return ip.isValid() || port != 0; }
|
||||
|
||||
bool excludes(NetworkAddress const& addr) const {
|
||||
if (isWholeMachine())
|
||||
return ip == addr.ip;
|
||||
return ip == addr.ip && port == addr.port;
|
||||
}
|
||||
|
||||
// This is for debugging and IS NOT to be used for serialization to persistant state
|
||||
std::string toString() const {
|
||||
if (!isWholeMachine())
|
||||
return formatIpPort(ip, port);
|
||||
return ip.toString();
|
||||
}
|
||||
|
||||
static AddressExclusion parse(StringRef const&);
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ip, port);
|
||||
}
|
||||
};
|
||||
|
||||
#endif // FLOW_NETWORKADDRESS_H
|
||||
|
|
|
@ -417,6 +417,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES slow/SwizzledRollbackTimeLapseIncrement.toml)
|
||||
add_fdb_test(TEST_FILES slow/SwizzledTenantManagement.toml)
|
||||
add_fdb_test(TEST_FILES slow/SwizzledTenantManagementMetacluster.toml)
|
||||
add_fdb_test(TEST_FILES slow/TenantCapacityLimits.toml)
|
||||
add_fdb_test(TEST_FILES slow/TenantManagement.toml)
|
||||
add_fdb_test(TEST_FILES slow/TenantManagementConcurrency.toml)
|
||||
add_fdb_test(TEST_FILES slow/VersionStampBackupToDB.toml)
|
||||
|
|
|
@ -4,8 +4,7 @@ maxTLogVersion=6
|
|||
disableHostname=true
|
||||
disableEncryption=true
|
||||
storageEngineExcludeTypes=[3, 4]
|
||||
allowDefaultTenant=false
|
||||
allowCreatingTenants=false
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
|
@ -4,8 +4,7 @@ maxTLogVersion = 6
|
|||
disableTss = true
|
||||
disableHostname = true
|
||||
disableEncryption = true
|
||||
allowDefaultTenant = false
|
||||
allowCreatingTenants = false
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
|
@ -4,8 +4,7 @@ maxTLogVersion=6
|
|||
disableHostname=true
|
||||
disableEncryption=true
|
||||
storageEngineExcludeTypes=[4]
|
||||
allowDefaultTenant=false
|
||||
allowCreatingTenants=false
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
|
@ -3,6 +3,7 @@ maxTLogVersion = 6
|
|||
disableTss = true
|
||||
disableHostname = true
|
||||
disableEncryption = true
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
|
@ -3,8 +3,7 @@ extraMachineCountDC = 2
|
|||
maxTLogVersion=6
|
||||
disableHostname=true
|
||||
storageEngineExcludeTypes=[4]
|
||||
allowDefaultTenant=false
|
||||
allowCreatingTenants=false
|
||||
tenantModes=['disabled']
|
||||
|
||||
[[knobs]]
|
||||
# This can be removed once the lower bound of this downgrade test is a version that understands the new protocol
|
||||
|
|
|
@ -12,6 +12,7 @@ encryptModes = ['domain_aware', 'cluster_aware']
|
|||
bg_metadata_source = "tenant"
|
||||
bg_key_tuple_truncate_offset = 1
|
||||
enable_encryption = true
|
||||
enable_configurable_encryption = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleCorrectness'
|
||||
|
|
|
@ -9,6 +9,7 @@ encryptModes = ['domain_aware', 'cluster_aware']
|
|||
[[knobs]]
|
||||
bg_metadata_source = "tenant"
|
||||
enable_encryption = true
|
||||
enable_configurable_encryption = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleCorrectness'
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
[configuration]
|
||||
allowDefaultTenant = false
|
||||
tenantModes = ['optional', 'required']
|
||||
allowCreatingTenants = false
|
||||
extraDatabaseMode = 'Single'
|
||||
|
||||
[[test]]
|
||||
testTitle = 'TenantCapacityLimitTest'
|
||||
clearAfterTest = true
|
||||
timeout = 2100
|
||||
runSetup = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'TenantCapacityLimits'
|
Loading…
Reference in New Issue