Merge branch 'main' of github.com:apple/foundationdb into debug2

This commit is contained in:
Ankita Kejriwal 2023-02-08 09:04:17 -08:00
commit a5257144c6
16 changed files with 430 additions and 86 deletions

View File

@ -445,6 +445,7 @@ elseif(NOT WIN32 AND NOT APPLE) # Linux Only
--target ${CMAKE_SYSTEM_PROCESSOR}
--outdir ${SHIM_LIB_OUTPUT_DIR}
--dlopen-callback=fdb_shim_dlopen_callback
--symbol-filter='^fdb_.*$$'
$<TARGET_FILE:fdb_c>
DEPENDS ${IMPLIBSO_SRC} fdb_c
COMMENT "Generating source code for C shim library")

View File

@ -358,6 +358,9 @@ Examples:
parser.add_argument(
"--symbol-list", help="Path to file with symbols that should be present in wrapper " "(all by default)"
)
parser.add_argument(
"--symbol-filter", help="Regular expression filter on symbols to be wrapped", default=""
)
parser.add_argument("--symbol-prefix", metavar="PFX", help="Prefix wrapper symbols with PFX", default="")
parser.add_argument("-q", "--quiet", help="Do not print progress info", action="store_true")
parser.add_argument("--outdir", "-o", help="Path to create wrapper at", default="./")
@ -410,6 +413,9 @@ Examples:
)
syms = list(filter(is_exported, collect_syms(input_name)))
if args.symbol_filter:
pattern = re.compile(args.symbol_filter)
syms = [ s for s in syms if pattern.match(s["Name"]) ]
def is_data_symbol(s):
return (

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 168 KiB

56
design/storage-quota.md Normal file
View File

@ -0,0 +1,56 @@
## Storage Quota
When the `STORAGE_QUOTA_ENABLED` knob is turned on, the commit proxy will reject transactions from tenant groups whose current storage usage is greater than its storage quota. This page describes the feature in more detail.
### Overview
The data distributor has monitors that track the mapping from tenant groups to tenants, the storage quotas for the tenant groups, and the current storage bytes used by each tenant. It puts together this information to create a list of tenants that belong to groups that are currently over storage quota. The commit proxy periodically queries for this block list and keeps a local copy. When the commit proxy receives a commit request for a transaction, it rejects the request if the tenant for this transaction is in the block list.
This diagram shows the flow of information:
![Diagram](storage-quota-diagram.svg)
### Storage Usage
The storage used by tenants is estimated using the `getEstimatedRangeSizeBytes()` function. This function aggregates the storage estimates from the storage servers that own shards for the given range. On a given storage server, the estimate is based on a byte sample, and the probability that a kv pair is sampled is:
`(size(key) + size(value)) / ((size(key) + 100) * 250)`.
### Tuple Layer
TODO: Update after https://github.com/apple/foundationdb/pull/9241
### fdbcli
The easiest way for an external client to interact with tag quotas is through `fdbcli`. To get the quota of a particular tenant group, run the following command:
```
fdbcli> quota get <tenant_group> storage
```
To set the quota, run:
```
fdbcli> quota set <tenant_group> storage <bytes>
```
To clear all quotas (storage as well as reserved and total throughput quotas), run:
```
fdbcli> quota clear <tenant_group>
```
### Bypass Quota Enforcement
The quota enforcement can be bypassed for a given transaction (for example, to allow a transaction that clears data) by using `Transaction::setOption()` to set `FDBTransactionOptions::BYPASS_STORAGE_QUOTA`. In this case, the commit proxy will not reject the transaction even if the tenant group is over its quota.
### Server Knobs
The following knobs control the interval at which various monitors on the data distributor query for information. Adjusting these knobs will affect the overall time between a tenant hitting their storage quota and their transactions being rejected.
* `TENANT_CACHE_LIST_REFRESH_INTERVAL`: How often the list of tenants is refreshed on the data distributor,
* `TENANT_CACHE_STORAGE_USAGE_REFRESH_INTERVAL`: How often the storage bytes used by each tenant is refreshed on the data distributor,
* `TENANT_CACHE_STORAGE_QUOTA_REFRESH_INTERVAL`: How often the storage quota allocated to each tenant is refreshed on the data distributor, and
* `CP_FETCH_TENANTS_OVER_STORAGE_QUOTA_INTERVAL`: How often the commit proxies send requests to the data distributor to fetch the list of tenants over storage quota
### Testing
The `StorageQuotaTest.toml` test provides a simple end-to-end test for this feature. Quotas are set using the internal storage quota API in the `StorageQuota` workload. This is run after the `CreateTenant` workload, which creates the tenants.
In addition to the simulation test, the feature has also been tested on real FDB clusters, using `fdbcli` to set quotas, and a data generator to populate data in the tenant.
### Visibility
The data distributor produces a trace event every `SERVER_KNOBS->TENANT_CACHE_STORAGE_USAGE_TRACE_INTERVAL` seconds for each tenant group. This trace event has the type `StorageUsageUpdated`, and logs the quota as well as the current storage usage (in bytes) for the tenant group.

View File

@ -23,6 +23,7 @@
#include "fdbclient/TenantManagement.actor.h"
#include "fdbrpc/TenantInfo.h"
#include "fdbrpc/simulator.h"
#include "flow/EncryptUtils.h"
#include "flow/FastRef.h"
#include "fmt/format.h"
#include "fdbclient/BackupAgent.actor.h"
@ -564,11 +565,12 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
EncryptedRangeFileWriter(Database cx,
Arena* arena,
EncryptionAtRestMode encryptMode,
Optional<Reference<TenantEntryCache<Void>>> tenantCache,
Reference<IBackupFile> file = Reference<IBackupFile>(),
int blockSize = 0,
Options options = Options())
: cx(cx), arena(arena), file(file), encryptMode(encryptMode), blockSize(blockSize), blockEnd(0),
fileVersion(BACKUP_AGENT_ENCRYPTED_SNAPSHOT_FILE_VERSION), options(options) {
: cx(cx), arena(arena), file(file), encryptMode(encryptMode), tenantCache(tenantCache), blockSize(blockSize),
blockEnd(0), fileVersion(BACKUP_AGENT_ENCRYPTED_SNAPSHOT_FILE_VERSION), options(options) {
buffer = makeString(blockSize);
wPtr = mutateString(buffer);
}
@ -664,7 +666,8 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
}
ACTOR static Future<Void> updateEncryptionKeysCtx(EncryptedRangeFileWriter* self, KeyRef key) {
state EncryptCipherDomainId curDomainId = getEncryptionDomainDetails(key, self->encryptMode);
state EncryptCipherDomainId curDomainId =
wait(getEncryptionDomainDetails(key, self->encryptMode, self->tenantCache));
state Reference<AsyncVar<ClientDBInfo> const> dbInfo = self->cx->clientInfo;
// Get text and header cipher key
@ -703,7 +706,10 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
copyToBuffer(self, s->begin(), s->size());
}
static EncryptCipherDomainId getEncryptionDomainDetails(KeyRef key, EncryptionAtRestMode encryptMode) {
ACTOR static Future<EncryptCipherDomainId> getEncryptionDomainDetails(
KeyRef key,
EncryptionAtRestMode encryptMode,
Optional<Reference<TenantEntryCache<Void>>> tenantCache) {
if (isSystemKey(key)) {
return SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID;
}
@ -712,7 +718,17 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
}
// dealing with domain aware encryption so all keys should belong to a tenant
KeyRef tenantPrefix = KeyRef(key.begin(), TenantAPI::PREFIX_SIZE);
return TenantAPI::prefixToId(tenantPrefix);
state int64_t tenantId = TenantAPI::prefixToId(tenantPrefix);
// It's possible for the first and last key in a block (when writeKey is called) to not have a valid tenant
// prefix, since they mark the start and end of a range, in that case we denote them as having a default encrypt
// domain for the purpose of encrypting the block
if (tenantCache.present()) {
Optional<TenantEntryCachePayload<Void>> payload = wait(tenantCache.get()->getById(tenantId));
if (!payload.present()) {
return FDB_DEFAULT_ENCRYPT_DOMAIN_ID;
}
}
return tenantId;
}
// Handles the first block and internal blocks. Ends current block if needed.
@ -822,8 +838,10 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
if (self->lastKey.size() == 0 || k.size() == 0) {
return false;
}
state EncryptCipherDomainId curKeyDomainId = getEncryptionDomainDetails(k, self->encryptMode);
state EncryptCipherDomainId prevKeyDomainId = getEncryptionDomainDetails(self->lastKey, self->encryptMode);
state EncryptCipherDomainId curKeyDomainId =
wait(getEncryptionDomainDetails(k, self->encryptMode, self->tenantCache));
state EncryptCipherDomainId prevKeyDomainId =
wait(getEncryptionDomainDetails(self->lastKey, self->encryptMode, self->tenantCache));
if (curKeyDomainId != prevKeyDomainId) {
CODE_PROBE(true, "crossed tenant boundaries");
wait(handleTenantBondary(self, k, v, writeValue, curKeyDomainId));
@ -859,7 +877,6 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
(!self->cipherKeys.headerCipherKey.isValid() || !self->cipherKeys.textCipherKey.isValid())) {
wait(updateEncryptionKeysCtx(self, k));
}
// Need to account for extra "empty" value being written in the case of crossing tenant boundaries
int toWrite = sizeof(uint32_t) + k.size() + sizeof(uint32_t);
wait(newBlockIfNeeded(self, toWrite));
@ -889,6 +906,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
Arena* arena;
EncryptionAtRestMode encryptMode;
Reference<IBackupFile> file;
Optional<Reference<TenantEntryCache<Void>>> tenantCache;
int blockSize;
private:
@ -1049,16 +1067,17 @@ ACTOR static Future<Void> decodeKVPairs(StringRefReader* reader,
// make sure that all keys in a block belong to exactly one tenant,
// unless its the last key in which case it can be a truncated (different) tenant prefix
if (encryptedBlock && g_network && g_network->isSimulated()) {
if (encryptedBlock && g_network && g_network->isSimulated() &&
!isReservedEncryptDomain(encryptHeader.get().cipherTextDetails.encryptDomainId)) {
ASSERT(encryptHeader.present());
state KeyRef curKey = KeyRef(k, kLen);
if (!prevDomainId.present()) {
EncryptCipherDomainId domainId =
EncryptedRangeFileWriter::getEncryptionDomainDetails(prevKey, encryptMode);
wait(EncryptedRangeFileWriter::getEncryptionDomainDetails(prevKey, encryptMode, tenantCache));
prevDomainId = domainId;
}
EncryptCipherDomainId curDomainId =
EncryptedRangeFileWriter::getEncryptionDomainDetails(curKey, encryptMode);
wait(EncryptedRangeFileWriter::getEncryptionDomainDetails(curKey, encryptMode, tenantCache));
if (!curKey.empty() && !prevKey.empty() && prevDomainId.get() != curDomainId) {
ASSERT(!done);
if (curDomainId != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID && curDomainId != FDB_DEFAULT_ENCRYPT_DOMAIN_ID) {
@ -1762,6 +1781,14 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
state BackupConfig backup(task);
state Arena arena;
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state EncryptionAtRestMode encryptMode = config.encryptionAtRestMode;
state Optional<Reference<TenantEntryCache<Void>>> tenantCache;
if (encryptMode.mode == EncryptionAtRestMode::DOMAIN_AWARE) {
tenantCache = makeReference<TenantEntryCache<Void>>(cx, TenantEntryCacheRefreshMode::WATCH);
wait(tenantCache.get()->init());
}
// Don't need to check keepRunning(task) here because we will do that while finishing each output file, but
// if bc is false then clearly the backup is no longer in progress
state Reference<IBackupContainer> bc = wait(backup.backupContainer().getD(cx.getReference()));
@ -1829,8 +1856,6 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
state Version snapshotBeginVersion;
state int64_t snapshotRangeFileCount;
DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state EncryptionAtRestMode encryptMode = config.encryptionAtRestMode;
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
@ -1859,7 +1884,8 @@ struct BackupRangeTaskFunc : BackupTaskFuncBase {
// Initialize range file writer and write begin key
if (encryptMode.mode != EncryptionAtRestMode::DISABLED) {
CODE_PROBE(true, "using encrypted snapshot file writer");
rangeFile = std::make_unique<EncryptedRangeFileWriter>(cx, &arena, encryptMode, outFile, blockSize);
rangeFile = std::make_unique<EncryptedRangeFileWriter>(
cx, &arena, encryptMode, tenantCache, outFile, blockSize);
} else {
rangeFile = std::make_unique<RangeFileWriter>(outFile, blockSize);
}

View File

@ -173,6 +173,9 @@ struct TenantIdCodec {
static int64_t unpack(Standalone<StringRef> val) { return bigEndian64(*(int64_t*)val.begin()); }
static Optional<int64_t> lowerBound(Standalone<StringRef> val) {
if (val.startsWith("\xff"_sr)) {
return {};
}
if (val.size() == 8) {
return unpack(val);
} else if (val.size() > 8) {

View File

@ -559,7 +559,7 @@ public:
mapByTenantId[entry.id] = payload;
mapByTenantName[entry.tenantName] = payload;
TraceEvent("TenantEntryCachePut")
TraceEvent("TenantEntryCachePut", uid)
.detail("TenantName", entry.tenantName)
.detail("TenantNameExisting", existingName)
.detail("TenantID", entry.id)

View File

@ -661,8 +661,7 @@ ACTOR Future<Void> updateGranuleSplitState(Transaction* tr,
state RangeResult totalState = wait(tr->getRange(currentRange, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT + 1));
// FIXME: remove above conflict range?
tr->addWriteConflictRange(currentRange);
ASSERT_WE_THINK(!totalState.more && totalState.size() <= SERVER_KNOBS->BG_MAX_SPLIT_FANOUT);
// maybe someone decreased the knob, we should gracefully handle it not in simulation
// maybe someone decreased the knob, we should gracefully handle it
if (totalState.more || totalState.size() > SERVER_KNOBS->BG_MAX_SPLIT_FANOUT) {
RangeResult tryAgain = wait(tr->getRange(currentRange, 10000));
ASSERT(!tryAgain.more);

View File

@ -640,9 +640,15 @@ public:
Promise<Version> versionPromise;
Optional<TagSet> tags;
Optional<UID> debugID;
int64_t tenantId;
ServerWatchMetadata(Key key, Optional<Value> value, Version version, Optional<TagSet> tags, Optional<UID> debugID)
: key(key), value(value), version(version), tags(tags), debugID(debugID) {}
ServerWatchMetadata(Key key,
Optional<Value> value,
Version version,
Optional<TagSet> tags,
Optional<UID> debugID,
int64_t tenantId)
: key(key), value(value), version(version), tags(tags), debugID(debugID), tenantId(tenantId) {}
};
struct BusiestWriteTagContext {
@ -774,7 +780,12 @@ private:
VersionedData versionedData;
std::map<Version, Standalone<VerUpdateRef>> mutationLog; // versions (durableVersion, version]
std::unordered_map<KeyRef, Reference<ServerWatchMetadata>> watchMap; // keep track of server watches
using WatchMapKey = std::pair<int64_t, Key>;
using WatchMapKeyHasher = boost::hash<WatchMapKey>;
using WatchMapValue = Reference<ServerWatchMetadata>;
using WatchMap_t = std::unordered_map<WatchMapKey, WatchMapValue, WatchMapKeyHasher>;
WatchMap_t watchMap; // keep track of server watches
public:
struct PendingNewShard {
@ -793,7 +804,7 @@ public:
std::map<Version, std::vector<CheckpointMetaData>> pendingCheckpoints; // Pending checkpoint requests
std::unordered_map<UID, CheckpointMetaData> checkpoints; // Existing and deleting checkpoints
std::unordered_map<UID, ICheckpointReader*> liveCheckpointReaders; // Active checkpoint readers
VersionedMap<int64_t, TenantName> tenantMap;
VersionedMap<int64_t, Void> tenantMap;
std::map<Version, std::vector<PendingNewShard>>
pendingAddRanges; // Pending requests to add ranges to physical shards
std::map<Version, std::vector<KeyRange>>
@ -833,9 +844,9 @@ public:
Reference<Histogram> readRangeKVPairsReturnedHistogram;
// watch map operations
Reference<ServerWatchMetadata> getWatchMetadata(KeyRef key) const;
Reference<ServerWatchMetadata> getWatchMetadata(KeyRef key, int64_t tenantId) const;
KeyRef setWatchMetadata(Reference<ServerWatchMetadata> metadata);
void deleteWatchMetadata(KeyRef key);
void deleteWatchMetadata(KeyRef key, int64_t tenantId);
void clearWatchMetadata();
// tenant map operations
@ -1100,6 +1111,7 @@ public:
Future<Void> durableInProgress;
AsyncMap<Key, bool> watches;
AsyncMap<int64_t, bool> tenantWatches;
int64_t watchBytes;
int64_t numWatches;
AsyncVar<bool> noRecentUpdates;
@ -1731,8 +1743,9 @@ void StorageServer::byteSampleApplyMutation(MutationRef const& m, Version ver) {
}
// watchMap Operations
Reference<ServerWatchMetadata> StorageServer::getWatchMetadata(KeyRef key) const {
const auto it = watchMap.find(key);
Reference<ServerWatchMetadata> StorageServer::getWatchMetadata(KeyRef key, int64_t tenantId) const {
const WatchMapKey mapKey(tenantId, key);
const auto it = watchMap.find(mapKey);
if (it == watchMap.end())
return Reference<ServerWatchMetadata>();
return it->second;
@ -1740,13 +1753,16 @@ Reference<ServerWatchMetadata> StorageServer::getWatchMetadata(KeyRef key) const
KeyRef StorageServer::setWatchMetadata(Reference<ServerWatchMetadata> metadata) {
KeyRef keyRef = metadata->key.contents();
int64_t tenantId = metadata->tenantId;
const WatchMapKey mapKey(tenantId, keyRef);
watchMap[keyRef] = metadata;
watchMap[mapKey] = metadata;
return keyRef;
}
void StorageServer::deleteWatchMetadata(KeyRef key) {
watchMap.erase(key);
void StorageServer::deleteWatchMetadata(KeyRef key, int64_t tenantId) {
const WatchMapKey mapKey(tenantId, key);
watchMap.erase(mapKey);
}
void StorageServer::clearWatchMetadata() {
@ -2223,11 +2239,10 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// must be kept alive until the watch is finished.
extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL;
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key) {
ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key, int64_t tenantId) {
state Location spanLocation = "SS:watchWaitForValueChange"_loc;
state Span span(spanLocation, parent);
state Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(key);
state Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(key, tenantId);
if (metadata->debugID.present())
g_traceBatch.addEvent("WatchValueDebug",
metadata->debugID.get().first(),
@ -2241,10 +2256,19 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
state Version minVersion = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(metadata->key);
if (tenantId != TenantInfo::INVALID_TENANT) {
watchFuture = watchFuture || data->tenantWatches.onChange(tenantId);
}
state ReadOptions options;
loop {
try {
metadata = data->getWatchMetadata(key);
if (tenantId != TenantInfo::INVALID_TENANT) {
auto view = data->tenantMap.at(latestVersion);
if (view.find(tenantId) == view.end()) {
throw tenant_removed();
}
}
metadata = data->getWatchMetadata(key, tenantId);
state Version latest = data->version.get();
options.debugID = metadata->debugID;
@ -2304,6 +2328,7 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
// Simulate a trigger on the watch that results in the loop going around without the value changing
watchFuture = watchFuture || delay(deterministicRandom()->random01());
}
wait(watchFuture);
data->watchBytes -= watchBytes;
} catch (Error& e) {
@ -2319,15 +2344,19 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanContext p
}
watchFuture = data->watches.onChange(metadata->key);
if (tenantId != TenantInfo::INVALID_TENANT) {
watchFuture = watchFuture || data->tenantWatches.onChange(tenantId);
}
wait(data->version.whenAtLeast(data->data().latestVersion));
}
}
void checkCancelWatchImpl(StorageServer* data, WatchValueRequest req) {
Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(req.key.contents());
Reference<ServerWatchMetadata> metadata = data->getWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
if (metadata.isValid() && metadata->versionPromise.getFutureReferenceCount() == 1) {
// last watch timed out so cancel watch_impl and delete key from the map
data->deleteWatchMetadata(req.key.contents());
data->deleteWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
metadata->watch_impl.cancel();
}
}
@ -5142,7 +5171,7 @@ ACTOR Future<GetMappedKeyValuesReply> mapKeyValues(StorageServer* data,
return result;
}
bool rangeIntersectsAnyTenant(VersionedMap<int64_t, TenantName>& tenantMap, KeyRangeRef range, Version ver) {
bool rangeIntersectsAnyTenant(VersionedMap<int64_t, Void>& tenantMap, KeyRangeRef range, Version ver) {
auto view = tenantMap.at(ver);
// There are no tenants, so we don't need to do any work
@ -5192,10 +5221,10 @@ bool rangeIntersectsAnyTenant(VersionedMap<int64_t, TenantName>& tenantMap, KeyR
TEST_CASE("/fdbserver/storageserver/rangeIntersectsAnyTenant") {
std::set<int64_t> entries = { 0, 2, 3, 4, 6 };
VersionedMap<int64_t, TenantName> tenantMap;
VersionedMap<int64_t, Void> tenantMap;
tenantMap.createNewVersion(1);
for (auto entry : entries) {
tenantMap.insert(entry, ""_sr);
tenantMap.insert(entry, Void());
}
// Before all tenants
@ -5280,13 +5309,13 @@ TEST_CASE("/fdbserver/storageserver/rangeIntersectsAnyTenant") {
}
TEST_CASE("/fdbserver/storageserver/randomRangeIntersectsAnyTenant") {
VersionedMap<int64_t, TenantName> tenantMap;
VersionedMap<int64_t, Void> tenantMap;
std::set<Key> tenantPrefixes;
tenantMap.createNewVersion(1);
int numEntries = deterministicRandom()->randomInt(0, 20);
for (int i = 0; i < numEntries; ++i) {
int64_t tenantId = deterministicRandom()->randomInt64(0, std::numeric_limits<int64_t>::max());
tenantMap.insert(tenantId, ""_sr);
tenantMap.insert(tenantId, Void());
tenantPrefixes.insert(TenantAPI::idToPrefix(tenantId));
}
@ -9023,7 +9052,7 @@ void StorageServer::insertTenant(StringRef tenantPrefix, TenantName tenantName,
if (version >= tenantMap.getLatestVersion()) {
int64_t tenantId = TenantAPI::prefixToId(tenantPrefix);
tenantMap.createNewVersion(version);
tenantMap.insert(tenantId, tenantName);
tenantMap.insert(tenantId, Void());
if (persist) {
auto& mLV = addVersionToMutationLog(version);
@ -9043,20 +9072,21 @@ void StorageServer::clearTenants(StringRef startTenant, StringRef endTenant, Ver
auto view = tenantMap.at(version);
auto& mLV = addVersionToMutationLog(version);
std::set<int64_t> tenantsToClear;
for (auto itr = view.begin(); itr != view.end(); ++itr) {
if (*itr >= startTenant && *itr < endTenant) {
// Trigger any watches on the prefix associated with the tenant.
Key tenantPrefix = TenantAPI::idToPrefix(itr.key());
TraceEvent("EraseTenant", thisServerID).detail("Tenant", itr.key()).detail("Version", version);
watches.sendError(tenantPrefix, strinc(tenantPrefix), tenant_removed());
tenantsToClear.insert(itr.key());
addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
tenantPrefix.withPrefix(persistTenantMapKeys.begin),
keyAfter(tenantPrefix.withPrefix(persistTenantMapKeys.begin))));
}
Optional<int64_t> startId = TenantIdCodec::lowerBound(startTenant);
Optional<int64_t> endId = TenantIdCodec::lowerBound(endTenant);
auto startItr = startId.present() ? view.lower_bound(startId.get()) : view.end();
auto endItr = endId.present() ? view.lower_bound(endId.get()) : view.end();
for (auto itr = startItr; itr != endItr; ++itr) {
auto mapKey = itr.key();
// Trigger any watches on the prefix associated with the tenant.
TraceEvent("EraseTenant", thisServerID).detail("TenantID", mapKey).detail("Version", version);
tenantWatches.sendError(mapKey, mapKey + 1, tenant_removed());
tenantsToClear.insert(mapKey);
}
addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
startTenant.withPrefix(persistTenantMapKeys.begin),
endTenant.withPrefix(persistTenantMapKeys.begin)));
for (auto tenantId : tenantsToClear) {
tenantMap.erase(tenantId);
@ -10052,7 +10082,7 @@ void StorageServerDisk::makeNewStorageServerDurable(const bool shardAware) {
auto view = data->tenantMap.atLatest();
for (auto itr = view.begin(); itr != view.end(); ++itr) {
storage->set(KeyValueRef(TenantAPI::idToPrefix(itr.key()).withPrefix(persistTenantMapKeys.begin), *itr));
storage->set(KeyValueRef(TenantAPI::idToPrefix(itr.key()).withPrefix(persistTenantMapKeys.begin), ""_sr));
}
}
@ -10547,7 +10577,7 @@ ACTOR Future<bool> restoreDurableState(StorageServer* data, IKeyValueStore* stor
auto const& result = tenantMap[tenantMapLoc];
int64_t tenantId = TenantAPI::prefixToId(result.key.substr(persistTenantMapKeys.begin.size()));
data->tenantMap.insert(tenantId, result.value);
data->tenantMap.insert(tenantId, Void());
TraceEvent("RestoringTenant", data->thisServerID)
.detail("Key", tenantMap[tenantMapLoc].key)
@ -11011,16 +11041,19 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
loop {
getCurrentLineage()->modify(&TransactionLineage::txID) = UID();
state WatchValueRequest req = waitNext(stream);
state Reference<ServerWatchMetadata> metadata = self->getWatchMetadata(req.key.contents());
state Reference<ServerWatchMetadata> metadata =
self->getWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
state Span span("SS:serveWatchValueRequestsImpl"_loc, req.spanContext);
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
state ReadOptions options;
// case 1: no watch set for the current key
if (!metadata.isValid()) {
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
metadata = makeReference<ServerWatchMetadata>(
req.key, req.value, req.version, req.tags, req.debugID, req.tenantInfo.tenantId);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key, req.tenantInfo.tenantId),
metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
}
// case 2: there is a watch in the map and it has the same value so just update version
@ -11034,13 +11067,15 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
}
// case 3: version in map has a lower version so trigger watch and create a new entry in map
else if (req.version > metadata->version) {
self->deleteWatchMetadata(req.key.contents());
self->deleteWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
metadata->versionPromise.send(req.version);
metadata->watch_impl.cancel();
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
metadata = makeReference<ServerWatchMetadata>(
req.key, req.value, req.version, req.tags, req.debugID, req.tenantInfo.tenantId);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
metadata->watch_impl = forward(watchWaitForValueChange(self, span.context, key, req.tenantInfo.tenantId),
metadata->versionPromise);
self->actors.add(watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
}
@ -11061,20 +11096,21 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
span.context, TenantInfo(), metadata->key, latest, metadata->tags, options, VersionVector());
state Future<Void> getValue = getValueQ(self, getReq);
GetValueReply reply = wait(getReq.reply.getFuture());
metadata = self->getWatchMetadata(req.key.contents());
metadata = self->getWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
if (metadata.isValid() && reply.value != metadata->value) { // valSS != valMap
self->deleteWatchMetadata(req.key.contents());
self->deleteWatchMetadata(req.key.contents(), req.tenantInfo.tenantId);
metadata->versionPromise.send(req.version);
metadata->watch_impl.cancel();
}
if (reply.value == req.value) { // valSS == valreq
metadata =
makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
metadata = makeReference<ServerWatchMetadata>(
req.key, req.value, req.version, req.tags, req.debugID, req.tenantInfo.tenantId);
KeyRef key = self->setWatchMetadata(metadata);
metadata->watch_impl =
forward(watchWaitForValueChange(self, span.context, key), metadata->versionPromise);
forward(watchWaitForValueChange(self, span.context, key, req.tenantInfo.tenantId),
metadata->versionPromise);
self->actors.add(
watchValueSendReply(self, req, metadata->versionPromise.getFuture(), span.context));
} else {

View File

@ -121,20 +121,16 @@ struct RestoreBackupWorkload : TestWorkload {
if (config.tenantMode == TenantMode::REQUIRED) {
// restore system keys
state VectorRef<KeyRangeRef> systemBackupRanges = getSystemBackupRanges();
state int i;
for (i = 0; i < systemBackupRanges.size(); i++) {
wait(success(self->backupAgent.restore(cx,
cx,
"system_restore"_sr,
Key(self->backupContainer->getURL()),
self->backupContainer->getProxy(),
WaitForComplete::True,
::invalidVersion,
Verbose::True,
systemBackupRanges[i])));
}
// restore non-system keys
wait(success(self->backupAgent.restore(cx,
cx,
"system_restore"_sr,
Key(self->backupContainer->getURL()),
self->backupContainer->getProxy(),
getSystemBackupRanges(),
WaitForComplete::True,
::invalidVersion,
Verbose::True)));
// restore user data
wait(success(self->backupAgent.restore(cx,
cx,
self->tag,

View File

@ -58,6 +58,8 @@ struct TenantManagementWorkload : TestWorkload {
TenantData() : empty(true) {}
TenantData(int64_t id, Optional<TenantGroupName> tenantGroup, bool empty)
: tenant(makeReference<Tenant>(id)), tenantGroup(tenantGroup), empty(empty) {}
TenantData(int64_t id, Optional<TenantName> tName, Optional<TenantGroupName> tenantGroup, bool empty)
: tenant(makeReference<Tenant>(id, tName)), tenantGroup(tenantGroup), empty(empty) {}
};
struct TenantGroupData {
@ -66,6 +68,9 @@ struct TenantManagementWorkload : TestWorkload {
std::map<TenantName, TenantData> createdTenants;
std::map<TenantGroupName, TenantGroupData> createdTenantGroups;
// Contains references to ALL tenants that were created by this client
// Possible to have been deleted, but will be tracked historically here
std::vector<Reference<Tenant>> allTestTenants;
int64_t maxId = -1;
const Key keyName = "key"_sr;
@ -562,8 +567,10 @@ struct TenantManagementWorkload : TestWorkload {
// Update our local tenant state to include the newly created one
self->maxId = entry.get().id;
self->createdTenants[tenantItr->first] =
TenantData(entry.get().id, tenantItr->second.tenantGroup, true);
TenantData tData =
TenantData(entry.get().id, tenantItr->first, tenantItr->second.tenantGroup, true);
self->createdTenants[tenantItr->first] = tData;
self->allTestTenants.push_back(tData.tenant);
// If this tenant has a tenant group, create or update the entry for it
if (tenantItr->second.tenantGroup.present()) {
@ -1260,6 +1267,7 @@ struct TenantManagementWorkload : TestWorkload {
ASSERT(!oldTenantEntry.present());
ASSERT(newTenantEntry.present());
TenantData tData = self->createdTenants[oldTenantName];
tData.tenant->name = newTenantName;
self->createdTenants[newTenantName] = tData;
self->createdTenants.erase(oldTenantName);
state Transaction insertTr(self->dataDb, tData.tenant);
@ -1720,6 +1728,43 @@ struct TenantManagementWorkload : TestWorkload {
}
}
ACTOR static Future<Void> readTenantKey(TenantManagementWorkload* self) {
if (self->allTestTenants.size() == 0) {
return Void();
}
state Reference<Tenant> tenant = deterministicRandom()->randomChoice(self->allTestTenants);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb, tenant);
state TenantName tName = tenant->name.get();
state bool tenantPresent = false;
state TenantData tData = TenantData();
auto itr = self->createdTenants.find(tName);
if (itr != self->createdTenants.end() && itr->second.tenant->id() == tenant->id()) {
tenantPresent = true;
tData = itr->second;
}
state bool keyPresent = tenantPresent && !tData.empty;
loop {
try {
Optional<Value> val = wait(tr->get(self->keyName));
if (val.present()) {
ASSERT(keyPresent && val.get() == tName);
} else {
ASSERT(tenantPresent && tData.empty);
}
break;
} catch (Error& e) {
state Error err = e;
if (err.code() == error_code_tenant_not_found) {
ASSERT(!tenantPresent);
CODE_PROBE(true, "Attempted to read key from non-existent tenant");
return Void();
}
wait(tr->onError(err));
}
}
return Void();
}
Future<Void> start(Database const& cx) override {
if (clientId == 0 || !singleClient) {
return _start(cx, this);
@ -1733,7 +1778,7 @@ struct TenantManagementWorkload : TestWorkload {
// Run a random sequence of tenant management operations for the duration of the test
while (now() < start + self->testDuration) {
state int operation = deterministicRandom()->randomInt(0, 8);
state int operation = deterministicRandom()->randomInt(0, 9);
if (operation == 0) {
wait(createTenant(self));
} else if (operation == 1) {
@ -1750,6 +1795,8 @@ struct TenantManagementWorkload : TestWorkload {
wait(getTenantGroup(self));
} else if (operation == 7) {
wait(listTenantGroups(self));
} else if (operation == 8) {
wait(readTenantKey(self));
}
}

View File

@ -354,6 +354,9 @@ if(WITH_PYTHON)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml
restarting/from_7.3.0/BlobGranuleRestartCycle-2.toml)
add_fdb_test(
TEST_FILES restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml
restarting/from_7.3.0/BlobGranuleRestartLarge-2.toml)
add_fdb_test(
TEST_FILES restarting/to_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-1.toml
restarting/to_7.1.0_until_7.2.0/ConfigureStorageMigrationTestRestart-2.toml)

View File

@ -15,7 +15,8 @@ clearAfterTest=false
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
transactionsPerSecond = 500.0
nodeCount = 2500
testDuration = 30.0
expectedRate = 0
@ -27,7 +28,30 @@ clearAfterTest=false
doForcePurge = false
initAtEnd = false
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 30.0
testDuration = 30.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'
testDuration=90.0
testDuration=30.0

View File

@ -16,10 +16,34 @@ runSetup=false
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
transactionsPerSecond = 2500.0
nodeCount = 2500
testDuration = 30.0
expectedRate = 0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 30.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 30.0
testDuration = 30.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 30.0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 30.0

View File

@ -0,0 +1,65 @@
# Blob Granules are only upgrade-able as of snowflake/release-71.2.3 and release
[configuration]
testClass = "BlobGranuleRestart"
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]
[[test]]
testTitle = 'BlobGranuleRestartLarge'
clearAfterTest=false
[[test.workload]]
testName = 'ReadWrite'
testDuration = 60.0
transactionsPerSecond = 200
writesPerTransactionA = 5
readsPerTransactionA = 1
writesPerTransactionB = 10
readsPerTransactionB = 1
alpha = 0.5
nodeCount = 2000000
valueBytes = 128
discardEdgeMeasurements = false
warmingDelay = 10.0
setup = false
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0
# don't delete state after test
clearAndMergeCheck = false
doForcePurge = false
initAtEnd = false
[[test.workload]]
testName = 'RandomClogging'
testDuration = 60.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 60.0
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName='SaveAndKill'
restartInfoLocation='simfdb/restartInfo.ini'
testDuration=60.0

View File

@ -0,0 +1,57 @@
# Blob Granules are only upgrade-able as of snowflake/release-71.2.3 and release-7.2
[configuration]
testClass = "BlobGranuleRestart"
blobGranulesEnabled = true
allowDefaultTenant = false
injectTargetedSSRestart = true
injectSSDelay = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5]
[[test]]
testTitle = 'BlobGranuleRestartLarge'
clearAfterTest=false
runSetup=false
[[test.workload]]
testName = 'ReadWrite'
testDuration = 60.0
transactionsPerSecond = 200
writesPerTransactionA = 5
readsPerTransactionA = 1
writesPerTransactionB = 10
readsPerTransactionB = 1
alpha = 0.5
nodeCount = 2000000
valueBytes = 128
discardEdgeMeasurements = false
warmingDelay = 10.0
setup = false
[[test.workload]]
testName = 'RandomClogging'
testDuration = 60.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 60.0
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 60.0
[[test.workload]]
testName = 'BlobGranuleVerifier'
testDuration = 60.0