From a5b4212990b9ac0c323b0b5f65589d42377e7065 Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Wed, 15 Mar 2023 08:27:46 -0500 Subject: [PATCH] adding blob granule logical size --- fdbclient/ClientKnobs.cpp | 1 + fdbclient/SystemData.cpp | 20 ++++++++++++--- fdbclient/include/fdbclient/ClientKnobs.h | 1 + fdbclient/include/fdbclient/SystemData.h | 3 ++- fdbserver/BlobGranuleServerCommon.actor.cpp | 9 ++++--- fdbserver/BlobManifest.actor.cpp | 3 ++- fdbserver/BlobWorker.actor.cpp | 25 ++++++++++--------- .../fdbserver/BlobGranuleServerCommon.actor.h | 16 +++++++++--- flow/ProtocolVersion.h.cmake | 1 + flow/ProtocolVersions.cmake | 1 + .../from_7.3.0/BlobGranuleRestartCycle-1.toml | 3 +++ .../from_7.3.0/BlobGranuleRestartLarge-1.toml | 3 +++ 12 files changed, 62 insertions(+), 24 deletions(-) diff --git a/fdbclient/ClientKnobs.cpp b/fdbclient/ClientKnobs.cpp index 1f6076443d..d64d7fc102 100644 --- a/fdbclient/ClientKnobs.cpp +++ b/fdbclient/ClientKnobs.cpp @@ -287,6 +287,7 @@ void ClientKnobs::initialize(Randomize randomize) { init( BG_MAX_GRANULE_PARALLELISM, 10 ); init( BG_TOO_MANY_GRANULES, 20000 ); init( BLOB_METADATA_REFRESH_INTERVAL, 3600 ); if ( randomize && BUGGIFY ) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); } + init( ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE = true; } init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 ); init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 ); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index aecb9ebb1d..ad8c87e073 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1424,22 +1424,30 @@ const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength, + int64_t logicalSize, Optional cipherKeysMeta) { - BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); + auto protocolVersion = CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE + ? ProtocolVersion::withBlobGranuleFileLogicalSize() + : ProtocolVersion::withBlobGranule(); + BinaryWriter wr(IncludeVersion(protocolVersion)); wr << filename; wr << offset; wr << length; wr << fullFileLength; wr << cipherKeysMeta; + if (CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE) { + wr << logicalSize; + } return wr.toValue(); } -std::tuple, int64_t, int64_t, int64_t, Optional> +std::tuple, int64_t, int64_t, int64_t, int64_t, Optional> decodeBlobGranuleFileValue(ValueRef const& value) { StringRef filename; int64_t offset; int64_t length; int64_t fullFileLength; + int64_t logicalSize; Optional cipherKeysMeta; BinaryReader reader(value, IncludeVersion()); @@ -1448,7 +1456,13 @@ decodeBlobGranuleFileValue(ValueRef const& value) { reader >> length; reader >> fullFileLength; reader >> cipherKeysMeta; - return std::tuple(filename, offset, length, fullFileLength, cipherKeysMeta); + if (reader.protocolVersion().hasBlobGranuleFileLogicalSize()) { + reader >> logicalSize; + } else { + // fall back to estimating logical size as physical size + logicalSize = length; + } + return std::tuple(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta); } const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) { diff --git a/fdbclient/include/fdbclient/ClientKnobs.h b/fdbclient/include/fdbclient/ClientKnobs.h index 1893f17016..c9926e7033 100644 --- a/fdbclient/include/fdbclient/ClientKnobs.h +++ b/fdbclient/include/fdbclient/ClientKnobs.h @@ -278,6 +278,7 @@ public: int BG_MAX_GRANULE_PARALLELISM; int BG_TOO_MANY_GRANULES; int64_t BLOB_METADATA_REFRESH_INTERVAL; + bool ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE; // The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file. // This might happen when a recovery is happening together with a cluster controller coordinator key change. diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 9f81c04d3a..5de82a7316 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -671,8 +671,9 @@ const Value blobGranuleFileValueFor( int64_t offset, int64_t length, int64_t fullFileLength, + int64_t logicalSize, Optional cipherKeysMeta = Optional()); -std::tuple, int64_t, int64_t, int64_t, Optional> +std::tuple, int64_t, int64_t, int64_t, int64_t, Optional> decodeBlobGranuleFileValue(ValueRef const& value); const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force); diff --git a/fdbserver/BlobGranuleServerCommon.actor.cpp b/fdbserver/BlobGranuleServerCommon.actor.cpp index cfd00e0965..a328cf781b 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.cpp +++ b/fdbserver/BlobGranuleServerCommon.actor.cpp @@ -74,14 +74,17 @@ ACTOR Future readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, int64_t offset; int64_t length; int64_t fullFileLength; + int64_t logicalSize; Optional cipherKeysMeta; std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key); ASSERT(gid == granuleID); - std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(it.value); + std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) = + decodeBlobGranuleFileValue(it.value); - BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength, cipherKeysMeta); + BlobFileIndex idx( + version, filename.toString(), offset, length, fullFileLength, logicalSize, cipherKeysMeta); if (fileType == 'S') { ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version); files->snapshotFiles.push_back(idx); @@ -250,7 +253,7 @@ static std::string makeTestFileName(Version v) { } static BlobFileIndex makeTestFile(Version v, int64_t len) { - return BlobFileIndex(v, makeTestFileName(v), 0, len, len); + return BlobFileIndex(v, makeTestFileName(v), 0, len, len, len); } static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) { diff --git a/fdbserver/BlobManifest.actor.cpp b/fdbserver/BlobManifest.actor.cpp index 366dead89f..d29b3a376f 100644 --- a/fdbserver/BlobManifest.actor.cpp +++ b/fdbserver/BlobManifest.actor.cpp @@ -768,10 +768,11 @@ private: int64_t offset; int64_t length; int64_t fullFileLength; + int64_t logicalSize; Optional cipherKeysMeta; std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key); - std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = + std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) = decodeBlobGranuleFileValue(row.value); GranuleFileVersion vs = { version, fileType, filename.toString(), length }; files.push_back(vs); diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index c069f1c2d6..94a3e2cb3f 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -882,8 +882,9 @@ ACTOR Future writeDeltaFile(Reference bwData, SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx); + state size_t logicalSize = deltasToWrite.expectedSize(); state size_t serializedSize = serialized.size(); - bwData->stats.compressionBytesRaw += deltasToWrite.expectedSize(); + bwData->stats.compressionBytesRaw += logicalSize; bwData->stats.compressionBytesFinal += serializedSize; // Free up deltasToWrite here to reduce memory @@ -930,7 +931,8 @@ ACTOR Future writeDeltaFile(Reference bwData, Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D'); // TODO change once we support file multiplexing - Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta); + Value dfValue = + blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta); tr->set(dfKey, dfValue); if (oldGranuleComplete.present()) { @@ -974,7 +976,8 @@ ACTOR Future writeDeltaFile(Reference bwData, bwData->stats.deltaUpdateSample.addMeasurement(duration); // FIXME: change when we implement multiplexing - return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta); + return BlobFileIndex( + currentDeltaVersion, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta); } catch (Error& e) { wait(tr->onError(e)); } @@ -1077,7 +1080,7 @@ ACTOR Future writeEmptyDeltaFile(Reference bwData wait(delay(deterministicRandom()->random01())); } - return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, {}); + return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, 0, {}); } catch (Error& e) { wait(tr->onError(e)); } @@ -1186,8 +1189,9 @@ ACTOR Future writeSnapshot(Reference bwData, SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, compressFilter, cipherKeysCtx); + state size_t logicalSize = snapshot.expectedSize(); state size_t serializedSize = serialized.size(); - bwData->stats.compressionBytesRaw += snapshot.expectedSize(); + bwData->stats.compressionBytesRaw += logicalSize; bwData->stats.compressionBytesFinal += serializedSize; // free snapshot to reduce memory @@ -1238,7 +1242,7 @@ ACTOR Future writeSnapshot(Reference bwData, Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S'); // TODO change once we support file multiplexing Key snapshotFileValue = - blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta); + blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta); tr->set(snapshotFileKey, snapshotFileValue); // create granule history at version if this is a new granule with the initial dump from FDB if (initialSnapshot) { @@ -1295,7 +1299,7 @@ ACTOR Future writeSnapshot(Reference bwData, } // FIXME: change when we implement multiplexing - return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, cipherKeysMeta); + return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta); } ACTOR Future dumpInitialSnapshotFromFDB(Reference bwData, @@ -2035,11 +2039,10 @@ Version doGranuleRollback(Reference metadata, metadata->bufferedDeltaVersion = cfRollbackVersion; // calculate number of bytes in durable delta files after last snapshot - // FIXME: this assumes delta file serialized size ~= logical size, which is false with compression for (int i = metadata->files.deltaFiles.size() - 1; i >= 0 && metadata->files.deltaFiles[i].version > metadata->pendingSnapshotVersion; i--) { - metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].length; + metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].logicalSize; } // Track that this rollback happened, since we have to re-read mutations up to the rollback @@ -2375,9 +2378,7 @@ ACTOR Future blobGranuleUpdateFiles(Reference bwData, Version snapshotVersion = files.snapshotFiles.back().version; for (int i = files.deltaFiles.size() - 1; i >= 0; i--) { if (files.deltaFiles[i].version > snapshotVersion) { - // FIXME: this assumes delta file serialized size ~= logical size, which is false with - // compression - metadata->bytesInNewDeltaFiles += files.deltaFiles[i].length; + metadata->bytesInNewDeltaFiles += files.deltaFiles[i].logicalSize; } } } diff --git a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h index c22e8edc7c..d181578332 100644 --- a/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/include/fdbserver/BlobGranuleServerCommon.actor.h @@ -45,21 +45,29 @@ struct BlobFileIndex { int64_t offset; int64_t length; int64_t fullFileLength; + int64_t logicalSize; Optional cipherKeysMeta; BlobFileIndex() {} - BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength) - : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {} - BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength, + int64_t logicalSize) + : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength), + logicalSize(logicalSize) {} + + BlobFileIndex(Version version, + std::string filename, + int64_t offset, + int64_t length, + int64_t fullFileLength, + int64_t logicalSize, Optional ciphKeysMeta) : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength), - cipherKeysMeta(ciphKeysMeta) {} + logicalSize(logicalSize), cipherKeysMeta(ciphKeysMeta) {} // compare on version bool operator<(const BlobFileIndex& r) const { return version < r.version; } diff --git a/flow/ProtocolVersion.h.cmake b/flow/ProtocolVersion.h.cmake index e6642b2fa9..2f4f3fce2f 100644 --- a/flow/ProtocolVersion.h.cmake +++ b/flow/ProtocolVersion.h.cmake @@ -174,6 +174,7 @@ public: // introduced features PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE@, BlobGranuleFile); PROTOCOL_VERSION_FEATURE(@FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE@, EncryptedSnapshotBackupFile); PROTOCOL_VERSION_FEATURE(@FDB_PV_CLUSTER_ID_SPECIAL_KEY@, ClusterIdSpecialKey); + PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE@, BlobGranuleFileLogicalSize); }; template <> diff --git a/flow/ProtocolVersions.cmake b/flow/ProtocolVersions.cmake index 9ef5b3ff81..4091edbf49 100644 --- a/flow/ProtocolVersions.cmake +++ b/flow/ProtocolVersions.cmake @@ -90,3 +90,4 @@ set(FDB_PV_SHARD_ENCODE_LOCATION_METADATA "0x0FDB00B072000000LL") set(FDB_PV_BLOB_GRANULE_FILE "0x0FDB00B072000000LL") set(FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE "0x0FDB00B072000000LL") set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL") +set(FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE "0x0FDB00B072000000LL") diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml index 0b26a83469..ce915ae960 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartCycle-1.toml @@ -9,6 +9,9 @@ injectSSDelay = true # FIXME: re-enable rocks at some point storageEngineExcludeTypes = [4, 5] +[[knobs]] +enable_blob_granule_file_logical_size=false + [[test]] testTitle = 'BlobGranuleRestartCycle' clearAfterTest=false diff --git a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml index faeaa96b5c..7d62d02f1d 100644 --- a/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml +++ b/tests/restarting/from_7.3.0/BlobGranuleRestartLarge-1.toml @@ -9,6 +9,9 @@ injectSSDelay = true # FIXME: re-enable rocks at some point storageEngineExcludeTypes = [4, 5] +[[knobs]] +enable_blob_granule_file_logical_size=false + [[test]] testTitle = 'BlobGranuleRestartLarge' clearAfterTest=false