Merge pull request #9703 from sfc-gh-jslocum/bg_file_logical_size

adding blob granule logical size
This commit is contained in:
Evan Tschannen 2023-03-15 09:59:57 -07:00 committed by GitHub
commit 6c1d02a14f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 62 additions and 24 deletions

View File

@ -287,6 +287,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( BG_MAX_GRANULE_PARALLELISM, 10 ); init( BG_MAX_GRANULE_PARALLELISM, 10 );
init( BG_TOO_MANY_GRANULES, 20000 ); init( BG_TOO_MANY_GRANULES, 20000 );
init( BLOB_METADATA_REFRESH_INTERVAL, 3600 ); if ( randomize && BUGGIFY ) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); } init( BLOB_METADATA_REFRESH_INTERVAL, 3600 ); if ( randomize && BUGGIFY ) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); }
init( ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE = true; }
init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 ); init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 );
init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 ); init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 );

View File

@ -1424,22 +1424,30 @@ const Value blobGranuleFileValueFor(StringRef const& filename,
int64_t offset, int64_t offset,
int64_t length, int64_t length,
int64_t fullFileLength, int64_t fullFileLength,
int64_t logicalSize,
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta) { Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); auto protocolVersion = CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE
? ProtocolVersion::withBlobGranuleFileLogicalSize()
: ProtocolVersion::withBlobGranule();
BinaryWriter wr(IncludeVersion(protocolVersion));
wr << filename; wr << filename;
wr << offset; wr << offset;
wr << length; wr << length;
wr << fullFileLength; wr << fullFileLength;
wr << cipherKeysMeta; wr << cipherKeysMeta;
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE) {
wr << logicalSize;
}
return wr.toValue(); return wr.toValue();
} }
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>> std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
decodeBlobGranuleFileValue(ValueRef const& value) { decodeBlobGranuleFileValue(ValueRef const& value) {
StringRef filename; StringRef filename;
int64_t offset; int64_t offset;
int64_t length; int64_t length;
int64_t fullFileLength; int64_t fullFileLength;
int64_t logicalSize;
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta; Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
BinaryReader reader(value, IncludeVersion()); BinaryReader reader(value, IncludeVersion());
@ -1448,7 +1456,13 @@ decodeBlobGranuleFileValue(ValueRef const& value) {
reader >> length; reader >> length;
reader >> fullFileLength; reader >> fullFileLength;
reader >> cipherKeysMeta; reader >> cipherKeysMeta;
return std::tuple(filename, offset, length, fullFileLength, cipherKeysMeta); if (reader.protocolVersion().hasBlobGranuleFileLogicalSize()) {
reader >> logicalSize;
} else {
// fall back to estimating logical size as physical size
logicalSize = length;
}
return std::tuple(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta);
} }
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) { const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) {

View File

@ -278,6 +278,7 @@ public:
int BG_MAX_GRANULE_PARALLELISM; int BG_MAX_GRANULE_PARALLELISM;
int BG_TOO_MANY_GRANULES; int BG_TOO_MANY_GRANULES;
int64_t BLOB_METADATA_REFRESH_INTERVAL; int64_t BLOB_METADATA_REFRESH_INTERVAL;
bool ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE;
// The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file. // The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file.
// This might happen when a recovery is happening together with a cluster controller coordinator key change. // This might happen when a recovery is happening together with a cluster controller coordinator key change.

View File

@ -671,8 +671,9 @@ const Value blobGranuleFileValueFor(
int64_t offset, int64_t offset,
int64_t length, int64_t length,
int64_t fullFileLength, int64_t fullFileLength,
int64_t logicalSize,
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta = Optional<BlobGranuleCipherKeysMeta>()); Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta = Optional<BlobGranuleCipherKeysMeta>());
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>> std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
decodeBlobGranuleFileValue(ValueRef const& value); decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force); const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force);

View File

