adding blob granule logical size
This commit is contained in:
parent
2890b9943a
commit
a5b4212990
|
@ -287,6 +287,7 @@ void ClientKnobs::initialize(Randomize randomize) {
|
|||
init( BG_MAX_GRANULE_PARALLELISM, 10 );
|
||||
init( BG_TOO_MANY_GRANULES, 20000 );
|
||||
init( BLOB_METADATA_REFRESH_INTERVAL, 3600 ); if ( randomize && BUGGIFY ) { BLOB_METADATA_REFRESH_INTERVAL = deterministicRandom()->randomInt(5, 120); }
|
||||
init( ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE = true; }
|
||||
|
||||
init( CHANGE_QUORUM_BAD_STATE_RETRY_TIMES, 3 );
|
||||
init( CHANGE_QUORUM_BAD_STATE_RETRY_DELAY, 2.0 );
|
||||
|
|
|
@ -1424,22 +1424,30 @@ const Value blobGranuleFileValueFor(StringRef const& filename,
|
|||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
int64_t logicalSize,
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta) {
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
|
||||
auto protocolVersion = CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE
|
||||
? ProtocolVersion::withBlobGranuleFileLogicalSize()
|
||||
: ProtocolVersion::withBlobGranule();
|
||||
BinaryWriter wr(IncludeVersion(protocolVersion));
|
||||
wr << filename;
|
||||
wr << offset;
|
||||
wr << length;
|
||||
wr << fullFileLength;
|
||||
wr << cipherKeysMeta;
|
||||
if (CLIENT_KNOBS->ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE) {
|
||||
wr << logicalSize;
|
||||
}
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
|
||||
decodeBlobGranuleFileValue(ValueRef const& value) {
|
||||
StringRef filename;
|
||||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
int64_t logicalSize;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
|
@ -1448,7 +1456,13 @@ decodeBlobGranuleFileValue(ValueRef const& value) {
|
|||
reader >> length;
|
||||
reader >> fullFileLength;
|
||||
reader >> cipherKeysMeta;
|
||||
return std::tuple(filename, offset, length, fullFileLength, cipherKeysMeta);
|
||||
if (reader.protocolVersion().hasBlobGranuleFileLogicalSize()) {
|
||||
reader >> logicalSize;
|
||||
} else {
|
||||
// fall back to estimating logical size as physical size
|
||||
logicalSize = length;
|
||||
}
|
||||
return std::tuple(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta);
|
||||
}
|
||||
|
||||
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) {
|
||||
|
|
|
@ -278,6 +278,7 @@ public:
|
|||
int BG_MAX_GRANULE_PARALLELISM;
|
||||
int BG_TOO_MANY_GRANULES;
|
||||
int64_t BLOB_METADATA_REFRESH_INTERVAL;
|
||||
bool ENABLE_BLOB_GRANULE_FILE_LOGICAL_SIZE;
|
||||
|
||||
// The coordinator key/value in storage server might be inconsistent to the value stored in the cluster file.
|
||||
// This might happen when a recovery is happening together with a cluster controller coordinator key change.
|
||||
|
|
|
@ -671,8 +671,9 @@ const Value blobGranuleFileValueFor(
|
|||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
int64_t logicalSize,
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta = Optional<BlobGranuleCipherKeysMeta>());
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
|
||||
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t, int64_t, Optional<BlobGranuleCipherKeysMeta>>
|
||||
decodeBlobGranuleFileValue(ValueRef const& value);
|
||||
|
||||
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force);
|
||||
|
|
|
@ -74,14 +74,17 @@ ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey,
|
|||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
int64_t logicalSize;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key);
|
||||
ASSERT(gid == granuleID);
|
||||
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) = decodeBlobGranuleFileValue(it.value);
|
||||
std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) =
|
||||
decodeBlobGranuleFileValue(it.value);
|
||||
|
||||
BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength, cipherKeysMeta);
|
||||
BlobFileIndex idx(
|
||||
version, filename.toString(), offset, length, fullFileLength, logicalSize, cipherKeysMeta);
|
||||
if (fileType == 'S') {
|
||||
ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version);
|
||||
files->snapshotFiles.push_back(idx);
|
||||
|
@ -250,7 +253,7 @@ static std::string makeTestFileName(Version v) {
|
|||
}
|
||||
|
||||
static BlobFileIndex makeTestFile(Version v, int64_t len) {
|
||||
return BlobFileIndex(v, makeTestFileName(v), 0, len, len);
|
||||
return BlobFileIndex(v, makeTestFileName(v), 0, len, len, len);
|
||||
}
|
||||
|
||||
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {
|
||||
|
|
|
@ -768,10 +768,11 @@ private:
|
|||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
int64_t logicalSize;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(row.key);
|
||||
std::tie(filename, offset, length, fullFileLength, cipherKeysMeta) =
|
||||
std::tie(filename, offset, length, fullFileLength, logicalSize, cipherKeysMeta) =
|
||||
decodeBlobGranuleFileValue(row.value);
|
||||
GranuleFileVersion vs = { version, fileType, filename.toString(), length };
|
||||
files.push_back(vs);
|
||||
|
|
|
@ -882,8 +882,9 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
SERVER_KNOBS->BG_DELTA_FILE_TARGET_CHUNK_BYTES,
|
||||
compressFilter,
|
||||
cipherKeysCtx);
|
||||
state size_t logicalSize = deltasToWrite.expectedSize();
|
||||
state size_t serializedSize = serialized.size();
|
||||
bwData->stats.compressionBytesRaw += deltasToWrite.expectedSize();
|
||||
bwData->stats.compressionBytesRaw += logicalSize;
|
||||
bwData->stats.compressionBytesFinal += serializedSize;
|
||||
|
||||
// Free up deltasToWrite here to reduce memory
|
||||
|
@ -930,7 +931,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
|
||||
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
|
||||
// TODO change once we support file multiplexing
|
||||
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta);
|
||||
Value dfValue =
|
||||
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
|
||||
tr->set(dfKey, dfValue);
|
||||
|
||||
if (oldGranuleComplete.present()) {
|
||||
|
@ -974,7 +976,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
|
|||
bwData->stats.deltaUpdateSample.addMeasurement(duration);
|
||||
|
||||
// FIXME: change when we implement multiplexing
|
||||
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
|
||||
return BlobFileIndex(
|
||||
currentDeltaVersion, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
|
@ -1077,7 +1080,7 @@ ACTOR Future<BlobFileIndex> writeEmptyDeltaFile(Reference<BlobWorkerData> bwData
|
|||
wait(delay(deterministicRandom()->random01()));
|
||||
}
|
||||
|
||||
return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, {});
|
||||
return BlobFileIndex(currentDeltaVersion, "", 0, 0, 0, 0, {});
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
|
@ -1186,8 +1189,9 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
SERVER_KNOBS->BG_SNAPSHOT_FILE_TARGET_CHUNK_BYTES,
|
||||
compressFilter,
|
||||
cipherKeysCtx);
|
||||
state size_t logicalSize = snapshot.expectedSize();
|
||||
state size_t serializedSize = serialized.size();
|
||||
bwData->stats.compressionBytesRaw += snapshot.expectedSize();
|
||||
bwData->stats.compressionBytesRaw += logicalSize;
|
||||
bwData->stats.compressionBytesFinal += serializedSize;
|
||||
|
||||
// free snapshot to reduce memory
|
||||
|
@ -1238,7 +1242,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S');
|
||||
// TODO change once we support file multiplexing
|
||||
Key snapshotFileValue =
|
||||
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, cipherKeysMeta);
|
||||
blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
|
||||
tr->set(snapshotFileKey, snapshotFileValue);
|
||||
// create granule history at version if this is a new granule with the initial dump from FDB
|
||||
if (initialSnapshot) {
|
||||
|
@ -1295,7 +1299,7 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
|
|||
}
|
||||
|
||||
// FIXME: change when we implement multiplexing
|
||||
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, cipherKeysMeta);
|
||||
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize, logicalSize, cipherKeysMeta);
|
||||
}
|
||||
|
||||
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
|
||||
|
@ -2035,11 +2039,10 @@ Version doGranuleRollback(Reference<GranuleMetadata> metadata,
|
|||
metadata->bufferedDeltaVersion = cfRollbackVersion;
|
||||
|
||||
// calculate number of bytes in durable delta files after last snapshot
|
||||
// FIXME: this assumes delta file serialized size ~= logical size, which is false with compression
|
||||
for (int i = metadata->files.deltaFiles.size() - 1;
|
||||
i >= 0 && metadata->files.deltaFiles[i].version > metadata->pendingSnapshotVersion;
|
||||
i--) {
|
||||
metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].length;
|
||||
metadata->bytesInNewDeltaFiles += metadata->files.deltaFiles[i].logicalSize;
|
||||
}
|
||||
|
||||
// Track that this rollback happened, since we have to re-read mutations up to the rollback
|
||||
|
@ -2375,9 +2378,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
|
|||
Version snapshotVersion = files.snapshotFiles.back().version;
|
||||
for (int i = files.deltaFiles.size() - 1; i >= 0; i--) {
|
||||
if (files.deltaFiles[i].version > snapshotVersion) {
|
||||
// FIXME: this assumes delta file serialized size ~= logical size, which is false with
|
||||
// compression
|
||||
metadata->bytesInNewDeltaFiles += files.deltaFiles[i].length;
|
||||
metadata->bytesInNewDeltaFiles += files.deltaFiles[i].logicalSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,21 +45,29 @@ struct BlobFileIndex {
|
|||
int64_t offset;
|
||||
int64_t length;
|
||||
int64_t fullFileLength;
|
||||
int64_t logicalSize;
|
||||
Optional<BlobGranuleCipherKeysMeta> cipherKeysMeta;
|
||||
|
||||
BlobFileIndex() {}
|
||||
|
||||
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength)
|
||||
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
|
||||
|
||||
BlobFileIndex(Version version,
|
||||
std::string filename,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
int64_t logicalSize)
|
||||
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength),
|
||||
logicalSize(logicalSize) {}
|
||||
|
||||
BlobFileIndex(Version version,
|
||||
std::string filename,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int64_t fullFileLength,
|
||||
int64_t logicalSize,
|
||||
Optional<BlobGranuleCipherKeysMeta> ciphKeysMeta)
|
||||
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength),
|
||||
cipherKeysMeta(ciphKeysMeta) {}
|
||||
logicalSize(logicalSize), cipherKeysMeta(ciphKeysMeta) {}
|
||||
|
||||
// compare on version
|
||||
bool operator<(const BlobFileIndex& r) const { return version < r.version; }
|
||||
|
|
|
@ -174,6 +174,7 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE@, BlobGranuleFile);
|
||||
PROTOCOL_VERSION_FEATURE(@FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE@, EncryptedSnapshotBackupFile);
|
||||
PROTOCOL_VERSION_FEATURE(@FDB_PV_CLUSTER_ID_SPECIAL_KEY@, ClusterIdSpecialKey);
|
||||
PROTOCOL_VERSION_FEATURE(@FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE@, BlobGranuleFileLogicalSize);
|
||||
};
|
||||
|
||||
template <>
|
||||
|
|
|
@ -90,3 +90,4 @@ set(FDB_PV_SHARD_ENCODE_LOCATION_METADATA "0x0FDB00B072000000LL")
|
|||
set(FDB_PV_BLOB_GRANULE_FILE "0x0FDB00B072000000LL")
|
||||
set(FDB_ENCRYPTED_SNAPSHOT_BACKUP_FILE "0x0FDB00B072000000LL")
|
||||
set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL")
|
||||
set(FDB_PV_BLOB_GRANULE_FILE_LOGICAL_SIZE "0x0FDB00B072000000LL")
|
||||
|
|
|
@ -9,6 +9,9 @@ injectSSDelay = true
|
|||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
[[knobs]]
|
||||
enable_blob_granule_file_logical_size=false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleRestartCycle'
|
||||
clearAfterTest=false
|
||||
|
|
|
@ -9,6 +9,9 @@ injectSSDelay = true
|
|||
# FIXME: re-enable rocks at some point
|
||||
storageEngineExcludeTypes = [4, 5]
|
||||
|
||||
[[knobs]]
|
||||
enable_blob_granule_file_logical_size=false
|
||||
|
||||
[[test]]
|
||||
testTitle = 'BlobGranuleRestartLarge'
|
||||
clearAfterTest=false
|
||||
|
|
Loading…
Reference in New Issue