Tenant Deletion Support for Backup Mutation Log (#9103)
tenant deletion support for backup mutation log
This commit is contained in:
parent
ca9464ae23
commit
330ac71630
|
@ -23,9 +23,13 @@
|
|||
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/GetEncryptCipherKeys.actor.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "fdbclient/Metacluster.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
@ -266,7 +270,9 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
Key removePrefix,
|
||||
Version version,
|
||||
Reference<KeyRangeMap<Version>> key_version,
|
||||
Database cx) {
|
||||
Database cx,
|
||||
std::unordered_map<int64_t, TenantName>* tenantMap,
|
||||
bool provisionalProxy) {
|
||||
try {
|
||||
state uint64_t offset(0);
|
||||
uint64_t protocolVersion = 0;
|
||||
|
@ -289,6 +295,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
throw restore_missing_data();
|
||||
|
||||
state int originalOffset = offset;
|
||||
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
|
||||
|
||||
while (consumed < totalBytes) {
|
||||
uint32_t type = 0;
|
||||
|
@ -321,6 +328,20 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
logValue = logValue.decrypt(cipherKeys, tempArena, BlobCipherMetrics::BACKUP);
|
||||
}
|
||||
ASSERT(!logValue.isEncrypted());
|
||||
|
||||
if (config.tenantMode == TenantMode::REQUIRED && !isSystemKey(logValue.param1)) {
|
||||
// If a tenant is not found for a given mutation then exclude it from the batch
|
||||
int64_t tenantId = TenantAPI::extractTenantIdFromMutation(logValue);
|
||||
ASSERT(tenantMap != nullptr);
|
||||
if (tenantMap->find(tenantId) == tenantMap->end()) {
|
||||
ASSERT(!provisionalProxy);
|
||||
TraceEvent("TenantNotFound").detail("Version", version).detail("TenantId", tenantId);
|
||||
CODE_PROBE(true, "mutation log restore tenant not found");
|
||||
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
MutationRef originalLogValue = logValue;
|
||||
|
||||
if (logValue.type == MutationRef::ClearRange) {
|
||||
|
@ -623,19 +644,21 @@ Future<Void> readCommitted(Database cx,
|
|||
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
|
||||
}
|
||||
|
||||
ACTOR Future<int> dumpData(Database cx,
|
||||
PromiseStream<RCGroup> results,
|
||||
Reference<FlowLock> lock,
|
||||
Key uid,
|
||||
Key addPrefix,
|
||||
Key removePrefix,
|
||||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
NotifiedVersion* committedVersion,
|
||||
Optional<Version> endVersion,
|
||||
Key rangeBegin,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
Reference<KeyRangeMap<Version>> keyVersion) {
|
||||
ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
PromiseStream<RCGroup> results,
|
||||
Reference<FlowLock> lock,
|
||||
Key uid,
|
||||
Key addPrefix,
|
||||
Key removePrefix,
|
||||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
NotifiedVersion* committedVersion,
|
||||
Optional<Version> endVersion,
|
||||
Key rangeBegin,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
Reference<KeyRangeMap<Version>> keyVersion,
|
||||
std::unordered_map<int64_t, TenantName>* tenantMap,
|
||||
bool provisionalProxy) {
|
||||
state Version lastVersion = invalidVersion;
|
||||
state bool endOfStream = false;
|
||||
state int totalBytes = 0;
|
||||
|
@ -662,7 +685,9 @@ ACTOR Future<int> dumpData(Database cx,
|
|||
removePrefix,
|
||||
group.groupKey,
|
||||
keyVersion,
|
||||
cx));
|
||||
cx,
|
||||
tenantMap,
|
||||
provisionalProxy));
|
||||
newBeginVersion = group.groupKey + 1;
|
||||
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
|
||||
break;
|
||||
|
@ -763,7 +788,9 @@ ACTOR Future<Void> applyMutations(Database cx,
|
|||
Version* endVersion,
|
||||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
NotifiedVersion* committedVersion,
|
||||
Reference<KeyRangeMap<Version>> keyVersion) {
|
||||
Reference<KeyRangeMap<Version>> keyVersion,
|
||||
std::unordered_map<int64_t, TenantName>* tenantMap,
|
||||
bool provisionalProxy) {
|
||||
state FlowLock commitLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
|
||||
state PromiseStream<Future<Void>> addActor;
|
||||
state Future<Void> error = actorCollection(addActor.getFuture());
|
||||
|
@ -800,19 +827,22 @@ ACTOR Future<Void> applyMutations(Database cx,
|
|||
|
||||
maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);
|
||||
for (idx = 0; idx < ranges.size(); ++idx) {
|
||||
int bytes = wait(dumpData(cx,
|
||||
results[idx],
|
||||
locks[idx],
|
||||
uid,
|
||||
addPrefix,
|
||||
removePrefix,
|
||||
commit,
|
||||
committedVersion,
|
||||
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
|
||||
ranges[idx].begin,
|
||||
addActor,
|
||||
&commitLock,
|
||||
keyVersion));
|
||||
int bytes =
|
||||
wait(kvMutationLogToTransactions(cx,
|
||||
results[idx],
|
||||
locks[idx],
|
||||
uid,
|
||||
addPrefix,
|
||||
removePrefix,
|
||||
commit,
|
||||
committedVersion,
|
||||
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
|
||||
ranges[idx].begin,
|
||||
addActor,
|
||||
&commitLock,
|
||||
keyVersion,
|
||||
tenantMap,
|
||||
provisionalProxy));
|
||||
maxBytes = std::max<int>(CLIENT_KNOBS->APPLY_MAX_INCREASE_FACTOR * bytes, maxBytes);
|
||||
if (error.isError())
|
||||
throw error.getError();
|
||||
|
|
|
@ -659,7 +659,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> updateEncryptionKeysCtx(EncryptedRangeFileWriter* self, KeyRef key) {
|
||||
state EncryptCipherDomainId curDomainId = wait(getEncryptionDomainDetails(key, self->encryptMode));
|
||||
state EncryptCipherDomainId curDomainId = getEncryptionDomainDetails(key, self->encryptMode);
|
||||
state Reference<AsyncVar<ClientDBInfo> const> dbInfo = self->cx->clientInfo;
|
||||
|
||||
// Get text and header cipher key
|
||||
|
@ -698,10 +698,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
|
|||
copyToBuffer(self, s->begin(), s->size());
|
||||
}
|
||||
|
||||
static bool isSystemKey(KeyRef key) { return key.size() && key[0] == systemKeys.begin[0]; }
|
||||
|
||||
ACTOR static Future<EncryptCipherDomainId> getEncryptionDomainDetailsImpl(KeyRef key,
|
||||
EncryptionAtRestMode encryptMode) {
|
||||
static EncryptCipherDomainId getEncryptionDomainDetails(KeyRef key, EncryptionAtRestMode encryptMode) {
|
||||
if (isSystemKey(key)) {
|
||||
return SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID;
|
||||
}
|
||||
|
@ -713,10 +710,6 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
|
|||
return TenantAPI::prefixToId(tenantPrefix);
|
||||
}
|
||||
|
||||
static Future<EncryptCipherDomainId> getEncryptionDomainDetails(KeyRef key, EncryptionAtRestMode encryptMode) {
|
||||
return getEncryptionDomainDetailsImpl(key, encryptMode);
|
||||
}
|
||||
|
||||
// Handles the first block and internal blocks. Ends current block if needed.
|
||||
// The final flag is used in simulation to pad the file's final block to a whole block size
|
||||
ACTOR static Future<Void> newBlock(EncryptedRangeFileWriter* self,
|
||||
|
@ -824,9 +817,8 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
|
|||
if (self->lastKey.size() == 0 || k.size() == 0) {
|
||||
return false;
|
||||
}
|
||||
state EncryptCipherDomainId curKeyDomainId = wait(getEncryptionDomainDetails(k, self->encryptMode));
|
||||
state EncryptCipherDomainId prevKeyDomainId =
|
||||
wait(getEncryptionDomainDetails(self->lastKey, self->encryptMode));
|
||||
state EncryptCipherDomainId curKeyDomainId = getEncryptionDomainDetails(k, self->encryptMode);
|
||||
state EncryptCipherDomainId prevKeyDomainId = getEncryptionDomainDetails(self->lastKey, self->encryptMode);
|
||||
if (curKeyDomainId != prevKeyDomainId) {
|
||||
CODE_PROBE(true, "crossed tenant boundaries");
|
||||
wait(handleTenantBondary(self, k, v, writeValue, curKeyDomainId));
|
||||
|
@ -1057,11 +1049,11 @@ ACTOR static Future<Void> decodeKVPairs(StringRefReader* reader,
|
|||
state KeyRef curKey = KeyRef(k, kLen);
|
||||
if (!prevDomainId.present()) {
|
||||
EncryptCipherDomainId domainId =
|
||||
wait(EncryptedRangeFileWriter::getEncryptionDomainDetails(prevKey, encryptMode.get()));
|
||||
EncryptedRangeFileWriter::getEncryptionDomainDetails(prevKey, encryptMode.get());
|
||||
prevDomainId = domainId;
|
||||
}
|
||||
EncryptCipherDomainId curDomainId =
|
||||
wait(EncryptedRangeFileWriter::getEncryptionDomainDetails(curKey, encryptMode.get()));
|
||||
EncryptedRangeFileWriter::getEncryptionDomainDetails(curKey, encryptMode.get());
|
||||
if (!curKey.empty() && !prevKey.empty() && prevDomainId.get() != curDomainId) {
|
||||
ASSERT(!done);
|
||||
if (curDomainId != SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID && curDomainId != FDB_DEFAULT_ENCRYPT_DOMAIN_ID) {
|
||||
|
@ -6171,7 +6163,7 @@ public:
|
|||
state Standalone<VectorRef<KeyRangeRef>> restoreRange;
|
||||
state Standalone<VectorRef<KeyRangeRef>> systemRestoreRange;
|
||||
for (auto r : ranges) {
|
||||
if (!config.encryptionAtRestMode.isEncryptionEnabled() || !r.intersects(getSystemBackupRanges())) {
|
||||
if (config.tenantMode != TenantMode::REQUIRED || !r.intersects(getSystemBackupRanges())) {
|
||||
restoreRange.push_back_deep(restoreRange.arena(), r);
|
||||
} else {
|
||||
KeyRangeRef normalKeyRange = r & normalKeys;
|
||||
|
|
|
@ -272,6 +272,10 @@ void decodeKeyServersValue(std::map<Tag, UID> const& tag_uid,
|
|||
std::sort(dest.begin(), dest.end());
|
||||
}
|
||||
|
||||
bool isSystemKey(KeyRef key) {
|
||||
return key.size() && key[0] == systemKeys.begin[0];
|
||||
}
|
||||
|
||||
const KeyRangeRef conflictingKeysRange =
|
||||
KeyRangeRef("\xff\xff/transaction/conflicting_keys/"_sr, "\xff\xff/transaction/conflicting_keys/\xff\xff"_sr);
|
||||
const ValueRef conflictingKeysTrue = "1"_sr;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include "fdbclient/Atomic.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
|
@ -37,4 +38,30 @@ TenantMode tenantModeForClusterType(ClusterType clusterType, TenantMode tenantMo
|
|||
}
|
||||
}
|
||||
|
||||
int64_t extractTenantIdFromMutation(MutationRef m) {
|
||||
ASSERT(!isSystemKey(m.param1));
|
||||
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
// The first 8 bytes of the key of this OP is also an 8-byte number
|
||||
if (m.type == MutationRef::SetVersionstampedKey && m.param1.size() >= 4 &&
|
||||
parseVersionstampOffset(m.param1) < 8) {
|
||||
return TenantInfo::INVALID_TENANT;
|
||||
}
|
||||
} else {
|
||||
// Assumes clear range mutations are split on tenant boundaries
|
||||
ASSERT_EQ(m.type, MutationRef::Type::ClearRange);
|
||||
}
|
||||
|
||||
return extractTenantIdFromKeyRef(m.param1);
|
||||
}
|
||||
|
||||
int64_t extractTenantIdFromKeyRef(StringRef s) {
|
||||
if (s.size() < TenantAPI::PREFIX_SIZE) {
|
||||
return TenantInfo::INVALID_TENANT;
|
||||
}
|
||||
// Parse mutation key to determine tenant prefix
|
||||
StringRef prefix = s.substr(0, TenantAPI::PREFIX_SIZE);
|
||||
return TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
|
||||
}
|
||||
|
||||
} // namespace TenantAPI
|
||||
|
|
|
@ -574,7 +574,9 @@ ACTOR Future<Void> applyMutations(Database cx,
|
|||
Version* endVersion,
|
||||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
NotifiedVersion* committedVersion,
|
||||
Reference<KeyRangeMap<Version>> keyVersion);
|
||||
Reference<KeyRangeMap<Version>> keyVersion,
|
||||
std::unordered_map<int64_t, TenantName>* tenantMap,
|
||||
bool provisionalProxy);
|
||||
ACTOR Future<Void> cleanupBackup(Database cx, DeleteData deleteData);
|
||||
|
||||
using EBackupState = BackupAgentBase::EnumState;
|
||||
|
|
|
@ -91,6 +91,7 @@ void decodeKeyServersValue(RangeResult result,
|
|||
UID& srcID,
|
||||
UID& destID,
|
||||
bool missingIsError = true);
|
||||
bool isSystemKey(KeyRef key);
|
||||
|
||||
extern const KeyRangeRef auditKeys;
|
||||
extern const KeyRef auditPrefix;
|
||||
|
|
|
@ -106,6 +106,8 @@ Future<Void> checkTenantMode(Transaction tr, ClusterType expectedClusterType) {
|
|||
}
|
||||
|
||||
TenantMode tenantModeForClusterType(ClusterType clusterType, TenantMode tenantMode);
|
||||
int64_t extractTenantIdFromMutation(MutationRef m);
|
||||
int64_t extractTenantIdFromKeyRef(StringRef s);
|
||||
|
||||
// Returns true if the specified ID has already been deleted and false if not. If the ID is old enough
|
||||
// that we no longer keep tombstones for it, an error is thrown.
|
||||
|
|
|
@ -75,7 +75,8 @@ public:
|
|||
bool& confChange_,
|
||||
Version version,
|
||||
Version popVersion_,
|
||||
bool initialCommit_)
|
||||
bool initialCommit_,
|
||||
bool provisionalCommitProxy_)
|
||||
: spanContext(spanContext_), dbgid(proxyCommitData_.dbgid), arena(arena_), mutations(mutations_),
|
||||
txnStateStore(proxyCommitData_.txnStateStore), toCommit(toCommit_), cipherKeys(cipherKeys_),
|
||||
encryptMode(encryptMode), confChange(confChange_), logSystem(logSystem_), version(version),
|
||||
|
@ -85,7 +86,8 @@ public:
|
|||
commit(proxyCommitData_.commit), cx(proxyCommitData_.cx), committedVersion(&proxyCommitData_.committedVersion),
|
||||
storageCache(&proxyCommitData_.storageCache), tag_popped(&proxyCommitData_.tag_popped),
|
||||
tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap),
|
||||
tenantNameIndex(&proxyCommitData_.tenantNameIndex), initialCommit(initialCommit_) {}
|
||||
tenantNameIndex(&proxyCommitData_.tenantNameIndex), initialCommit(initialCommit_),
|
||||
provisionalCommitProxy(provisionalCommitProxy_) {}
|
||||
|
||||
ApplyMetadataMutationsImpl(const SpanContext& spanContext_,
|
||||
ResolverData& resolverData_,
|
||||
|
@ -145,6 +147,9 @@ private:
|
|||
// true if called from Resolver
|
||||
bool forResolver = false;
|
||||
|
||||
// true if called from a provisional commit proxy
|
||||
bool provisionalCommitProxy = false;
|
||||
|
||||
private:
|
||||
// The following variables are used internally
|
||||
|
||||
|
@ -499,7 +504,9 @@ private:
|
|||
&p.endVersion,
|
||||
commit,
|
||||
committedVersion,
|
||||
p.keyVersion);
|
||||
p.keyVersion,
|
||||
tenantMap,
|
||||
provisionalCommitProxy);
|
||||
}
|
||||
|
||||
void checkSetApplyMutationsKeyVersionMapRange(MutationRef m) {
|
||||
|
@ -1332,7 +1339,8 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
bool& confChange,
|
||||
Version version,
|
||||
Version popVersion,
|
||||
bool initialCommit) {
|
||||
bool initialCommit,
|
||||
bool provisionalCommitProxy) {
|
||||
ApplyMetadataMutationsImpl(spanContext,
|
||||
arena,
|
||||
mutations,
|
||||
|
@ -1344,7 +1352,8 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
confChange,
|
||||
version,
|
||||
popVersion,
|
||||
initialCommit)
|
||||
initialCommit,
|
||||
provisionalCommitProxy)
|
||||
.apply();
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/TransactionLineage.h"
|
||||
#include "fdbrpc/TenantInfo.h"
|
||||
#include "fdbrpc/sim_validation.h"
|
||||
|
@ -1094,7 +1095,8 @@ void applyMetadataEffect(CommitBatchContext* self) {
|
|||
self->forceRecovery,
|
||||
/* version= */ self->commitVersion,
|
||||
/* popVersion= */ 0,
|
||||
/* initialCommit */ false);
|
||||
/* initialCommit */ false,
|
||||
/* provisionalCommitProxy */ self->pProxyCommitData->provisional);
|
||||
}
|
||||
if (self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations.size() &&
|
||||
self->firstStateMutations) {
|
||||
|
@ -1167,35 +1169,13 @@ void determineCommittedTransactions(CommitBatchContext* self) {
|
|||
}
|
||||
}
|
||||
|
||||
inline int64_t extractTenantIdFromKeyRef(StringRef s) {
|
||||
if (s.size() < TenantAPI::PREFIX_SIZE) {
|
||||
return TenantInfo::INVALID_TENANT;
|
||||
}
|
||||
// Parse mutation key to determine tenant prefix
|
||||
StringRef prefix = s.substr(0, TenantAPI::PREFIX_SIZE);
|
||||
return TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
|
||||
}
|
||||
|
||||
int64_t extractTenantIdFromSingleKeyMutation(MutationRef m) {
|
||||
ASSERT(!isSystemKey(m.param1));
|
||||
|
||||
// The first 8 bytes of the key of this OP is also an 8-byte number
|
||||
if (m.type == MutationRef::SetVersionstampedKey && m.param1.size() >= 4 && parseVersionstampOffset(m.param1) < 8) {
|
||||
return TenantInfo::INVALID_TENANT;
|
||||
}
|
||||
|
||||
ASSERT(isSingleKeyMutation((MutationRef::Type)m.type));
|
||||
|
||||
return extractTenantIdFromKeyRef(m.param1);
|
||||
}
|
||||
|
||||
// Return true if a single-key mutation is associated with a valid tenant id or a system key
|
||||
bool validTenantAccess(MutationRef m) {
|
||||
if (isSystemKey(m.param1))
|
||||
return true;
|
||||
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
auto tenantId = extractTenantIdFromSingleKeyMutation(m);
|
||||
auto tenantId = TenantAPI::extractTenantIdFromMutation(m);
|
||||
// throw exception for invalid raw access
|
||||
if (tenantId == TenantInfo::INVALID_TENANT) {
|
||||
return false;
|
||||
|
@ -1203,8 +1183,8 @@ bool validTenantAccess(MutationRef m) {
|
|||
} else {
|
||||
// For clear range, we allow raw access
|
||||
ASSERT_EQ(m.type, MutationRef::Type::ClearRange);
|
||||
auto beginTenantId = extractTenantIdFromKeyRef(m.param1);
|
||||
auto endTenantId = extractTenantIdFromKeyRef(m.param2);
|
||||
auto beginTenantId = TenantAPI::extractTenantIdFromKeyRef(m.param1);
|
||||
auto endTenantId = TenantAPI::extractTenantIdFromKeyRef(m.param2);
|
||||
CODE_PROBE(beginTenantId != endTenantId, "Clear Range raw access or cross multiple tenants");
|
||||
}
|
||||
return true;
|
||||
|
@ -1257,7 +1237,8 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
|||
self->forceRecovery,
|
||||
self->commitVersion,
|
||||
self->commitVersion + 1,
|
||||
/* initialCommit= */ false);
|
||||
/* initialCommit= */ false,
|
||||
/* provisionalCommitProxy */ self->pProxyCommitData->provisional);
|
||||
}
|
||||
|
||||
if (self->firstStateMutations) {
|
||||
|
@ -2905,7 +2886,8 @@ ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolv
|
|||
confChanges,
|
||||
/* version= */ 0,
|
||||
/* popVersion= */ 0,
|
||||
/* initialCommit= */ true);
|
||||
/* initialCommit= */ true,
|
||||
/* provisionalCommitProxy */ pContext->pCommitData->provisional);
|
||||
} // loop
|
||||
|
||||
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
|
||||
|
@ -2971,7 +2953,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
Version recoveryTransactionVersion,
|
||||
bool firstProxy,
|
||||
std::string whitelistBinPaths,
|
||||
EncryptionAtRestMode encryptMode) {
|
||||
EncryptionAtRestMode encryptMode,
|
||||
bool provisional) {
|
||||
state ProxyCommitData commitData(proxy.id(),
|
||||
master,
|
||||
proxy.getConsistentReadVersion,
|
||||
|
@ -2979,7 +2962,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
proxy.commit,
|
||||
db,
|
||||
firstProxy,
|
||||
encryptMode);
|
||||
encryptMode,
|
||||
provisional);
|
||||
|
||||
state Future<Sequence> sequenceFuture = (Sequence)0;
|
||||
state PromiseStream<std::pair<std::vector<CommitTransactionRequest>, int>> batchedCommits;
|
||||
|
@ -3160,7 +3144,8 @@ ACTOR Future<Void> commitProxyServer(CommitProxyInterface proxy,
|
|||
req.recoveryTransactionVersion,
|
||||
req.firstProxy,
|
||||
whitelistBinPaths,
|
||||
req.encryptMode);
|
||||
req.encryptMode,
|
||||
proxy.provisional);
|
||||
wait(core || checkRemoved(db, req.recoveryCount, proxy));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("CommitProxyTerminated", proxy.id()).errorUnsuppressed(e);
|
||||
|
|
|
@ -99,17 +99,14 @@ void applyMetadataMutations(SpanContext const& spanContext,
|
|||
bool& confChange,
|
||||
Version version,
|
||||
Version popVersion,
|
||||
bool initialCommit);
|
||||
bool initialCommit,
|
||||
bool provisionalCommitProxy);
|
||||
void applyMetadataMutations(SpanContext const& spanContext,
|
||||
const UID& dbgid,
|
||||
Arena& arena,
|
||||
const VectorRef<MutationRef>& mutations,
|
||||
IKeyValueStore* txnStateStore);
|
||||
|
||||
inline bool isSystemKey(KeyRef key) {
|
||||
return key.size() && key[0] == systemKeys.begin[0];
|
||||
}
|
||||
|
||||
inline bool containsMetadataMutation(const VectorRef<MutationRef>& mutations) {
|
||||
for (auto const& m : mutations) {
|
||||
|
||||
|
|
|
@ -208,6 +208,7 @@ struct ProxyCommitData {
|
|||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
double commitBatchInterval;
|
||||
bool provisional;
|
||||
|
||||
int64_t localCommitBatchesStarted;
|
||||
NotifiedVersion latestLocalCommitBatchResolving;
|
||||
|
@ -306,13 +307,14 @@ struct ProxyCommitData {
|
|||
PublicRequestStream<CommitTransactionRequest> commit,
|
||||
Reference<AsyncVar<ServerDBInfo> const> db,
|
||||
bool firstProxy,
|
||||
EncryptionAtRestMode encryptMode)
|
||||
EncryptionAtRestMode encryptMode,
|
||||
bool provisional)
|
||||
: dbgid(dbgid), commitBatchesMemBytesCount(0),
|
||||
stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount, &tenantMap), master(master),
|
||||
logAdapter(nullptr), txnStateStore(nullptr), committedVersion(recoveryTransactionVersion),
|
||||
minKnownCommittedVersion(0), version(0), lastVersionTime(0), commitVersionRequestNumber(1),
|
||||
mostRecentProcessedRequestNumber(0), firstProxy(firstProxy), encryptMode(encryptMode), lastCoalesceTime(0),
|
||||
locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
|
||||
mostRecentProcessedRequestNumber(0), firstProxy(firstProxy), encryptMode(encryptMode), provisional(provisional),
|
||||
lastCoalesceTime(0), locked(false), commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN),
|
||||
localCommitBatchesStarted(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit),
|
||||
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, LockAware::True)), db(db),
|
||||
singleKeyMutationEvent("SingleKeyMutation"_sr), lastTxsPop(0), popRemoteTxs(false), lastStartCommit(0),
|
||||
|
|
|
@ -699,7 +699,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
Standalone<VectorRef<KeyRangeRef>> modifiedRestoreRanges;
|
||||
Standalone<VectorRef<KeyRangeRef>> systemRestoreRanges;
|
||||
for (int i = 0; i < self->restoreRanges.size(); ++i) {
|
||||
if (!config.encryptionAtRestMode.isEncryptionEnabled() ||
|
||||
if (config.tenantMode != TenantMode::REQUIRED ||
|
||||
!self->restoreRanges[i].intersects(getSystemBackupRanges())) {
|
||||
modifiedRestoreRanges.push_back_deep(modifiedRestoreRanges.arena(), self->restoreRanges[i]);
|
||||
} else {
|
||||
|
|
|
@ -673,7 +673,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
state Standalone<VectorRef<KeyRangeRef>> restoreRange;
|
||||
state Standalone<VectorRef<KeyRangeRef>> systemRestoreRange;
|
||||
for (auto r : self->backupRanges) {
|
||||
if (!config.encryptionAtRestMode.isEncryptionEnabled() || !r.intersects(getSystemBackupRanges())) {
|
||||
if (config.tenantMode != TenantMode::REQUIRED || !r.intersects(getSystemBackupRanges())) {
|
||||
restoreRange.push_back_deep(
|
||||
restoreRange.arena(),
|
||||
KeyRangeRef(r.begin.withPrefix(self->backupPrefix), r.end.withPrefix(self->backupPrefix)));
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/TenantEntryCache.actor.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -36,6 +37,7 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
double maxNumTenants;
|
||||
double minNumTenants;
|
||||
std::vector<TenantName> tenantNames;
|
||||
bool deleteTenants;
|
||||
double testDuration;
|
||||
|
||||
BulkSetupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
|
@ -45,8 +47,9 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
// maximum and minimum number of tenants per client
|
||||
maxNumTenants = getOption(options, "maxNumTenants"_sr, 0);
|
||||
minNumTenants = getOption(options, "minNumTenants"_sr, 0);
|
||||
deleteTenants = getOption(options, "deleteTenants"_sr, false);
|
||||
ASSERT(minNumTenants <= maxNumTenants);
|
||||
testDuration = getOption(options, "testDuration"_sr, 10.0);
|
||||
testDuration = getOption(options, "testDuration"_sr, -1);
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
|
@ -64,16 +67,17 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
TraceEvent("BulkSetupTenantCreation").detail("NumTenants", numTenantsToCreate);
|
||||
if (numTenantsToCreate > 0) {
|
||||
std::vector<Future<Void>> tenantFutures;
|
||||
state std::vector<TenantName> tenantNames;
|
||||
for (int i = 0; i < numTenantsToCreate; i++) {
|
||||
TenantMapEntry entry;
|
||||
workload->tenantNames.push_back(TenantName(format("BulkSetupTenant_%04d", i)));
|
||||
tenantNames.push_back(TenantName(format("BulkSetupTenant_%04d", i)));
|
||||
TraceEvent("CreatingTenant")
|
||||
.detail("Tenant", workload->tenantNames.back())
|
||||
.detail("Tenant", tenantNames.back())
|
||||
.detail("TenantGroup", entry.tenantGroup);
|
||||
tenantFutures.push_back(
|
||||
success(TenantAPI::createTenant(cx.getReference(), workload->tenantNames.back())));
|
||||
tenantFutures.push_back(success(TenantAPI::createTenant(cx.getReference(), tenantNames.back())));
|
||||
}
|
||||
wait(waitForAll(tenantFutures));
|
||||
workload->tenantNames = tenantNames;
|
||||
}
|
||||
wait(bulkSetup(cx,
|
||||
workload,
|
||||
|
@ -89,6 +93,49 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
0,
|
||||
0,
|
||||
workload->tenantNames));
|
||||
// We want to ensure that tenant deletion happens before the restore phase starts
|
||||
if (workload->deleteTenants) {
|
||||
state Reference<TenantEntryCache<Void>> tenantCache =
|
||||
makeReference<TenantEntryCache<Void>>(cx, TenantEntryCacheRefreshMode::WATCH);
|
||||
wait(tenantCache->init());
|
||||
state int numTenantsToDelete = deterministicRandom()->randomInt(0, workload->tenantNames.size() + 1);
|
||||
if (numTenantsToDelete > 0) {
|
||||
state int i;
|
||||
for (i = 0; i < numTenantsToDelete; i++) {
|
||||
state TenantName tenant = deterministicRandom()->randomChoice(workload->tenantNames);
|
||||
Optional<TenantEntryCachePayload<Void>> payload = wait(tenantCache->getByName(tenant));
|
||||
ASSERT(payload.present());
|
||||
state int64_t tenantId = payload.get().entry.id;
|
||||
TraceEvent("BulkSetupTenantDeletionClearing")
|
||||
.detail("TenantName", tenant)
|
||||
.detail("TenantId", tenantId)
|
||||
.detail("TotalNumTenants", workload->tenantNames.size());
|
||||
// clear the tenant
|
||||
state ReadYourWritesTransaction tr = ReadYourWritesTransaction(cx, tenant);
|
||||
loop {
|
||||
try {
|
||||
tr.clear(normalKeys);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
// delete the tenant
|
||||
wait(success(TenantAPI::deleteTenant(cx.getReference(), tenant)));
|
||||
for (auto it = workload->tenantNames.begin(); it != workload->tenantNames.end(); it++) {
|
||||
if (*it == tenant) {
|
||||
workload->tenantNames.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
TraceEvent("BulkSetupTenantDeletionDone")
|
||||
.detail("TenantName", tenant)
|
||||
.detail("TenantId", tenantId)
|
||||
.detail("TotalNumTenants", workload->tenantNames.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -96,7 +143,11 @@ struct BulkSetupWorkload : TestWorkload {
|
|||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
if (clientId == 0) {
|
||||
return timeout(_setup(this, cx), testDuration, Void());
|
||||
if (testDuration > 0) {
|
||||
return timeout(_setup(this, cx), testDuration, Void());
|
||||
} else {
|
||||
return _setup(this, cx);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -237,7 +237,7 @@ struct IncrementalBackupWorkload : TestWorkload {
|
|||
state Standalone<VectorRef<KeyRangeRef>> restoreRange;
|
||||
state Standalone<VectorRef<KeyRangeRef>> systemRestoreRange;
|
||||
for (auto r : backupRanges) {
|
||||
if (!config.encryptionAtRestMode.isEncryptionEnabled() || !r.intersects(getSystemBackupRanges())) {
|
||||
if (config.tenantMode != TenantMode::REQUIRED || !r.intersects(getSystemBackupRanges())) {
|
||||
restoreRange.push_back_deep(restoreRange.arena(), r);
|
||||
} else {
|
||||
KeyRangeRef normalKeyRange = r & normalKeys;
|
||||
|
|
|
@ -118,7 +118,7 @@ struct RestoreBackupWorkload : TestWorkload {
|
|||
wait(waitOnBackup(self, cx));
|
||||
wait(clearDatabase(cx));
|
||||
|
||||
if (config.encryptionAtRestMode.isEncryptionEnabled()) {
|
||||
if (config.tenantMode == TenantMode::REQUIRED) {
|
||||
// restore system keys
|
||||
VectorRef<KeyRangeRef> systemBackupRanges = getSystemBackupRanges();
|
||||
state std::vector<Future<Version>> restores;
|
||||
|
|
|
@ -58,7 +58,7 @@ struct RestoreFromBlobWorkload : TestWorkload {
|
|||
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
|
||||
|
||||
wait(delay(self->restoreAfter));
|
||||
if (config.encryptionAtRestMode.isEncryptionEnabled()) {
|
||||
if (config.tenantMode == TenantMode::REQUIRED) {
|
||||
// restore system keys followed by user keys
|
||||
wait(success(backupAgent.restore(
|
||||
cx, {}, self->backupTag, self->backupURL, {}, getSystemBackupRanges(), self->waitForComplete)));
|
||||
|
|
|
@ -162,6 +162,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
|
||||
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)
|
||||
add_fdb_test(TEST_FILES fast/IncrementalBackupWithTenantDeletion.toml)
|
||||
add_fdb_test(TEST_FILES fast/IncrementTest.toml)
|
||||
add_fdb_test(TEST_FILES fast/InventoryTestAlmostReadOnly.toml)
|
||||
add_fdb_test(TEST_FILES fast/InventoryTestSomeWrites.toml)
|
||||
|
|
|
@ -17,7 +17,7 @@ simBackupAgents = 'BackupToFile'
|
|||
[[test.workload]]
|
||||
testName = 'BulkLoadWithTenants'
|
||||
maxNumTenants = 100
|
||||
minNumTenants = 0
|
||||
minNumTenants = 1
|
||||
transactionsPerSecond = 2500.0
|
||||
testDuration = 30.0
|
||||
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
[configuration]
|
||||
allowDefaultTenant = false
|
||||
tenantModes = ['required']
|
||||
allowCreatingTenants = false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SubmitBackup'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
runConsistencyCheck = false
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
submitOnly = true
|
||||
waitForBackup = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BulkLoad'
|
||||
clearAfterTest = true
|
||||
simBackupAgents = 'BackupToFile'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'BulkLoadWithTenants'
|
||||
maxNumTenants = 100
|
||||
minNumTenants = 1
|
||||
transactionsPerSecond = 3000.0
|
||||
deleteTenants = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
waitForBackup = true
|
||||
stopBackup = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SubmitRestore'
|
||||
clearAfterTest = false
|
||||
simBackupAgents = 'BackupToFile'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
restoreOnly = true
|
|
@ -1,5 +1,7 @@
|
|||
[configuration]
|
||||
extraDatabaseMode = 'Single'
|
||||
# required tenant mode is not supported for Disaster Recovery yet
|
||||
tenantModes = ['disabled', 'optional']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'ApiCorrectnessTest'
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
[configuration]
|
||||
extraDatabaseMode = 'Single'
|
||||
# required tenant mode is not supported for Disaster Recovery yet
|
||||
tenantModes = ['disabled', 'optional']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SharedDefaultBackupToFileThenDB'
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
[configuration]
|
||||
extraDatabaseMode = 'Single'
|
||||
# required tenant mode is not supported for Disaster Recovery yet
|
||||
tenantModes = ['disabled', 'optional']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'VersionStampBackupToDB'
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
[configuration]
|
||||
extraDatabaseMode = 'Single'
|
||||
# required tenant mode is not supported for Disaster Recovery yet
|
||||
tenantModes = ['disabled', 'optional']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'VersionStampCorrectnessTest'
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
[configuration]
|
||||
StderrSeverity = 30
|
||||
extraDatabaseMode = 'Single'
|
||||
# required tenant mode is not supported for Disaster Recovery yet
|
||||
tenantModes = ['disabled', 'optional']
|
||||
|
||||
[[test]]
|
||||
testTitle = 'WriteDuringReadTest'
|
||||
|
|
Loading…
Reference in New Issue