diff --git a/fdbcli/BlobRestoreCommand.actor.cpp b/fdbcli/BlobRestoreCommand.actor.cpp new file mode 100644 index 0000000000..fad60d7d74 --- /dev/null +++ b/fdbcli/BlobRestoreCommand.actor.cpp @@ -0,0 +1,47 @@ +/* + * BlobRestoreCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" +#include "fdbclient/FDBOptions.h" +#include "fdbclient/FDBTypes.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/SystemData.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +namespace fdb_cli { + +ACTOR Future blobRestoreCommandActor(Database localDb, std::vector tokens) { + if (tokens.size() != 1 && tokens.size() != 2) { + printUsage(tokens[0]); + return false; + } + + state bool success = false; + wait(store(success, localDb->blobRestore(normalKeys))); + if (success) { + fmt::print("Started blob restore for the full cluster. Please use 'status' command to check progress.\n"); + } else { + fmt::print("Fail to start a new blob restore while there is a pending one.\n"); + } + return success; +} + +CommandFactory blobRestoreFactory("blobrestore", CommandHelp("blobrestore", "", "")); +} // namespace fdb_cli diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index c2473e1f59..a82c86436e 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -1416,6 +1416,13 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise, Reference blobRangeCommandActor(Database localDb, ACTOR Future blobKeyCommandActor(Database localDb, Optional tenantEntry, std::vector tokens); +// blobrestore command +ACTOR Future blobRestoreCommandActor(Database localDb, std::vector tokens); + // maintenance command ACTOR Future setHealthyZone(Reference db, StringRef zoneId, double seconds, bool printWarning = false); ACTOR Future clearHealthyZone(Reference db, diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 0345bfee39..93b6716afb 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -10915,6 +10915,37 @@ Future>> DatabaseContext::listBlobbifiedRanges return listBlobbifiedRangesActor(Reference::addRef(this), range, rangeLimit, tenantName); } +ACTOR Future blobRestoreActor(Reference cx, KeyRange range) { + state Database db(cx); + state Reference tr = makeReference(db); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + state Key key = blobRestoreCommandKeyFor(range); + Optional value = wait(tr->get(key)); + if (value.present()) { + Standalone status = decodeBlobRestoreStatus(value.get()); + if (status.progress < 100) { + return false; // stop if there is in-progress restore. + } + } + Standalone status; + status.progress = 0; + Value newValue = blobRestoreCommandValueFor(status); + tr->set(key, newValue); + wait(tr->commit()); + return true; + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + +Future DatabaseContext::blobRestore(KeyRange range) { + return blobRestoreActor(Reference::addRef(this), range); +} + int64_t getMaxKeySize(KeyRef const& key) { return getMaxWriteKeySize(key, true); } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 39aca67c2e..4ebc2a0e80 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1660,6 +1660,36 @@ BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value) { return interf; } +const KeyRangeRef blobRestoreCommandKeys("\xff\x02/blobRestoreCommand/"_sr, "\xff\x02/blobRestoreCommand0"_sr); + +const Value blobRestoreCommandKeyFor(const KeyRangeRef range) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobRestoreCommandKeys.begin); + wr << range; + return wr.toValue(); +} + +const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key) { + KeyRange range; + BinaryReader reader(key.removePrefix(blobRestoreCommandKeys.begin), + AssumeVersion(ProtocolVersion::withBlobGranule())); + reader >> range; + return range; +} + +const Value blobRestoreCommandValueFor(BlobRestoreStatus status) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); + wr << status; + return wr.toValue(); +} + +Standalone decodeBlobRestoreStatus(ValueRef const& value) { + Standalone status; + BinaryReader reader(value, IncludeVersion()); + reader >> status; + return status; +} + const KeyRangeRef storageQuotaKeys("\xff/storageQuota/"_sr, "\xff/storageQuota0"_sr); const KeyRef storageQuotaPrefix = storageQuotaKeys.begin; diff --git a/fdbclient/include/fdbclient/BlobGranuleCommon.h b/fdbclient/include/fdbclient/BlobGranuleCommon.h index 7cdc72fb71..23abc7d974 100644 --- a/fdbclient/include/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/include/fdbclient/BlobGranuleCommon.h @@ -313,4 +313,15 @@ struct BlobManifest { } }; +// Defines blob restore status +struct BlobRestoreStatus { + constexpr static FileIdentifier file_identifier = 378657; + int progress; + + template + void serialize(Ar& ar) { + serializer(ar, progress); + } +}; + #endif diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index f28da0399a..a16baaffe2 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -403,6 +403,7 @@ public: Future verifyBlobRange(const KeyRange& range, Optional version, Optional tenantName = {}); + Future blobRestore(const KeyRange range); // private: explicit DatabaseContext(Reference>> connectionRecord, diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 1393dc63b1..de30ddede5 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -710,6 +710,13 @@ UID decodeBlobWorkerListKey(KeyRef const& key); const Value blobWorkerListValue(BlobWorkerInterface const& interface); BlobWorkerInterface decodeBlobWorkerListValue(ValueRef const& value); +// Blob restore command +extern const KeyRangeRef blobRestoreCommandKeys; +const Value blobRestoreCommandKeyFor(const KeyRangeRef range); +const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key); +const Value blobRestoreCommandValueFor(BlobRestoreStatus status); +Standalone decodeBlobRestoreStatus(ValueRef const& value); + // Storage quota per tenant // "\xff/storageQuota/[[tenantName]]" := "[[quota]]" extern const KeyRangeRef storageQuotaKeys; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 3d4af419ea..b9cdebc507 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -388,6 +388,8 @@ struct BlobManagerData : NonCopyable, ReferenceCounted { Promise iAmReplaced; + bool isFullRestoreMode = false; + BlobManagerData(UID id, Reference const> dbInfo, Database db, @@ -3537,7 +3539,10 @@ ACTOR Future recoverBlobManager(Reference bmData) { bmData->startRecruiting.trigger(); bmData->initBStore(); - if (isFullRestoreMode()) { + + bool isFullRestore = wait(isFullRestoreMode(bmData->db, normalKeys)); + bmData->isFullRestoreMode = isFullRestore; + if (bmData->isFullRestoreMode) { wait(loadManifest(bmData->db, bmData->bstore)); int64_t epoc = wait(lastBlobEpoc(bmData->db, bmData->bstore)); @@ -5297,11 +5302,8 @@ ACTOR Future backupManifest(Reference bmData) { bmData->initBStore(); loop { - bool pendingSplit = wait(hasPendingSplit(bmData)); - if (!pendingSplit) { - wait(dumpManifest(bmData->db, bmData->bstore, bmData->epoch, bmData->manifestDumperSeqNo)); - bmData->manifestDumperSeqNo++; - } + wait(dumpManifest(bmData->db, bmData->bstore, bmData->epoch, bmData->manifestDumperSeqNo)); + bmData->manifestDumperSeqNo++; wait(delay(SERVER_KNOBS->BLOB_MANIFEST_BACKUP_INTERVAL)); } } @@ -5370,7 +5372,7 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, if (SERVER_KNOBS->BG_ENABLE_MERGING) { self->addActor.send(granuleMergeChecker(self)); } - if (SERVER_KNOBS->BLOB_MANIFEST_BACKUP && !isFullRestoreMode()) { + if (SERVER_KNOBS->BLOB_MANIFEST_BACKUP && !self->isFullRestoreMode) { self->addActor.send(backupManifest(self)); } diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index fd896a9e38..230ef0cc3c 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -60,7 +60,7 @@ struct BlobManifestFile { int64_t seqNo{ 0 }; BlobManifestFile(const std::string& path) { - if (sscanf(path.c_str(), MANIFEST_FOLDER "/manifest.%" SCNd64 ".%" SCNd64, &epoch, &seqNo) == 2) { + if (sscanf(path.c_str(), MANIFEST_FOLDER "/" MANIFEST_FOLDER ".%" SCNd64 ".%" SCNd64, &epoch, &seqNo) == 2) { fileName = path; } } @@ -76,7 +76,7 @@ struct BlobManifestFile { BlobManifestFile file(path); return file.epoch > 0 && file.seqNo > 0; }; - BackupContainerFileSystem::FilesAndSizesT filesAndSizes = wait(reader->listFiles(MANIFEST_FOLDER, filter)); + BackupContainerFileSystem::FilesAndSizesT filesAndSizes = wait(reader->listFiles(MANIFEST_FOLDER "/", filter)); std::vector result; for (auto& f : filesAndSizes) { @@ -107,6 +107,9 @@ public: try { state Standalone manifest; Standalone> rows = wait(getSystemKeys(self)); + if (rows.size() == 0) { + return Void(); + } manifest.rows = rows; Value data = encode(manifest); wait(writeToFile(self, data)); @@ -153,7 +156,8 @@ private: state std::string fullPath; std::tie(writer, fullPath) = self->blobConn_->createForWrite(MANIFEST_FOLDER); - state std::string fileName = format(MANIFEST_FOLDER "/manifest.%lld.%lld", self->epoch_, self->seqNo_); + state std::string fileName = + format(MANIFEST_FOLDER "/" MANIFEST_FOLDER ".%lld.%lld", self->epoch_, self->seqNo_); state Reference file = wait(writer->writeFile(fileName)); wait(file->append(data.begin(), data.size())); wait(file->finish()); @@ -453,3 +457,26 @@ ACTOR Future lastBlobEpoc(Database db, Reference isFullRestoreMode(Database db, KeyRangeRef keys) { + state Transaction tr(db); + loop { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + try { + RangeResult ranges = wait(tr.getRange(blobRestoreCommandKeys, CLIENT_KNOBS->TOO_MANY)); + for (auto& r : ranges) { + KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key); + if (keyRange.contains(keys)) { + Standalone status = decodeBlobRestoreStatus(r.value); + return status.progress < 100; // progress is less than 100 + } + } + return false; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} diff --git a/fdbserver/BlobMigrator.actor.cpp b/fdbserver/BlobMigrator.actor.cpp index dd7d5f3ea3..0c23ed7904 100644 --- a/fdbserver/BlobMigrator.actor.cpp +++ b/fdbserver/BlobMigrator.actor.cpp @@ -21,6 +21,7 @@ #include "flow/ActorCollection.h" #include "flow/FastRef.h" #include "flow/IRandom.h" +#include "flow/Trace.h" #include "flow/flow.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/BlobConnectionProvider.h" @@ -63,14 +64,7 @@ public: // Start migration ACTOR static Future start(Reference self) { - if (!isFullRestoreMode()) { - return Void(); - } - wait(delay(10)); // TODO need to wait for a signal for readiness of blob manager - - BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_)); - self->blobGranules_ = granules; - + wait(checkIfReadyForMigration(self)); wait(prepare(self, normalKeys)); wait(advanceVersion(self)); wait(serverLoop(self)); @@ -78,6 +72,28 @@ public: } private: + // Check if blob manifest is loaded so that blob migration can start + ACTOR static Future checkIfReadyForMigration(Reference self) { + loop { + bool isFullRestore = wait(isFullRestoreMode(self->db_, normalKeys)); + if (isFullRestore) { + BlobGranuleRestoreVersionVector granules = wait(listBlobGranules(self->db_, self->blobConn_)); + if (!granules.empty()) { + self->blobGranules_ = granules; + for (BlobGranuleRestoreVersion granule : granules) { + TraceEvent("RestorableGranule") + .detail("GranuleId", granule.granuleID.toString()) + .detail("KeyRange", granule.keyRange.toString()) + .detail("Version", granule.version) + .detail("SizeInBytes", granule.sizeInBytes); + } + return Void(); + } + } + wait(delay(SERVER_KNOBS->BLOB_MIGRATOR_CHECK_INTERVAL)); + } + } + // Prepare for data migration for given key range. ACTOR static Future prepare(Reference self, KeyRangeRef keys) { // Register as a storage server, so that DataDistributor could start data movement after @@ -136,8 +152,9 @@ private: } } if (owning) { - dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString()); wait(krmSetRange(&tr, serverKeysPrefixFor(id), keys, serverKeysFalse)); + dprint("Unassign {} from storage server {}\n", keys.toString(), id.toString()); + TraceEvent("UnassignKeys").detail("Keys", keys.toString()).detail("From", id.toString()); } } wait(tr.commit()); @@ -185,8 +202,10 @@ private: // Calculated progress int64_t total = sizeInBytes(self); int progress = (total - incompleted) * 100 / total; - bool done = incompleted == 0; - dprint("Progress {} :{}%. done {}\n", serverID.toString(), progress, done); + state bool done = incompleted == 0; + dprint("Migration progress :{}%. done {}\n", progress, done); + TraceEvent("BlobMigratorProgress").detail("Progress", progress).detail("Done", done); + wait(updateProgress(self, normalKeys, progress)); return done; } catch (Error& e) { wait(tr.onError(e)); @@ -194,6 +213,32 @@ private: } } + // Update restore progress + ACTOR static Future updateProgress(Reference self, KeyRangeRef range, int progress) { + state Transaction tr(self->db_); + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + state Key key = blobRestoreCommandKeyFor(range); + Optional value = wait(tr.get(key)); + if (value.present()) { + Standalone status = decodeBlobRestoreStatus(value.get()); + if (progress > status.progress) { + status.progress = progress; + Value updatedValue = blobRestoreCommandValueFor(status); + tr.set(key, updatedValue); + wait(tr.commit()); + } + } + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } + // Advance version, so that future commits will have a larger version than the restored data ACTOR static Future advanceVersion(Reference self) { state Transaction tr(self->db_); @@ -207,6 +252,7 @@ private: if (currentVersion <= expectedVersion) { tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(expectedVersion + 1, Unversioned())); dprint("Advance version from {} to {}\n", currentVersion, expectedVersion); + TraceEvent("AdvanceVersion").detail("Current", currentVersion).detail("New", expectedVersion); wait(tr.commit()); } return Void(); @@ -218,7 +264,7 @@ private: // Main server loop ACTOR static Future serverLoop(Reference self) { - self->actors_.add(waitFailureServer(self->interf_.ssi.waitFailure.getFuture())); + self->actors_.add(waitFailureServer(self->interf_.waitFailure.getFuture())); self->actors_.add(logProgress(self)); self->actors_.add(handleRequest(self)); self->actors_.add(handleUnsupportedRequest(self)); @@ -226,6 +272,7 @@ private: try { choose { when(HaltBlobMigratorRequest req = waitNext(self->interf_.haltBlobMigrator.getFuture())) { + dprint("Stopping blob migrator {}\n", self->interf_.id().toString()); req.reply.send(Void()); TraceEvent("BlobMigratorHalted", self->interf_.id()).detail("ReqID", req.requesterID); break; @@ -237,6 +284,8 @@ private: throw; } } + self->actors_.clear(true); + dprint("Stopped blob migrator {}\n", self->interf_.id().toString()); return Void(); } @@ -267,7 +316,7 @@ private: req.reply.send(rep); } when(GetStorageMetricsRequest req = waitNext(ssi.getStorageMetrics.getFuture())) { - fmt::print("Handle GetStorageMetrics\n"); + // fmt::print("Handle GetStorageMetrics\n"); StorageMetrics metrics; metrics.bytes = sizeInBytes(self); GetStorageMetricsReply resp; @@ -331,7 +380,7 @@ private: req.reply.sendError(unsupported_operation()); } when(UpdateCommitCostRequest req = waitNext(ssi.updateCommitCostRequest.getFuture())) { - dprint("Unsupported UpdateCommitCostRequest\n"); + // dprint("Unsupported UpdateCommitCostRequest\n"); req.reply.sendError(unsupported_operation()); } when(FetchCheckpointKeyValuesRequest req = waitNext(ssi.fetchCheckpointKeyValues.getFuture())) { @@ -358,9 +407,9 @@ private: } ACTOR static Future processStorageQueuingMetricsRequest(StorageQueuingMetricsRequest req) { - dprint("Unsupported StorageQueuingMetricsRequest\n"); - // FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD - // processes + // dprint("Unsupported StorageQueuingMetricsRequest\n"); + // FIXME get rid of this delay. it's a temp solution to avoid starvaion scheduling of DD + // processes wait(delay(1)); req.reply.sendError(unsupported_operation()); return Void(); @@ -398,7 +447,8 @@ private: // Main entry point ACTOR Future blobMigrator(BlobMigratorInterface interf, Reference const> dbInfo) { - fmt::print("Start blob migrator {} \n", interf.id().toString()); + TraceEvent("StartBlobMigrator").detail("Interface", interf.id().toString()); + dprint("Starting blob migrator {}\n", interf.id().toString()); try { Reference self = makeReference(dbInfo, interf); wait(BlobMigrator::start(self)); diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index ccd7284fe1..a9ab173095 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -292,6 +292,8 @@ struct BlobWorkerData : NonCopyable, ReferenceCounted { int64_t lastResidentMemory = 0; double lastResidentMemoryCheckTime = -100.0; + bool isFullRestoreMode = false; + BlobWorkerData(UID id, Reference const> dbInfo, Database db) : id(id), db(db), tenantData(BGTenantMap(dbInfo)), dbInfo(dbInfo), initialSnapshotLock(new FlowLock(SERVER_KNOBS->BLOB_WORKER_INITIAL_SNAPSHOT_PARALLELISM)), @@ -2146,7 +2148,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, } // No need to start Change Feed in full restore mode - if (isFullRestoreMode()) + if (bwData->isFullRestoreMode) return Void(); checkMergeCandidate = granuleCheckMergeCandidate(bwData, @@ -3588,7 +3590,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl state Reference metadata = m; // state Version granuleBeginVersion = req.beginVersion; // skip waiting for CF ready for recovery mode - if (!isFullRestoreMode()) { + if (!bwData->isFullRestoreMode) { choose { when(wait(metadata->readable.getFuture())) {} when(wait(metadata->cancelled.getFuture())) { throw wrong_shard_server(); } @@ -3646,7 +3648,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl // this is an active granule query loop { // skip check since CF doesn't start for bare metal recovery mode - if (isFullRestoreMode()) { + if (bwData->isFullRestoreMode) { break; } if (!metadata->activeCFData.get().isValid() || !metadata->cancelled.canBeSet()) { @@ -3689,7 +3691,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl // if feed was popped by another worker and BW only got empty versions, it wouldn't itself see that it // got popped, but we can still reject the in theory this should never happen with other protections but // it's a useful and inexpensive sanity check - if (!isFullRestoreMode()) { + if (!bwData->isFullRestoreMode) { Version emptyVersion = metadata->activeCFData.get()->popVersion - 1; if (req.readVersion > metadata->durableDeltaVersion.get() && emptyVersion > metadata->bufferedDeltaVersion) { @@ -3995,6 +3997,9 @@ ACTOR Future openGranule(Reference bwData, As throw granule_assignment_conflict(); } + bool isFullRestore = wait(isFullRestoreMode(bwData->db, req.keyRange)); + bwData->isFullRestoreMode = isFullRestore; + Optional prevLockValue = wait(fLockValue); state bool hasPrevOwner = prevLockValue.present(); state bool createChangeFeed = false; @@ -4069,7 +4074,7 @@ ACTOR Future openGranule(Reference bwData, As } // for recovery mode - don't create change feed, don't create snapshot - if (isFullRestoreMode()) { + if (bwData->isFullRestoreMode) { createChangeFeed = false; info.doSnapshot = false; GranuleFiles granuleFiles = wait(loadPreviousFiles(&tr, info.granuleID)); @@ -4091,7 +4096,7 @@ ACTOR Future openGranule(Reference bwData, As } } - if (createChangeFeed && !isFullRestoreMode()) { + if (createChangeFeed && !bwData->isFullRestoreMode) { // create new change feed for new version of granule wait(updateChangeFeed( &tr, granuleIDToCFKey(info.granuleID), ChangeFeedStatus::CHANGE_FEED_CREATE, req.keyRange)); @@ -4103,7 +4108,8 @@ ACTOR Future openGranule(Reference bwData, As // If anything in previousGranules, need to do the handoff logic and set // ret.previousChangeFeedId, and the previous durable version will come from the previous // granules - if (info.history.present() && info.history.get().value.parentVersions.size() > 0 && !isFullRestoreMode()) { + if (info.history.present() && info.history.get().value.parentVersions.size() > 0 && + !bwData->isFullRestoreMode) { CODE_PROBE(true, "Granule open found parent"); if (info.history.get().value.parentVersions.size() == 1) { // split state KeyRangeRef parentRange(info.history.get().value.parentBoundaries[0], diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index a8600d5a18..cc23e68f8d 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "fdbclient/FDBTypes.h" @@ -691,7 +692,7 @@ void checkBetterSingletons(ClusterControllerData* self) { WorkerDetails newMGWorker; if (self->db.blobGranulesEnabled.get()) { newBMWorker = findNewProcessForSingleton(self, ProcessClass::BlobManager, id_used); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { newMGWorker = findNewProcessForSingleton(self, ProcessClass::BlobMigrator, id_used); } } @@ -710,7 +711,7 @@ void checkBetterSingletons(ClusterControllerData* self) { ProcessClass::Fitness bestFitnessForMG; if (self->db.blobGranulesEnabled.get()) { bestFitnessForBM = findBestFitnessForSingleton(self, newBMWorker, ProcessClass::BlobManager); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { bestFitnessForMG = findBestFitnessForSingleton(self, newMGWorker, ProcessClass::BlobManager); } } @@ -744,7 +745,7 @@ void checkBetterSingletons(ClusterControllerData* self) { if (self->db.blobGranulesEnabled.get()) { bmHealthy = isHealthySingleton( self, newBMWorker, bmSingleton, bestFitnessForBM, self->recruitingBlobManagerID); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { mgHealthy = isHealthySingleton( self, newMGWorker, mgSingleton, bestFitnessForMG, self->recruitingBlobMigratorID); } @@ -775,7 +776,7 @@ void checkBetterSingletons(ClusterControllerData* self) { if (self->db.blobGranulesEnabled.get()) { currBMProcessId = bmSingleton.interface.get().locality.processId(); newBMProcessId = newBMWorker.interf.locality.processId(); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { currMGProcessId = mgSingleton.interface.get().locality.processId(); newMGProcessId = newMGWorker.interf.locality.processId(); } @@ -792,7 +793,7 @@ void checkBetterSingletons(ClusterControllerData* self) { if (self->db.blobGranulesEnabled.get()) { currPids.emplace_back(currBMProcessId); newPids.emplace_back(newBMProcessId); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { currPids.emplace_back(currMGProcessId); newPids.emplace_back(newMGProcessId); } @@ -810,7 +811,7 @@ void checkBetterSingletons(ClusterControllerData* self) { if (!self->db.blobGranulesEnabled.get()) { ASSERT(currColocMap[currBMProcessId] == 0); ASSERT(newColocMap[newBMProcessId] == 0); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { ASSERT(currColocMap[currMGProcessId] == 0); ASSERT(newColocMap[newMGProcessId] == 0); } @@ -836,7 +837,7 @@ void checkBetterSingletons(ClusterControllerData* self) { ddSingleton.recruit(self); } else if (self->db.blobGranulesEnabled.get() && newColocMap[newBMProcessId] < currColocMap[currBMProcessId]) { bmSingleton.recruit(self); - } else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() && + } else if (self->db.blobGranulesEnabled.get() && self->db.blobRestoreEnabled.get() && newColocMap[newMGProcessId] < currColocMap[currMGProcessId]) { mgSingleton.recruit(self); } else if (SERVER_KNOBS->ENABLE_ENCRYPTION && newColocMap[newEKPProcessId] < currColocMap[currEKPProcessId]) { @@ -1404,13 +1405,13 @@ ACTOR Future registerWorker(RegisterWorkerRequest req, self, w, currSingleton, registeringSingleton, self->recruitingRatekeeperID); } - if (self->db.blobGranulesEnabled.get() && isFullRestoreMode() && req.blobManagerInterf.present()) { + if (self->db.blobGranulesEnabled.get() && req.blobManagerInterf.present()) { auto currSingleton = BlobManagerSingleton(self->db.serverInfo->get().blobManager); auto registeringSingleton = BlobManagerSingleton(req.blobManagerInterf); haltRegisteringOrCurrentSingleton( self, w, currSingleton, registeringSingleton, self->recruitingBlobManagerID); } - if (req.blobMigratorInterf.present()) { + if (req.blobMigratorInterf.present() && self->db.blobRestoreEnabled.get()) { auto currSingleton = BlobMigratorSingleton(self->db.serverInfo->get().blobMigrator); auto registeringSingleton = BlobMigratorSingleton(req.blobMigratorInterf); haltRegisteringOrCurrentSingleton( @@ -2553,6 +2554,43 @@ ACTOR Future getNextBMEpoch(ClusterControllerData* self) { } } +ACTOR Future watchBlobRestoreCommand(ClusterControllerData* self) { + state Reference tr = makeReference(self->cx); + state Key blobRestoreCommandKey = blobRestoreCommandKeyFor(normalKeys); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Optional blobRestoreCommand = wait(tr->get(blobRestoreCommandKey)); + if (blobRestoreCommand.present()) { + Standalone status = decodeBlobRestoreStatus(blobRestoreCommand.get()); + TraceEvent("WatchBlobRestoreCommand").detail("Progress", status.progress); + if (status.progress == 0) { + self->db.blobRestoreEnabled.set(true); + if (self->db.blobGranulesEnabled.get()) { + const auto& blobManager = self->db.serverInfo->get().blobManager; + if (blobManager.present()) { + BlobManagerSingleton(blobManager) + .haltBlobGranules(self, blobManager.get().locality.processId()); + } + const auto& blobMigrator = self->db.serverInfo->get().blobMigrator; + if (blobMigrator.present()) { + BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId()); + } + } + } + } + + state Future watch = tr->watch(blobRestoreCommandKey); + wait(tr->commit()); + wait(watch); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + ACTOR Future startBlobMigrator(ClusterControllerData* self, double waitTime) { // If master fails at the same time, give it a chance to clear master PID. // Also wait to avoid too many consecutive recruits in a small time window. @@ -2629,9 +2667,8 @@ ACTOR Future monitorBlobMigrator(ClusterControllerData* self) { } loop { if (self->db.serverInfo->get().blobMigrator.present() && !self->recruitBlobMigrator.get()) { - state Future wfClient = - waitFailureClient(self->db.serverInfo->get().blobMigrator.get().ssi.waitFailure, - SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME); + state Future wfClient = waitFailureClient(self->db.serverInfo->get().blobMigrator.get().waitFailure, + SERVER_KNOBS->BLOB_MIGRATOR_FAILURE_TIME); loop { choose { when(wait(wfClient)) { @@ -2643,11 +2680,11 @@ ACTOR Future monitorBlobMigrator(ClusterControllerData* self) { when(wait(self->recruitBlobMigrator.onChange())) {} } } - } else if (self->db.blobGranulesEnabled.get() && isFullRestoreMode()) { + } else if (self->db.blobGranulesEnabled.get() && self->db.blobRestoreEnabled.get()) { // if there is no blob migrator present but blob granules are now enabled, recruit a BM wait(startBlobMigrator(self, recruitThrottler.newRecruitment())); } else { - wait(self->db.blobGranulesEnabled.onChange()); + wait(self->db.blobGranulesEnabled.onChange() || self->db.blobRestoreEnabled.onChange()); } } } @@ -2778,7 +2815,7 @@ ACTOR Future monitorBlobManager(ClusterControllerData* self) { const auto& blobManager = self->db.serverInfo->get().blobManager; BlobManagerSingleton(blobManager) .haltBlobGranules(self, blobManager.get().locality.processId()); - if (isFullRestoreMode()) { + if (self->db.blobRestoreEnabled.get()) { const auto& blobMigrator = self->db.serverInfo->get().blobMigrator; BlobMigratorSingleton(blobMigrator).halt(self, blobMigrator.get().locality.processId()); } @@ -3079,8 +3116,9 @@ ACTOR Future clusterControllerCore(ClusterControllerFullInterface interf, self.addActor.send(monitorDataDistributor(&self)); self.addActor.send(monitorRatekeeper(&self)); self.addActor.send(monitorBlobManager(&self)); - self.addActor.send(monitorBlobMigrator(&self)); self.addActor.send(watchBlobGranulesConfigKey(&self)); + self.addActor.send(monitorBlobMigrator(&self)); + self.addActor.send(watchBlobRestoreCommand(&self)); self.addActor.send(monitorConsistencyScan(&self)); self.addActor.send(metaclusterMetricsUpdater(&self)); self.addActor.send(dbInfoUpdater(&self)); diff --git a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h index 59b226202e..4e39e53ee9 100644 --- a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h @@ -162,10 +162,7 @@ ACTOR Future loadManifest(Database db, Reference b ACTOR Future printRestoreSummary(Database db, Reference blobConn); ACTOR Future listBlobGranules(Database db, Reference blobConn); ACTOR Future lastBlobEpoc(Database db, Reference blobConn); - -inline bool isFullRestoreMode() { - return SERVER_KNOBS->BLOB_FULL_RESTORE_MODE; -}; +ACTOR Future isFullRestoreMode(Database db, KeyRangeRef range); #include "flow/unactorcompiler.h" diff --git a/fdbserver/include/fdbserver/BlobMigratorInterface.h b/fdbserver/include/fdbserver/BlobMigratorInterface.h index 5b9cb6b97a..32fb6b0c0b 100644 --- a/fdbserver/include/fdbserver/BlobMigratorInterface.h +++ b/fdbserver/include/fdbserver/BlobMigratorInterface.h @@ -30,6 +30,7 @@ struct BlobMigratorInterface { constexpr static FileIdentifier file_identifier = 869199; RequestStream haltBlobMigrator; + RequestStream> waitFailure; LocalityData locality; UID uniqueID; StorageServerInterface ssi; @@ -48,7 +49,7 @@ struct BlobMigratorInterface { template void serialize(Archive& ar) { - serializer(ar, locality, uniqueID, haltBlobMigrator); + serializer(ar, locality, uniqueID, haltBlobMigrator, waitFailure); } }; diff --git a/fdbserver/include/fdbserver/ClusterController.actor.h b/fdbserver/include/fdbserver/ClusterController.actor.h index b3f9aa27d7..a4ceea4592 100644 --- a/fdbserver/include/fdbserver/ClusterController.actor.h +++ b/fdbserver/include/fdbserver/ClusterController.actor.h @@ -144,6 +144,7 @@ public: Future clientCounter; int clientCount; AsyncVar blobGranulesEnabled; + AsyncVar blobRestoreEnabled; ClusterType clusterType = ClusterType::STANDALONE; Optional metaclusterName; Optional metaclusterRegistration; @@ -159,7 +160,7 @@ public: TaskPriority::DefaultEndpoint, LockAware::True)), // SOMEDAY: Locality! unfinishedRecoveries(0), logGenerations(0), cachePopulated(false), clientCount(0), - blobGranulesEnabled(config.blobGranulesEnabled) { + blobGranulesEnabled(config.blobGranulesEnabled), blobRestoreEnabled(false) { clientCounter = countClients(this); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index a6bb04860b..32489d6ec9 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -6138,6 +6138,7 @@ ACTOR Future>> tryReadBlobGranules(Tra loop { try { Standalone> chunks = wait(tr->readBlobGranules(keys, 0, readVersion)); + TraceEvent(SevDebug, "ReadBlobGranules").detail("Keys", keys).detail("Chunks", chunks.size()); return chunks; } catch (Error& e) { if (retryCount >= maxRetryCount) { @@ -6169,10 +6170,7 @@ ACTOR Future tryGetRangeFromBlob(PromiseStream results, for (i = 0; i < chunks.size(); ++i) { state KeyRangeRef chunkRange = chunks[i].keyRange; state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn)); - TraceEvent("ReadBlobData") - .detail("Rows", rows.size()) - .detail("ChunkRange", chunkRange.toString()) - .detail("Keys", keys.toString()); + TraceEvent(SevDebug, "ReadBlobData").detail("Rows", rows.size()).detail("ChunkRange", chunkRange); if (rows.size() == 0) { rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end)); } @@ -6185,7 +6183,7 @@ ACTOR Future tryGetRangeFromBlob(PromiseStream results, } catch (Error& e) { TraceEvent(SevWarn, "ReadBlobDataFailure") .suppressFor(5.0) - .detail("Keys", keys.toString()) + .detail("Keys", keys) .detail("FetchVersion", fetchVersion) .detail("Error", e.what()); tr->reset(); @@ -6994,7 +6992,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // We must also ensure we have fetched all change feed metadata BEFORE changing the phase to fetching to ensure // change feed mutations get applied correctly state std::vector changeFeedsToFetch; - if (!isFullRestoreMode()) { + state bool isFullRestore = wait(isFullRestoreMode(data->cx, keys)); + if (!isFullRestore) { std::vector _cfToFetch = wait(fetchCFMetadata); changeFeedsToFetch = _cfToFetch; } @@ -7072,7 +7071,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state PromiseStream results; state Future hold; - if (SERVER_KNOBS->FETCH_USING_BLOB) { + if (isFullRestore) { hold = tryGetRangeFromBlob(results, &tr, keys, fetchVersion, data->blobConn); } else { hold = tryGetRange(results, &tr, keys); @@ -7110,7 +7109,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { data->thisServerID); } } - metricReporter.addFetchedBytes(expectedBlockSize, this_block.size()); // Write this_block to storage diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 0bd2bcc507..d39ab98026 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -2335,6 +2335,7 @@ ACTOR Future workerServer(Reference connRecord, } else { startRole(Role::BLOB_MIGRATOR, recruited.id(), interf.id()); DUMPTOKEN(recruited.haltBlobMigrator); + DUMPTOKEN(recruited.waitFailure); DUMPTOKEN(recruited.ssi.getValue); DUMPTOKEN(recruited.ssi.getKey); DUMPTOKEN(recruited.ssi.getKeyValues); @@ -2345,7 +2346,6 @@ ACTOR Future workerServer(Reference connRecord, DUMPTOKEN(recruited.ssi.getReadHotRanges); DUMPTOKEN(recruited.ssi.getRangeSplitPoints); DUMPTOKEN(recruited.ssi.getStorageMetrics); - DUMPTOKEN(recruited.ssi.waitFailure); DUMPTOKEN(recruited.ssi.getQueuingMetrics); DUMPTOKEN(recruited.ssi.getKeyValueStoreType); DUMPTOKEN(recruited.ssi.watchValue);