From c85b984c3a979c71a8144c53342791d2ca44a32e Mon Sep 17 00:00:00 2001 From: Hui Liu Date: Sat, 3 Dec 2022 08:45:23 -0800 Subject: [PATCH] blobrestore to previous point of time --- fdbcli/BlobRestoreCommand.actor.cpp | 14 ++- fdbclient/NativeAPI.actor.cpp | 11 ++- fdbclient/SystemData.cpp | 29 +++++++ .../include/fdbclient/BlobGranuleCommon.h | 13 +++ fdbclient/include/fdbclient/DatabaseContext.h | 2 +- fdbclient/include/fdbclient/SystemData.h | 5 ++ fdbserver/BlobManifest.actor.cpp | 56 ++++++++++++ fdbserver/BlobMigrator.actor.cpp | 85 +++++++++++-------- .../fdbserver/BlobGranuleServerCommon.actor.h | 2 + fdbserver/storageserver.actor.cpp | 15 +++- .../workloads/BlobRestoreWorkload.actor.cpp | 3 +- 11 files changed, 189 insertions(+), 46 deletions(-) diff --git a/fdbcli/BlobRestoreCommand.actor.cpp b/fdbcli/BlobRestoreCommand.actor.cpp index 5738782602..f5176037c0 100644 --- a/fdbcli/BlobRestoreCommand.actor.cpp +++ b/fdbcli/BlobRestoreCommand.actor.cpp @@ -33,8 +33,18 @@ ACTOR Future blobRestoreCommandActor(Database localDb, std::vector version; + if (tokens.size() > 1) { + Version v; + if (sscanf(tokens[1].toString().c_str(), "%" PRId64, &v) != 1) { + printUsage(tokens[0]); + return false; + } + version = v; + } + state bool success = false; - wait(store(success, localDb->blobRestore(normalKeys))); + wait(store(success, localDb->blobRestore(normalKeys, version))); if (success) { fmt::print( "Started blob restore for the full cluster. Please use 'status details' command to check progress.\n"); @@ -44,5 +54,5 @@ ACTOR Future blobRestoreCommandActor(Database localDb, std::vector>> DatabaseContext::listBlobbifiedRanges return listBlobbifiedRangesActor(Reference::addRef(this), range, rangeLimit, tenantName); } -ACTOR Future blobRestoreActor(Reference cx, KeyRange range) { +ACTOR Future blobRestoreActor(Reference cx, KeyRange range, Optional version) { state Database db(cx); state Reference tr = makeReference(db); loop { @@ -10937,6 +10937,11 @@ ACTOR Future blobRestoreActor(Reference cx, KeyRange rang BlobRestoreStatus status(BlobRestorePhase::INIT); Value newValue = blobRestoreCommandValueFor(status); tr->set(key, newValue); + + BlobRestoreArg arg(version); + Value argValue = blobRestoreArgValueFor(arg); + tr->set(blobRestoreArgKeyFor(range), argValue); + wait(tr->commit()); return true; } catch (Error& e) { @@ -10945,8 +10950,8 @@ ACTOR Future blobRestoreActor(Reference cx, KeyRange rang } } -Future DatabaseContext::blobRestore(KeyRange range) { - return blobRestoreActor(Reference::addRef(this), range); +Future DatabaseContext::blobRestore(KeyRange range, Optional version) { + return blobRestoreActor(Reference::addRef(this), range, version); } int64_t getMaxKeySize(KeyRef const& key) { diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index c8a32faa02..1f82ab3ee0 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1708,6 +1708,35 @@ Standalone decodeBlobRestoreStatus(ValueRef const& value) { return status; } +const KeyRangeRef blobRestoreArgKeys("\xff\x02/blobRestoreArgs/"_sr, "\xff\x02/blobRestoreArgs0"_sr); + +const Value blobRestoreArgKeyFor(const KeyRangeRef range) { + BinaryWriter wr(AssumeVersion(ProtocolVersion::withBlobGranule())); + wr.serializeBytes(blobRestoreArgKeys.begin); + wr << range; + return wr.toValue(); +} + +const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key) { + KeyRange range; + BinaryReader reader(key.removePrefix(blobRestoreArgKeys.begin), AssumeVersion(ProtocolVersion::withBlobGranule())); + reader >> range; + return range; +} + +const Value blobRestoreArgValueFor(BlobRestoreArg args) { + BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); + wr << args; + return wr.toValue(); +} + +Standalone decodeBlobRestoreArg(ValueRef const& value) { + Standalone args; + BinaryReader reader(value, IncludeVersion()); + reader >> args; + return args; +} + const KeyRangeRef storageQuotaKeys("\xff/storageQuota/"_sr, "\xff/storageQuota0"_sr); const KeyRef storageQuotaPrefix = storageQuotaKeys.begin; diff --git a/fdbclient/include/fdbclient/BlobGranuleCommon.h b/fdbclient/include/fdbclient/BlobGranuleCommon.h index c12c0f70e2..12dd47cda3 100644 --- a/fdbclient/include/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/include/fdbclient/BlobGranuleCommon.h @@ -339,4 +339,17 @@ struct BlobRestoreStatus { } }; +struct BlobRestoreArg { + constexpr static FileIdentifier file_identifier = 947689; + Optional version; + + BlobRestoreArg() {} + BlobRestoreArg(Optional v) : version(v){}; + + template + void serialize(Ar& ar) { + serializer(ar, version); + } +}; + #endif diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 6c6e2cb772..fc5850b395 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -412,7 +412,7 @@ public: Future verifyBlobRange(const KeyRange& range, Optional version, Optional tenantName = {}); - Future blobRestore(const KeyRange range); + Future blobRestore(const KeyRange range, Optional version); // private: explicit DatabaseContext(Reference>> connectionRecord, diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 766ca3af93..2ec563e8d8 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -726,6 +726,11 @@ const Value blobRestoreCommandKeyFor(const KeyRangeRef range); const KeyRange decodeBlobRestoreCommandKeyFor(const KeyRef key); const Value blobRestoreCommandValueFor(BlobRestoreStatus status); Standalone decodeBlobRestoreStatus(ValueRef const& value); +extern const KeyRangeRef blobRestoreArgKeys; +const Value blobRestoreArgKeyFor(const KeyRangeRef range); +const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key); +const Value blobRestoreArgValueFor(BlobRestoreArg args); +Standalone decodeBlobRestoreArg(ValueRef const& value); // Storage quota per tenant // "\xff/storageQuota/[[tenantGroupName]]" := "[[quota]]" diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index 387e17b6df..100da79501 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -626,3 +626,59 @@ ACTOR Future> getRestoreStatus(Database db, KeyRange } return result; } + +// Get restore argument +ACTOR Future> getRestoreArg(Database db, KeyRangeRef keys) { + state Transaction tr(db); + state Optional result; + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + try { + state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); + limits.minRows = 0; + state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreArgKeys.begin); + state KeySelectorRef end = firstGreaterOrEqual(blobRestoreArgKeys.end); + loop { + RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True)); + for (auto& r : ranges) { + KeyRange keyRange = decodeBlobRestoreArgKeyFor(r.key); + if (keys.intersects(keyRange)) { + Standalone arg = decodeBlobRestoreArg(r.value); + result = arg; + return result; + } + } + if (!ranges.more) { + break; + } + if (ranges.readThrough.present()) { + begin = firstGreaterOrEqual(ranges.readThrough.get()); + } else { + begin = firstGreaterThan(ranges.back().key); + } + } + return result; + } catch (Error& e) { + wait(tr.onError(e)); + } + + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +// Get restore target version. Return defaultVersion if no restore argument available for the range +ACTOR Future getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion) { + Optional arg = wait(getRestoreArg(db, range)); + Version expected = defaultVersion; + if (arg.present()) { + if (arg.get().version.present()) { + return arg.get().version.get(); + } + } + return expected; +} diff --git a/fdbserver/BlobMigrator.actor.cpp b/fdbserver/BlobMigrator.actor.cpp index 01a9891825..02a4bc0513 100644 --- a/fdbserver/BlobMigrator.actor.cpp +++ b/fdbserver/BlobMigrator.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ #include +#include #include #include "fdbclient/ClientBooleanParams.h" #include "fdbserver/RestoreUtil.h" @@ -315,51 +316,60 @@ private: } } - // Find mutation logs url - static std::string mlogsUrl(Reference self) { - std::string url = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL; - - // A special case for local directory. - // See FileBackupAgent.actor.cpp. if the container string describes a local directory then "/backup-" - // will be added to it. so we need to append this directory name to the url - if (url.find("file://") == 0) { - std::string path = url.substr(7); - path.erase(path.find_last_not_of("\\/") + 1); // Remove trailing slashes - std::vector dirs = platform::listDirectories(path); - if (dirs.empty()) { - TraceEvent(SevError, "BlobMigratorMissingMutationLogs").detail("Url", url); - throw restore_missing_data(); + // Check if we need to apply mutation logs. If all granules have data up to targetVersion, we don't need to apply + // mutation logs + static bool needApplyLogs(Reference self, Version targetVersion) { + for (auto& granule : self->blobGranules_) { + if (granule.version < targetVersion) { + // at least one granule doesn't have data up to target version, so we'll need to apply mutation logs + return true; } - // Pick the newest backup folder - std::sort(dirs.begin(), dirs.end()); - std::string name = dirs.back(); - url.erase(url.find_last_not_of("\\/") + 1); // Remove trailing slashes - return url + "/" + name; } - return url; + return false; } // Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules ACTOR static Future applyMutationLogs(Reference self) { - state std::string mutationLogsUrl = mlogsUrl(self); - TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Url", mutationLogsUrl); - // check last version in mutation logs - Optional proxy; // unused - Optional encryptionKeyFile; // unused - state Reference bc = - IBackupContainer::openContainer(mutationLogsUrl, proxy, encryptionKeyFile); + + state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL; + state std::vector containers = wait(IBackupContainer::listContainers(baseUrl, {})); + if (containers.size() == 0) { + TraceEvent("MissingMutationLogs", self->interf_.id()).detail("Url", baseUrl); + throw restore_missing_data(); + } + state Reference bc = IBackupContainer::openContainer(containers.front(), {}, {}); BackupDescription desc = wait(bc->describeBackup()); if (!desc.contiguousLogEnd.present()) { - TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl); + TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); throw restore_missing_data(); } if (!desc.minLogBegin.present()) { - TraceEvent("InvalidMutationLogs").detail("Url", mutationLogsUrl); + TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); throw restore_missing_data(); } state Version minLogVersion = desc.minLogBegin.get(); state Version maxLogVersion = desc.contiguousLogEnd.get() - 1; + state Version targetVersion = wait(getRestoreTargetVersion(self->db_, normalKeys, maxLogVersion)); + if (targetVersion < maxLogVersion) { + if (!needApplyLogs(self, targetVersion)) { + TraceEvent("SkipMutationLogs").detail("TargetVersion", targetVersion); + dprint("Skip mutation logs as all granules are at version {}\n", targetVersion); + return Void(); + } + } + + if (targetVersion < minLogVersion) { + TraceEvent("MissingMutationLogs") + .detail("MinLogVersion", minLogVersion) + .detail("TargetVersion", maxLogVersion); + throw restore_missing_data(); + } + if (targetVersion > maxLogVersion) { + TraceEvent("SkipTargetVersion") + .detail("MaxLogVersion", maxLogVersion) + .detail("TargetVersion", targetVersion); + } // restore to target version state Standalone> ranges; @@ -370,11 +380,12 @@ private: .detail("Granule", granule.granuleID) .detail("GranuleVersion", granule.version) .detail("MinLogVersion", minLogVersion) - .detail("MaxLogVersion", maxLogVersion); + .detail("MaxLogVersion", maxLogVersion) + .detail("TargetVersion", targetVersion); throw restore_missing_data(); } // no need to apply mutation logs if granule is already on that version - if (granule.version < maxLogVersion) { + if (granule.version < targetVersion) { ranges.push_back(ranges.arena(), granule.keyRange); // Blob granule ends at granule.version(inclusive), so we need to apply mutation logs // after granule.version(exclusive). @@ -391,9 +402,11 @@ private: throw restore_missing_data(); } std::string tagName = "blobrestore-" + self->interf_.id().shortString(); - TraceEvent("ApplyMutationLogs", self->interf_.id()).detail("Version", minLogVersion); + TraceEvent("ApplyMutationLogs", self->interf_.id()) + .detail("MinVer", minLogVersion) + .detail("MaxVer", maxLogVersion); - wait(submitRestore(self, KeyRef(tagName), KeyRef(mutationLogsUrl), ranges, beginVersions)); + wait(submitRestore(self, KeyRef(tagName), KeyRef(containers.front()), ranges, beginVersions, targetVersion)); return Void(); } @@ -402,7 +415,8 @@ private: Key tagName, Key mutationLogsUrl, Standalone> ranges, - Standalone> beginVersions) { + Standalone> beginVersions, + Version endVersion) { state Optional proxy; // unused state Optional origDb; // unused @@ -415,7 +429,7 @@ private: ranges, beginVersions, WaitForComplete::True, - invalidVersion, + endVersion, Verbose::True, ""_sr, // addPrefix ""_sr, // removePrefix @@ -423,6 +437,7 @@ private: UnlockDB::True, OnlyApplyMutationLogs::True)); TraceEvent("ApplyMutationLogsComplete", self->interf_.id()).detail("Version", version); + dprint("Restore to version {} done. Target version {} \n", version, endVersion); return Void(); } diff --git a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h index 660b87a053..b6ff89fbcb 100644 --- a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h @@ -169,6 +169,8 @@ ACTOR Future updateRestoreStatus(Database db, Optional expectedPhase); ACTOR Future> getRestoreRangeStatus(Database db, KeyRangeRef keys); ACTOR Future> getRestoreStatus(Database db, KeyRangeRef range); +ACTOR Future> getRestoreArg(Database db, KeyRangeRef range); +ACTOR Future getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion); #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 9e287eb630..ba58ce0523 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -6339,7 +6339,10 @@ ACTOR Future>> tryReadBlobGranules(Tra loop { try { Standalone> chunks = wait(tr->readBlobGranules(keys, 0, readVersion)); - TraceEvent(SevDebug, "ReadBlobGranules").detail("Keys", keys).detail("Chunks", chunks.size()); + TraceEvent(SevDebug, "ReadBlobGranules") + .detail("Keys", keys) + .detail("Chunks", chunks.size()) + .detail("FetchVersion", fetchVersion); return chunks; } catch (Error& e) { if (retryCount >= maxRetryCount) { @@ -6372,7 +6375,10 @@ ACTOR Future tryGetRangeFromBlob(PromiseStream results, for (i = 0; i < chunks.size(); ++i) { state KeyRangeRef chunkRange = chunks[i].keyRange; state RangeResult rows = wait(readBlobGranule(chunks[i], keys, 0, fetchVersion, blobConn)); - TraceEvent(SevDebug, "ReadBlobData").detail("Rows", rows.size()).detail("ChunkRange", chunkRange); + TraceEvent(SevDebug, "ReadBlobData") + .detail("Rows", rows.size()) + .detail("ChunkRange", chunkRange) + .detail("FetchVersion", fetchVersion); if (rows.size() == 0) { rows.readThrough = KeyRef(rows.arena(), std::min(chunkRange.end, keys.end)); } @@ -7298,12 +7304,13 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state PromiseStream results; state Future hold; if (isFullRestore) { - std::pair rangeStatus = wait(getRestoreRangeStatus(data->cx, keys)); + state std::pair rangeStatus = wait(getRestoreRangeStatus(data->cx, keys)); // Read from blob only when it's copying data for full restore. Otherwise it may cause data corruptions // e.g we don't want to copy from blob any more when it's applying mutation logs(APPLYING_MLOGS) if (rangeStatus.second.phase == BlobRestorePhase::COPYING_DATA || rangeStatus.second.phase == BlobRestorePhase::ERROR) { - hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, fetchVersion, data->blobConn); + Version targetVersion = wait(getRestoreTargetVersion(data->cx, keys, fetchVersion)); + hold = tryGetRangeFromBlob(results, &tr, rangeStatus.first, targetVersion, data->blobConn); } else { hold = tryGetRange(results, &tr, keys); } diff --git a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp index ece278723b..3c5081c9b1 100644 --- a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp +++ b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp @@ -24,6 +24,7 @@ #include "fdbclient/BackupAgent.actor.h" #include "fdbclient/BackupContainer.h" #include "fdbclient/BackupContainerFileSystem.h" +#include "fdbclient/FDBTypes.h" #include "fdbclient/Knobs.h" #include "fdbclient/SystemData.h" #include "fdbserver/workloads/workloads.actor.h" @@ -72,7 +73,7 @@ struct BlobRestoreWorkload : TestWorkload { if (self->performRestore_) { fmt::print("Perform blob restore\n"); - wait(store(result, self->extraDb_->blobRestore(normalKeys))); + wait(store(result, self->extraDb_->blobRestore(normalKeys, {}))); state std::vector> futures; futures.push_back(self->runBackupAgent(self));