@ -74,14 +74,17 @@ ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey,
int64_t offset; int64_t offset;
int64_t length; int64_t length;
int64_t fullFileLength; int64_t fullFileLength;
int64_t logicalSize;
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta; Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key); std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key);
ASSERT(gid == granuleID); ASSERT(gid == granuleID);
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(it.value); std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) =
decodeBlobGranuleFileValue(it.value);
BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength, cipherKeysMeta); BlobFileIndex idx(
version, filename.toString(), offset, length, fullFileLength, logicalSize, cipherKeysMeta);
if (fileType == 'S') { if (fileType == 'S') {
ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version); ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version);
files->snapshotFiles.push_back(idx); files->snapshotFiles.push_back(idx);
@ -250,7 +253,7 @@ static std::string makeTestFileName(Version v) {
} }
static BlobFileIndex makeTestFile(Version v, int64_t len) { static BlobFileIndex makeTestFile(Version v, int64_t len) {
return BlobFileIndex(v, makeTestFileName(v), 0, len, len); return BlobFileIndex(v, makeTestFileName(v), 0, len, len, len);
} }
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) { static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {

View File

@ -767,10 +767,11 @@ private:
int64_t offset; int64_t offset;
int64_t length; int64_t length;
int64_t fullFileLength; int64_t fullFileLength;
int64_t logicalSize;
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta; Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key); std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) =
decodeBlobGranuleFileValue(row.value); decodeBlobGranuleFileValue(row.value);
GranuleFileVersion vs = { version, fileType, filename.toString(), length }; GranuleFileVersion vs = { version, fileType, filename.toString(), length };
files.push_back(vs); files.push_back(vs);

View File

@ -883,8 +883,9 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES, SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES,
compressFilter, compressFilter,
cipherKeysCtx); cipherKeysCtx);
state size_t logicalSize = deltasToWrite.expectedSize();
state size_t serializedSize = serialized.size(); state size_t serializedSize = serialized.size();
bwData->stats.compressionBytesRaw += deltasToWrite.expectedSize(); bwData->stats.compressionBytesRaw += logicalSize;
bwData->stats.compressionBytesFinal += serializedSize; bwData->stats.compressionBytesFinal += serializedSize;
// Free up deltasToWrite here to reduce memory // Free up deltasToWrite here to reduce memory
@ -931,7 +932,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D'); Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
// TODO change once we support file multiplexing // TODO change once we support file multiplexing
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta); Value dfValue =
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
tr->set(dfKey, dfValue); tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) { if (oldGranuleComplete.present()) {
@ -975,7 +977,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
bwData->stats.deltaUpdateSample.addMeasurement(duration); bwData->stats.deltaUpdateSample.addMeasurement(duration);
// FIXME: change when we implement multiplexing // FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta); return BlobFileIndex(
currentDeltaVersion, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
} catch (Error& e) { } catch (Error& e) {
wait(tr->onError(e)); wait(tr->onError(e));
} }
@ -1078,7 +1081,7 @@ ACTOR Future<BlobFileIndex> writeEmptyDeltaFile(Reference<BlobWorkerData> bwData
wait(delay(deterministicRandom()->random01())); wait(delay(deterministicRandom()->random01()));
} }
return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, {}); return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, 0, {});
} catch (Error& e) { } catch (Error& e) {
wait(tr->onError(e)); wait(tr->onError(e));
} }
@ -1187,8 +1190,9 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES, SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES,
compressFilter, compressFilter,
cipherKeysCtx); cipherKeysCtx);
state size_t logicalSize = snapshot.expectedSize();
state size_t serializedSize = serialized.size(); state size_t serializedSize = serialized.size();
bwData->stats.compressionBytesRaw += snapshot.expectedSize(); bwData->stats.compressionBytesRaw += logicalSize;
bwData->stats.compressionBytesFinal += serializedSize; bwData->stats.compressionBytesFinal += serializedSize;
// free snapshot to reduce memory // free snapshot to reduce memory
@ -1239,7 +1243,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S'); Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S');
// TODO change once we support file multiplexing // TODO change once we support file multiplexing
Key snapshotFileValue = Key snapshotFileValue =
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta); blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
tr->set(snapshotFileKey, snapshotFileValue); tr->set(snapshotFileKey, snapshotFileValue);
// create granule history at version if this is a new granule with the initial dump from FDB // create granule history at version if this is a new granule with the initial dump from FDB
if (initialSnapshot) { if (initialSnapshot) {
@ -1296,7 +1300,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
} }
// FIXME: change when we implement multiplexing // FIXME: change when we implement multiplexing
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, cipherKeysMeta); return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
} }
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData, ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
@ -2036,11 +2040,10 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
metadata->bufferedDeltaVersion = cfRollbackVersion; metadata->bufferedDeltaVersion = cfRollbackVersion;
// calculate number of bytes in durable delta files after last snapshot // calculate number of bytes in durable delta files after last snapshot
// FIXME: this assumes delta file serialized size ~= logical size, which is false with compression
for (int i = metadata->files.deltaFiles.size() - 1; for (int i = metadata->files.deltaFiles.size() - 1;
i >= 0 && metadata->files.deltaFiles[i].version > metadata->pendingSnapshotVersion; i >= 0 && metadata->files.deltaFiles[i].version > metadata->pendingSnapshotVersion;
i--) { i--) {
metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].length; metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].logicalSize;
} }
// Track that this rollback happened, since we have to re-read mutations up to the rollback // Track that this rollback happened, since we have to re-read mutations up to the rollback
@ -2378,9 +2381,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
Version snapshotVersion = files.snapshotFiles.back().version; Version snapshotVersion = files.snapshotFiles.back().version;
for (int i = files.deltaFiles.size() - 1; i >= 0; i--) { for (int i = files.deltaFiles.size() - 1; i >= 0; i--) {
if (files.deltaFiles[i].version > snapshotVersion) { if (files.deltaFiles[i].version > snapshotVersion) {
// FIXME: this assumes delta file serialized size ~= logical size, which is false with metadata->bytesInNewDeltaFiles += files.deltaFiles[i].logicalSize;
// compression
metadata->bytesInNewDeltaFiles += files.deltaFiles[i].length;
} }
} }
} }

