Fix filtering of potential backup mutations in commit proxy and backup worker; add code probe to ensure we are testing default backup sharing and add some tests to hit it
This commit is contained in:
parent
a7c200b16c
commit
3353103d9d
|
@ -1220,11 +1220,9 @@ VectorRef<KeyRangeRef> const& getSystemBackupRanges() {
|
|||
return systemBackupRanges;
|
||||
}
|
||||
|
||||
KeyRangeMap<bool> const& backupMutationMask() {
|
||||
KeyRangeMap<bool> const& systemBackupMutationMask() {
|
||||
static KeyRangeMap<bool> mask;
|
||||
if (mask.size() == 1) {
|
||||
mask.insert(normalKeys, true);
|
||||
mask.insert(metadataVersionKey, true);
|
||||
for (auto r : getSystemBackupRanges()) {
|
||||
mask.insert(r, true);
|
||||
}
|
||||
|
|
|
@ -2092,6 +2092,8 @@ struct StartFullBackupTaskFunc : TaskFuncBase {
|
|||
if (uidRange == targetRange) {
|
||||
destUidValue = it.value;
|
||||
found = true;
|
||||
CODE_PROBE(targetRange == getDefaultBackupSharedRange(),
|
||||
"DR mutation sharing with default backup");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4608,6 +4608,8 @@ public:
|
|||
if (uidRange == targetRange) {
|
||||
destUidValue = it.value;
|
||||
found = true;
|
||||
CODE_PROBE(targetRange == getDefaultBackupSharedRange(),
|
||||
"Backup mutation sharing with default backup");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1007,7 +1007,7 @@ void simulateBlobFailure();
|
|||
|
||||
void addDefaultBackupRanges(Standalone<VectorRef<KeyRangeRef>>& backupKeys);
|
||||
VectorRef<KeyRangeRef> const& getSystemBackupRanges();
|
||||
KeyRangeMap<bool> const& backupMutationMask();
|
||||
KeyRangeMap<bool> const& systemBackupMutationMask();
|
||||
|
||||
template <class Container>
|
||||
bool isDefaultBackup(Container ranges) {
|
||||
|
|
|
@ -54,10 +54,10 @@ struct VersionedMessage {
|
|||
Version getVersion() const { return version.version; }
|
||||
uint32_t getSubVersion() const { return version.sub; }
|
||||
|
||||
// Returns true if the message is a mutation that should be backed up, i.e.,
|
||||
// either key is not in system key space or is not a metadataVersionKey.
|
||||
bool isBackupMessage(MutationRef* m,
|
||||
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys) {
|
||||
// Returns true if the message is a mutation that could be backed up (normal keys, system key backup ranges, or the
|
||||
// metadata version key)
|
||||
bool isCandidateBackupMessage(MutationRef* m,
|
||||
const std::unordered_map<BlobCipherDetails, Reference<BlobCipherKey>>& cipherKeys) {
|
||||
for (Tag tag : tags) {
|
||||
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
|
||||
return false; // skip Txs mutations
|
||||
|
@ -82,7 +82,21 @@ struct VersionedMessage {
|
|||
// We use dedicated arena for decrypt buffer, as the other arena is used to count towards backup lock bytes.
|
||||
*m = m->decrypt(cipherKeys, decryptArena, BlobCipherMetrics::BACKUP, &message);
|
||||
}
|
||||
return backupMutationMask().rangeContaining(m->param1).value();
|
||||
|
||||
// Return true if the mutation intersects any legal backup ranges
|
||||
if (normalKeys.contains(m->param1) || m->param1 == metadataVersionKey) {
|
||||
return true;
|
||||
} else if (m->type != MutationRef::Type::ClearRange) {
|
||||
return systemBackupMutationMask().rangeContaining(m->param1).value();
|
||||
} else {
|
||||
for (auto& r : systemBackupMutationMask().intersectingRanges(KeyRangeRef(m->param1, m->param2))) {
|
||||
if (r->value()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void collectCipherDetailIfEncrypted(std::unordered_set<BlobCipherDetails>& cipherDetails) {
|
||||
|
@ -789,7 +803,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self,
|
|||
for (idx = 0; idx < numMsg; idx++) {
|
||||
auto& message = self->messages[idx];
|
||||
MutationRef m;
|
||||
if (!message.isBackupMessage(&m, cipherKeys))
|
||||
if (!message.isCandidateBackupMessage(&m, cipherKeys))
|
||||
continue;
|
||||
|
||||
DEBUG_MUTATION("addMutation", message.version.version, m)
|
||||
|
|
|
@ -1445,9 +1445,26 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
UNREACHABLE();
|
||||
}
|
||||
|
||||
// Check on backing up key, if backup ranges are defined and a normal key
|
||||
if (!(pProxyCommitData->vecBackupKeys.size() > 1 &&
|
||||
backupMutationMask().rangeContaining(m.param1).value())) {
|
||||
if (pProxyCommitData->vecBackupKeys.size() <= 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check whether the mutation intersects any legal backup ranges
|
||||
// If so, it will be clamped to the intersecting range(s) later
|
||||
bool hasCandidateBackupKeys = false;
|
||||
if (normalKeys.contains(m.param1) || m.param1 == metadataVersionKey) {
|
||||
hasCandidateBackupKeys = true;
|
||||
} else if (m.type != MutationRef::Type::ClearRange) {
|
||||
hasCandidateBackupKeys = systemBackupMutationMask().rangeContaining(m.param1).value();
|
||||
} else {
|
||||
for (auto& r : systemBackupMutationMask().intersectingRanges(KeyRangeRef(m.param1, m.param2))) {
|
||||
if (r->value()) {
|
||||
hasCandidateBackupKeys = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!hasCandidateBackupKeys) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,11 +26,13 @@
|
|||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// A workload which test the correctness of backup and restore process
|
||||
struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
||||
double backupAfter, restoreAfter, abortAndRestartAfter;
|
||||
double minBackupAfter;
|
||||
double backupStartAt, restoreStartAfterBackupFinished, stopDifferentialAfter;
|
||||
Key backupTag;
|
||||
int backupRangesCount, backupRangeLengthMax;
|
||||
|
@ -43,11 +45,16 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
bool allowPauses;
|
||||
bool shareLogRange;
|
||||
bool shouldSkipRestoreRanges;
|
||||
bool defaultBackup;
|
||||
Optional<std::string> encryptionKeyFileName;
|
||||
|
||||
BackupAndRestoreCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
locked.set(sharedRandomNumber % 2);
|
||||
backupAfter = getOption(options, "backupAfter"_sr, 10.0);
|
||||
double minBackupAfter = getOption(options, "minBackupAfter"_sr, backupAfter);
|
||||
if (backupAfter > minBackupAfter) {
|
||||
backupAfter = deterministicRandom()->random01() * (backupAfter - minBackupAfter) + minBackupAfter;
|
||||
}
|
||||
restoreAfter = getOption(options, "restoreAfter"_sr, 35.0);
|
||||
performRestore = getOption(options, "performRestore"_sr, true);
|
||||
backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag());
|
||||
|
@ -71,6 +78,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
agentRequest = getOption(options, "simBackupAgents"_sr, true);
|
||||
allowPauses = getOption(options, "allowPauses"_sr, true);
|
||||
shareLogRange = getOption(options, "shareLogRange"_sr, false);
|
||||
defaultBackup = getOption(options, "defaultBackup"_sr, false);
|
||||
|
||||
std::vector<std::string> restorePrefixesToInclude =
|
||||
getOption(options, "restorePrefixesToInclude"_sr, std::vector<std::string>());
|
||||
|
@ -83,7 +91,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
TraceEvent("BARW_ClientId").detail("Id", wcx.clientId);
|
||||
UID randomID = nondeterministicRandom()->randomUniqueID();
|
||||
TraceEvent("BARW_PerformRestore", randomID).detail("Value", performRestore);
|
||||
if (shareLogRange) {
|
||||
if (defaultBackup) {
|
||||
addDefaultBackupRanges(backupRanges);
|
||||
} else if (shareLogRange) {
|
||||
bool beforePrefix = sharedRandomNumber & 1;
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, "\xfe\xff\xfe"_sr));
|
||||
|
@ -171,7 +181,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
state bool adjusted = false;
|
||||
state TenantMapEntry entry;
|
||||
|
||||
if (cx->defaultTenant.present() || BUGGIFY) {
|
||||
if (!self->defaultBackup && (cx->defaultTenant.present() || BUGGIFY)) {
|
||||
if (cx->defaultTenant.present()) {
|
||||
wait(store(entry, TenantAPI::getTenant(cx.getReference(), cx->defaultTenant.get())));
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
double backupStartAt, restoreStartAfterBackupFinished, stopDifferentialAfter;
|
||||
Key backupTag, restoreTag;
|
||||
Key backupPrefix, extraPrefix;
|
||||
bool beforePrefix;
|
||||
int backupRangesCount, backupRangeLengthMax;
|
||||
bool differentialBackup, performRestore, agentRequest;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
|
@ -43,11 +42,16 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
Database extraDB;
|
||||
LockDB locked{ false };
|
||||
bool shareLogRange;
|
||||
bool defaultBackup;
|
||||
UID destUid;
|
||||
|
||||
BackupToDBCorrectnessWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
locked.set(sharedRandomNumber % 2);
|
||||
backupAfter = getOption(options, "backupAfter"_sr, 10.0);
|
||||
double minBackupAfter = getOption(options, "minBackupAfter"_sr, backupAfter);
|
||||
if (backupAfter > minBackupAfter) {
|
||||
backupAfter = deterministicRandom()->random01() * (backupAfter - minBackupAfter) + minBackupAfter;
|
||||
}
|
||||
restoreAfter = getOption(options, "restoreAfter"_sr, 35.0);
|
||||
performRestore = getOption(options, "performRestore"_sr, true);
|
||||
backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag());
|
||||
|
@ -74,26 +78,31 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
: 0.0);
|
||||
agentRequest = getOption(options, "simDrAgents"_sr, true);
|
||||
shareLogRange = getOption(options, "shareLogRange"_sr, false);
|
||||
defaultBackup = getOption(options, "defaultBackup"_sr, false);
|
||||
|
||||
// Use sharedRandomNumber if shareLogRange is true so that we can ensure backup and DR both backup the same
|
||||
// range
|
||||
beforePrefix = shareLogRange ? (sharedRandomNumber & 1) : (deterministicRandom()->random01() < 0.5);
|
||||
bool beforePrefix = shareLogRange ? (sharedRandomNumber & 1) : (deterministicRandom()->random01() < 0.5);
|
||||
|
||||
if (beforePrefix) {
|
||||
extraPrefix = backupPrefix.withPrefix("\xfe\xff\xfe"_sr);
|
||||
backupPrefix = backupPrefix.withPrefix("\xfe\xff\xff"_sr);
|
||||
} else {
|
||||
extraPrefix = backupPrefix.withPrefix("\x00\x00\x01"_sr);
|
||||
backupPrefix = backupPrefix.withPrefix("\x00\x00\00"_sr);
|
||||
if (!defaultBackup) {
|
||||
if (beforePrefix) {
|
||||
extraPrefix = backupPrefix.withPrefix("\xfe\xff\xfe"_sr);
|
||||
backupPrefix = backupPrefix.withPrefix("\xfe\xff\xff"_sr);
|
||||
} else {
|
||||
extraPrefix = backupPrefix.withPrefix("\x00\x00\x01"_sr);
|
||||
backupPrefix = backupPrefix.withPrefix("\x00\x00\00"_sr);
|
||||
}
|
||||
|
||||
ASSERT(backupPrefix != StringRef());
|
||||
}
|
||||
|
||||
ASSERT(backupPrefix != StringRef());
|
||||
|
||||
KeyRef beginRange;
|
||||
KeyRef endRange;
|
||||
UID randomID = nondeterministicRandom()->randomUniqueID();
|
||||
|
||||
if (shareLogRange) {
|
||||
if (defaultBackup) {
|
||||
addDefaultBackupRanges(backupRanges);
|
||||
} else if (shareLogRange) {
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, "\xfe\xff\xfe"_sr));
|
||||
else
|
||||
|
@ -145,7 +154,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> _setup(Database cx, BackupToDBCorrectnessWorkload* self) {
|
||||
if (cx->defaultTenant.present() || BUGGIFY) {
|
||||
if (!self->defaultBackup && (cx->defaultTenant.present() || BUGGIFY)) {
|
||||
if (cx->defaultTenant.present()) {
|
||||
TenantMapEntry entry = wait(TenantAPI::getTenant(cx.getReference(), cx->defaultTenant.get()));
|
||||
|
||||
|
@ -628,7 +637,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
state UID logUid = wait(backupAgent.getLogUid(self->extraDB, self->backupTag));
|
||||
|
||||
// Occasionally start yet another backup that might still be running when we restore
|
||||
if (!self->locked && BUGGIFY) {
|
||||
if (!self->locked && self->extraPrefix != self->backupPrefix && BUGGIFY) {
|
||||
TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag));
|
||||
try {
|
||||
extraBackup = backupAgent.submitBackup(self->extraDB,
|
||||
|
|
|
@ -335,6 +335,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES slow/Serializability.toml)
|
||||
add_fdb_test(TEST_FILES slow/SharedBackupCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES slow/SharedBackupToDBCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES slow/SharedDefaultBackupCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES slow/StorefrontTest.toml)
|
||||
add_fdb_test(TEST_FILES slow/SwizzledApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES slow/SwizzledCycleTest.toml)
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
[configuration]
|
||||
extraDatabaseMode = 'Single'
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SharedDefaultBackupToFileThenDB'
|
||||
clearAfterTest = false
|
||||
simBackupAgents = 'BackupToFileAndDB'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
nodeCount = 3000
|
||||
transactionsPerSecond = 500.0
|
||||
testDuration = 30.0
|
||||
expectedRate = 0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'BackupAndRestoreCorrectness'
|
||||
backupTag = 'backup1'
|
||||
backupAfter = 20.0
|
||||
minBackupAfter = 10.0
|
||||
restoreAfter = 60.0
|
||||
shareLogRange = true
|
||||
performRestore = true
|
||||
allowPauses = false
|
||||
defaultBackup = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'BackupToDBCorrectness'
|
||||
backupTag = 'backup2'
|
||||
backupAfter = 20.0
|
||||
minBackupAfter = 10.0
|
||||
restoreAfter = 60.0
|
||||
performRestore = false
|
||||
shareLogRange = true
|
||||
defaultBackup = true
|
Loading…
Reference in New Issue