Merge branch 'main' of github.com:apple/foundationdb into sfc-gh-dadkins/tlog-queue-metrics

This commit is contained in:
Dan Adkins 2023-02-28 09:26:03 -08:00
commit 035314b277
39 changed files with 1544 additions and 295 deletions

View File

@ -504,6 +504,57 @@ An |database-blurb1| Modifications to a database are performed via transactions.
Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated. By default, this value is updated every second.
.. function:: FDBFuture* fdb_database_get_client_status(FDBDatabase* db)
Returns a JSON string containing database client-side status information. At the top level the report describes the status of the
Multi-Version Client database - its initialization state, the protocol version, the available client versions. The report schema is:
.. code-block::
{ "Healthy": <overall health status, true or false>,
"InitializationState": <initializing|initialization_failed|created|incompatible|closed>,
"InitializationError": <initialization error code, present if initialization failed>,
"ProtocolVersion" : <determined protocol version of the cluster, present if determined>,
"ConnectionRecord" : <connection file name or connection string>,
"DatabaseStatus" : <Native Database status report, present if successfully retrieved>,
"ErrorRetrievingDatabaseStatus" : <error code of retrieving status of the Native Database, present if failed>,
"AvailableClients" : [
{ "ProtocolVersion" : <protocol version of the client>,
"ReleaseVersion" : <release version of the client>,
"ThreadIndex" : <the index of the client thread serving this database>
}, ...
]
}
The status of the actual version-specific database is embedded within the ``DatabaseStatus`` attribute. It lists the addresses of various FDB
server roles the client is aware of and their connection status. The schema of the ``DatabaseStatus`` object is:
.. code-block::
{ "Healthy" : <overall health status: true or false>,
"ClusterID" : <UUID>,
"Coordinators" : [ <address>, ... ],
"CurrentCoordinator" : <address>
"GrvProxies" : [ <address>, ... ],
"CommitProxies" : [ <address>, ... ],
"StorageServers" : [ { "Address" : <address>, "SSID" : <Storage Server ID> }, ... ],
"Connections" : [
{ "Address" : <address>,
"Status" : <failed|connected|connecting|disconnected>,
"Compatible" : <is protocol version compatible with the client>,
"ConnectFailedCount" : <number of failed connection attempts>,
"LastConnectTime" : <elapsed time in seconds since the last connection attempt>,
"PingCount" : <total ping count>,
"PingTimeoutCount" : <number of ping timeouts>,
"BytesSampleTime" : <elapsed time of the reported the bytes received and sent values>,
"BytesReceived" : <bytes received>,
"BytesSent" : <bytes sent>,
"ProtocolVersion" : <protocol version of the server, missing if unknown>
},
...
]
}
Tenant
======

View File

@ -183,6 +183,23 @@ The ``defaulttenant`` command configures ``fdbcli`` to run its commands without
The active tenant cannot be changed while a transaction (using ``begin``) is open.
datadistribution
----------------
The ``datadistribution`` command is used to enable or disable functionalities of data distributor.
Its syntax is
- ``datadistribution <on|off>``. Fully enable or disable the data distributor.
- ``datadistribution <enable|disable> <ssfailure|rebalance|rebalance_disk|rebalance_read>``. Enable or disable part of data distribution features.
ssfailure
Whether storage server failure will trigger data movement for replica repairing.
rebalance_disk
If enabled, data distributor will do data movement to make sure every storage server use similar disk space.
rebalance_read
If enabled, data distributor will do data movement to balance the read bytes bandwidth among storage servers. This feature needs ``knob_read_sampling_enabled=true``.
rebalance
Control both rebalance_disk and rebalance_read.
exclude
-------
@ -428,7 +445,7 @@ The available process classes are ``unset``, ``storage``, ``transaction``, ``res
setknob
-------
The ``setknob`` command can be used to set knobs dynamically. Its syntax is ``setknob <KNOBNAME> <KNOBVALUE> [CONFIGCLASS]``. If not present in a ``begin\commit`` block, the CLI will prompt for a description of the change.
The ``setknob`` command can be used to set knobs dynamically. Its syntax is ``setknob <KNOBNAME> <KNOBVALUE> [CONFIGCLASS]``. If not present in a ``begin\commit`` block, the CLI will prompt for a description of the change.
Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
@ -480,7 +497,7 @@ create
Creates a new tenant in the cluster.
``NAME`` - The desired name of the tenant. The name can be any byte string that does not begin with the ``\xff`` byte.
``NAME`` - The desired name of the tenant. The name can be any byte string that does not begin with the ``\xff`` byte.
``TENANT_GROUP`` - The tenant group the tenant will be placed in.

View File

@ -9,6 +9,7 @@ Release Notes
Features
--------
- Read-aware Data Distribution feature is developed for balance the read bytes bandwidth among storage servers.
Performance
-----------

View File