View File

@ -45,21 +45,29 @@ struct BlobFileIndex {
int64_t offset; int64_t offset;
int64_t length; int64_t length;
int64_t fullFileLength; int64_t fullFileLength;
int64_t logicalSize;
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta; Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
BlobFileIndex() {} BlobFileIndex() {}
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength)
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
BlobFileIndex(Version version, BlobFileIndex(Version version,
std::string filename, std::string filename,
int64_t offset, int64_t offset,
int64_t length, int64_t length,
int64_t fullFileLength, int64_t fullFileLength,
int64_t logicalSize)
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength),
logicalSize(logicalSize) {}
BlobFileIndex(Version version,
std::string filename,
int64_t offset,
int64_t length,
int64_t fullFileLength,
int64_t logicalSize,
Optional<BlobGranuleCipherKeysMeta> ciphKeysMeta) Optional<BlobGranuleCipherKeysMeta> ciphKeysMeta)
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength), : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength),
cipherKeysMeta(ciphKeysMeta) {} logicalSize(logicalSize), cipherKeysMeta(ciphKeysMeta) {}
// compare on version // compare on version
bool operator<(const BlobFileIndex& r) const { return version < r.version; } bool operator<(const BlobFileIndex& r) const { return version < r.version; }

View File

@ -174,6 +174,7 @@ public: // introduced features
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE@, BlobGranuleFile); PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE@, BlobGranuleFile);
PROTOCOL_VERSION_FEATURE(@FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE@, EncryptedSnapshotBackupFile); PROTOCOL_VERSION_FEATURE(@FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE@, EncryptedSnapshotBackupFile);
PROTOCOL_VERSION_FEATURE(@FDB_PV_CLUSTER_ID_SPECIAL_KEY@, ClusterIdSpecialKey); PROTOCOL_VERSION_FEATURE(@FDB_PV_CLUSTER_ID_SPECIAL_KEY@, ClusterIdSpecialKey);
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE@, BlobGranuleFileLogicalSize);
}; };
template <> template <>

View File

@ -90,3 +90,4 @@ set(FDB_PV_SHARD_ENCODE_LOCATION_METADATA "0x0FDB00B072000000LL")
set(FDB_PV_BLOB_GRANULE_FILE "0x0FDB00B072000000LL") set(FDB_PV_BLOB_GRANULE_FILE "0x0FDB00B072000000LL")
set(FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE "0x0FDB00B072000000LL") set(FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE "0x0FDB00B072000000LL")
set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL") set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL")
set(FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE "0x0FDB00B072000000LL")

View File

@ -9,6 +9,9 @@ injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]
[[knobs]]
enable_blob_granule_file_logical_size=false
[[test]] [[test]]
testTitle = 'BlobGranuleRestartCycle' testTitle = 'BlobGranuleRestartCycle'
clearAfterTest=false clearAfterTest=false

View File

@ -9,6 +9,9 @@ injectSSDelay = true
# FIXME: re-enable rocks at some point # FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4, 5] storageEngineExcludeTypes = [4, 5]
[[knobs]]
enable_blob_granule_file_logical_size=false
[[test]] [[test]]
testTitle = 'BlobGranuleRestartLarge' testTitle = 'BlobGranuleRestartLarge'
clearAfterTest=false clearAfterTest=false