From 00c270fc3f833ce1a281c2696f3c372833158031 Mon Sep 17 00:00:00 2001 From: Hui Liu Date: Mon, 14 Nov 2022 10:47:18 -0800 Subject: [PATCH] BlobManifest - add limits for getRange and transactions for resilency with large manifest --- fdbclient/ServerKnobs.cpp | 1 + fdbclient/include/fdbclient/ServerKnobs.h | 1 + fdbserver/BlobManifest.actor.cpp | 157 +++++++++++++++++----- 3 files changed, 122 insertions(+), 37 deletions(-) diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 431278ee22..6542d6e972 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1025,6 +1025,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( BLOB_MANIFEST_BACKUP_INTERVAL, isSimulated ? 5.0 : 30.0 ); init( BLOB_FULL_RESTORE_MODE, false ); init( BLOB_MIGRATOR_CHECK_INTERVAL, isSimulated ? 1.0 : 5.0); + init( BLOB_MANIFEST_RW_ROWS, isSimulated ? 10 : 1000); init( BGCC_TIMEOUT, isSimulated ? 10.0 : 120.0 ); init( BGCC_MIN_INTERVAL, isSimulated ? 1.0 : 10.0 ); diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 8b398ccc0a..c98912f606 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -996,6 +996,7 @@ public: double BLOB_MANIFEST_BACKUP_INTERVAL; bool BLOB_FULL_RESTORE_MODE; double BLOB_MIGRATOR_CHECK_INTERVAL; + int BLOB_MANIFEST_RW_ROWS; // Blob metadata int64_t BLOB_METADATA_CACHE_TTL; diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index 230ef0cc3c..45f4496b98 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -24,6 +24,7 @@ #include "fdbclient/BackupContainer.h" #include "fdbclient/BlobGranuleCommon.h" +#include "fdbclient/ClientBooleanParams.h" #include "fdbserver/Knobs.h" #include "flow/FastRef.h" #include "flow/Trace.h" @@ -137,10 +138,23 @@ private: blobRangeKeys // Key ranges managed by blob }; for (auto range : ranges) { - // todo use getRangeStream for better performance - RangeResult result = wait(tr.getRange(range, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); - for (auto& row : result) { - rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value)); + state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); + limits.minRows = 0; + state KeySelectorRef begin = firstGreaterOrEqual(range.begin); + state KeySelectorRef end = firstGreaterOrEqual(range.end); + loop { + RangeResult result = wait(tr.getRange(begin, end, limits, Snapshot::True)); + for (auto& row : result) { + rows.push_back_deep(rows.arena(), KeyValueRef(row.key, row.value)); + } + if (!result.more) { + break; + } + if (result.readThrough.present()) { + begin = firstGreaterOrEqual(result.readThrough.get()); + } else { + begin = firstGreaterThan(result.end()[-1].key); + } } } return rows; @@ -152,6 +166,13 @@ private: // Write data to blob manifest file ACTOR static Future writeToFile(Reference self, Value data) { + static int32_t lastWrittenBytes = 0; + if (data.size() == lastWrittenBytes) { + dprint("Skip writting blob manifest with same size {}\n", lastWrittenBytes); + return Void(); + } + lastWrittenBytes = data.size(); + state Reference writer; state std::string fullPath; @@ -212,7 +233,7 @@ public: ACTOR static Future execute(Reference self) { try { Value data = wait(readFromFile(self)); - Standalone manifest = decode(data); + state Standalone manifest = decode(data); wait(writeSystemKeys(self, manifest.rows)); BlobGranuleRestoreVersionVector _ = wait(listGranules(self)); } catch (Error& e) { @@ -231,13 +252,32 @@ public: tr.setOption(FDBTransactionOptions::LOCK_AWARE); try { - std::vector granules; + state Standalone> blobRanges; + // Read all granules + state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); + limits.minRows = 0; + state KeySelectorRef begin = firstGreaterOrEqual(blobGranuleMappingKeys.begin); + state KeySelectorRef end = firstGreaterOrEqual(blobGranuleMappingKeys.end); + loop { + RangeResult rows = wait(tr.getRange(begin, end, limits, Snapshot::True)); + for (auto& row : rows) { + blobRanges.push_back_deep(blobRanges.arena(), row.key); + } + if (!rows.more) { + break; + } + if (rows.readThrough.present()) { + begin = firstGreaterOrEqual(rows.readThrough.get()); + } else { + begin = firstGreaterThan(rows.end()[-1].key); + } + } + + // check each granule range state int i = 0; - auto limit = GetRangeLimits::BYTE_LIMIT_UNLIMITED; - state RangeResult blobRanges = wait(tr.getRange(blobGranuleMappingKeys, limit)); for (i = 0; i < blobRanges.size() - 1; i++) { - Key startKey = blobRanges[i].key.removePrefix(blobGranuleMappingKeys.begin); - Key endKey = blobRanges[i + 1].key.removePrefix(blobGranuleMappingKeys.begin); + Key startKey = blobRanges[i].removePrefix(blobGranuleMappingKeys.begin); + Key endKey = blobRanges[i + 1].removePrefix(blobGranuleMappingKeys.begin); state KeyRange granuleRange = KeyRangeRef(startKey, endKey); try { Standalone granule = wait(getGranule(&tr, granuleRange)); @@ -300,17 +340,32 @@ private: // Write system keys to database ACTOR static Future writeSystemKeys(Reference self, VectorRef rows) { + state int start = 0; + state int end = 0; + for (start = 0; start < rows.size(); start = end) { + end = std::min(start + SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS, rows.size()); + wait(writeSystemKeys(self, rows, start, end)); + } + return Void(); + } + + // Write system keys from start index to end(exclusive), so that we don't exceed the limit of transaction limit + ACTOR static Future writeSystemKeys(Reference self, + VectorRef rows, + int start, + int end) { state Transaction tr(self->db_); loop { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); tr.setOption(FDBTransactionOptions::LOCK_AWARE); try { - for (auto& row : rows) { - tr.set(row.key, row.value); + for (int i = start; i < end; ++i) { + tr.set(rows[i].key, rows[i].value); } wait(tr.commit()); - dprint("Blob manifest loaded {} rows\n", rows.size()); + dprint("Blob manifest loaded rows from {} to {}\n", start, end); + TraceEvent("BlobManifestLoader").detail("RowStart", start).detail("RowEnd", end); return Void(); } catch (Error& e) { wait(tr.onError(e)); @@ -324,8 +379,7 @@ private: KeyRange historyKeyRange = blobGranuleHistoryKeyRangeFor(range); // reverse lookup so that the first row is the newest version state RangeResult results = - wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::False, Reverse::True)); - + wait(tr->getRange(historyKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED, Snapshot::True, Reverse::True)); for (KeyValueRef row : results) { state KeyRange keyRange; state Version version; @@ -367,24 +421,39 @@ private: // List all files for given granule ACTOR static Future> listGranuleFiles(Transaction* tr, UID granuleID) { + state std::vector files; + state KeyRange fileKeyRange = blobGranuleFileKeyRangeFor(granuleID); - RangeResult results = wait(tr->getRange(fileKeyRange, GetRangeLimits::BYTE_LIMIT_UNLIMITED)); + state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); + limits.minRows = 0; + state KeySelectorRef begin = firstGreaterOrEqual(fileKeyRange.begin); + state KeySelectorRef end = firstGreaterOrEqual(fileKeyRange.end); + loop { + RangeResult results = wait(tr->getRange(begin, end, limits, Snapshot::True)); + for (auto& row : results) { + UID gid; + Version version; + uint8_t fileType; + Standalone filename; + int64_t offset; + int64_t length; + int64_t fullFileLength; + Optional cipherKeysMeta; - std::vector files; - for (auto& row : results) { - UID gid; - Version version; - uint8_t fileType; - Standalone filename; - int64_t offset; - int64_t length; - int64_t fullFileLength; - Optional cipherKeysMeta; - - std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key); - std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(row.value); - GranuleFileVersion vs = { version, fileType, filename.toString(), length }; - files.push_back(vs); + std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key); + std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = + decodeBlobGranuleFileValue(row.value); + GranuleFileVersion vs = { version, fileType, filename.toString(), length }; + files.push_back(vs); + } + if (!results.more) { + break; + } + if (results.readThrough.present()) { + begin = firstGreaterOrEqual(results.readThrough.get()); + } else { + begin = firstGreaterThan(results.end()[-1].key); + } } return files; } @@ -466,12 +535,26 @@ ACTOR Future isFullRestoreMode(Database db, KeyRangeRef keys) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); try { - RangeResult ranges = wait(tr.getRange(blobRestoreCommandKeys, CLIENT_KNOBS->TOO_MANY)); - for (auto& r : ranges) { - KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key); - if (keyRange.contains(keys)) { - Standalone status = decodeBlobRestoreStatus(r.value); - return status.progress < 100; // progress is less than 100 + state GetRangeLimits limits(SERVER_KNOBS->BLOB_MANIFEST_RW_ROWS); + limits.minRows = 0; + state KeySelectorRef begin = firstGreaterOrEqual(blobRestoreCommandKeys.begin); + state KeySelectorRef end = firstGreaterOrEqual(blobRestoreCommandKeys.end); + loop { + RangeResult ranges = wait(tr.getRange(begin, end, limits, Snapshot::True)); + for (auto& r : ranges) { + KeyRange keyRange = decodeBlobRestoreCommandKeyFor(r.key); + if (keyRange.contains(keys)) { + Standalone status = decodeBlobRestoreStatus(r.value); + return status.progress < 100; // progress is less than 100 + } + } + if (!ranges.more) { + break; + } + if (ranges.readThrough.present()) { + begin = firstGreaterOrEqual(ranges.readThrough.get()); + } else { + begin = firstGreaterThan(ranges.end()[-1].key); } } return false;