From aa1d983132edf5873a47eb5f4e59ce916479522e Mon Sep 17 00:00:00 2001 From: Hui Liu Date: Wed, 1 Feb 2023 13:56:07 -0800 Subject: [PATCH] Truncate logs after force-flushing cold blob granules --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/SystemData.cpp | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbclient/include/fdbclient/SystemData.h | 1 + fdbserver/BlobManager.actor.cpp | 95 ++++++++++++++++++- fdbserver/BlobManifest.actor.cpp | 14 +++ fdbserver/BlobMigrator.actor.cpp | 18 +--- .../fdbserver/BlobGranuleServerCommon.actor.h | 1 + .../workloads/BlobRestoreWorkload.actor.cpp | 8 +- 9 files changed, 122 insertions(+), 18 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 6e3e7ff5c3..8fdb2ec719 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1060,6 +1060,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_RESTORE_MANIFEST_URL, isSimulated ? "file://simfdb/fdbblob/manifest" : "" ); init( BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE, isSimulated ? 10000 : 10000000 ); init( BLOB_RESTORE_MANIFEST_RETENTION_MAX, 10 ); + init( BLOB_RESTORE_MLOGS_RETENTION_SECS, isSimulated ? 120 : 3600 * 24 * 14 ); init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index f497f6677c..97cab0ca64 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1742,6 +1742,7 @@ Standalone decodeBlobRestoreArg(ValueRef const& value) { } const Key blobManifestVersionKey = "\xff\x02/blobManifestVersion"_sr; +const Key blobGranulesLastFlushKey = "\xff\x02/blobGranulesLastFlushTs"_sr; const KeyRangeRef idempotencyIdKeys("\xff\x02/idmp/"_sr, "\xff\x02/idmp0"_sr); const KeyRef idempotencyIdsExpiredVersion("\xff\x02/idmpExpiredVersion"_sr); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 08c1a270c3..46abbe0979 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1039,6 +1039,7 @@ public: std::string BLOB_RESTORE_MANIFEST_URL; int BLOB_RESTORE_MANIFEST_FILE_MAX_SIZE; int BLOB_RESTORE_MANIFEST_RETENTION_MAX; + int BLOB_RESTORE_MLOGS_RETENTION_SECS; // Blob metadata int64_t BLOB_METADATA_CACHE_TTL; diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 8a1ca2482c..8e31a175be 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -733,6 +733,7 @@ const KeyRange decodeBlobRestoreArgKeyFor(const KeyRef key); const Value blobRestoreArgValueFor(BlobRestoreArg args); Standalone decodeBlobRestoreArg(ValueRef const& value); extern const Key blobManifestVersionKey; +extern const Key blobGranulesLastFlushKey; extern const KeyRangeRef idempotencyIdKeys; extern const KeyRef idempotencyIdsExpiredVersion; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index dcbdfbbddd..70fb4e168c 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -19,14 +19,20 @@ */ #include +#include #include #include #include #include #include +#include "fdbclient/BackupContainer.h" +#include "fdbclient/ClientBooleanParams.h" +#include "fdbclient/KeyBackedTypes.h" #include "fdbclient/ServerKnobs.h" #include "fdbrpc/simulator.h" +#include "flow/CodeProbe.h" +#include "flow/serialize.h" #include "fmt/format.h" #include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/BlobGranuleCommon.h" @@ -38,6 +44,7 @@ #include "fdbclient/ReadYourWrites.h" #include "fdbclient/SystemData.h" #include "fdbclient/Tuple.h" +#include "fdbclient/BackupAgent.actor.h" #include "fdbserver/BlobManagerInterface.h" #include "fdbserver/Knobs.h" #include "fdbserver/BlobGranuleValidation.actor.h" @@ -5243,6 +5250,91 @@ ACTOR Future bgConsistencyCheck(Reference bmData) { } } +ACTOR Future updateLastFlushTs(Database db) { + state Transaction tr(db); + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + KeyBackedProperty lastFlushTs(blobGranulesLastFlushKey); + int64_t epochs = (int64_t)now(); + lastFlushTs.set(&tr, epochs); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + fmt::print("updateLastFlushTs {} \n", e.what()); + wait(tr.onError(e)); + } + } +} + +ACTOR Future getLastFlushTs(Database db) { + state Transaction tr(db); + loop { + try { + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + KeyBackedProperty lastFlushTs(blobGranulesLastFlushKey); + state int64_t ret; + wait(store(ret, lastFlushTs.getD(&tr, Snapshot::False, 0))); + return ret; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +ACTOR Future maybeFlushAndTruncateMutationLogs(Reference bmData) { + try { + int64_t lastFlushTs = wait(getLastFlushTs(bmData->db)); + bool shouldFlush = (now() - lastFlushTs) > SERVER_KNOBS->BLOB_RESTORE_MLOGS_RETENTION_SECS; + if (!shouldFlush) { + TraceEvent("SkipBlobGranulesFlush").detail("LastFlushTs", lastFlushTs); + return Void(); + } + + state std::string mlogsUrl = wait(getMutationLogUrl()); + state Reference bc = IBackupContainer::openContainer(mlogsUrl, {}, {}); + state BackupDescription desc = wait(bc->describeBackup(true)); + if (!desc.contiguousLogEnd.present()) { + TraceEvent("SkipBlobGranulesFlush").detail("LogUrl", mlogsUrl); + return Void(); // skip truncation if no valid backup for mutation logs + } + + state Version logEndVersion = desc.contiguousLogEnd.get(); + // Force flush in-memory data of all blob workers until end of log + FlushGranuleRequest req(bmData->epoch, normalKeys, logEndVersion, false); + wait(success(doBlobGranuleRequests(bmData->db, normalKeys, req, &BlobWorkerInterface::flushGranuleRequest))); + wait(updateLastFlushTs(bmData->db)); + + // Truncate mutation logs to max retention period + Reference tr(new ReadYourWritesTransaction(bmData->db)); + Optional logEndEpochs = wait(timeKeeperEpochsFromVersion(logEndVersion, tr)); + if (!logEndEpochs.present()) { + TraceEvent("SkipMutationLogTruncation").detail("LogEndVersion", logEndVersion); + return Void(); // skip truncation if no timestamp about log end + } + + // Find timestamp and version to truncate + int64_t epochs = logEndEpochs.get() - SERVER_KNOBS->BLOB_RESTORE_MLOGS_RETENTION_SECS; + if (epochs <= 0) { + TraceEvent("SkipMutationLogTruncation").detail("Epochs", epochs); + return Void(); + } + state std::string timestamp = BackupAgentBase::formatTime(epochs); + state Version truncVersion = wait(timeKeeperVersionFromDatetime(timestamp, bmData->db)); + + wait(bc->expireData(truncVersion, true)); + TraceEvent("TruncateMutationLogs").detail("Version", truncVersion).detail("Timestamp", timestamp); + CODE_PROBE(true, "Flush blob granules and truncate mutation logs"); + } catch (Error& e) { + TraceEvent("TruncateMutationLogsError").error(e); // skip and retry next time + } + return Void(); +} + ACTOR Future backupManifest(Reference bmData) { bmData->initBStore(); @@ -5256,8 +5348,9 @@ ACTOR Future backupManifest(Reference bmData) { break; } } - if (activeRanges) { + if (activeRanges && SERVER_KNOBS->BLOB_MANIFEST_BACKUP) { if (bmData->manifestStore.isValid()) { + wait(maybeFlushAndTruncateMutationLogs(bmData)); wait(dumpManifest(bmData->db, bmData->manifestStore, bmData->epoch, bmData->manifestDumperSeqNo)); bmData->manifestDumperSeqNo++; } else { diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index 852a50254c..7361ff8ab6 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -260,6 +260,7 @@ public: self->segmentNo_ = 1; self->totalRows_ = 0; self->logicalSize_ = 0; + self->totalBytes_ = 0; wait(waitForAll(self->pendingFutures_)); self->pendingFutures_.clear(); self->deleteSegmentFiles(); @@ -999,3 +1000,16 @@ ACTOR Future getManifestVersion(Database db) { } } } + +ACTOR Future getMutationLogUrl() { + state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL; + if (baseUrl.starts_with("file://")) { + state std::vector containers = wait(IBackupContainer::listContainers(baseUrl, {})); + if (containers.size() == 0) { + throw blob_restore_missing_logs(); + } + return containers.back(); + } else { + return baseUrl; + } +} diff --git a/fdbserver/BlobMigrator.actor.cpp b/fdbserver/BlobMigrator.actor.cpp index 00434d11f9..4dc8898c2a 100644 --- a/fdbserver/BlobMigrator.actor.cpp +++ b/fdbserver/BlobMigrator.actor.cpp @@ -329,27 +329,15 @@ private: // Apply mutation logs to blob granules so that they reach to a consistent version for all blob granules ACTOR static Future applyMutationLogs(Reference self) { // check last version in mutation logs - - state std::string baseUrl = SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL; - state std::string mlogsUrl; - if (baseUrl.starts_with("file://")) { - state std::vector containers = wait(IBackupContainer::listContainers(baseUrl, {})); - if (containers.size() == 0) { - TraceEvent("MissingMutationLogs", self->interf_.id()).detail("Url", baseUrl); - throw restore_missing_data(); - } - mlogsUrl = containers.front(); - } else { - mlogsUrl = baseUrl; - } + state std::string mlogsUrl = wait(getMutationLogUrl()); state Reference bc = IBackupContainer::openContainer(mlogsUrl, {}, {}); BackupDescription desc = wait(bc->describeBackup()); if (!desc.contiguousLogEnd.present()) { - TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); + TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL); throw blob_restore_missing_logs(); } if (!desc.minLogBegin.present()) { - TraceEvent("InvalidMutationLogs").detail("Url", baseUrl); + TraceEvent("InvalidMutationLogs").detail("Url", SERVER_KNOBS->BLOB_RESTORE_MLOGS_URL); throw blob_restore_missing_logs(); } state Version minLogVersion = desc.minLogBegin.get(); diff --git a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h index c52ad6e967..5a04573107 100644 --- a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h @@ -171,6 +171,7 @@ ACTOR Future> getRestoreStatus(Database db, KeyRange ACTOR Future> getRestoreArg(Database db, KeyRangeRef range); ACTOR Future getRestoreTargetVersion(Database db, KeyRangeRef range, Version defaultVersion); ACTOR Future getManifestVersion(Database db); +ACTOR Future getMutationLogUrl(); #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp index 3c5081c9b1..3f4b22e326 100644 --- a/fdbserver/workloads/BlobRestoreWorkload.actor.cpp +++ b/fdbserver/workloads/BlobRestoreWorkload.actor.cpp @@ -25,7 +25,6 @@ #include "fdbclient/BackupContainer.h" #include "fdbclient/BackupContainerFileSystem.h" #include "fdbclient/FDBTypes.h" -#include "fdbclient/Knobs.h" #include "fdbclient/SystemData.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" @@ -73,6 +72,10 @@ struct BlobRestoreWorkload : TestWorkload { if (self->performRestore_) { fmt::print("Perform blob restore\n"); + // disable manifest backup and log truncation + KnobValueRef knobFalse = KnobValueRef::create(bool{ false }); + IKnobCollection::getMutableGlobalKnobCollection().setKnob("blob_manifest_backup", knobFalse); + wait(store(result, self->extraDb_->blobRestore(normalKeys, {}))); state std::vector> futures; @@ -178,8 +181,9 @@ struct BlobRestoreWorkload : TestWorkload { if (src[i].value != dest[i].value) { fmt::print("Value mismatch at {}\n", i); TraceEvent(SevError, "TestFailure") - .detail("Reason", "Key Mismatch") + .detail("Reason", "Value Mismatch") .detail("Index", i) + .detail("Key", src[i].key.printable()) .detail("SrcValue", src[i].value.printable()) .detail("DestValue", dest[i].value.printable()); return false;