diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 1c719fba3d..a6f9429be6 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -5328,7 +5328,7 @@ ACTOR Future backupManifest(Reference bmData) { } // Simulation validation that multiple blob managers aren't started with the same epoch within same cluster -static std::map, UID> managerEpochsSeen; +static std::map, UID> managerEpochsSeen; ACTOR Future checkBlobManagerEpoch(Reference const> dbInfo, int64_t epoch, UID dbgid) { loop { @@ -5339,11 +5339,37 @@ ACTOR Future checkBlobManagerEpoch(Reference const> } } +ACTOR Future fetchClusterId(Database db) { + state Reference tr = makeReference(db); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + Optional clusterIdVal = wait(tr->get(clusterIdKey)); + if (clusterIdVal.present()) { + UID clusterId = BinaryReader::fromStringRef(clusterIdVal.get(), IncludeVersion()); + return clusterId; + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY, TaskPriority::BlobManager)); + tr->reset(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + ACTOR Future blobManager(BlobManagerInterface bmInterf, Reference const> dbInfo, int64_t epoch) { + state Reference self = + makeReference(bmInterf.id(), + dbInfo, + openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True), + bmInterf.locality.dcId(), + epoch); + if (g_network->isSimulated()) { - std::string clusterId = dbInfo->get().clusterInterface.id().shortString(); + UID clusterId = wait(fetchClusterId(self->db)); auto clusterEpoc = std::make_pair(clusterId, epoch); bool managerEpochAlreadySeen = managerEpochsSeen.count(clusterEpoc); if (managerEpochAlreadySeen) { @@ -5356,13 +5382,6 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, ASSERT(!managerEpochAlreadySeen); managerEpochsSeen[clusterEpoc] = bmInterf.id(); } - state Reference self = - makeReference(bmInterf.id(), - dbInfo, - openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True), - bmInterf.locality.dcId(), - epoch); - state Future collection = actorCollection(self->addActor.getFuture()); if (BM_DEBUG) { diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index a7cc15ef71..46f25e7966 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -128,7 +128,6 @@ private: blobGranuleMappingKeys, // Map granule to workers. Track the active granules blobGranuleFileKeys, // Map a granule version to granule files. Track files for a granule blobGranuleHistoryKeys, // Map granule to its parents and parent bundaries. for time-travel read - blobGranuleSplitKeys, // Granule split state to recover from a splitting granule blobRangeKeys // Key ranges managed by blob }; for (auto range : ranges) { diff --git a/fdbserver/BlobMigrator.actor.cpp b/fdbserver/BlobMigrator.actor.cpp index ad59d075c0..2b441cc27e 100644 --- a/fdbserver/BlobMigrator.actor.cpp +++ b/fdbserver/BlobMigrator.actor.cpp @@ -370,7 +370,10 @@ private: // no need to apply mutation logs if granule is already on that version if (granule.version < maxLogVersion) { ranges.push_back(ranges.arena(), granule.keyRange); - beginVersions.push_back(beginVersions.arena(), granule.version + 1); + // Blob granule ends at granule.version(inclusive), so we need to apply mutation logs + // after granule.version(exclusive). + beginVersions.push_back(beginVersions.arena(), granule.version); + TraceEvent("ApplyMutationLogVersion").detail("GID", granule.granuleID).detail("Ver", granule.version); } } Optional restoreSet = @@ -503,7 +506,7 @@ private: choose { when(SplitRangeRequest req = waitNext(ssi.getRangeSplitPoints.getFuture())) { dprint("Unsupported SplitRangeRequest\n"); - req.reply.sendError(unsupported_operation()); + req.reply.sendError(broken_promise()); } when(StorageQueuingMetricsRequest req = waitNext(ssi.getQueuingMetrics.getFuture())) { self->actors_.add(processStorageQueuingMetricsRequest(req)); diff --git a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp index f11f9a07df..8bee93cd9c 100644 --- a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp +++ b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp @@ -110,7 +110,6 @@ struct BlobRestoreWorkload : TestWorkload { } } - // ACTOR static Future copyToOriginalDb(Database cx, BlobRestoreWorkload* self) { state RangeResult data; diff --git a/fdbserver/workloads/IncrementalBackup.actor.cpp b/fdbserver/workloads/IncrementalBackup.actor.cpp index 64992c3ca4..fcf95d7f48 100644 --- a/fdbserver/workloads/IncrementalBackup.actor.cpp +++ b/fdbserver/workloads/IncrementalBackup.actor.cpp @@ -116,18 +116,21 @@ struct IncrementalBackupWorkload : TestWorkload { state int tries = 0; loop { tries++; - BackupDescription desc = wait(backupContainer->describeBackup(true)); + state BackupDescription desc = wait(backupContainer->describeBackup(true)); TraceEvent("IBackupVersionGate") .detail("MaxLogEndVersion", desc.maxLogEnd.present() ? desc.maxLogEnd.get() : invalidVersion) .detail("ContiguousLogEndVersion", desc.contiguousLogEnd.present() ? desc.contiguousLogEnd.get() : invalidVersion) .detail("TargetVersion", v); - if (self->waitRetries != -1 && tries > self->waitRetries) - break; - if (!desc.contiguousLogEnd.present()) + + if (!desc.contiguousLogEnd.present()) { + wait(delay(5.0)); continue; + } if (desc.contiguousLogEnd.get() >= v) break; + if (self->waitRetries != -1 && tries > self->waitRetries) + break; // Avoid spamming requests with a delay wait(delay(5.0)); } diff --git a/tests/fast/BlobRestoreBasic.toml b/tests/fast/BlobRestoreBasic.toml index 785f51cb47..2b9b57e19c 100644 --- a/tests/fast/BlobRestoreBasic.toml +++ b/tests/fast/BlobRestoreBasic.toml @@ -34,7 +34,6 @@ waitForQuiescence = false tag = 'default' submitOnly = true waitForBackup = true - waitRetries = 100 [[test]] testTitle = 'CycleTest' @@ -55,7 +54,6 @@ clearAfterTest = false tag = 'default' waitForBackup = true stopBackup = true - waitRetries = 500 [[test]] testTitle = 'BlobRestore' @@ -67,3 +65,15 @@ waitForQuiescence = false [[test.workload]] testName = 'BlobRestoreWorkload' performRestore = true + + +[[test]] +testTitle = 'VerifyCycle' +checkOnly = true + + [[test.workload]] + testName = 'Cycle' + nodeCount = 3000 + transactionsPerSecond = 3000.0 + testDuration = 10.0 + expectedRate = 0