@ -461,28 +461,38 @@ ACTOR Future<bool> tenantGetCommand(Reference<IDatabase> db, std::vector<StringR
int64_t id;
std::string prefix;
std::string tenantState;
std::string tenantLockState;
std::string lockId;
std::string tenantGroup;
std::string assignedCluster;
std::string error;
doc.get("id", id);
doc.get("prefix.printable", prefix);
doc.get("lock_state", tenantLockState);
bool hasTenantState = doc.tryGet("tenant_state", tenantState);
bool hasLockId = doc.tryGet("lock_id", lockId);
bool hasTenantGroup = doc.tryGet("tenant_group.printable", tenantGroup);
bool hasAssignedCluster = doc.tryGet("assigned_cluster.printable", assignedCluster);
bool hasError = doc.tryGet("error", error);
fmt::print(" id: {}\n", id);
fmt::print(" prefix: {}\n", printable(prefix).c_str());
fmt::print(" prefix: {}\n", printable(prefix));
if (hasTenantState) {
fmt::print(" tenant state: {}\n", printable(tenantState).c_str());
fmt::print(" tenant state: {}\n", printable(tenantState));
}
fmt::print(" lock state: {}\n", tenantLockState);
if (hasLockId) {
fmt::print(" lock id: {}\n", lockId);
}
if (hasTenantGroup) {
fmt::print(" tenant group: {}\n", tenantGroup.c_str());
fmt::print(" tenant group: {}\n", tenantGroup);
}
if (hasAssignedCluster) {
fmt::print(" assigned cluster: {}\n", printable(assignedCluster).c_str());
fmt::print(" assigned cluster: {}\n", printable(assignedCluster));
}
if (hasError) {
fmt::print(" error: {}\n", error);
@ -664,6 +674,101 @@ ACTOR Future<bool> tenantRenameCommand(Reference<IDatabase> db, std::vector<Stri
return true;
}
ACTOR Future<bool> tenantLockCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
state UID uid;
state Reference<ITransaction> tr;
state StringRef name;
state Key nameKey;
state TenantAPI::TenantLockState desiredLockState;
state int uidIdx;
if (tokens[1] == "lock"_sr && (tokens.size() < 3 || tokens.size() > 5)) {
fmt::print("Usage: tenant lock <NAME> [w|rw] [UID]\n\n");
fmt::print("Locks a tenant for read-write or read-only with a given UID.\n");
fmt::print("By default a read-write lock is created.\n");
fmt::print("If no UID is passed, fdbcli will generate one.\n");
fmt::print("UID has to be a 16-byte number represented in hex.\n");
return false;
} else if (tokens[1] == "unlock"_sr && tokens.size() != 4) {
fmt::print("Usage: tenant unlock <NAME> <UID>\n\n");
return false;
}
name = tokens[2];
nameKey = tenantMapSpecialKeyRange.begin.withSuffix(name);
if (tokens[1] == "unlock"_sr) {
uidIdx = 3;
desiredLockState = TenantAPI::TenantLockState::UNLOCKED;
} else {
uidIdx = 4;
if (tokens.size() > 3) {
if (tokens[3] == "w"_sr) {
desiredLockState = TenantAPI::TenantLockState::READ_ONLY;
} else if (tokens[3] == "rw"_sr) {
desiredLockState = TenantAPI::TenantLockState::LOCKED;
} else {
fmt::print(stderr, "ERROR: Invalid lock type `{}'\n", tokens[3]);
return false;
}
} else {
desiredLockState = TenantAPI::TenantLockState::LOCKED;
}
}
if (tokens.size() > uidIdx) {
try {
auto uidStr = tokens[uidIdx].toString();
if (uidStr.size() < 32) {
// UID::fromString expects the string to be exactly 32 characters long, but the uid might be shorter
// if the most significant byte[s] are 0. So we need to pad
uidStr.insert(0, 32 - uidStr.size(), '0');
}
uid = UID::fromStringThrowsOnFailure(uidStr);
} catch (Error& e) {
ASSERT(e.code() == error_code_operation_failed);
fmt::print(stderr, "ERROR: Couldn't not parse `{}' as a valid UID", tokens[uidIdx].toString());
return false;
}
} else {
ASSERT(desiredLockState != TenantAPI::TenantLockState::UNLOCKED);
uid = deterministicRandom()->randomUniqueID();
}
tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
wait(MetaclusterAPI::changeTenantLockState(db, name, desiredLockState, uid));
} else {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
auto f = tr->get(nameKey);
Optional<Value> entry = wait(safeThreadFutureToFuture(f));
if (!entry.present()) {
fmt::print(stderr, "ERROR: Tenant `{}' does not exist\n", name);
return false;
}
auto tenantId = getTenantId(entry.get());
wait(TenantAPI::changeLockState(tr.getPtr(), tenantId, desiredLockState, uid));
wait(safeThreadFutureToFuture(tr->commit()));
}
if (desiredLockState != TenantAPI::TenantLockState::UNLOCKED) {
fmt::print("Locked tenant `{}' with UID `{}'\n", name.toString(), uid.toString());
} else {
fmt::print("Unlocked tenant `{}'\n", name.toString());
}
return true;
} catch (Error& e) {
if (e.code() == error_code_tenant_locked) {
if (desiredLockState == TenantAPI::TenantLockState::UNLOCKED) {
fmt::print(stderr, "ERROR: Wrong lock UID\n");
} else {
fmt::print(stderr, "ERROR: Tenant locked with a different UID\n");
}
return false;
}
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
// tenant command
Future<bool> tenantCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() == 1) {
@ -683,6 +788,10 @@ Future<bool> tenantCommand(Reference<IDatabase> db, std::vector<StringRef> token
return tenantConfigureCommand(db, tokens);
} else if (tokencmp(tokens[1], "rename")) {
return tenantRenameCommand(db, tokens);
} else if (tokencmp(tokens[1], "lock")) {
return tenantLockCommand(db, tokens);
} else if (tokencmp(tokens[1], "unlock")) {
return tenantLockCommand(db, tokens);
} else {
printUsage(tokens[0]);
return true;
@ -699,14 +808,15 @@ Future<bool> tenantCommandForwarder(Reference<IDatabase> db, std::vector<StringR
}
return tenantCommand(db, forwardedTokens);
} // namespace fdb_cli
}
void tenantGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "create", "delete", "deleteId", "list", "get", "configure", "rename", nullptr };
const char* opts[] = { "create", "delete", "deleteId", "list", "get",
"configure", "rename", "lock", "unlock", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 3 && tokencmp(tokens[1], "create")) {
const char* opts[] = { "tenant_group=", nullptr };
@ -725,6 +835,11 @@ void tenantGenerator(const char* text,
const char* opts[] = { "ignore_capacity_limit", nullptr };
arrayGenerator(text, line, opts, lc);
}
} else if (tokencmp(tokens[1], "lock")) {
if (tokens.size() == 3) {
const char* opts[] = { "w", "rw", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
}
@ -769,6 +884,12 @@ std::vector<const char*> tenantHintGenerator(std::vector<StringRef> const& token
} else if (tokencmp(tokens[1], "rename") && tokens.size() < 4) {
static std::vector<const char*> opts = { "<OLD_NAME>", "<NEW_NAME>" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "lock") && tokens.size() < 5) {
static std::vector<const char*> opts = { "<NAME>", "[w|rw]", "[UID]" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "unlock") && tokens.size() < 4) {
static std::vector<const char*> opts = { "<NAME>", "<UID>" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else {
return {};
}
@ -781,7 +902,9 @@ CommandFactory tenantRegisterFactory("tenant",
"`list' prints a list of tenants in the cluster.\n"
"`get' prints the metadata for a particular tenant.\n"
"`configure' modifies the configuration for a tenant.\n"
"`rename' changes the name of a tenant.\n"),
"`rename' changes the name of a tenant.\n"
"`lock` locks a tenant.\n"
"`unlock` unlocks a tenant.\n"),
&tenantGenerator,
&tenantHintGenerator);

View File

@ -1009,15 +1009,74 @@ def tenant_list(logger):
assert output == "ERROR: unrecognized tenant state(s) `14z'."
@enable_logging()
def tenant_lock(logger):
logger.debug("Create tenant")
setup_tenants(["tenant"])
logger.debug("Write test key")
run_fdbcli_command("usetenant tenant; writemode on; set foo bar")
logger.debug("Lock tenant in read-only mode")
output = run_fdbcli_command("tenant lock tenant w")
output = output.strip()
logger.debug("output: {}".format(output))
start_string = "Locked tenant `tenant' with UID `"
assert output.startswith(start_string)
assert output.endswith("'")
uid_str = output[len(start_string) : -1]
assert len(uid_str) == 32
logger.debug("Verify tenant is readable")
output = run_fdbcli_command("usetenant tenant; get foo").strip()
logger.debug("output: {}".format(output))
lines = output.split("\n")
assert lines[-1] == "`foo' is `bar'"
logger.debug("Verify tenant is NOT writeable")
output = run_fdbcli_command_and_get_error(
"usetenant tenant; writemode on; set foo bar2"
).strip()
logger.debug("output: {}".format(output))
assert output == "ERROR: Tenant is locked (2144)"
logger.debug('Unlock tenant with UID "{}"'.format(uid_str))
output = run_fdbcli_command("tenant unlock tenant {}".format(uid_str))
logger.debug("output: {}".format(output.strip()))
assert output.strip() == "Unlocked tenant `tenant'"
logger.debug("Lock tenant in rw mode")
output = run_fdbcli_command("tenant lock tenant rw {}".format(uid_str)).strip()
logger.debug("output: {}".format(output))
assert output == "Locked tenant `tenant' with UID `{}'".format(uid_str)
logger.debug("Verify tenant is NOT readable")
output = run_fdbcli_command_and_get_error("usetenant tenant; get foo").strip()
logger.debug("output: {}".format(output))
assert output == "ERROR: Tenant is locked (2144)"
logger.debug("Verify tenant is NOT writeable")
output = run_fdbcli_command_and_get_error(
"usetenant tenant; writemode on; set foo bar2"
).strip()
logger.debug("output: {}".format(output))
assert output == "ERROR: Tenant is locked (2144)"
logger.debug("Unlock tenant")
output = run_fdbcli_command("tenant unlock tenant {}".format(uid_str))
logger.debug("output: {}".format(output.strip()))
assert output.strip() == "Unlocked tenant `tenant'"
@enable_logging()
def tenant_get(logger):
setup_tenants(["tenant", "tenant2 tenant_group=tenant_group2"])
output = run_fdbcli_command("tenant get tenant")
lines = output.split("\n")
assert len(lines) == 2
assert len(lines) == 3
assert lines[0].strip().startswith("id: ")
assert lines[1].strip().startswith("prefix: ")
assert lines[2].strip() == "lock state: unlocked"
output = run_fdbcli_command("tenant get tenant JSON")
json_output = json.loads(output, strict=False)
@ -1034,13 +1093,16 @@ def tenant_get(logger):
assert len(json_output["tenant"]["prefix"]) == 2
assert "base64" in json_output["tenant"]["prefix"]
assert "printable" in json_output["tenant"]["prefix"]
assert "lock_state" in json_output["tenant"]
assert json_output["tenant"]["lock_state"] == "unlocked"
output = run_fdbcli_command("tenant get tenant2")
lines = output.split("\n")
assert len(lines) == 3
assert len(lines) == 4
assert lines[0].strip().startswith("id: ")
assert lines[1].strip().startswith("prefix: ")
assert lines[2].strip() == "tenant group: tenant_group2"
assert lines[2].strip() == "lock state: unlocked"
assert lines[3].strip() == "tenant group: tenant_group2"
output = run_fdbcli_command("tenant get tenant2 JSON")
json_output = json.loads(output, strict=False)
@ -1054,6 +1116,8 @@ def tenant_get(logger):
assert "base64" in json_output["tenant"]["name"]
assert "printable" in json_output["tenant"]["name"]
assert "prefix" in json_output["tenant"]
assert "lock_state" in json_output["tenant"]
assert json_output["tenant"]["lock_state"] == "unlocked"
assert "tenant_group" in json_output["tenant"]
assert len(json_output["tenant"]["tenant_group"]) == 2
assert "base64" in json_output["tenant"]["tenant_group"]
@ -1074,8 +1138,8 @@ def tenant_configure(logger):
output = run_fdbcli_command("tenant get tenant")
lines = output.split("\n")
assert len(lines) == 3
assert lines[2].strip() == "tenant group: tenant_group1"
assert len(lines) == 4
assert lines[3].strip() == "tenant group: tenant_group1"
output = run_fdbcli_command("tenant configure tenant unset tenant_group")
assert output == "The configuration for tenant `tenant' has been updated"
@ -1087,7 +1151,7 @@ def tenant_configure(logger):
output = run_fdbcli_command("tenant get tenant")
lines = output.split("\n")
assert len(lines) == 2
assert len(lines) == 3
output = run_fdbcli_command_and_get_error(
"tenant configure tenant tenant_group=tenant_group1 tenant_group=tenant_group2"
@ -1330,6 +1394,7 @@ def tenants():
run_tenant_test(tenant_old_commands)
run_tenant_test(tenant_group_list)
run_tenant_test(tenant_group_get)
run_tenant_test(tenant_lock)
@enable_logging()

View File

@ -19,6 +19,8 @@
*/
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "flow/BooleanParam.h"
#ifdef BUILD_AZURE_BACKUP
#include "fdbclient/BackupContainerAzureBlobStore.h"
#endif
@ -34,6 +36,8 @@
#include "flow/actorcompiler.h" // This must be the last #include.
FDB_DEFINE_BOOLEAN_PARAM(IncludeKeyRangeMap);
class BackupContainerFileSystemImpl {
public:
// TODO: Do this more efficiently, as the range file list for a snapshot could potentially be hundreds of
@ -159,7 +163,8 @@ public:
ACTOR static Future<Void> writeKeyspaceSnapshotFile(Reference<BackupContainerFileSystem> bc,
std::vector<std::string> fileNames,
std::vector<std::pair<Key, Key>> beginEndKeys,
int64_t totalBytes) {
int64_t totalBytes,
IncludeKeyRangeMap includeKeyRangeMap) {
ASSERT(!fileNames.empty() && fileNames.size() == beginEndKeys.size());
state Version minVer = std::numeric_limits<Version>::max();
@ -188,11 +193,13 @@ public:
doc.create("beginVersion") = minVer;
doc.create("endVersion") = maxVer;
auto ranges = doc.subDoc("keyRanges");
for (int i = 0; i < beginEndKeys.size(); i++) {
auto fileDoc = ranges.subDoc(fileNames[i], /*split=*/false);
fileDoc.create("beginKey") = beginEndKeys[i].first.toString();
fileDoc.create("endKey") = beginEndKeys[i].second.toString();
if (includeKeyRangeMap) {
auto ranges = doc.subDoc("keyRanges");
for (int i = 0; i < beginEndKeys.size(); i++) {
auto fileDoc = ranges.subDoc(fileNames[i], /*split=*/false);
fileDoc.create("beginKey") = beginEndKeys[i].first.toString();
fileDoc.create("endKey") = beginEndKeys[i].second.toString();
}
}
wait(yield());
@ -941,13 +948,9 @@ public:
std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>> results =
wait(bc->readKeyspaceSnapshot(snapshots[i]));
// Old backup does not have metadata about key ranges and can not be filtered with key ranges.
if (keyRangesFilter.size() && results.second.empty() && !results.first.empty()) {
throw backup_not_filterable_with_key_ranges();
}
// Filter by keyRangesFilter.
if (keyRangesFilter.empty()) {
// If there is no key ranges filter for the restore OR if the snapshot contains no per-file key range info
// then return all of the range files
if (keyRangesFilter.empty() || results.second.empty()) {
restorable.ranges = std::move(results.first);
restorable.keyRanges = std::move(results.second);
minKeyRangeVersion = snapshots[i].beginVersion;
@ -1216,9 +1219,10 @@ BackupContainerFileSystem::readKeyspaceSnapshot(KeyspaceSnapshotFile snapshot) {
Future<Void> BackupContainerFileSystem::writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) {
int64_t totalBytes,
IncludeKeyRangeMap includeKeyRangeMap) {
return BackupContainerFileSystemImpl::writeKeyspaceSnapshotFile(
Reference<BackupContainerFileSystem>::addRef(this), fileNames, beginEndKeys, totalBytes);
Reference<BackupContainerFileSystem>::addRef(this), fileNames, beginEndKeys, totalBytes, includeKeyRangeMap);
};
Future<std::vector<LogFile>> BackupContainerFileSystem::listLogFiles(Version beginVersion,
@ -1738,8 +1742,10 @@ ACTOR Future<Void> testBackupContainer(std::string url,
wait(testWriteSnapshotFile(range, begin, end, blockSize));
if (deterministicRandom()->random01() < .2) {
writes.push_back(c->writeKeyspaceSnapshotFile(
snapshots.rbegin()->second, snapshotBeginEndKeys.rbegin()->second, snapshotSizes.rbegin()->second));
writes.push_back(c->writeKeyspaceSnapshotFile(snapshots.rbegin()->second,
snapshotBeginEndKeys.rbegin()->second,
snapshotSizes.rbegin()->second,
IncludeKeyRangeMap(BUGGIFY)));
snapshots[v] = {};
snapshotBeginEndKeys[v] = {};
snapshotSizes[v] = 0;

View File

@ -3148,6 +3148,7 @@ struct BackupSnapshotManifest : BackupTaskFuncBase {
Reference<Task> task) {
state BackupConfig config(task);
state Reference<IBackupContainer> bc;
state DatabaseConfiguration dbConfig;
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
@ -3163,6 +3164,7 @@ struct BackupSnapshotManifest : BackupTaskFuncBase {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(taskBucket->keepRunning(tr, task));
wait(store(dbConfig, getDatabaseConfiguration(cx)));
if (!bc) {
// Backup container must be present if we're still here
@ -3186,8 +3188,8 @@ struct BackupSnapshotManifest : BackupTaskFuncBase {
}
}
std::vector<std::string> files;
std::vector<std::pair<Key, Key>> beginEndKeys;
state std::vector<std::string> files;
state std::vector<std::pair<Key, Key>> beginEndKeys;
state Version maxVer = 0;
state Version minVer = std::numeric_limits<Version>::max();
state int64_t totalBytes = 0;
@ -3228,7 +3230,14 @@ struct BackupSnapshotManifest : BackupTaskFuncBase {
}
Params.endVersion().set(task, maxVer);
wait(bc->writeKeyspaceSnapshotFile(files, beginEndKeys, totalBytes));
// Avoid keyRange filtering optimization for 'manifest' files
wait(bc->writeKeyspaceSnapshotFile(files,
beginEndKeys,
totalBytes,
dbConfig.encryptionAtRestMode.isEncryptionEnabled()
? IncludeKeyRangeMap::False
: IncludeKeyRangeMap::True));
TraceEvent(SevInfo, "FileBackupWroteSnapshotManifest")
.detail("BackupUID", config.getUid())

View File

@ -152,6 +152,7 @@ TenantMapEntry MetaclusterTenantMapEntry::toTenantMapEntry() const {
TenantMapEntry entry;
entry.tenantName = tenantName;
entry.tenantLockState = tenantLockState;
entry.tenantLockId = tenantLockId;
entry.tenantGroup = tenantGroup;
entry.configurationSequenceNum = configurationSequenceNum;
if (id >= 0) {
@ -165,6 +166,7 @@ MetaclusterTenantMapEntry MetaclusterTenantMapEntry::fromTenantMapEntry(TenantMa
MetaclusterTenantMapEntry entry;
entry.tenantName = source.tenantName;
entry.tenantLockState = source.tenantLockState;
entry.tenantLockId = source.tenantLockId;
entry.tenantGroup = source.tenantGroup;
entry.configurationSequenceNum = source.configurationSequenceNum;
if (source.id >= 0) {
@ -195,6 +197,10 @@ std::string MetaclusterTenantMapEntry::toJson() const {
}
tenantEntry["lock_state"] = TenantAPI::tenantLockStateToString(tenantLockState);
if (tenantLockId.present()) {
tenantEntry["lock_id"] = tenantLockId.get().toString();
}
if (tenantState == MetaclusterAPI::TenantState::RENAMING) {
ASSERT(renameDestination.present());
tenantEntry["rename_destination"] = binaryToJson(renameDestination.get());
@ -206,11 +212,13 @@ std::string MetaclusterTenantMapEntry::toJson() const {
}
bool MetaclusterTenantMapEntry::matchesConfiguration(MetaclusterTenantMapEntry const& other) const {
return tenantGroup == other.tenantGroup;
return tenantGroup == other.tenantGroup && tenantLockState == other.tenantLockState &&
tenantLockId == other.tenantLockId;
}
bool MetaclusterTenantMapEntry::matchesConfiguration(TenantMapEntry const& other) const {
return tenantGroup == other.tenantGroup;
return tenantGroup == other.tenantGroup && tenantLockState == other.tenantLockState &&
tenantLockId == other.tenantLockId;
}
void MetaclusterTenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value> value) {
@ -226,9 +234,10 @@ void MetaclusterTenantMapEntry::configure(Standalone<StringRef> parameter, Optio
bool MetaclusterTenantMapEntry::operator==(MetaclusterTenantMapEntry const& other) const {
return id == other.id && tenantName == other.tenantName && tenantState == other.tenantState &&
tenantLockState == other.tenantLockState && tenantGroup == other.tenantGroup &&
assignedCluster == other.assignedCluster && configurationSequenceNum == other.configurationSequenceNum &&
renameDestination == other.renameDestination && error == other.error;
tenantLockState == other.tenantLockState && tenantLockId == other.tenantLockId &&
tenantGroup == other.tenantGroup && assignedCluster == other.assignedCluster &&
configurationSequenceNum == other.configurationSequenceNum && renameDestination == other.renameDestination &&
error == other.error;
}
bool MetaclusterTenantMapEntry::operator!=(MetaclusterTenantMapEntry const& other) const {

View File

@ -6691,7 +6691,8 @@ ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState, CommitT
e.code() != error_code_batch_transaction_throttled && e.code() != error_code_tag_throttled &&
e.code() != error_code_process_behind && e.code() != error_code_future_version &&
e.code() != error_code_tenant_not_found && e.code() != error_code_illegal_tenant_access &&
e.code() != error_code_proxy_tag_throttled && e.code() != error_code_storage_quota_exceeded) {
e.code() != error_code_proxy_tag_throttled && e.code() != error_code_storage_quota_exceeded &&
e.code() != error_code_tenant_locked) {
TraceEvent(SevError, "TryCommitError").error(e);
}
if (trState->trLogInfo)

View File

@ -168,9 +168,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD, 960 ); if( randomize && BUGGIFY ) PRIORITY_ENFORCE_MOVE_OUT_OF_PHYSICAL_SHARD = 360; // Set as the lowest priority
// Data distribution
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
init( MAX_PHYSICAL_SHARD_BYTES, 500000000 ); // 500 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
init( SHARD_ENCODE_LOCATION_METADATA, false ); if( randomize && BUGGIFY ) SHARD_ENCODE_LOCATION_METADATA = true;
init( ENABLE_DD_PHYSICAL_SHARD, false ); // EXPERIMENTAL; If true, SHARD_ENCODE_LOCATION_METADATA must be true; When true, optimization of data move between DCs is disabled
init( MAX_PHYSICAL_SHARD_BYTES, 10000000 ); // 10 MB; for ENABLE_DD_PHYSICAL_SHARD; smaller leads to larger number of physicalShard per storage server
init( PHYSICAL_SHARD_METRICS_DELAY, 300.0 ); // 300 seconds; for ENABLE_DD_PHYSICAL_SHARD
init( ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME, 600.0 ); if( randomize && BUGGIFY ) ANONYMOUS_PHYSICAL_SHARD_TRANSITION_TIME = 0.0; // 600 seconds; for ENABLE_DD_PHYSICAL_SHARD
init( READ_REBALANCE_CPU_THRESHOLD, 15.0 );
@ -471,6 +471,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_MAX_BACKGROUND_JOBS, 2 ); // RocksDB default.
init( ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD, 21600 ); // 6h, RocksDB default.
init( ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY, isSimulated ? 10.0 : 300.0 ); // Delays shard clean up, must be larger than ROCKSDB_READ_VALUE_TIMEOUT to prevent reading deleted shard.
init( ROCKSDB_EMPTY_RANGE_CHECK, isSimulated ? true : false);
// Leader election
bool longLeaderElection = randomize && BUGGIFY;
@ -697,6 +698,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( STORAGE_DURABILITY_LAG_HARD_MAX, 2000e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_HARD_MAX = 100e6;
init( STORAGE_DURABILITY_LAG_SOFT_MAX, 250e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_SOFT_MAX = 10e6;
init( STORAGE_INCLUDE_FEED_STORAGE_QUEUE, true ); if ( randomize && BUGGIFY ) STORAGE_INCLUDE_FEED_STORAGE_QUEUE = false;
init( STORAGE_SHARD_CONSISTENCY_CHECK_INTERVAL, 0.0); if ( isSimulated ) STORAGE_SHARD_CONSISTENCY_CHECK_INTERVAL = 5.0;
//FIXME: Low priority reads are disabled by assigning very high knob values, reduce knobs for 7.0
init( LOW_PRIORITY_STORAGE_QUEUE_BYTES, 775e8 ); if( smallStorageTarget ) LOW_PRIORITY_STORAGE_QUEUE_BYTES = 1750e3;
@ -786,7 +788,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( READ_HOT_SUB_RANGE_CHUNK_SIZE, 10000000); // 10MB
init( EMPTY_READ_PENALTY, 20 ); // 20 bytes
init( DD_SHARD_COMPARE_LIMIT, 1000 );
init( READ_SAMPLING_ENABLED, false ); if ( randomize && BUGGIFY ) READ_SAMPLING_ENABLED = true;// enable/disable read sampling
init( READ_SAMPLING_ENABLED, true ); if ( randomize && BUGGIFY ) READ_SAMPLING_ENABLED = false;// enable/disable read sampling
//Storage Server
init( STORAGE_LOGGING_DELAY, 5.0 );

View File

@ -139,12 +139,16 @@ std::string TenantMapEntry::toJson() const {
}
tenantEntry["lock_state"] = TenantAPI::tenantLockStateToString(tenantLockState);
if (tenantLockId.present()) {
tenantEntry["lock_id"] = tenantLockId.get().toString();
}
return json_spirit::write_string(json_spirit::mValue(tenantEntry));
}
bool TenantMapEntry::matchesConfiguration(TenantMapEntry const& other) const {
return tenantGroup == other.tenantGroup;
return tenantGroup == other.tenantGroup && tenantLockState == other.tenantLockState &&
tenantLockId == other.tenantLockId;
}
void TenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value> value) {
@ -158,7 +162,8 @@ void TenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value>
bool TenantMapEntry::operator==(TenantMapEntry const& other) const {
return id == other.id && tenantName == other.tenantName && tenantLockState == other.tenantLockState &&
tenantGroup == other.tenantGroup && configurationSequenceNum == other.configurationSequenceNum;
tenantLockId == other.tenantLockId && tenantGroup == other.tenantGroup &&
configurationSequenceNum == other.configurationSequenceNum;
}
bool TenantMapEntry::operator!=(TenantMapEntry const& other) const {

View File

@ -29,6 +29,8 @@
#include "fdbclient/ReadYourWrites.h"
#include <vector>
FDB_DECLARE_BOOLEAN_PARAM(IncludeKeyRangeMap);
class ReadYourWritesTransaction;
Future<Optional<int64_t>> timeKeeperEpochsFromVersion(Version const& v, Reference<ReadYourWritesTransaction> const& tr);
@ -246,7 +248,8 @@ public:
// snapshot of the key ranges this backup is targeting.
virtual Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) = 0;
int64_t totalBytes,
IncludeKeyRangeMap includeKeyRangeMap) = 0;
// Open a file for read by name
virtual Future<Reference<IAsyncFile>> readFile(const std::string& name) = 0;

View File

@ -27,8 +27,6 @@
#include "fdbclient/FDBTypes.h"
#include "flow/Trace.h"
#include "fdbclient/BackupContainer.h"
/* BackupContainerFileSystem implements a backup container which stores files in a nested folder structure.
* Inheritors must only defined methods for writing, reading, deleting, sizing, and listing files.
*
@ -126,7 +124,8 @@ public:
Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) final;
int64_t totalBytes,
IncludeKeyRangeMap IncludeKeyRangeMap) final;
// List log files, unsorted, which contain data at any version >= beginVersion and <= targetVersion.
// "partitioned" flag indicates if new partitioned mutation logs or old logs should be listed.

View File

@ -86,6 +86,7 @@ class CoalescedKeyRefRangeMap : public RangeMap<KeyRef, Val, KeyRangeRef, Metric
public:
explicit CoalescedKeyRefRangeMap(Val v = Val(), Key endKey = allKeys.end)
: RangeMap<KeyRef, Val, KeyRangeRef, Metric, MetricFunc>(endKey, v), mapEnd(endKey) {}
void operator=(CoalescedKeyRefRangeMap&& r) noexcept {
mapEnd = std::move(r.mapEnd);
RangeMap<KeyRef, Val, KeyRangeRef, Metric, MetricFunc>::operator=(std::move(r));
@ -100,10 +101,14 @@ class CoalescedKeyRangeMap : public RangeMap<Key, Val, KeyRangeRef, Metric, Metr
public:
explicit CoalescedKeyRangeMap(Val v = Val(), Key endKey = allKeys.end)
: RangeMap<Key, Val, KeyRangeRef, Metric, MetricFunc>(endKey, v), mapEnd(endKey) {}
void operator=(CoalescedKeyRangeMap&& r) noexcept {
mapEnd = std::move(r.mapEnd);
RangeMap<Key, Val, KeyRangeRef, Metric, MetricFunc>::operator=(std::move(r));
}
CoalescedKeyRangeMap(CoalescedKeyRangeMap&& source) = default;
void insert(const KeyRangeRef& keys, const Val& value);
void insert(const KeyRef& key, const Val& value);
Key mapEnd;

View File

@ -140,6 +140,7 @@ struct MetaclusterTenantMapEntry {
TenantName tenantName;
MetaclusterAPI::TenantState tenantState = MetaclusterAPI::TenantState::READY;
TenantAPI::TenantLockState tenantLockState = TenantAPI::TenantLockState::UNLOCKED;
Optional<UID> tenantLockId;
Optional<TenantGroupName> tenantGroup;
ClusterName assignedCluster;
int64_t configurationSequenceNum = 0;
@ -180,6 +181,7 @@ struct MetaclusterTenantMapEntry {
tenantName,
tenantState,
tenantLockState,
tenantLockId,
tenantGroup,
assignedCluster,
configurationSequenceNum,

View File

@ -140,6 +140,7 @@ struct ManagementClusterMetadata {
static KeyBackedMap<ClusterName, int64_t, TupleCodec<ClusterName>, BinaryCodec<int64_t>> clusterTenantCount;
// A set of (cluster name, tenant name, tenant ID) tuples ordered by cluster
// Renaming tenants are stored twice in the index, with the destination name stored with ID -1
static KeyBackedSet<Tuple> clusterTenantIndex;
// A set of (cluster, tenant group name) tuples ordered by cluster
@ -745,6 +746,8 @@ struct RemoveClusterImpl {
updatedEntry.clusterState = DataClusterState::REMOVING;
updatedEntry.capacity.numTenantGroups = 0;
MetaclusterMetadata::activeRestoreIds().erase(tr, self->clusterName);
updateClusterMetadata(tr,
self->ctx.clusterName.get(),
self->ctx.dataClusterMetadata.get(),
@ -799,10 +802,11 @@ struct RemoveClusterImpl {
return Void();
}
// Returns true if all tenants have been purged
ACTOR static Future<bool> purgeTenants(RemoveClusterImpl* self,
Reference<typename DB::TransactionT> tr,
std::pair<Tuple, Tuple> clusterTupleRange) {
// Returns a pair of bools. The first will be true if all tenants have been purged, and the second will be true if
// any tenants have been purged
ACTOR static Future<std::pair<bool, bool>> purgeTenants(RemoveClusterImpl* self,
Reference<typename DB::TransactionT> tr,
std::pair<Tuple, Tuple> clusterTupleRange) {
ASSERT(self->ctx.dataClusterMetadata.get().entry.clusterState == DataClusterState::REMOVING);
// Get the list of tenants
@ -813,12 +817,14 @@ struct RemoveClusterImpl {
state KeyBackedRangeResult<Tuple> tenantEntries = wait(tenantEntriesFuture);
// Erase each tenant from the tenant map on the management cluster
std::set<int64_t> erasedTenants;
int64_t erasedTenants = 0;
for (Tuple entry : tenantEntries.results) {
int64_t tenantId = entry.getInt(2);
ASSERT(entry.getString(0) == self->ctx.clusterName.get());
erasedTenants.insert(tenantId);
ManagementClusterMetadata::tenantMetadata().tenantMap.erase(tr, tenantId);
if (tenantId != TenantInfo::INVALID_TENANT) {
++erasedTenants;
ManagementClusterMetadata::tenantMetadata().tenantMap.erase(tr, tenantId);
}
ManagementClusterMetadata::tenantMetadata().tenantNameIndex.erase(tr, entry.getString(1));
ManagementClusterMetadata::tenantMetadata().lastTenantModification.setVersionstamp(tr, Versionstamp(), 0);
}
@ -831,12 +837,11 @@ struct RemoveClusterImpl {
Tuple::makeTuple(self->ctx.clusterName.get(), keyAfter(tenantEntries.results.rbegin()->getString(1))));
}
ManagementClusterMetadata::tenantMetadata().tenantCount.atomicOp(
tr, -erasedTenants.size(), MutationRef::AddValue);
ManagementClusterMetadata::tenantMetadata().tenantCount.atomicOp(tr, -erasedTenants, MutationRef::AddValue);
ManagementClusterMetadata::clusterTenantCount.atomicOp(
tr, self->ctx.clusterName.get(), -erasedTenants.size(), MutationRef::AddValue);
tr, self->ctx.clusterName.get(), -erasedTenants, MutationRef::AddValue);
return !tenantEntries.more;
return std::make_pair(!tenantEntries.more, !tenantEntries.results.empty());
}
// Returns true if all tenant groups have been purged
@ -877,7 +882,6 @@ struct RemoveClusterImpl {
ManagementClusterMetadata::dataClusters().erase(tr, ctx.clusterName.get());
ManagementClusterMetadata::dataClusterConnectionRecords.erase(tr, ctx.clusterName.get());
ManagementClusterMetadata::clusterTenantCount.erase(tr, ctx.clusterName.get());
MetaclusterMetadata::activeRestoreIds().erase(tr, ctx.clusterName.get());
}
// Removes the next set of metadata from the management cluster; returns true when all specified
@ -885,25 +889,23 @@ struct RemoveClusterImpl {
ACTOR static Future<bool> managementClusterPurgeSome(RemoveClusterImpl* self,
Reference<typename DB::TransactionT> tr,
std::pair<Tuple, Tuple> clusterTupleRange,
bool* deleteTenants,
bool* deleteTenantGroups) {
bool* deleteTenants) {
if (deleteTenants) {
bool deletedAllTenants = wait(purgeTenants(self, tr, clusterTupleRange));
if (!deletedAllTenants) {
std::pair<bool, bool> deleteResult = wait(purgeTenants(self, tr, clusterTupleRange));
// If we didn't delete everything, return and try again on the next iteration
if (!deleteResult.first) {
return false;
}
*deleteTenants = false;
// If there was nothing to delete, then we don't have to try purging tenants again the next time
*deleteTenants = deleteResult.second;
}
if (deleteTenantGroups) {
bool deletedAllTenantGroups = wait(purgeTenantGroups(self, tr, clusterTupleRange));
if (!deletedAllTenantGroups) {
return false;
}
*deleteTenantGroups = false;
bool deletedAllTenantGroups = wait(purgeTenantGroups(self, tr, clusterTupleRange));
if (!deletedAllTenantGroups) {
return false;
}
self->removeDataClusterEntry(tr);
return true;
}
@ -913,15 +915,12 @@ struct RemoveClusterImpl {
Tuple::makeTuple(self->ctx.clusterName.get()), Tuple::makeTuple(keyAfter(self->ctx.clusterName.get())));
state bool deleteTenants = true;
state bool deleteTenantGroups = true;
loop {
bool clearedAll = wait(self->ctx.runManagementTransaction(
[self = self,
clusterTupleRange = clusterTupleRange,
deleteTenants = &deleteTenants,
deleteTenantGroups = &deleteTenantGroups](Reference<typename DB::TransactionT> tr) {
return managementClusterPurgeSome(self, tr, clusterTupleRange, deleteTenants, deleteTenantGroups);
[self = self, clusterTupleRange = clusterTupleRange, deleteTenants = &deleteTenants](
Reference<typename DB::TransactionT> tr) {
return managementClusterPurgeSome(self, tr, clusterTupleRange, deleteTenants);
}));
if (clearedAll) {
@ -929,6 +928,11 @@ struct RemoveClusterImpl {
}
}
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
self->removeDataClusterEntry(tr);
return Future<Void>(Void());
}));
TraceEvent("RemovedDataCluster").detail("Name", self->ctx.clusterName.get());
return Void();
}
@ -1068,6 +1072,10 @@ struct RegisterClusterImpl {
self->clusterEntry.allocated = ClusterUsage();
self->clusterEntry.id = deterministicRandom()->randomUniqueID();
// If we happen to have any orphaned restore IDs from a previous time this cluster was in a metacluster,
// erase them now.
MetaclusterMetadata::activeRestoreIds().erase(tr, self->clusterName);
ManagementClusterMetadata::dataClusters().set(tr, self->clusterName, self->clusterEntry);
ManagementClusterMetadata::dataClusterConnectionRecords.set(tr, self->clusterName, self->connectionString);
} else if (dataClusterMetadata.get().entry.clusterState == DataClusterState::REMOVING) {
@ -1509,22 +1517,22 @@ struct RestoreClusterImpl {
// Store the cluster entry for the restored cluster
ACTOR static Future<Void> registerRestoringClusterInManagementCluster(RestoreClusterImpl* self,
Reference<typename DB::TransactionT> tr) {
state DataClusterEntry clusterEntry;
clusterEntry.id = self->dataClusterId;
clusterEntry.clusterState = DataClusterState::RESTORING;
state Optional<DataClusterMetadata> dataClusterMetadata = wait(tryGetClusterTransaction(tr, self->clusterName));
if (dataClusterMetadata.present() &&
(dataClusterMetadata.get().entry.clusterState != DataClusterState::RESTORING ||
dataClusterMetadata.get().entry.id != clusterEntry.id ||
!dataClusterMetadata.get().matchesConfiguration(
DataClusterMetadata(clusterEntry, self->connectionString)))) {
!self->dataClusterId.isValid() || dataClusterMetadata.get().entry.id != self->dataClusterId)) {
TraceEvent("RestoredClusterAlreadyExists").detail("ClusterName", self->clusterName);
throw cluster_already_exists();
} else if (!self->restoreDryRun) {
MetaclusterMetadata::activeRestoreIds().addReadConflictKey(tr, self->clusterName);
MetaclusterMetadata::activeRestoreIds().set(tr, self->clusterName, self->restoreId);
self->dataClusterId = deterministicRandom()->randomUniqueID();
DataClusterEntry clusterEntry;
clusterEntry.id = self->dataClusterId;
clusterEntry.clusterState = DataClusterState::RESTORING;
ManagementClusterMetadata::dataClusters().set(tr, self->clusterName, clusterEntry);
ManagementClusterMetadata::dataClusterConnectionRecords.set(tr, self->clusterName, self->connectionString);
@ -1778,18 +1786,24 @@ struct RestoreClusterImpl {
// A data cluster tenant is not present on the management cluster
if (managementEntry == self->mgmtClusterTenantMap.end() ||
managementEntry->second.assignedCluster != self->clusterName) {
managementEntry->second.assignedCluster != self->clusterName ||
managementEntry->second.tenantState == TenantState::REMOVING) {
if (self->restoreDryRun) {
if (managementEntry == self->mgmtClusterTenantMap.end()) {
self->messages.push_back(fmt::format("Delete missing tenant `{}' with ID {} on data cluster",
printable(tenantEntry.tenantName),
tenantEntry.id));
} else {
} else if (managementEntry->second.assignedCluster != self->clusterName) {
self->messages.push_back(fmt::format(
"Delete tenant `{}' with ID {} on data cluster because it is now located on the cluster `{}'",
printable(tenantEntry.tenantName),
tenantEntry.id,
printable(managementEntry->second.assignedCluster)));
} else {
self->messages.push_back(
fmt::format("Delete tenant `{}' with ID {} on data cluster because it is in the REMOVING state",
printable(tenantEntry.tenantName),
tenantEntry.id));
}
} else {
wait(self->runRestoreDataClusterTransaction([tenantEntry = tenantEntry](Reference<ITransaction> tr) {
@ -1803,21 +1817,24 @@ struct RestoreClusterImpl {
state MetaclusterTenantMapEntry managementTenant = managementEntry->second;
// Rename
state bool renamed = tenantName != managementTenant.tenantName;
state TenantName managementTenantName = managementTenant.tenantState != TenantState::RENAMING
? managementTenant.tenantName
: managementTenant.renameDestination.get();
state bool renamed = tenantName != managementTenantName;
if (renamed) {
state TenantName temporaryName;
state bool usingTemporaryName = self->dataClusterTenantNames.count(managementTenant.tenantName) > 0;
state bool usingTemporaryName = self->dataClusterTenantNames.count(managementTenantName) > 0;
if (usingTemporaryName) {
temporaryName = metaclusterTemporaryRenamePrefix.withSuffix(managementTenant.tenantName);
temporaryName = metaclusterTemporaryRenamePrefix.withSuffix(managementTenantName);
} else {
temporaryName = managementTenant.tenantName;
temporaryName = managementTenantName;
}
if (self->restoreDryRun) {
self->messages.push_back(fmt::format("Rename tenant `{}' with ID {} to `{}' on data cluster{}",
printable(tenantEntry.tenantName),
tenantEntry.id,
printable(managementTenant.tenantName),
printable(managementTenantName),
usingTemporaryName ? " via temporary name" : ""));
} else {
wait(self->runRestoreDataClusterTransaction(
@ -1843,7 +1860,6 @@ struct RestoreClusterImpl {
bool configurationChanged = !managementTenant.matchesConfiguration(tenantEntry);
if (configurationChanged ||
managementTenant.configurationSequenceNum != tenantEntry.configurationSequenceNum) {
ASSERT(managementTenant.configurationSequenceNum >= tenantEntry.configurationSequenceNum);
if (self->restoreDryRun) {
// If this is an update to the internal sequence number only and we are also renaming the tenant,
// we don't need to report anything. The internal metadata update is (at least partially) caused
@ -1856,11 +1872,15 @@ struct RestoreClusterImpl {
configurationChanged ? "" : " (internal metadata only)"));
}
} else {
wait(self->runRestoreDataClusterTransaction(
[self = self, managementTenant = managementTenant](Reference<ITransaction> tr) {
return updateTenantConfiguration(
self, tr, managementTenant.id, managementTenant.toTenantMapEntry());
}));
wait(self->runRestoreDataClusterTransaction([self = self,
managementTenant = managementTenant,
tenantEntry = tenantEntry,
tenantName = tenantName](Reference<ITransaction> tr) {
ASSERT_GE(managementTenant.configurationSequenceNum, tenantEntry.configurationSequenceNum);
TenantMapEntry updatedEntry = managementTenant.toTenantMapEntry();
updatedEntry.tenantName = tenantName;
return updateTenantConfiguration(self, tr, managementTenant.id, updatedEntry);
}));
// SOMEDAY: we could mark the tenant in the management cluster as READY if it is in the
// UPDATING_CONFIGURATION state
}
@ -1870,7 +1890,7 @@ struct RestoreClusterImpl {
}
}
Future<Void> renameTenantBatch(std::vector<std::pair<TenantName, MetaclusterTenantMapEntry>> tenantsToRename) {
Future<Void> renameTenantBatch(std::map<TenantName, TenantMapEntry> tenantsToRename) {
return runRestoreDataClusterTransaction([this, tenantsToRename](Reference<ITransaction> tr) {
std::vector<Future<Void>> renameFutures;
for (auto t : tenantsToRename) {
@ -1891,15 +1911,23 @@ struct RestoreClusterImpl {
if (!self->restoreDryRun) {
state int reconcileIndex;
state std::vector<std::pair<TenantName, MetaclusterTenantMapEntry>> tenantsToRename;
state std::map<TenantName, TenantMapEntry> tenantsToRename;
for (reconcileIndex = 0; reconcileIndex < reconcileFutures.size(); ++reconcileIndex) {
Optional<std::pair<TenantName, MetaclusterTenantMapEntry>> const& result =
reconcileFutures[reconcileIndex].get();
if (result.present() && result.get().first.startsWith(metaclusterTemporaryRenamePrefix) &&
result.get().first != result.get().second.tenantName) {
tenantsToRename.push_back(result.get());
if (tenantsToRename.size() >= CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
wait(self->renameTenantBatch(tenantsToRename));
if (result.present() && result.get().first.startsWith(metaclusterTemporaryRenamePrefix)) {
TenantMapEntry destinationTenant = result.get().second.toTenantMapEntry();
if (result.get().second.renameDestination.present()) {
destinationTenant.tenantName = result.get().second.renameDestination.get();
}
if (result.get().first != destinationTenant.tenantName) {
tenantsToRename[result.get().first] = destinationTenant;
if (tenantsToRename.size() >= CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
wait(self->renameTenantBatch(tenantsToRename));
}
}
}
}
@ -1980,7 +2008,15 @@ struct RestoreClusterImpl {
Optional<MetaclusterTenantMapEntry> existingEntry = wait(tryGetTenantTransaction(tr, tenantEntry.tenantName));
if (existingEntry.present()) {
if (existingEntry.get().assignedCluster == self->clusterName) {
ASSERT(existingEntry.get().matchesConfiguration(tenantEntry));
if (existingEntry.get().id != tenantEntry.id ||
!existingEntry.get().matchesConfiguration(tenantEntry)) {
ASSERT(self->restoreDryRun);
self->messages.push_back(
fmt::format("The tenant `{}' was modified concurrently with the restore dry-run",
printable(tenantEntry.tenantName)));
throw tenant_already_exists();
}
// This is a retry, so return success
return false;
} else {
@ -1995,7 +2031,6 @@ struct RestoreClusterImpl {
managementEntry.assignedCluster = self->clusterName;
if (!self->restoreDryRun) {
ManagementClusterMetadata::tenantMetadata().tenantMap.set(tr, managementEntry.id, managementEntry);
ManagementClusterMetadata::tenantMetadata().tenantNameIndex.set(
tr, managementEntry.tenantName, managementEntry.id);
@ -2111,21 +2146,21 @@ struct RestoreClusterImpl {
}
if (tenantBatch.size() == CLIENT_KNOBS->METACLUSTER_RESTORE_BATCH_SIZE) {
wait(runTransaction(self->ctx.managementDb,
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
}));
wait(self->runRestoreManagementTransaction(
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
}));
tenantBatch.clear();
}
}
if (!tenantBatch.empty()) {
wait(runTransaction(self->ctx.managementDb,
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
}));
wait(self->runRestoreManagementTransaction(
[self = self, tenantBatch = tenantBatch](Reference<typename DB::TransactionT> tr) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
return addTenantBatchToManagementCluster(self, tr, tenantBatch);
}));
}
if (self->restoreDryRun) {
@ -2203,8 +2238,6 @@ struct RestoreClusterImpl {
}
ACTOR static Future<Void> runManagementClusterRepopulate(RestoreClusterImpl* self) {
self->dataClusterId = deterministicRandom()->randomUniqueID();
// Record the data cluster in the management cluster
wait(self->ctx.runManagementTransaction([self = self](Reference<typename DB::TransactionT> tr) {
return registerRestoringClusterInManagementCluster(self, tr);
@ -2689,8 +2722,9 @@ struct DeleteTenantImpl {
ManagementClusterMetadata::clusterTenantIndex.erase(
tr,
Tuple::makeTuple(
tenantEntry.get().assignedCluster, tenantEntry.get().renameDestination.get(), self->tenantId));
Tuple::makeTuple(tenantEntry.get().assignedCluster,
tenantEntry.get().renameDestination.get(),
TenantInfo::INVALID_TENANT));
}
// Remove the tenant from its tenant group
@ -2871,6 +2905,8 @@ struct ConfigureTenantImpl {
TenantName tenantName;
std::map<Standalone<StringRef>, Optional<Value>> configurationParameters;
IgnoreCapacityLimit ignoreCapacityLimit = IgnoreCapacityLimit::False;
Optional<TenantAPI::TenantLockState> lockState;
Optional<UID> lockId;
// Parameters set in updateManagementCluster
MetaclusterTenantMapEntry updatedEntry;
@ -2882,6 +2918,12 @@ struct ConfigureTenantImpl {
: ctx(managementDb), tenantName(tenantName), configurationParameters(configurationParameters),
ignoreCapacityLimit(ignoreCapacityLimit) {}
ConfigureTenantImpl(Reference<DB> managementDb,
TenantName tenantName,
TenantAPI::TenantLockState lockState,
UID lockId)
: ctx(managementDb), tenantName(tenantName), lockState(lockState), lockId(lockId) {}
// This verifies that the tenant group can be changed, and if so it updates all of the tenant group data
// structures. It does not update the TenantMapEntry stored in the tenant map.
ACTOR static Future<Void> updateTenantGroup(ConfigureTenantImpl* self,
@ -2971,6 +3013,9 @@ struct ConfigureTenantImpl {
self->updatedEntry = tenantEntry.get();
self->updatedEntry.tenantState = MetaclusterAPI::TenantState::UPDATING_CONFIGURATION;
ASSERT_EQ(self->lockState.present(), self->lockId.present());
ASSERT_NE(self->lockState.present(), self->configurationParameters.size() > 0);
state std::map<Standalone<StringRef>, Optional<Value>>::iterator configItr;
for (configItr = self->configurationParameters.begin(); configItr != self->configurationParameters.end();
++configItr) {
@ -2988,12 +3033,24 @@ struct ConfigureTenantImpl {
self->updatedEntry.configure(configItr->first, configItr->second);
}
if (self->lockState.present()) {
TenantAPI::checkLockState(tenantEntry.get(), self->lockState.get(), self->lockId.get());
self->updatedEntry.tenantLockState = self->lockState.get();
if (self->updatedEntry.tenantLockState == TenantAPI::TenantLockState::UNLOCKED) {
self->updatedEntry.tenantLockId = {};
} else {
self->updatedEntry.tenantLockId = self->lockId.get();
}
}
if (self->updatedEntry.matchesConfiguration(tenantEntry.get()) &&
tenantEntry.get().tenantState == MetaclusterAPI::TenantState::READY) {
return false;
}
++self->updatedEntry.configurationSequenceNum;
ASSERT_EQ(self->updatedEntry.tenantLockState != TenantAPI::TenantLockState::UNLOCKED,
self->updatedEntry.tenantLockId.present());
ManagementClusterMetadata::tenantMetadata().tenantMap.set(tr, self->updatedEntry.id, self->updatedEntry);
ManagementClusterMetadata::tenantMetadata().lastTenantModification.setVersionstamp(tr, Versionstamp(), 0);
@ -3060,6 +3117,16 @@ Future<Void> configureTenant(Reference<DB> db,
return Void();
}
ACTOR template <class DB>
Future<Void> changeTenantLockState(Reference<DB> db,
TenantName name,
TenantAPI::TenantLockState lockState,
UID lockId) {
state ConfigureTenantImpl<DB> impl(db, name, lockState, lockId);
wait(impl.run());
return Void();
}
template <class DB>
struct RenameTenantImpl {
MetaclusterOperationContext<DB> ctx;
@ -3144,7 +3211,7 @@ struct RenameTenantImpl {
// Updated indexes to include the new tenant
ManagementClusterMetadata::clusterTenantIndex.insert(
tr, Tuple::makeTuple(updatedEntry.assignedCluster, self->newName, self->tenantId));
tr, Tuple::makeTuple(updatedEntry.assignedCluster, self->newName, TenantInfo::INVALID_TENANT));
return Void();
}
@ -3195,6 +3262,10 @@ struct RenameTenantImpl {
// Remove the tenant from the cluster -> tenant index
ManagementClusterMetadata::clusterTenantIndex.erase(
tr, Tuple::makeTuple(updatedEntry.assignedCluster, self->oldName, self->tenantId));
ManagementClusterMetadata::clusterTenantIndex.erase(
tr, Tuple::makeTuple(updatedEntry.assignedCluster, self->newName, TenantInfo::INVALID_TENANT));
ManagementClusterMetadata::clusterTenantIndex.insert(
tr, Tuple::makeTuple(updatedEntry.assignedCluster, self->newName, self->tenantId));
}
return Void();

View File

@ -385,6 +385,7 @@ public:
int64_t ROCKSDB_MAX_BACKGROUND_JOBS;
int64_t ROCKSDB_DELETE_OBSOLETE_FILE_PERIOD;
double ROCKSDB_PHYSICAL_SHARD_CLEAN_UP_DELAY;
bool ROCKSDB_EMPTY_RANGE_CHECK;
// Leader election
int MAX_NOTIFICATIONS;
@ -796,6 +797,7 @@ public:
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READTYPE_PRIORITY_MAP;
int SPLIT_METRICS_MAX_ROWS;
double STORAGE_SHARD_CONSISTENCY_CHECK_INTERVAL;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -82,6 +82,7 @@ struct TenantMapEntry {
Key prefix;
TenantName tenantName;
TenantAPI::TenantLockState tenantLockState = TenantAPI::TenantLockState::UNLOCKED;
Optional<UID> tenantLockId;
Optional<TenantGroupName> tenantGroup;
int64_t configurationSequenceNum = 0;
@ -109,7 +110,7 @@ struct TenantMapEntry {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, tenantName, tenantLockState, tenantGroup, configurationSequenceNum);
serializer(ar, id, tenantName, tenantLockState, tenantLockId, tenantGroup, configurationSequenceNum);
if constexpr (Ar::isDeserializing) {
if (id >= 0) {
prefix = TenantAPI::idToPrefix(id);
@ -203,7 +204,6 @@ struct TenantMetadataSpecification {
KeyBackedObjectMap<int64_t, typename TenantTypes::TenantMapEntryT, decltype(IncludeVersion()), TenantIdCodec>
tenantMap;
KeyBackedMap<TenantName, int64_t> tenantNameIndex;
KeyBackedMap<int64_t, UID> lockId;
KeyBackedProperty<int64_t> lastTenantId;
KeyBackedBinaryValue<int64_t> tenantCount;
KeyBackedSet<int64_t> tenantTombstones;
@ -216,9 +216,8 @@ struct TenantMetadataSpecification {
TenantMetadataSpecification(KeyRef prefix)
: subspace(prefix.withSuffix("tenant/"_sr)), tenantMap(subspace.withSuffix("map/"_sr), IncludeVersion()),
tenantNameIndex(subspace.withSuffix("nameIndex/"_sr)), lockId(subspace.withSuffix("lockId"_sr)),
lastTenantId(subspace.withSuffix("lastId"_sr)), tenantCount(subspace.withSuffix("count"_sr)),
tenantTombstones(subspace.withSuffix("tombstones/"_sr)),
tenantNameIndex(subspace.withSuffix("nameIndex/"_sr)), lastTenantId(subspace.withSuffix("lastId"_sr)),
tenantCount(subspace.withSuffix("count"_sr)), tenantTombstones(subspace.withSuffix("tombstones/"_sr)),
tombstoneCleanupData(subspace.withSuffix("tombstoneCleanup"_sr), IncludeVersion()),
tenantGroupTenantIndex(subspace.withSuffix("tenantGroup/tenantIndex/"_sr)),
tenantGroupMap(subspace.withSuffix("tenantGroup/map/"_sr), IncludeVersion()),
@ -232,7 +231,6 @@ struct TenantMetadata {
static inline auto& subspace() { return instance().subspace; }
static inline auto& tenantMap() { return instance().tenantMap; }
static inline auto& tenantNameIndex() { return instance().tenantNameIndex; }
static inline auto& tenantLockId() { return instance().lockId; }
static inline auto& lastTenantId() { return instance().lastTenantId; }
static inline auto& tenantCount() { return instance().tenantCount; }
static inline auto& tenantTombstones() { return instance().tenantTombstones; }

View File

@ -372,7 +372,6 @@ Future<Void> deleteTenantTransaction(Transaction tr,
// This is idempotent because we only erase an entry from the tenant map if it is present
TenantMetadata::tenantMap().erase(tr, tenantId);
TenantMetadata::tenantLockId().erase(tr, tenantId);
TenantMetadata::tenantNameIndex().erase(tr, tenantEntry.get().tenantName);
TenantMetadata::tenantCount().atomicOp(tr, -1, MutationRef::AddValue);
TenantMetadata::lastTenantModification().setVersionstamp(tr, Versionstamp(), 0);
@ -488,35 +487,39 @@ Future<Void> configureTenantTransaction(Transaction tr,
}
}
ASSERT_EQ(updatedTenantEntry.tenantLockId.present(),
updatedTenantEntry.tenantLockState != TenantLockState::UNLOCKED);
return Void();
}
template <class TenantMapEntryT>
bool checkLockState(TenantMapEntryT entry, TenantLockState desiredLockState, UID lockId) {
if (entry.tenantLockId == lockId && entry.tenantLockState == desiredLockState) {
return true;
}
if (entry.tenantLockId.present() && entry.tenantLockId.get() != lockId) {
throw tenant_locked();
}
return false;
}
ACTOR template <class Transaction>
Future<Void> changeLockState(Transaction* tr, int64_t tenant, TenantLockState desiredLockState, UID lockID) {
state TenantMapEntry entry;
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
wait(store(entry, TenantAPI::getTenantTransaction(tr, tenant)));
Optional<UID> currLockID = wait(TenantMetadata::tenantLockId().get(tr, tenant));
if (currLockID.present()) {
if (currLockID.get() != lockID) {
throw tenant_locked();
} else if (desiredLockState != TenantLockState::UNLOCKED) {
// already executed -- this can happen if we're in a retry loop
return Void();
}
// otherwise we can now continue with unlock
}
TenantMapEntry newState = entry;
newState.tenantLockState = desiredLockState;
wait(configureTenantTransaction(tr, entry, newState));
if (desiredLockState == TenantLockState::UNLOCKED) {
TenantMetadata::tenantLockId().erase(tr, tenant);
} else {
TenantMetadata::tenantLockId().set(tr, tenant, lockID);
Future<Void> changeLockState(Transaction tr, int64_t tenant, TenantLockState desiredLockState, UID lockId) {
state Future<Void> tenantModeCheck = TenantAPI::checkTenantMode(tr, ClusterType::STANDALONE);
state TenantMapEntry entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
wait(tenantModeCheck);
if (!checkLockState(entry, desiredLockState, lockId)) {
TenantMapEntry newState = entry;
newState.tenantLockState = desiredLockState;
newState.tenantLockId = (desiredLockState == TenantLockState::UNLOCKED) ? Optional<UID>() : lockId;
wait(configureTenantTransaction(tr, entry, newState));
}
return Void();
}

View File

@ -210,6 +210,8 @@ public:
void validateCoalesced();
void operator=(RangeMap&& r) noexcept { map = std::move(r.map); }
RangeMap(RangeMap&& source) = default;
// void clear( const Val& value ) { ranges.clear(); ranges.insert(std::make_pair(Key(),value)); }
void clear() { map.clear(); }

View File

@ -341,6 +341,11 @@ public:
std::set<std::pair<std::string, unsigned>> corruptedBlocks;
// Valdiate at-rest encryption guarantees. If enabled, tests should inject a known 'marker' in Key and/or Values
// inserted into FDB by the workload. On shutdown, all test generated files (under simfdb/) are scanned to find if
// 'plaintext marker' is present.
Optional<std::string> dataAtRestPlaintextMarker;
flowGlobalType global(int id) const final;
void setGlobal(size_t id, flowGlobalType v) final;

View File

@ -458,6 +458,7 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Blo
ASSERT(SERVER_KNOBS->BG_METADATA_SOURCE == "tenant");
ASSERT(!tenantsToLoad.empty());
state EKPGetLatestBlobMetadataRequest req;
state double retrySleep = 0.1;
for (const auto tenantId : tenantsToLoad) {
req.domainIds.emplace_back(tenantId);
}
@ -465,33 +466,69 @@ ACTOR Future<Void> loadBlobMetadataForTenants(BGTenantMap* self, std::vector<Blo
// FIXME: if one tenant gets an error, don't kill whole process
state double startTime = now();
loop {
Future<EKPGetLatestBlobMetadataReply> requestFuture;
if (self->dbInfo.isValid() && self->dbInfo->get().encryptKeyProxy.present()) {
req.reply.reset();
requestFuture =
brokenPromiseToNever(self->dbInfo->get().encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
} else {
requestFuture = Never();
}
choose {
when(EKPGetLatestBlobMetadataReply rep = wait(requestFuture)) {
ASSERT(rep.blobMetadataDetails.size() == req.domainIds.size());
// not guaranteed to be in same order in the request as the response
for (auto& metadata : rep.blobMetadataDetails) {
auto info = self->tenantInfoById.find(metadata.domainId);
if (info == self->tenantInfoById.end()) {
continue;
}
auto dataEntry = self->tenantData.rangeContaining(info->second.prefix);
ASSERT(dataEntry.begin() == info->second.prefix);
dataEntry.cvalue()->updateBStore(metadata);
}
double elapsed = now() - startTime;
BlobCipherMetrics::getInstance()->getBlobMetadataLatency.addMeasurement(elapsed);
return Void();
try {
Future<EKPGetLatestBlobMetadataReply> requestFuture;
if (self->dbInfo.isValid() && self->dbInfo->get().encryptKeyProxy.present()) {
req.reply.reset();
requestFuture =
brokenPromiseToNever(self->dbInfo->get().encryptKeyProxy.get().getLatestBlobMetadata.getReply(req));
} else {
requestFuture = Never();
}
when(wait(self->dbInfo->onChange())) {}
choose {
when(EKPGetLatestBlobMetadataReply rep = wait(requestFuture)) {
ASSERT(rep.blobMetadataDetails.size() <= req.domainIds.size());
// not guaranteed to be in same order in the request as the response
for (auto& metadata : rep.blobMetadataDetails) {
auto info = self->tenantInfoById.find(metadata.domainId);
if (info == self->tenantInfoById.end()) {
continue;
}
auto dataEntry = self->tenantData.rangeContaining(info->second.prefix);
ASSERT(dataEntry.begin() == info->second.prefix);
dataEntry.cvalue()->updateBStore(metadata);
}
double elapsed = now() - startTime;
BlobCipherMetrics::getInstance()->getBlobMetadataLatency.addMeasurement(elapsed);
if (rep.blobMetadataDetails.size() == req.domainIds.size()) {
return Void();
}
// if not all tenants included in response, or on error, retry with missing ones
std::unordered_set<BlobMetadataDomainId> missingIds;
for (auto& id : req.domainIds) {
missingIds.insert(id);
}
for (auto& metadata : rep.blobMetadataDetails) {
missingIds.erase(metadata.domainId);
}
ASSERT(missingIds.size() > 0);
TraceEvent(SevWarn, "BlobMetadataFetchMissingTenants")
.suppressFor(30.0)
.detail("Count", missingIds.size());
CODE_PROBE(true, "blob metadata fetch missing tenants");
req.domainIds.clear();
for (auto& id : missingIds) {
req.domainIds.push_back(id);
}
}
when(wait(self->dbInfo->onChange())) {
// reset retry sleep
retrySleep = 0.1;
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
CODE_PROBE(true, "blob metadata fetch error");
TraceEvent(SevWarn, "BlobMetadataFetchError").errorUnsuppressed(e).suppressFor(30.0);
}
wait(delay(retrySleep));
retrySleep = std::min(10.0, retrySleep * 1.5);
}
}

View File

@ -64,7 +64,8 @@
#define BM_DEBUG false
#define BM_PURGE_DEBUG false
void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
void handleClientBlobRange(int64_t epoch,
KeyRangeMap<bool>* knownBlobRanges,
Arena& ar,
VectorRef<KeyRangeRef>* rangesToAdd,
VectorRef<KeyRangeRef>* rangesToRemove,
@ -84,14 +85,16 @@ void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
KeyRangeRef overlap(overlapStart, overlapEnd);
if (rangeActive) {
if (BM_DEBUG) {
fmt::print("BM Adding client range [{0} - {1})\n",
fmt::print("BM {0} Adding client range [{1} - {2})\n",
epoch,
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
rangesToAdd->push_back_deep(ar, overlap);
} else {
if (BM_DEBUG) {
fmt::print("BM Removing client range [{0} - {1})\n",
fmt::print("BM {0} Removing client range [{1} - {2})\n",
epoch,
overlapStart.printable().c_str(),
overlapEnd.printable().c_str());
}
@ -102,7 +105,8 @@ void handleClientBlobRange(KeyRangeMap<bool>* knownBlobRanges,
knownBlobRanges->insert(keyRange, rangeActive);
}
void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
void updateClientBlobRanges(int64_t epoch,
KeyRangeMap<bool>* knownBlobRanges,
RangeResult dbBlobRanges,
Arena& ar,
VectorRef<KeyRangeRef>* rangesToAdd,
@ -126,11 +130,11 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
if (dbBlobRanges.size() == 0) {
// special case. Nothing in the DB, reset knownBlobRanges and revoke all existing ranges from workers
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, normalKeys.end, false);
epoch, knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, normalKeys.end, false);
} else {
if (dbBlobRanges[0].key > normalKeys.begin) {
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, dbBlobRanges[0].key, false);
epoch, knownBlobRanges, ar, rangesToAdd, rangesToRemove, normalKeys.begin, dbBlobRanges[0].key, false);
}
for (int i = 0; i < dbBlobRanges.size() - 1; i++) {
if (dbBlobRanges[i].key >= normalKeys.end) {
@ -142,7 +146,8 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
bool active = dbBlobRanges[i].value == blobRangeActive;
if (active) {
if (BM_DEBUG) {
fmt::print("BM sees client range [{0} - {1})\n",
fmt::print("BM {0} sees client range [{1} - {2})\n",
epoch,
dbBlobRanges[i].key.printable(),
dbBlobRanges[i + 1].key.printable());
}
@ -157,10 +162,11 @@ void updateClientBlobRanges(KeyRangeMap<bool>* knownBlobRanges,
endKey = normalKeys.end;
}
handleClientBlobRange(
knownBlobRanges, ar, rangesToAdd, rangesToRemove, dbBlobRanges[i].key, endKey, active);
epoch, knownBlobRanges, ar, rangesToAdd, rangesToRemove, dbBlobRanges[i].key, endKey, active);
}
if (dbBlobRanges[dbBlobRanges.size() - 1].key < normalKeys.end) {
handleClientBlobRange(knownBlobRanges,
handleClientBlobRange(epoch,
knownBlobRanges,
ar,
rangesToAdd,
rangesToRemove,
@ -1357,7 +1363,8 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
VectorRef<KeyRangeRef> rangesToAdd;
VectorRef<KeyRangeRef> rangesToRemove;
updateClientBlobRanges(&bmData->knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove);
updateClientBlobRanges(
bmData->epoch, &bmData->knownBlobRanges, results, ar, &rangesToAdd, &rangesToRemove);
if (needToCoalesce) {
// recovery has granules instead of known ranges in here. We need to do so to identify any parts of
@ -1412,7 +1419,8 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
for (auto f : splitFutures) {
state BlobGranuleSplitPoints splitPoints = wait(f);
if (BM_DEBUG) {
fmt::print("Split client range [{0} - {1}) into {2} ranges:\n",
fmt::print("BM {0} Splitting client range [{1} - {2}) into {3} ranges.\n",
bmData->epoch,
splitPoints.keys[0].printable(),
splitPoints.keys[splitPoints.keys.size() - 1].printable(),
splitPoints.keys.size() - 1);
@ -1422,6 +1430,14 @@ ACTOR Future<Void> monitorClientRanges(Reference<BlobManagerData> bmData) {
// picks up the same ranges
wait(writeInitialGranuleMapping(bmData, splitPoints));
if (BM_DEBUG) {
fmt::print("BM {0} Split client range [{1} - {2}) into {3} ranges:\n",
bmData->epoch,
splitPoints.keys[0].printable(),
splitPoints.keys[splitPoints.keys.size() - 1].printable(),
splitPoints.keys.size() - 1);
}
for (int i = 0; i < splitPoints.keys.size() - 1; i++) {
KeyRange range = KeyRange(KeyRangeRef(splitPoints.keys[i], splitPoints.keys[i + 1]));
// only add the client range if this is the first BM or it's not already assigned
@ -4524,6 +4540,61 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self,
return Void();
}
ACTOR Future<Void> waitForcePurgeBlobbified(Reference<BlobManagerData> self, KeyRangeRef range) {
// To avoid races with a range still being blobbified, wait to initiate force purge until blobbification is complete
// Also needs to be idempotent with previous purges though, so if range is already purging, skip this check
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} waitForcePurgeBlobbified [{1} - {2}): start\n",
self->epoch,
range.begin.printable(),
range.end.printable());
}
state Transaction tr(self->db);
loop {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
state Future<Optional<Version>> verifyFuture =
timeout(self->db->verifyBlobRange(range, latestVersion), 10.0);
ForcedPurgeState purgeState = wait(getForcePurgedState(&tr, range));
if (purgeState != ForcedPurgeState::NonePurged) {
// FIXME: likely does not solve the issue if SomePurged but not all, since some ranges might not be
// blobbified example would be if first [C - D) was purged, then later [A - F) was.
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} waitForcePurgeComplete [{1} - {2}): already purged\n",
self->epoch,
range.begin.printable(),
range.end.printable());
}
break;
}
Optional<Version> verifyVersion = wait(verifyFuture);
if (verifyVersion.present() && verifyVersion.get() != invalidVersion) {
if (BM_PURGE_DEBUG) {
fmt::print("BM {0} waitForcePurgeComplete [{1} - {2}): verified blobbified\n",
self->epoch,
range.begin.printable(),
range.end.printable());
}
break;
}
tr.reset();
wait(delay(1.0));
// TODO: remove key range stuck!
TraceEvent("WaitForcePurgeBlobbifiedBlocking", self->id)
.suppressFor(60.0)
.detail("Epoch", self->epoch)
.detail("Range", range);
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
/*
* This method is used to purge the range [startKey, endKey) at (and including) purgeVersion.
* To do this, we do a BFS traversal starting at the active granules. Then we classify granules
@ -4563,7 +4634,28 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
state Transaction tr(self->db);
// FIXME: this should be completely separate by known blob range, to avoid any races or issues
// if range isn't in known blob ranges, do nothing after writing force purge range to database
state std::vector<KeyRange> knownPurgeRanges;
auto knownRanges = self->knownBlobRanges.intersectingRanges(range);
for (auto& it : knownRanges) {
if (it.cvalue()) {
knownPurgeRanges.push_back(range & it.range());
}
}
if (force) {
// before setting force purge in the database, make sure all ranges to purge are actually blobbified to avoid
// races
// FIXME: this can also apply to non-force-purges but for retention purges it's not as bad since it'll likely
// get another one soon
std::vector<Future<Void>> waitForBlobbifies;
waitForBlobbifies.reserve(knownPurgeRanges.size());
for (auto& it : knownPurgeRanges) {
waitForBlobbifies.push_back(waitForcePurgeBlobbified(self, it));
}
wait(waitForAll(waitForBlobbifies));
// TODO could clean this up after force purge is done, but it's safer not to
self->forcePurgingRanges.insert(range, true);
// set force purged range, to prevent future operations on this range
@ -4595,17 +4687,7 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
}
// if range isn't in known blob ranges, do nothing after writing force purge range to database
bool anyKnownRanges = false;
auto knownRanges = self->knownBlobRanges.intersectingRanges(range);
for (auto& it : knownRanges) {
if (it.cvalue()) {
anyKnownRanges = true;
break;
}
}
if (!anyKnownRanges) {
if (knownPurgeRanges.empty()) {
CODE_PROBE(true, "skipping purge because not in known blob ranges");
TraceEvent("PurgeGranulesSkippingUnknownRange", self->id)
.detail("Epoch", self->epoch)
@ -5015,10 +5097,12 @@ ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range
ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
self->initBStore();
// wait for BM to be fully recovered and have loaded hard boundaries before starting purges
wait(self->doneRecovering.getFuture());
wait(self->loadedClientRanges.getFuture());
loop {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first)
// before checking through the purge intents. We write a UID into the change key value
@ -5036,6 +5120,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
state CoalescedKeyRangeMap<std::pair<Version, bool>> purgeMap;
purgeMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
try {
wait(checkManagerLock(tr, self));
// TODO: replace 10000 with a knob
state RangeResult purgeIntents = wait(tr->getRange(blobGranulePurgeKeys, BUGGIFY ? 1 : 10000));
if (purgeIntents.size()) {
@ -5088,8 +5173,27 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
// done. If the BM fails then all purges will fail and so the next BM will have a clear
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
// new BM
try {
wait(waitForAll(purges));
} catch (Error& e) {
// These should not get an error that then causes a transaction retry loop. All error handling
// should be done in the purge calls
if (e.code() == error_code_operation_cancelled ||
e.code() == error_code_blob_manager_replaced) {
throw e;
}
// FIXME: refactor this into a function on BlobManagerData
TraceEvent(SevError, "BlobManagerUnexpectedErrorPurgeRanges", self->id)
.error(e)
.detail("Epoch", self->epoch);
ASSERT_WE_THINK(false);
wait(waitForAll(purges));
// if not simulation, kill the BM
if (self->iAmReplaced.canBeSet()) {
self->iAmReplaced.sendError(e);
}
throw e;
}
break;
} else {
state Future<Void> watchPurgeIntentsChange = tr->watch(blobGranulePurgeChangeKey);
@ -5102,6 +5206,11 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
}
}
if (BUGGIFY && self->maybeInjectTargetedRestart()) {
wait(delay(0)); // should be cancelled
ASSERT(false);
}
tr->reset();
loop {
try {
@ -5109,6 +5218,7 @@ ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->clear(KeyRangeRef(blobGranulePurgeKeys.begin, keyAfter(lastPurgeKey)));
wait(checkManagerLock(tr, self));
wait(tr->commit());
break;
} catch (Error& e) {
@ -5635,7 +5745,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAB, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
@ -5655,7 +5765,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataEmpty, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataEmpty, ar, &added, &removed);
ASSERT(added.size() == 0);
@ -5670,7 +5780,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 2);
ASSERT(added[0] == rangeAB);
@ -5695,7 +5805,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);
@ -5715,7 +5825,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAC, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAC, ar, &added, &removed);
ASSERT(added.size() == 0);
@ -5735,7 +5845,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 0);
@ -5755,7 +5865,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBD, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataBD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeCD);
@ -5775,7 +5885,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAD, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAD, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeAB);
@ -5795,7 +5905,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataAB_CD, ar, &added, &removed);
ASSERT(added.size() == 0);
@ -5819,7 +5929,7 @@ TEST_CASE("/blobmanager/updateranges") {
kbrRanges.clear();
added.clear();
removed.clear();
updateClientBlobRanges(&knownBlobRanges, dbDataBC, ar, &added, &removed);
updateClientBlobRanges(1, &knownBlobRanges, dbDataBC, ar, &added, &removed);
ASSERT(added.size() == 1);
ASSERT(added[0] == rangeBC);

View File

@ -748,13 +748,15 @@ public:
}
std::string propValue = "";
ASSERT(shard->db->GetProperty(shard->cf, rocksdb::DB::Properties::kCFStats, &propValue));
TraceEvent(SevInfo, "PhysicalShardCFStats").detail("ShardId", id).detail("Detail", propValue);
TraceEvent(SevInfo, "PhysicalShardCFStats")
.detail("PhysicalShardID", id)
.detail("Detail", propValue);
// Get compression ratio for each level.
rocksdb::ColumnFamilyMetaData cfMetadata;
shard->db->GetColumnFamilyMetaData(shard->cf, &cfMetadata);
TraceEvent e(SevInfo, "PhysicalShardLevelStats");
e.detail("ShardId", id);
e.detail("PhysicalShardID", id);
std::string levelProp;
for (auto it = cfMetadata.levels.begin(); it != cfMetadata.levels.end(); ++it) {
std::string propValue = "";
@ -816,7 +818,8 @@ public:
}
physicalShards[shard->id] = shard;
columnFamilyMap[handle->GetID()] = handle;
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("PhysicalShard", shard->id);
TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId)
.detail("PhysicalShardID", shard->id);
}
std::set<std::string> unusedShards(columnFamilies.begin(), columnFamilies.end());
@ -995,7 +998,7 @@ public:
}
PhysicalShard* addRange(KeyRange range, std::string id) {
TraceEvent(SevInfo, "ShardedRocksAddRangeBegin", this->logId)
TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
@ -1003,11 +1006,23 @@ public:
auto ranges = dataShardMap.intersectingRanges(range);
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
if (it.value() != nullptr && it.value()->physicalShard->id != id) {
TraceEvent(SevError, "ShardedRocksAddOverlappingRanges")
.detail("IntersectingRange", it->range())
.detail("DataShardRange", it->value()->range)
.detail("PhysicalShard", it->value()->physicalShard->toString());
if (it.value()) {
if (it.value()->physicalShard->id == id) {
TraceEvent(SevError, "ShardedRocksDBAddRange")
.detail("ErrorType", "RangeAlreadyExist")
.detail("IntersectingRange", it->range())
.detail("DataShardRange", it->value()->range)
.detail("ExpectedShardId", id)
.detail("PhysicalShardID", it->value()->physicalShard->toString());
} else {
TraceEvent(SevError, "ShardedRocksDBAddRange")
.detail("ErrorType", "ConflictingRange")
.detail("IntersectingRange", it->range())
.detail("DataShardRange", it->value()->range)
.detail("ExpectedShardId", id)
.detail("PhysicalShardID", it->value()->physicalShard->toString());
}
return nullptr;
}
}
@ -1023,7 +1038,7 @@ public:
validate();
TraceEvent(SevInfo, "ShardedRocksAddRangeEnd", this->logId)
TraceEvent(SevInfo, "ShardedRocksDBRangeAdded", this->logId)
.detail("Range", range)
.detail("PhysicalShardID", id);
@ -1032,7 +1047,6 @@ public:
std::vector<std::string> removeRange(KeyRange range) {
TraceEvent(SevInfo, "ShardedRocksRemoveRangeBegin", this->logId).detail("Range", range);
std::vector<std::string> shardIds;
std::vector<DataShard*> newShards;
@ -1049,6 +1063,22 @@ public:
auto existingShard = it.value()->physicalShard;
auto shardRange = it.range();
if (SERVER_KNOBS->ROCKSDB_EMPTY_RANGE_CHECK) {
// Enable consistency validation.
RangeResult rangeResult;
auto bytesRead = readRangeInDb(existingShard, range, 1, UINT16_MAX, &rangeResult);
if (bytesRead > 0) {
TraceEvent(SevError, "ShardedRocksDBRangeNotEmpty")
.detail("PhysicalShard", existingShard->toString())
.detail("Range", range)
.detail("DataShardRange", shardRange);
}
// Force clear range.
writeBatch->DeleteRange(it.value()->physicalShard->cf, toSlice(range.begin), toSlice(range.end));
dirtyShards->insert(it.value()->physicalShard);
}
TraceEvent(SevDebug, "ShardedRocksRemoveRange")
.detail("Range", range)
.detail("IntersectingRange", shardRange)
@ -1056,6 +1086,7 @@ public:
.detail("PhysicalShard", existingShard->toString());
ASSERT(it.value()->range == shardRange); // Ranges should be consistent.
if (range.contains(shardRange)) {
existingShard->dataShards.erase(shardRange.begin.toString());
TraceEvent(SevInfo, "ShardedRocksRemovedRange")
@ -1077,9 +1108,11 @@ public:
if (shardRange.begin < range.begin) {
auto dataShard =
std::make_unique<DataShard>(KeyRange(KeyRangeRef(shardRange.begin, range.begin)), existingShard);
newShards.push_back(dataShard.get());
const std::string msg = "Shrink shard from " + Traceable<KeyRangeRef>::toString(shardRange) + " to " +
Traceable<KeyRangeRef>::toString(dataShard->range);
existingShard->dataShards[shardRange.begin.toString()] = std::move(dataShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
}
@ -1087,9 +1120,11 @@ public:
if (shardRange.end > range.end) {
auto dataShard =
std::make_unique<DataShard>(KeyRange(KeyRangeRef(range.end, shardRange.end)), existingShard);
newShards.push_back(dataShard.get());
const std::string msg = "Shrink shard from " + Traceable<KeyRangeRef>::toString(shardRange) + " to " +
Traceable<KeyRangeRef>::toString(dataShard->range);
existingShard->dataShards[range.end.toString()] = std::move(dataShard);
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
}
@ -1199,7 +1234,7 @@ public:
.detail("Action", "PersistRangeMapping")
.detail("BeginKey", it.range().begin)
.detail("EndKey", it.range().end)
.detail("ShardId", it.value()->physicalShard->id);
.detail("PhysicalShardID", it.value()->physicalShard->id);
} else {
// Empty range.
@ -1208,7 +1243,7 @@ public:
.detail("Action", "PersistRangeMapping")
.detail("BeginKey", it.range().begin)
.detail("EndKey", it.range().end)
.detail("ShardId", "None");
.detail("PhysicalShardID", "None");
}
lastKey = it.range().end;
}
@ -1303,6 +1338,23 @@ public:
return dataMap;
}
CoalescedKeyRangeMap<std::string> getExistingRanges() {
CoalescedKeyRangeMap<std::string> existingRanges;
existingRanges.insert(allKeys, "");
for (auto it : dataShardMap.intersectingRanges(allKeys)) {
if (!it.value()) {
continue;
}
if (it.value()->physicalShard->id == "kvs-metadata") {
continue;
}
existingRanges.insert(it.range(), it.value()->physicalShard->id);
}
return existingRanges;
}
void validate() {
TraceEvent(SevVerbose, "ShardedRocksValidateShardManager", this->logId);
for (auto s = dataShardMap.ranges().begin(); s != dataShardMap.ranges().end(); ++s) {
@ -1316,6 +1368,10 @@ public:
e.detail("Shard", "Empty");
}
if (shard != nullptr) {
if (shard->range != static_cast<KeyRangeRef>(s->range())) {
TraceEvent(SevWarn, "ShardRangeMismatch").detail("Range", s->range());
}
ASSERT(shard->range == static_cast<KeyRangeRef>(s->range()));
ASSERT(shard->physicalShard != nullptr);
auto it = shard->physicalShard->dataShards.find(shard->range.begin.toString());
@ -3041,11 +3097,13 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
break;
}
auto shards = shardManager->getPendingDeletionShards(cleanUpDelay);
auto a = new Writer::RemoveShardAction(shards);
Future<Void> f = a->done.getFuture();
writeThread->post(a);
TraceEvent(SevInfo, "ShardedRocksDB").detail("DeleteEmptyShards", shards.size());
wait(f);
if (shards.size() > 0) {
auto a = new Writer::RemoveShardAction(shards);
Future<Void> f = a->done.getFuture();
writeThread->post(a);
TraceEvent(SevInfo, "ShardedRocksDB").detail("DeleteEmptyShards", shards.size());
wait(f);
}
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
@ -3107,6 +3165,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
return EncryptionAtRestMode(EncryptionAtRestMode::DISABLED);
}
CoalescedKeyRangeMap<std::string> getExistingRanges() override { return shardManager.getExistingRanges(); }
std::shared_ptr<ShardedRocksDBState> rState;
rocksdb::Options dbOptions;
ShardManager shardManager;

View File

@ -21,6 +21,7 @@
#include "fdbclient/BlobCipher.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/simulator.h"
#include "fdbserver/KmsConnectorInterface.h"
#include "fdbserver/Knobs.h"
@ -221,6 +222,20 @@ ACTOR Future<Void> blobMetadataLookup(KmsConnectorInterface interf, KmsConnBlobM
wait(delay(deterministicRandom()->random01())); // simulate network delay
// buggify errors or omitted tenants in response
if (!g_simulator->speedUpSimulation && BUGGIFY_WITH_PROB(0.01)) {
if (deterministicRandom()->coinflip()) {
// remove some number of tenants from the response
int targetSize = deterministicRandom()->randomInt(0, rep.metadataDetails.size());
while (rep.metadataDetails.size() > targetSize) {
swapAndPop(&rep.metadataDetails, deterministicRandom()->randomInt(0, rep.metadataDetails.size()));
}
} else {
req.reply.sendError(operation_failed());
return Void();
}
}
req.reply.send(rep);
return Void();

View File

@ -85,6 +85,9 @@ public:
// Persists key range and physical shard mapping.
virtual void persistRangeMapping(KeyRangeRef range, bool isAdd) {}
// Returns key range to physical shard mapping.
virtual CoalescedKeyRangeMap<std::string> getExistingRanges() { throw not_implemented(); }
// To debug MEMORY_RADIXTREE type ONLY
// Returns (1) how many key & value pairs have been inserted (2) how many nodes have been created (3) how many
// key size is less than 12 bytes

View File

@ -226,10 +226,12 @@ private:
ASSERT(tenantMapItr != managementData.tenantData.tenantMap.end());
MetaclusterTenantMapEntry const& metaclusterEntry = tenantMapItr->second;
ASSERT_EQ(entry.id, metaclusterEntry.id);
ASSERT(entry.tenantName == metaclusterEntry.tenantName);
if (!self->allowPartialMetaclusterOperations) {
ASSERT_EQ(metaclusterEntry.tenantState, MetaclusterAPI::TenantState::READY);
ASSERT(entry.tenantName == metaclusterEntry.tenantName);
} else if (entry.tenantName != metaclusterEntry.tenantName) {
ASSERT(entry.tenantName == metaclusterEntry.renameDestination);
}
if (metaclusterEntry.tenantState != MetaclusterAPI::TenantState::UPDATING_CONFIGURATION &&
metaclusterEntry.tenantState != MetaclusterAPI::TenantState::REMOVING) {
@ -240,6 +242,8 @@ private:
if (entry.configurationSequenceNum == metaclusterEntry.configurationSequenceNum) {
ASSERT(entry.tenantGroup == metaclusterEntry.tenantGroup);
ASSERT_EQ(entry.tenantLockState, metaclusterEntry.tenantLockState);
ASSERT(entry.tenantLockId == metaclusterEntry.tenantLockId);
}
}

View File

@ -164,11 +164,17 @@ private:
ASSERT_EQ(t.size(), 3);
TenantName tenantName = t.getString(1);
int64_t tenantId = t.getInt(2);
bool renaming = tenantId == TenantInfo::INVALID_TENANT;
if (renaming) {
tenantId = self->managementMetadata.tenantData.tenantNameIndex[tenantName];
}
MetaclusterTenantMapEntry const& entry = self->managementMetadata.tenantData.tenantMap[tenantId];
bool renaming =
entry.tenantState == MetaclusterAPI::TenantState::RENAMING && entry.renameDestination == tenantName;
ASSERT(tenantName == entry.tenantName || renaming);
if (!renaming) {
if (renaming) {
ASSERT(entry.tenantState == MetaclusterAPI::TenantState::RENAMING ||
entry.tenantState == MetaclusterAPI::TenantState::REMOVING);
ASSERT(entry.renameDestination == tenantName);
} else {
ASSERT(entry.tenantName == tenantName);
ASSERT(self->managementMetadata.clusterTenantMap[t.getString(0)].insert(tenantId).second);
}
}

View File

@ -76,6 +76,8 @@ private:
} else {
ASSERT(!tenantsInTenantGroupIndex.count(tenantId));
}
ASSERT_NE(tenantMapEntry.tenantLockState == TenantAPI::TenantLockState::UNLOCKED,
tenantMapEntry.tenantLockId.present());
}
}

View File

@ -395,6 +395,8 @@ struct StorageServerDisk {
void persistRangeMapping(KeyRangeRef range, bool isAdd) { storage->persistRangeMapping(range, isAdd); }
CoalescedKeyRangeMap<std::string> getExistingRanges() { return storage->getExistingRanges(); }
Future<Void> getError() { return storage->getError(); }
Future<Void> init() { return storage->init(); }
Future<Void> canCommit() { return storage->canCommit(); }
@ -11197,6 +11199,72 @@ ACTOR Future<Void> serveChangeFeedVersionUpdateRequests(
}
}
ACTOR Future<Void> storageEngineConsistencyCheck(StorageServer* self) {
if (SERVER_KNOBS->STORAGE_SHARD_CONSISTENCY_CHECK_INTERVAL <= 0.0) {
return Void();
}
if (self->storage.getKeyValueStoreType() != KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
return Void();
}
loop {
wait(delay(SERVER_KNOBS->STORAGE_SHARD_CONSISTENCY_CHECK_INTERVAL));
// Only validate when storage server and storage engine are expected to have the same shard mapping.
while (self->pendingAddRanges.size() > 0 || self->pendingRemoveRanges.size() > 0) {
wait(delay(5.0));
}
CoalescedKeyRangeMap<std::string> currentShards;
currentShards.insert(allKeys, "");
auto fullRange = self->shards.ranges();
for (auto it = fullRange.begin(); it != fullRange.end(); ++it) {
if (!it.value()) {
continue;
}
if (it.value()->assigned()) {
currentShards.insert(it.range(), format("%016llx", it.value()->shardId));
}
}
auto kvShards = self->storage.getExistingRanges();
TraceEvent(SevInfo, "StorageEngineConsistencyCheckStarted").log();
auto kvRanges = kvShards.ranges();
for (auto it = kvRanges.begin(); it != kvRanges.end(); ++it) {
if (it.value() == "") {
continue;
}
for (auto v : currentShards.intersectingRanges(it.range())) {
if (v.value() == "") {
TraceEvent(SevWarn, "MissingShardSS").detail("Range", v.range()).detail("ShardIdKv", it.value());
} else if (v.value() != it.value()) {
TraceEvent(SevWarn, "ShardMismatch")
.detail("Range", v.range())
.detail("ShardIdKv", it.value())
.detail("ShardIdSs", v.value());
}
}
}
for (auto it : currentShards.ranges()) {
if (it.value() == "") {
continue;
}
for (auto v : kvShards.intersectingRanges(it.range())) {
if (v.value() == "") {
TraceEvent(SevWarn, "MissingShardKv").detail("Range", v.range()).detail("ShardIdSS", it.value());
}
}
}
TraceEvent(SevInfo, "StorageEngineConsistencyCheckComplete");
}
}
ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
if (!SERVER_KNOBS->REPORT_DD_METRICS) {
return Void();
@ -11253,6 +11321,7 @@ ACTOR Future<Void> storageServerCore(StorageServer* self, StorageServerInterface
self->actors.add(serveChangeFeedVersionUpdateRequests(self, ssi.changeFeedVersionUpdate.getFuture()));
self->actors.add(traceRole(Role::STORAGE_SERVER, ssi.id()));
self->actors.add(reportStorageServerState(self));
self->actors.add(storageEngineConsistencyCheck(self));
self->transactionTagCounter.startNewInterval();
self->actors.add(

View File

@ -19,10 +19,19 @@
*/
#include <boost/algorithm/string/predicate.hpp>
#include <boost/filesystem.hpp>
#include <boost/filesystem/directory.hpp>
#include <boost/filesystem/file_status.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/range/iterator_range_core.hpp>
#include <cinttypes>
#include <fstream>
#include <functional>
#include <istream>
#include <iterator>
#include <map>
#include <streambuf>
#include <toml.hpp>
#include "flow/ActorCollection.h"
@ -53,6 +62,7 @@
#include "fdbserver/Knobs.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/Platform.h"
#include "flow/actorcompiler.h" // This must be the last #include.
FDB_DEFINE_BOOLEAN_PARAM(UntrustedMode);
@ -1781,6 +1791,53 @@ ACTOR Future<Void> initializeSimConfig(Database db) {
}
}
void encryptionAtRestPlaintextMarkerCheck() {
if (!g_network->isSimulated() || !g_simulator->dataAtRestPlaintextMarker.present()) {
// Encryption at-rest was not enabled, do nothing
return;
}
namespace fs = boost::filesystem;
printf("EncryptionAtRestPlaintextMarkerCheckStart\n");
TraceEvent("EncryptionAtRestPlaintextMarkerCheckStart");
fs::path p("simfdb/");
fs::recursive_directory_iterator end;
int scanned = 0;
bool success = true;
// Enumerate all files in the "simfdb/" folder and look for "marker" string
for (fs::recursive_directory_iterator itr(p); itr != end; ++itr) {
if (boost::filesystem::is_regular_file(itr->path())) {
std::ifstream f(itr->path().string().c_str());
if (f) {
std::string buf;
int count = 0;
while (std::getline(f, buf)) {
// SOMEDAY: using 'std::boyer_moore_horspool_searcher' would significantly improve search
// time
if (buf.find(g_simulator->dataAtRestPlaintextMarker.get()) != std::string::npos) {
TraceEvent(SevError, "EncryptionAtRestPlaintextMarkerCheckPanic")
.detail("Filename", itr->path().string())
.detail("LineBuf", buf)
.detail("Marker", g_simulator->dataAtRestPlaintextMarker.get());
success = false;
}
count++;
}
TraceEvent("EncryptionAtRestPlaintextMarkerCheckScanned")
.detail("Filename", itr->path().string())
.detail("NumLines", count);
scanned++;
} else {
TraceEvent(SevError, "FileOpenError").detail("Filename", itr->path().string());
}
}
ASSERT(success);
}
printf("EncryptionAtRestPlaintextMarkerCheckEnd NumFiles: %d\n", scanned);
TraceEvent("EncryptionAtRestPlaintextMarkerCheckEnd").detail("NumFiles", scanned);
}
/**
* \brief Test orchestrator: sends test specification to testers in the right order and collects the results.
*
@ -2009,6 +2066,8 @@ ACTOR Future<Void> runTests(Reference<AsyncVar<Optional<struct ClusterController
}
printf("\n");
encryptionAtRestPlaintextMarkerCheck();
return Void();
}

View File

@ -18,13 +18,25 @@
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/TenantInfo.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/ApiWorkload.h"
#include "fdbserver/workloads/MemoryKeyValueStore.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/genericactors.actor.h"
// Valdiate at-rest encryption guarantees. If enabled, test injects a known 'marker' in Key and/or Values
// inserted into FDB by the workload. On shutdown, all test generated files (under simfdb/) can scanned to find if
// 'plaintext marker' is present.
const std::string ENCRYPTION_AT_REST_MARKER_STRING = "Expecto..Patronum...";
// An enum of API operation types used in the random test
enum OperationType { SET, GET, GET_RANGE, GET_RANGE_SELECTOR, GET_KEY, CLEAR, CLEAR_RANGE, UNINITIALIZED };
@ -98,6 +110,9 @@ public:
// Maximum time to reset DB to the original state
double resetDBTimeout;
// Validate data at-rest encryption guarantees
int validateEncryptionAtRest;
ApiCorrectnessWorkload(WorkloadContext const& wcx)
: ApiWorkload(wcx), numRandomOperations("Num Random Operations") {
numGets = getOption(options, "numGets"_sr, 1000);
@ -114,6 +129,11 @@ public:
int maxTransactionBytes = getOption(options, "maxTransactionBytes"_sr, 500000);
maxKeysPerTransaction = std::max(1, maxTransactionBytes / (maxValueLength + maxLongKeyLength));
validateEncryptionAtRest =
g_network->isSimulated()
? getOption(options, "validateEncryptionAtRest"_sr, deterministicRandom()->coinflip() ? 1 : 0)
: 0;
resetDBTimeout = getOption(options, "resetDBTimeout"_sr, 1800.0);
if (maxTransactionBytes > 500000) {
@ -143,6 +163,15 @@ public:
}
ACTOR Future<Void> performSetup(Database cx, ApiCorrectnessWorkload* self) {
DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx));
if (g_network->isSimulated() && dbConfig.encryptionAtRestMode.isEncryptionEnabled() &&
self->validateEncryptionAtRest) {
TraceEvent("EncryptionAtRestPlainTextMarkerCheckEnabled")
.detail("EncryptionMode", dbConfig.encryptionAtRestMode.toString())
.detail("DataAtRestMarker", ENCRYPTION_AT_REST_MARKER_STRING);
g_simulator->dataAtRestPlaintextMarker = ENCRYPTION_AT_REST_MARKER_STRING;
}
// Choose a random transaction type (NativeAPI, ReadYourWrites, ThreadSafe, MultiVersion)
std::vector<TransactionType> types;
types.push_back(NATIVE);

View File

@ -18,10 +18,21 @@
* limitations under the License.
*/
#include "fdbserver/workloads/ApiWorkload.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/TenantInfo.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
#include <cinttypes>
#include "fmt/format.h"
#include "fdbserver/workloads/ApiWorkload.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Clears the keyspace used by this test
@ -249,6 +260,18 @@ Key ApiWorkload::generateKey(VectorRef<KeyValueRef> const& data,
keyBuffer[0] = deterministicRandom()->randomInt(0, 255);
}
// If encryption validation is enabled; slip "marker pattern" at random location in generated key
if (g_network->isSimulated() && g_simulator->dataAtRestPlaintextMarker.present() &&
keyLength + 1 > g_simulator->dataAtRestPlaintextMarker.get().size()) {
int len = keyLength - g_simulator->dataAtRestPlaintextMarker.get().size();
// Avoid updating the first byte of the key
int idx = len > 1 ? deterministicRandom()->randomInt(1, len) : 1;
memcpy(&keyBuffer[idx],
g_simulator->dataAtRestPlaintextMarker.get().c_str(),
g_simulator->dataAtRestPlaintextMarker.get().size());
//TraceEvent(SevDebug, "ModifiedKey").suppressFor(5).detail("Key", keyBuffer);
}
keyBuffer[keyLength] = '\0';
Key key(prefix + keyBuffer);
@ -276,7 +299,19 @@ Key ApiWorkload::selectRandomKey(VectorRef<KeyValueRef> const& data, double prob
// Generates a random value
Value ApiWorkload::generateValue(int minValueLength, int maxValueLength) {
int valueLength = deterministicRandom()->randomInt(minValueLength, maxValueLength + 1);
return Value(std::string(valueLength, 'x'));
std::string ret(std::string(valueLength, 'x'));
// If encryption validation is enabled; slip "marker pattern" at random location in generated key
if (g_network->isSimulated() && g_simulator->dataAtRestPlaintextMarker.present() &&
valueLength > g_simulator->dataAtRestPlaintextMarker.get().size()) {
int len = valueLength - g_simulator->dataAtRestPlaintextMarker.get().size();
int idx = deterministicRandom()->randomInt(0, len);
memcpy(&ret[idx],
g_simulator->dataAtRestPlaintextMarker.get().c_str(),
g_simulator->dataAtRestPlaintextMarker.get().size());
//TraceEvent("ModifiedValue").suppressFor(5).detail("Value", ret);
}
return Value(ret);
}
// Generates a random value

View File

@ -84,9 +84,10 @@ struct AuthzSecurityWorkload : TestWorkload {
publicNonTenantRequestPositive, tLogReadNegative, keyLocationLeakNegative, bgLocationLeakNegative,
crossTenantBGLocPositive, crossTenantBGLocNegative, crossTenantBGReqPositive, crossTenantBGReqNegative,
crossTenantBGReadPositive, crossTenantBGReadNegative, crossTenantGetGranulesPositive,
crossTenantGetGranulesNegative;
crossTenantGetGranulesNegative, blobbifyNegative, unblobbifyNegative, listBlobNegative, verifyBlobNegative,
flushBlobNegative, purgeBlobNegative;
std::vector<std::function<Future<Void>(Database cx)>> testFunctions;
bool checkBlobGranules;
bool checkBlobGranules, checkBlobManagement;
AuthzSecurityWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), crossTenantGetPositive("CrossTenantGetPositive"),
@ -98,7 +99,10 @@ struct AuthzSecurityWorkload : TestWorkload {
crossTenantBGReqPositive("CrossTenantBGReqPositive"), crossTenantBGReqNegative("CrossTenantBGReqNegative"),
crossTenantBGReadPositive("CrossTenantBGReadPositive"), crossTenantBGReadNegative("CrossTenantBGReadNegative"),
crossTenantGetGranulesPositive("CrossTenantGetGranulesPositive"),
crossTenantGetGranulesNegative("CrossTenantGetGranulesNegative") {
crossTenantGetGranulesNegative("CrossTenantGetGranulesNegative"), blobbifyNegative("BlobbifyNegative"),
unblobbifyNegative("UnblobbifyNegative"), listBlobNegative("ListBlobNegative"),
verifyBlobNegative("VerifyBlobNegative"), flushBlobNegative("FlushBlobNegative"),
purgeBlobNegative("PurgeBlobNegative") {
testDuration = getOption(options, "testDuration"_sr, 10.0);
transactionsPerSecond = getOption(options, "transactionsPerSecond"_sr, 500.0) / clientCount;
actorCount = getOption(options, "actorsPerClient"_sr, transactionsPerSecond / 5);
@ -106,6 +110,9 @@ struct AuthzSecurityWorkload : TestWorkload {
anotherTenantName = getOption(options, "tenantB"_sr, "authzSecurityTestTenant"_sr);
tLogConfigKey = getOption(options, "tLogConfigKey"_sr, "TLogInterface"_sr);
checkBlobGranules = getOption(options, "checkBlobGranules"_sr, false);
checkBlobManagement =
checkBlobGranules && getOption(options, "checkBlobManagement"_sr, sharedRandomNumber % 2 == 0);
sharedRandomNumber /= 2;
ASSERT(g_network->isSimulated());
// make it comfortably longer than the timeout of the workload
@ -142,6 +149,14 @@ struct AuthzSecurityWorkload : TestWorkload {
return testCrossTenantGetGranulesDisallowed(this, cx, PositiveTestcase::False);
});
}
if (checkBlobManagement) {
testFunctions.push_back([this](Database cx) { return testBlobbifyDisallowed(this, cx); });
testFunctions.push_back([this](Database cx) { return testUnblobbifyDisallowed(this, cx); });
testFunctions.push_back([this](Database cx) { return testListBlobDisallowed(this, cx); });
testFunctions.push_back([this](Database cx) { return testVerifyBlobDisallowed(this, cx); });
testFunctions.push_back([this](Database cx) { return testFlushBlobDisallowed(this, cx); });
testFunctions.push_back([this](Database cx) { return testPurgeBlobDisallowed(this, cx); });
}
}
Future<Void> setup(Database const& cx) override {
@ -178,6 +193,11 @@ struct AuthzSecurityWorkload : TestWorkload {
crossTenantBGReadNegative.getValue() > 0 && crossTenantGetGranulesPositive.getValue() > 0 &&
crossTenantGetGranulesNegative.getValue() > 0;
}
if (checkBlobManagement) {
success &= blobbifyNegative.getValue() > 0 && unblobbifyNegative.getValue() > 0 &&
listBlobNegative.getValue() > 0 && verifyBlobNegative.getValue() > 0 &&
flushBlobNegative.getValue() > 0 && purgeBlobNegative.getValue() > 0;
}
return success;
}
@ -200,6 +220,14 @@ struct AuthzSecurityWorkload : TestWorkload {
m.push_back(crossTenantGetGranulesPositive.getMetric());
m.push_back(crossTenantGetGranulesNegative.getMetric());
}
if (checkBlobManagement) {
m.push_back(blobbifyNegative.getMetric());
m.push_back(unblobbifyNegative.getMetric());
m.push_back(listBlobNegative.getMetric());
m.push_back(verifyBlobNegative.getMetric());
m.push_back(flushBlobNegative.getMetric());
m.push_back(purgeBlobNegative.getMetric());
}
}
void setAuthToken(Transaction& tr, StringRef token) {
@ -776,6 +804,77 @@ struct AuthzSecurityWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> checkBlobManagementNegative(AuthzSecurityWorkload* self,
std::string opName,
Future<Void> op,
PerfIntCounter* counter) {
try {
wait(op);
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
if (e.code() == error_code_permission_denied) {
++(*counter);
} else {
TraceEvent(SevError, "AuthzSecurityBlobManagementAllowed")
.detail("OpType", opName)
.detail("TenantId", self->tenant->id());
}
}
return Void();
}
ACTOR static Future<Void> testBlobbifyDisallowed(AuthzSecurityWorkload* self, Database cx) {
Future<Void> op;
if (deterministicRandom()->coinflip()) {
op = success(cx->blobbifyRange(normalKeys, self->tenant));
} else {
op = success(cx->blobbifyRangeBlocking(normalKeys, self->tenant));
}
wait(checkBlobManagementNegative(self, "Blobbify", op, &self->blobbifyNegative));
return Void();
}
ACTOR static Future<Void> testUnblobbifyDisallowed(AuthzSecurityWorkload* self, Database cx) {
wait(checkBlobManagementNegative(
self, "Unblobbify", success(cx->unblobbifyRange(normalKeys, self->tenant)), &self->unblobbifyNegative));
return Void();
}
ACTOR static Future<Void> testListBlobDisallowed(AuthzSecurityWorkload* self, Database cx) {
wait(checkBlobManagementNegative(self,
"ListBlob",
success(cx->listBlobbifiedRanges(normalKeys, 1000, self->tenant)),
&self->listBlobNegative));
return Void();
}
ACTOR static Future<Void> testVerifyBlobDisallowed(AuthzSecurityWorkload* self, Database cx) {
wait(checkBlobManagementNegative(
self, "VerifyBlob", success(cx->verifyBlobRange(normalKeys, {}, self->tenant)), &self->verifyBlobNegative));
return Void();
}
ACTOR static Future<Void> testFlushBlobDisallowed(AuthzSecurityWorkload* self, Database cx) {
wait(checkBlobManagementNegative(
self,
"FlushBlob",
success(cx->flushBlobRange(normalKeys, {}, deterministicRandom()->coinflip(), self->tenant)),
&self->flushBlobNegative));
return Void();
}
ACTOR static Future<Void> testPurgeBlobDisallowed(AuthzSecurityWorkload* self, Database cx) {
wait(checkBlobManagementNegative(
self,
"PurgeBlob",
success(cx->purgeBlobGranules(normalKeys, 1, self->tenant, deterministicRandom()->coinflip())),
&self->purgeBlobNegative));
return Void();
}
ACTOR static Future<Void> runTestClient(AuthzSecurityWorkload* self, Database cx) {
state double lastTime = now();
state double delay = self->actorCount / self->transactionsPerSecond;

View File

@ -50,7 +50,7 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
double testDuration;
MetaclusterManagementConcurrencyWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
testDuration = getOption(options, "testDuration"_sr, 120.0);
testDuration = getOption(options, "testDuration"_sr, 90.0);
}
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
@ -83,11 +83,6 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
ClusterName chooseClusterName() { return dataDbIndex[deterministicRandom()->randomInt(0, dataDbIndex.size())]; }
static Future<Void> verifyClusterRecovered(Database db) {
return success(runTransaction(db.getReference(),
[](Reference<ReadYourWritesTransaction> tr) { return tr->getReadVersion(); }));
}
ACTOR static Future<Void> registerCluster(MetaclusterManagementConcurrencyWorkload* self) {
state ClusterName clusterName = self->chooseClusterName();
state Database dataDb = self->dataDbs[clusterName];
@ -120,22 +115,27 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
.error(e)
.detail("ClusterName", clusterName);
if (e.code() != error_code_cluster_already_exists && e.code() != error_code_cluster_not_empty &&
e.code() != error_code_cluster_already_registered && e.code() != error_code_cluster_removed) {
e.code() != error_code_cluster_already_registered && e.code() != error_code_cluster_removed &&
e.code() != error_code_cluster_restoring) {
TraceEvent(SevError, "MetaclusterManagementConcurrencyRegisterClusterFailure", debugId)
.error(e)
.detail("ClusterName", clusterName);
ASSERT(false);
}
wait(success(errorOr(MetaclusterAPI::removeCluster(
self->managementDb, clusterName, ClusterType::METACLUSTER_MANAGEMENT, ForceRemove::True))));
return Void();
}
wait(verifyClusterRecovered(dataDb));
return Void();
}
ACTOR static Future<Void> removeCluster(MetaclusterManagementConcurrencyWorkload* self) {
state ClusterName clusterName = self->chooseClusterName();
state Database dataDb = self->dataDbs[clusterName];
state ForceRemove forceRemove(deterministicRandom()->coinflip());
state UID debugId = deterministicRandom()->randomUniqueID();
@ -166,7 +166,6 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
return Void();
}
wait(verifyClusterRecovered(dataDb));
return Void();
}
@ -307,7 +306,7 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
.detail("NewNumTenantGroups", newNumTenantGroups.orDefault(-1))
.detail("NewConnectionString", connectionString.map(&ClusterConnectionString::toString).orDefault(""));
if (e.code() != error_code_cluster_not_found && e.code() != error_code_cluster_removed &&
e.code() != error_code_invalid_metacluster_operation) {
e.code() != error_code_invalid_metacluster_operation && e.code() != error_code_cluster_restoring) {
TraceEvent(SevError, "MetaclusterManagementConcurrencyConfigureClusterFailure")
.error(e)
.detail("ClusterName", clusterName);
@ -318,13 +317,104 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> restoreCluster(MetaclusterManagementConcurrencyWorkload* self) {
state ClusterName clusterName = self->chooseClusterName();
state Database db = self->dataDbs[clusterName];
state ApplyManagementClusterUpdates applyManagementClusterUpdates(deterministicRandom()->coinflip());
state ForceJoin forceJoin(deterministicRandom()->coinflip());
state bool removeFirst = !applyManagementClusterUpdates && deterministicRandom()->coinflip();
state UID debugId = deterministicRandom()->randomUniqueID();
try {
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestore", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
if (removeFirst) {
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreRemoveDataCluster", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
wait(success(MetaclusterAPI::removeCluster(
self->managementDb, clusterName, ClusterType::METACLUSTER_MANAGEMENT, ForceRemove::True)));
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreRemovedDataCluster", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
}
state std::vector<std::string> messages;
if (deterministicRandom()->coinflip()) {
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreDryRun", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
wait(MetaclusterAPI::restoreCluster(self->managementDb,
clusterName,
db->getConnectionRecord()->getConnectionString(),
applyManagementClusterUpdates,
RestoreDryRun::True,
forceJoin,
&messages));
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreDryRunDone", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
messages.clear();
}
wait(MetaclusterAPI::restoreCluster(self->managementDb,
clusterName,
db->getConnectionRecord()->getConnectionString(),
applyManagementClusterUpdates,
RestoreDryRun::False,
forceJoin,
&messages));
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreComplete", debugId)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
} catch (Error& e) {
TraceEvent(SevDebug, "MetaclusterManagementConcurrencyRestoreError", debugId)
.error(e)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
if (e.code() == error_code_cluster_already_registered) {
ASSERT(!forceJoin || applyManagementClusterUpdates == ApplyManagementClusterUpdates::False);
} else if (applyManagementClusterUpdates == ApplyManagementClusterUpdates::True &&
e.code() == error_code_invalid_data_cluster) {
// Restoring a data cluster can fail if the cluster is not actually a data cluster registered with
// the metacluster
} else if (applyManagementClusterUpdates == ApplyManagementClusterUpdates::False &&
(e.code() == error_code_cluster_already_exists || e.code() == error_code_tenant_already_exists ||
e.code() == error_code_invalid_tenant_configuration)) {
// Repopulating a management cluster can fail if the cluster is already in the metacluster
} else if (e.code() != error_code_cluster_not_found && e.code() != error_code_cluster_removed &&
e.code() != error_code_conflicting_restore) {
TraceEvent(SevError, "MetaclusterManagementConcurrencyRestoreFailure", debugId)
.error(e)
.detail("ClusterName", clusterName)
.detail("ApplyManagementClusterUpdates", applyManagementClusterUpdates);
ASSERT(false);
}
wait(success(errorOr(MetaclusterAPI::removeCluster(
self->managementDb, clusterName, ClusterType::METACLUSTER_MANAGEMENT, ForceRemove::True))));
}
return Void();
}
Future<Void> start(Database const& cx) override { return _start(cx, this); }
ACTOR static Future<Void> _start(Database cx, MetaclusterManagementConcurrencyWorkload* self) {
state double start = now();
// Run a random sequence of metacluster management operations for the duration of the test
while (now() < start + self->testDuration) {
state int operation = deterministicRandom()->randomInt(0, 5);
state int operation = deterministicRandom()->randomInt(0, 6);
if (operation == 0) {
wait(registerCluster(self));
} else if (operation == 1) {
@ -335,6 +425,8 @@ struct MetaclusterManagementConcurrencyWorkload : TestWorkload {
wait(getCluster(self));
} else if (operation == 4) {
wait(configureCluster(self));
} else if (operation == 5) {
wait(restoreCluster(self));
}
}

View File

@ -85,9 +85,9 @@ struct TenantLock : TestWorkload {
UID lockID) {
state Reference<Tenant> tenant = makeReference<Tenant>(db, name);
state ReadYourWritesTransaction tr(db);
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(tenant->ready());
wait(TenantAPI::changeLockState(&tr, tenant->id(), desiredState, lockID));
wait(tr.commit());

View File

@ -25,6 +25,7 @@
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/MetaclusterManagement.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/Tenant.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "fdbrpc/simulator.h"
@ -214,7 +215,8 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
.error(e)
.detail("TenantName", entry.tenantName)
.detail("TenantGroup", entry.tenantGroup);
if (e.code() == error_code_metacluster_no_capacity || e.code() == error_code_cluster_removed) {
if (e.code() == error_code_metacluster_no_capacity || e.code() == error_code_cluster_removed ||
e.code() == error_code_cluster_restoring) {
ASSERT(self->useMetacluster && !self->createMetacluster);
} else if (e.code() == error_code_tenant_removed) {
ASSERT(self->useMetacluster);
@ -254,7 +256,7 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
TraceEvent(SevDebug, "TenantManagementConcurrencyDeleteTenantError", debugId)
.error(e)
.detail("TenantName", tenant);
if (e.code() == error_code_cluster_removed) {
if (e.code() == error_code_cluster_removed || e.code() == error_code_cluster_restoring) {
ASSERT(self->useMetacluster && !self->createMetacluster);
} else if (e.code() != error_code_tenant_not_found) {
TraceEvent(SevError, "TenantManagementConcurrencyDeleteTenantFailure", debugId)
@ -326,7 +328,7 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
.error(e)
.detail("TenantName", tenant)
.detail("TenantGroup", tenantGroup);
if (e.code() == error_code_cluster_removed) {
if (e.code() == error_code_cluster_removed || e.code() == error_code_cluster_restoring) {
ASSERT(self->useMetacluster && !self->createMetacluster);
} else if (e.code() == error_code_cluster_no_capacity ||
e.code() == error_code_invalid_tenant_configuration) {
@ -371,7 +373,7 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
.error(e)
.detail("OldTenantName", oldTenant)
.detail("NewTenantName", newTenant);
if (e.code() == error_code_cluster_removed) {
if (e.code() == error_code_cluster_removed || e.code() == error_code_cluster_restoring) {
ASSERT(self->useMetacluster && !self->createMetacluster);
} else if (e.code() == error_code_invalid_tenant_state || e.code() == error_code_tenant_removed ||
e.code() == error_code_cluster_no_capacity) {
@ -387,6 +389,89 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
}
}
ACTOR static Future<Void> changeLockStateImpl(TenantManagementConcurrencyWorkload* self,
TenantName tenant,
TenantAPI::TenantLockState lockState,
bool useExistingId) {
state UID lockId;
if (self->useMetacluster) {
MetaclusterTenantMapEntry entry = wait(MetaclusterAPI::getTenant(self->mvDb, tenant));
if (useExistingId && entry.tenantLockId.present()) {
lockId = entry.tenantLockId.get();
} else {
lockId = deterministicRandom()->randomUniqueID();
}
wait(MetaclusterAPI::changeTenantLockState(self->mvDb, tenant, lockState, lockId));
} else {
state Reference<ReadYourWritesTransaction> tr = self->dataDb->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
TenantMapEntry entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
if (useExistingId && entry.tenantLockId.present()) {
lockId = entry.tenantLockId.get();
} else {
lockId = deterministicRandom()->randomUniqueID();
}
wait(TenantAPI::changeLockState(tr, entry.id, lockState, lockId));
wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1)));
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
return Void();
}
ACTOR static Future<Void> changeLockState(TenantManagementConcurrencyWorkload* self) {
state TenantName tenant = self->chooseTenantName();
state TenantAPI::TenantLockState lockState = (TenantAPI::TenantLockState)deterministicRandom()->randomInt(0, 3);
state bool useExistingId = deterministicRandom()->coinflip();
state UID debugId = deterministicRandom()->randomUniqueID();
try {
loop {
TraceEvent(SevDebug, "TenantManagementConcurrencyChangingTenantLockState", debugId)
.detail("TenantName", tenant)
.detail("TenantLockState", TenantAPI::tenantLockStateToString(lockState))
.detail("UseExistingId", useExistingId);
Optional<Void> result = wait(timeout(changeLockStateImpl(self, tenant, lockState, useExistingId), 30));
if (result.present()) {
TraceEvent(SevDebug, "TenantManagementConcurrencyChangedTenantLockState", debugId)
.detail("TenantName", tenant)
.detail("TenantLockState", TenantAPI::tenantLockStateToString(lockState))
.detail("UseExistingId", useExistingId);
break;
}
}
return Void();
} catch (Error& e) {
TraceEvent(SevDebug, "TenantManagementConcurrencyChangeLockStateError", debugId)
.error(e)
.detail("TenantName", tenant)
.detail("TenantLockState", TenantAPI::tenantLockStateToString(lockState))
.detail("UseExistingId", useExistingId);
if (e.code() == error_code_cluster_removed) {
ASSERT(self->useMetacluster && !self->createMetacluster);
} else if (e.code() != error_code_tenant_not_found && e.code() != error_code_tenant_locked) {
TraceEvent(SevError, "TenantManagementConcurrencyChangeLockStateFailure", debugId)
.error(e)
.detail("TenantName", tenant)
.detail("TenantLockState", TenantAPI::tenantLockStateToString(lockState))
.detail("UseExistingId", useExistingId);
ASSERT(false);
}
return Void();
}
}
Future<Void> start(Database const& cx) override { return _start(cx, this); }
ACTOR static Future<Void> _start(Database cx, TenantManagementConcurrencyWorkload* self) {
state double start = now();
@ -402,6 +487,8 @@ struct TenantManagementConcurrencyWorkload : TestWorkload {
wait(configureTenant(self));
} else if (operation == 3) {
wait(renameTenant(self));
} else if (operation == 4) {
wait(changeLockState(self));
}
}

View File

@ -53,6 +53,8 @@ struct TenantManagementWorkload : TestWorkload {
struct TenantTestData {
Reference<Tenant> tenant;
Optional<TenantGroupName> tenantGroup;
TenantAPI::TenantLockState lockState = TenantAPI::TenantLockState::UNLOCKED;
Optional<UID> lockId;
bool empty;
TenantTestData() : empty(true) {}
@ -672,9 +674,11 @@ struct TenantManagementWorkload : TestWorkload {
} else {
ASSERT(tenantsToCreate.size() == 1);
TraceEvent(SevError, "CreateTenantFailure")
.error(e)
.errorUnsuppressed(e)
.detail("TenantName", tenantsToCreate.begin()->first);
ASSERT(false);
}
return Void();
}
@ -684,9 +688,11 @@ struct TenantManagementWorkload : TestWorkload {
wait(tr->onError(e));
} catch (Error& e) {
for (auto [tenant, _] : tenantsToCreate) {
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
TraceEvent(SevError, "CreateTenantFailure")
.errorUnsuppressed(e)
.detail("TenantName", tenant);
}
return Void();
ASSERT(false);
}
}
}
@ -722,7 +728,7 @@ struct TenantManagementWorkload : TestWorkload {
}
}
ACTOR static Future<Void> clearTenantData(TenantManagementWorkload* self, TenantName tenantName) {
ACTOR static Future<bool> clearTenantData(TenantManagementWorkload* self, TenantName tenantName) {
state Transaction clearTr(self->dataDb, self->createdTenants[tenantName].tenant);
loop {
try {
@ -730,9 +736,14 @@ struct TenantManagementWorkload : TestWorkload {
wait(clearTr.commit());
auto itr = self->createdTenants.find(tenantName);
ASSERT(itr != self->createdTenants.end());
ASSERT_EQ(itr->second.lockState, TenantAPI::TenantLockState::UNLOCKED);
itr->second.empty = true;
return Void();
return true;
} catch (Error& e) {
if (e.code() == error_code_tenant_locked) {
ASSERT_NE(self->createdTenants[tenantName].lockState, TenantAPI::TenantLockState::UNLOCKED);
return false;
}
wait(clearTr.onError(e));
}
}
@ -841,18 +852,20 @@ struct TenantManagementWorkload : TestWorkload {
if (alreadyExists || endTenant.present()) {
for (tenantItr = tenants.begin(); tenantItr != tenants.end(); ++tenantItr) {
// For most tenants, we will delete the contents and make them empty
state bool cleared = false;
if (deterministicRandom()->random01() < 0.9) {
wait(clearTenantData(self, tenantItr->first));
wait(store(cleared, clearTenantData(self, tenantItr->first)));
// watch the tenant to be deleted
if (watchTenantCheck) {
if (cleared && watchTenantCheck) {
watchFutures.emplace_back(
tenantItr->first,
errorOr(watchTenant(self, self->createdTenants[tenantItr->first].tenant)));
}
}
// Otherwise, we will just report the current emptiness of the tenant
else {
if (!cleared) {
auto itr = self->createdTenants.find(tenantItr->first);
ASSERT(itr != self->createdTenants.end());
isEmpty = isEmpty && itr->second.empty;
@ -861,9 +874,10 @@ struct TenantManagementWorkload : TestWorkload {
}
} catch (Error& e) {
TraceEvent(SevError, "DeleteTenantFailure")
.error(e)
.errorUnsuppressed(e)
.detail("TenantName", beginTenant)
.detail("EndTenant", endTenant);
ASSERT(false);
return Void();
}
@ -1013,9 +1027,10 @@ struct TenantManagementWorkload : TestWorkload {
ASSERT(!existedAtStart && !endTenant.present());
} else {
TraceEvent(SevError, "DeleteTenantFailure")
.error(e)
.errorUnsuppressed(e)
.detail("TenantName", beginTenant)
.detail("EndTenant", endTenant);
ASSERT(false);
}
return Void();
}
@ -1026,10 +1041,10 @@ struct TenantManagementWorkload : TestWorkload {
wait(tr->onError(e));
} catch (Error& e) {
TraceEvent(SevError, "DeleteTenantFailure")
.error(e)
.errorUnsuppressed(e)
.detail("TenantName", beginTenant)
.detail("EndTenant", endTenant);
return Void();
ASSERT(false);
}
}
}
@ -1043,6 +1058,8 @@ struct TenantManagementWorkload : TestWorkload {
state Transaction tr(self->dataDb, self->createdTenants[tenantName].tenant);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_LOCK_AWARE);
// We only every store a single key in each tenant. Therefore we expect a range read of the entire
// tenant to return either 0 or 1 keys, depending on whether that key has been set.
state RangeResult result = wait(tr.getRange(KeyRangeRef(""_sr, "\xff"_sr), 2));
@ -1191,8 +1208,8 @@ struct TenantManagementWorkload : TestWorkload {
}
if (!retry) {
TraceEvent(SevError, "GetTenantFailure").error(error).detail("TenantName", tenant);
return Void();
TraceEvent(SevError, "GetTenantFailure").errorUnsuppressed(error).detail("TenantName", tenant);
ASSERT(false);
}
}
}
@ -1294,11 +1311,11 @@ struct TenantManagementWorkload : TestWorkload {
if (!retry) {
TraceEvent(SevError, "ListTenantFailure")
.error(error)
.errorUnsuppressed(error)
.detail("BeginTenant", beginTenant)
.detail("EndTenant", endTenant);
return Void();
ASSERT(false);
}
}
}
@ -1322,6 +1339,7 @@ struct TenantManagementWorkload : TestWorkload {
if (!tData.empty) {
loop {
try {
insertTr.setOption(FDBTransactionOptions::LOCK_AWARE);
insertTr.set(self->keyName, newTenantName);
wait(insertTr.commit());
break;
@ -1469,9 +1487,9 @@ struct TenantManagementWorkload : TestWorkload {
wait(tr->onError(e));
} catch (Error& e) {
TraceEvent(SevError, "RenameTenantFailure")
.error(e)
.errorUnsuppressed(e)
.detail("TenantRenames", describe(tenantRenames));
return Void();
ASSERT(false);
}
}
}
@ -1595,6 +1613,7 @@ struct TenantManagementWorkload : TestWorkload {
ASSERT(!hasSystemTenantGroup);
ASSERT(!specialKeysUseInvalidTuple);
ASSERT(!assignToDifferentCluster);
ASSERT_EQ(operationType == OperationType::METACLUSTER, self->useMetacluster);
Versionstamp currentVersionstamp = wait(getLastTenantModification(self, operationType));
if (configurationChanging) {
ASSERT_GT(currentVersionstamp.version, originalReadVersion);
@ -1637,8 +1656,10 @@ struct TenantManagementWorkload : TestWorkload {
try {
wait(tr->onError(e));
} catch (Error&) {
TraceEvent(SevError, "ConfigureTenantFailure").error(error).detail("TenantName", tenant);
return Void();
TraceEvent(SevError, "ConfigureTenantFailure")
.errorUnsuppressed(error)
.detail("TenantName", tenant);
ASSERT(false);
}
}
}
@ -1705,8 +1726,10 @@ struct TenantManagementWorkload : TestWorkload {
}
if (!retry) {
TraceEvent(SevError, "GetTenantGroupFailure").error(error).detail("TenantGroupName", tenantGroup);
return Void();
TraceEvent(SevError, "GetTenantGroupFailure")
.errorUnsuppressed(error)
.detail("TenantGroupName", tenantGroup);
ASSERT(false);
}
}
}
@ -1808,53 +1831,189 @@ struct TenantManagementWorkload : TestWorkload {
if (!retry) {
TraceEvent(SevError, "ListTenantGroupFailure")
.error(error)
.errorUnsuppressed(error)
.detail("BeginTenant", beginTenantGroup)
.detail("EndTenant", endTenantGroup);
return Void();
ASSERT(false);
}
}
}
}
ACTOR static Future<Void> readTenantKey(TenantManagementWorkload* self) {
ACTOR static Future<Void> operateOnTenantKey(TenantManagementWorkload* self) {
if (self->allTestTenants.size() == 0) {
return Void();
}
state Reference<Tenant> tenant = deterministicRandom()->randomChoice(self->allTestTenants);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb, tenant);
state TenantName tName = tenant->name.get();
state bool tenantPresent = false;
state TenantTestData tData = TenantTestData();
int mode = deterministicRandom()->randomInt(0, 3);
state bool readKey = mode == 0 || mode == 2;
state bool writeKey = mode == 1 || mode == 2;
state bool clearKey = deterministicRandom()->coinflip();
state bool lockAware = deterministicRandom()->coinflip();
state TenantAPI::TenantLockState lockState = TenantAPI::TenantLockState::UNLOCKED;
auto itr = self->createdTenants.find(tName);
if (itr != self->createdTenants.end() && itr->second.tenant->id() == tenant->id()) {
tenantPresent = true;
tData = itr->second;
lockState = itr->second.lockState;
}
state bool keyPresent = tenantPresent && !tData.empty;
state bool maybeCommitted = false;
loop {
try {
Optional<Value> val = wait(tr->get(self->keyName));
if (val.present()) {
ASSERT(keyPresent && val.get() == tName);
} else {
ASSERT(tenantPresent && tData.empty);
if (lockAware) {
tr->setOption(writeKey || deterministicRandom()->coinflip()
? FDBTransactionOptions::LOCK_AWARE
: FDBTransactionOptions::READ_LOCK_AWARE);
}
if (readKey) {
Optional<Value> val = wait(tr->get(self->keyName));
if (val.present()) {
ASSERT((keyPresent && val.get() == tName) || (maybeCommitted && writeKey && !clearKey));
} else {
ASSERT((tenantPresent && tData.empty) || (maybeCommitted && writeKey && clearKey));
}
ASSERT(lockAware || lockState != TenantAPI::TenantLockState::LOCKED);
}
if (writeKey) {
if (clearKey) {
tr->clear(self->keyName);
} else {
tr->set(self->keyName, tName);
}
wait(tr->commit());
ASSERT(lockAware || lockState == TenantAPI::TenantLockState::UNLOCKED);
self->createdTenants[tName].empty = clearKey;
}
break;
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_tenant_not_found) {
ASSERT(!tenantPresent);
CODE_PROBE(true, "Attempted to read key from non-existent tenant");
return Void();
} else if (err.code() == error_code_tenant_locked) {
ASSERT(!lockAware);
if (!writeKey) {
ASSERT_EQ(lockState, TenantAPI::TenantLockState::LOCKED);
} else {
ASSERT_NE(lockState, TenantAPI::TenantLockState::UNLOCKED);
}
return Void();
}
wait(tr->onError(err));
try {
maybeCommitted = maybeCommitted || err.code() == error_code_commit_unknown_result;
wait(tr->onError(err));
} catch (Error& error) {
TraceEvent(SevError, "ReadKeyFailure").errorUnsuppressed(error).detail("TenantName", tenant);
ASSERT(false);
}
}
}
return Void();
}
ACTOR static Future<Void> changeLockStateImpl(Reference<ReadYourWritesTransaction> tr,
TenantName tenant,
TenantAPI::TenantLockState lockState,
UID lockId,
OperationType operationType,
TenantManagementWorkload* self) {
if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
TenantMapEntry entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
wait(TenantAPI::changeLockState(tr, entry.id, lockState, lockId));
wait(tr->commit());
} else if (operationType == OperationType::METACLUSTER) {
wait(MetaclusterAPI::changeTenantLockState(self->mvDb, tenant, lockState, lockId));
} else {
// We don't have a special keys or database variant of this function
ASSERT(false);
}
return Void();
}
ACTOR static Future<Void> changeLockState(TenantManagementWorkload* self) {
state OperationType operationType =
deterministicRandom()->coinflip() ? OperationType::METACLUSTER : OperationType::MANAGEMENT_TRANSACTION;
state TenantName tenant = self->chooseTenantName(true);
auto itr = self->createdTenants.find(tenant);
state bool exists = itr != self->createdTenants.end();
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
state TenantAPI::TenantLockState lockState = TenantAPI::TenantLockState(deterministicRandom()->randomInt(0, 3));
state UID lockId = deterministicRandom()->coinflip() || !exists || !itr->second.lockId.present()
? deterministicRandom()->randomUniqueID()
: itr->second.lockId.get();
state bool lockStateChanging = exists && itr->second.lockState != lockState;
state bool legalLockChange =
exists && (itr->second.lockId == lockId || (itr->second.lockState == TenantAPI::TenantLockState::UNLOCKED));
state Version originalReadVersion = wait(self->getLatestReadVersion(self, operationType));
loop {
try {
wait(changeLockStateImpl(tr, tenant, lockState, lockId, operationType, self));
ASSERT(exists);
ASSERT(legalLockChange);
ASSERT_EQ(operationType == OperationType::METACLUSTER, self->useMetacluster);
auto itr = self->createdTenants.find(tenant);
ASSERT(itr != self->createdTenants.end());
itr->second.lockState = lockState;
if (lockState != TenantAPI::TenantLockState::UNLOCKED) {
itr->second.lockId = lockId;
}
Versionstamp currentVersionstamp = wait(getLastTenantModification(self, operationType));
if (lockStateChanging) {
ASSERT_GT(currentVersionstamp.version, originalReadVersion);
}
return Void();
} catch (Error& e) {
state Error error = e;
if (e.code() == error_code_tenant_not_found) {
ASSERT(!exists);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT_NE(operationType == OperationType::METACLUSTER, self->useMetacluster);
return Void();
} else if (e.code() == error_code_tenant_locked) {
ASSERT(!legalLockChange);
return Void();
}
try {
wait(tr->onError(e));
} catch (Error&) {
TraceEvent(SevError, "ConfigureTenantFailure")
.errorUnsuppressed(error)
.detail("TenantName", tenant);
ASSERT(false);
}
}
}
}
Future<Void> start(Database const& cx) override {
if (clientId == 0 || !singleClient) {
return _start(cx, this);
@ -1879,7 +2038,7 @@ struct TenantManagementWorkload : TestWorkload {
// Run a random sequence of tenant management operations for the duration of the test
while (now() < start + self->testDuration) {
state int operation = deterministicRandom()->randomInt(0, 9);
state int operation = deterministicRandom()->randomInt(0, 10);
state Future<Void> logger = delayedTrace(operation);
if (operation == 0) {
wait(createTenant(self));
@ -1898,7 +2057,9 @@ struct TenantManagementWorkload : TestWorkload {
} else if (operation == 7) {
wait(listTenantGroups(self));
} else if (operation == 8) {
wait(readTenantKey(self));
wait(operateOnTenantKey(self));
} else if (operation == 9) {
wait(changeLockState(self));
}
}

View File

@ -27,6 +27,8 @@
#include <openssl/err.h>
#include <openssl/rand.h>
#include <fmt/format.h>
#include "flow/DeterministicRandom.h"
#include "flow/Error.h"
#include "flow/Hostname.h"
@ -130,7 +132,7 @@ Reference<IRandom> nondeterministicRandom() {
}
std::string UID::toString() const {
return format("%016llx%016llx", part[0], part[1]);
return fmt::format("{:016x}{:016x}", part[0], part[1]);
}
UID UID::fromString(std::string const& s) {