Add data verification at the end of BlobRestoreBasic.toml

This commit is contained in:
Hui Liu 2023-01-04 19:02:17 -08:00
parent 0849f60ef1
commit 4af55a274d
6 changed files with 52 additions and 19 deletions

View File

@ -5328,7 +5328,7 @@ ACTOR Future<Void> backupManifest(Reference<BlobManagerData> bmData) {
} }
// Simulation validation that multiple blob managers aren't started with the same epoch within same cluster // Simulation validation that multiple blob managers aren't started with the same epoch within same cluster
static std::map<std::pair<std::string, int64_t>, UID> managerEpochsSeen; static std::map<std::pair<UID, int64_t>, UID> managerEpochsSeen;
ACTOR Future<Void> checkBlobManagerEpoch(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int64_t epoch, UID dbgid) { ACTOR Future<Void> checkBlobManagerEpoch(Reference<AsyncVar<ServerDBInfo> const> dbInfo, int64_t epoch, UID dbgid) {
loop { loop {
@ -5339,11 +5339,37 @@ ACTOR Future<Void> checkBlobManagerEpoch(Reference<AsyncVar<ServerDBInfo> const>
} }
} }
ACTOR Future<UID> fetchClusterId(Database db) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> clusterIdVal = wait(tr->get(clusterIdKey));
if (clusterIdVal.present()) {
UID clusterId = BinaryReader::fromStringRef<UID>(clusterIdVal.get(), IncludeVersion());
return clusterId;
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager));
tr->reset();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf, ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
Reference<AsyncVar<ServerDBInfo> const> dbInfo, Reference<AsyncVar<ServerDBInfo> const> dbInfo,
int64_t epoch) { int64_t epoch) {
state Reference<BlobManagerData> self =
makeReference<BlobManagerData>(bmInterf.id(),
dbInfo,
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
bmInterf.locality.dcId(),
epoch);
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
std::string clusterId = dbInfo->get().clusterInterface.id().shortString(); UID clusterId = wait(fetchClusterId(self->db));
auto clusterEpoc = std::make_pair(clusterId, epoch); auto clusterEpoc = std::make_pair(clusterId, epoch);
bool managerEpochAlreadySeen = managerEpochsSeen.count(clusterEpoc); bool managerEpochAlreadySeen = managerEpochsSeen.count(clusterEpoc);
if (managerEpochAlreadySeen) { if (managerEpochAlreadySeen) {
@ -5356,13 +5382,6 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
ASSERT(!managerEpochAlreadySeen); ASSERT(!managerEpochAlreadySeen);
managerEpochsSeen[clusterEpoc] = bmInterf.id(); managerEpochsSeen[clusterEpoc] = bmInterf.id();
} }
state Reference<BlobManagerData> self =
makeReference<BlobManagerData>(bmInterf.id(),
dbInfo,
openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True),
bmInterf.locality.dcId(),
epoch);
state Future<Void> collection = actorCollection(self->addActor.getFuture()); state Future<Void> collection = actorCollection(self->addActor.getFuture());
if (BM_DEBUG) { if (BM_DEBUG) {

View File

@ -128,7 +128,6 @@ private:
blobGranuleMappingKeys, // Map granule to workers. Track the active granules blobGranuleMappingKeys, // Map granule to workers. Track the active granules
blobGranuleFileKeys, // Map a granule version to granule files. Track files for a granule blobGranuleFileKeys, // Map a granule version to granule files. Track files for a granule
blobGranuleHistoryKeys, // Map granule to its parents and parent bundaries. for time-travel read blobGranuleHistoryKeys, // Map granule to its parents and parent bundaries. for time-travel read
blobGranuleSplitKeys, // Granule split state to recover from a splitting granule
blobRangeKeys // Key ranges managed by blob blobRangeKeys // Key ranges managed by blob
}; };
for (auto range : ranges) { for (auto range : ranges) {

View File

@ -370,7 +370,10 @@ private:
// no need to apply mutation logs if granule is already on that version // no need to apply mutation logs if granule is already on that version
if (granule.version < maxLogVersion) { if (granule.version < maxLogVersion) {
ranges.push_back(ranges.arena(), granule.keyRange); ranges.push_back(ranges.arena(), granule.keyRange);
beginVersions.push_back(beginVersions.arena(), granule.version + 1); // Blob granule ends at granule.version(inclusive), so we need to apply mutation logs
// after granule.version(exclusive).
beginVersions.push_back(beginVersions.arena(), granule.version);
TraceEvent("ApplyMutationLogVersion").detail("GID", granule.granuleID).detail("Ver", granule.version);
} }
} }
Optional<RestorableFileSet> restoreSet = Optional<RestorableFileSet> restoreSet =
@ -503,7 +506,7 @@ private:
choose { choose {
when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) { when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) {
dprint("Unsupported SplitRangeRequest\n"); dprint("Unsupported SplitRangeRequest\n");
req.reply.sendError(unsupported_operation()); req.reply.sendError(broken_promise());
} }
when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) { when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) {
self->actors_.add(processStorageQueuingMetricsRequest(req)); self->actors_.add(processStorageQueuingMetricsRequest(req));

View File

@ -110,7 +110,6 @@ struct BlobRestoreWorkload : TestWorkload {
} }
} }
//
ACTOR static Future<Void> copyToOriginalDb(Database cx, BlobRestoreWorkload* self) { ACTOR static Future<Void> copyToOriginalDb(Database cx, BlobRestoreWorkload* self) {
state RangeResult data; state RangeResult data;

View File

@ -116,18 +116,21 @@ struct IncrementalBackupWorkload : TestWorkload {
state int tries = 0; state int tries = 0;
loop { loop {
tries++; tries++;
BackupDescription desc = wait(backupContainer->describeBackup(true)); state BackupDescription desc = wait(backupContainer->describeBackup(true));
TraceEvent("IBackupVersionGate") TraceEvent("IBackupVersionGate")
.detail("MaxLogEndVersion", desc.maxLogEnd.present() ? desc.maxLogEnd.get() : invalidVersion) .detail("MaxLogEndVersion", desc.maxLogEnd.present() ? desc.maxLogEnd.get() : invalidVersion)
.detail("ContiguousLogEndVersion", .detail("ContiguousLogEndVersion",
desc.contiguousLogEnd.present() ? desc.contiguousLogEnd.get() : invalidVersion) desc.contiguousLogEnd.present() ? desc.contiguousLogEnd.get() : invalidVersion)
.detail("TargetVersion", v); .detail("TargetVersion", v);
if (self->waitRetries != -1 && tries > self->waitRetries)
break; if (!desc.contiguousLogEnd.present()) {
if (!desc.contiguousLogEnd.present()) wait(delay(5.0));
continue; continue;
}
if (desc.contiguousLogEnd.get() >= v) if (desc.contiguousLogEnd.get() >= v)
break; break;
if (self->waitRetries != -1 && tries > self->waitRetries)
break;
// Avoid spamming requests with a delay // Avoid spamming requests with a delay
wait(delay(5.0)); wait(delay(5.0));
} }

View File

@ -34,7 +34,6 @@ waitForQuiescence = false
tag = 'default' tag = 'default'
submitOnly = true submitOnly = true
waitForBackup = true waitForBackup = true
waitRetries = 100
[[test]] [[test]]
testTitle = 'CycleTest' testTitle = 'CycleTest'
@ -55,7 +54,6 @@ clearAfterTest = false
tag = 'default' tag = 'default'
waitForBackup = true waitForBackup = true
stopBackup = true stopBackup = true
waitRetries = 500
[[test]] [[test]]
testTitle = 'BlobRestore' testTitle = 'BlobRestore'
@ -67,3 +65,15 @@ waitForQuiescence = false
[[test.workload]] [[test.workload]]
testName = 'BlobRestoreWorkload' testName = 'BlobRestoreWorkload'
performRestore = true performRestore = true
[[test]]
testTitle = 'VerifyCycle'
checkOnly = true
[[test.workload]]
testName = 'Cycle'
nodeCount = 3000
transactionsPerSecond = 3000.0
testDuration = 10.0
expectedRate = 0