From 61474d5d548071183684bddf5e1d47b8f97424dd Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Mon, 28 Mar 2022 14:48:12 -0500 Subject: [PATCH] Future-proof blob granules with full file size --- fdbclient/BlobGranuleCommon.h | 9 +++++---- fdbclient/BlobGranuleFiles.cpp | 11 ++++++++--- fdbclient/SystemData.cpp | 9 ++++++--- fdbclient/SystemData.h | 4 ++-- fdbserver/BlobGranuleServerCommon.actor.cpp | 16 ++++++++++------ fdbserver/BlobGranuleServerCommon.actor.h | 5 +++-- fdbserver/BlobWorker.actor.cpp | 18 ++++++++++++------ 7 files changed, 46 insertions(+), 26 deletions(-) diff --git a/fdbclient/BlobGranuleCommon.h b/fdbclient/BlobGranuleCommon.h index c76e72342d..97074326d9 100644 --- a/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/BlobGranuleCommon.h @@ -52,19 +52,20 @@ struct BlobFilePointerRef { StringRef filename; int64_t offset; int64_t length; + int64_t fullFileLength; BlobFilePointerRef() {} - BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length) - : filename(to, filename), offset(offset), length(length) {} + BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length, int64_t fullFileLength) + : filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {} template void serialize(Ar& ar) { - serializer(ar, filename, offset, length); + serializer(ar, filename, offset, length, fullFileLength); } std::string toString() const { std::stringstream ss; - ss << filename.toString() << ":" << offset << ":" << length; + ss << filename.toString() << ":" << offset << ":" << length << ":" << fullFileLength; return std::move(ss).str(); } }; diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index 1697722fb7..469573e3d9 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -240,22 +240,27 @@ static void startLoad(const ReadBlobGranuleContext granuleContext, // Start load process for all files in chunk if (chunk.snapshotFile.present()) { std::string snapshotFname = chunk.snapshotFile.get().filename.toString(); - // FIXME: full file length won't always be length of read + // FIXME: remove when we implement file multiplexing + ASSERT(chunk.snapshotFile.get().offset == 0); + ASSERT(chunk.snapshotFile.get().length == chunk.snapshotFile.get().fullFileLength); loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(), snapshotFname.size(), chunk.snapshotFile.get().offset, chunk.snapshotFile.get().length, - chunk.snapshotFile.get().length, + chunk.snapshotFile.get().fullFileLength, granuleContext.userContext); } loadIds.deltaIds.reserve(chunk.deltaFiles.size()); for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) { std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString(); + // FIXME: remove when we implement file multiplexing + ASSERT(chunk.deltaFiles[deltaFileIdx].offset == 0); + ASSERT(chunk.deltaFiles[deltaFileIdx].length == chunk.deltaFiles[deltaFileIdx].fullFileLength); int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(), deltaFName.size(), chunk.deltaFiles[deltaFileIdx].offset, chunk.deltaFiles[deltaFileIdx].length, - chunk.deltaFiles[deltaFileIdx].length, + chunk.deltaFiles[deltaFileIdx].fullFileLength, granuleContext.userContext); loadIds.deltaIds.push_back(deltaLoadId); } diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 5c24441f32..42e0f83a26 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1190,23 +1190,26 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) { return KeyRangeRef(startKey, strinc(startKey)); } -const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) { +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength) { BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule())); wr << filename; wr << offset; wr << length; + wr << fullFileLength; return wr.toValue(); } -std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) { +std::tuple, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) { StringRef filename; int64_t offset; int64_t length; + int64_t fullFileLength; BinaryReader reader(value, IncludeVersion()); reader >> filename; reader >> offset; reader >> length; - return std::tuple(filename, offset, length); + reader >> fullFileLength; + return std::tuple(filename, offset, length, fullFileLength); } const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) { diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 171130559e..fcbc20bf97 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -572,8 +572,8 @@ const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t file std::tuple decodeBlobGranuleFileKey(KeyRef const& key); const KeyRange blobGranuleFileKeyRangeFor(UID granuleID); -const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length); -std::tuple, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); +const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength); +std::tuple, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value); const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force); std::tuple decodeBlobGranulePruneValue(ValueRef const& value); diff --git a/fdbserver/BlobGranuleServerCommon.actor.cpp b/fdbserver/BlobGranuleServerCommon.actor.cpp index 4792984d62..35b8d2e22f 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.cpp +++ b/fdbserver/BlobGranuleServerCommon.actor.cpp @@ -60,13 +60,14 @@ ACTOR Future readGranuleFiles(Transaction* tr, Key* startKey, Key endKey, Standalone filename; int64_t offset; int64_t length; + int64_t fullFileLength; std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key); ASSERT(gid == granuleID); - std::tie(filename, offset, length) = decodeBlobGranuleFileValue(it.value); + std::tie(filename, offset, length, fullFileLength) = decodeBlobGranuleFileValue(it.value); - BlobFileIndex idx(version, filename.toString(), offset, length); + BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength); if (fileType == 'S') { ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version); files->snapshotFiles.push_back(idx); @@ -168,14 +169,16 @@ void GranuleFiles::getFiles(Version beginVersion, Version lastIncluded = invalidVersion; if (snapshotF != snapshotFiles.end()) { chunk.snapshotVersion = snapshotF->version; - chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length); + chunk.snapshotFile = BlobFilePointerRef( + replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length, snapshotF->fullFileLength); lastIncluded = chunk.snapshotVersion; } else { chunk.snapshotVersion = invalidVersion; } while (deltaF != deltaFiles.end() && deltaF->version < readVersion) { - chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + chunk.deltaFiles.emplace_back_deep( + replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength); deltaBytesCounter += deltaF->length; ASSERT(lastIncluded < deltaF->version); lastIncluded = deltaF->version; @@ -183,7 +186,8 @@ void GranuleFiles::getFiles(Version beginVersion, } // include last delta file that passes readVersion, if it exists if (deltaF != deltaFiles.end() && lastIncluded < readVersion) { - chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + chunk.deltaFiles.emplace_back_deep( + replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength); deltaBytesCounter += deltaF->length; lastIncluded = deltaF->version; } @@ -194,7 +198,7 @@ static std::string makeTestFileName(Version v) { } static BlobFileIndex makeTestFile(Version v, int64_t len) { - return BlobFileIndex(v, makeTestFileName(v), 0, len); + return BlobFileIndex(v, makeTestFileName(v), 0, len, len); } static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) { diff --git a/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/BlobGranuleServerCommon.actor.h index 399ae5b7b0..ea3f8c1e3b 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/BlobGranuleServerCommon.actor.h @@ -49,11 +49,12 @@ struct BlobFileIndex { std::string filename; int64_t offset; int64_t length; + int64_t fullFileLength; BlobFileIndex() {} - BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length) - : version(version), filename(filename), offset(offset), length(length) {} + BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength) + : version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {} // compare on version bool operator<(const BlobFileIndex& r) const { return version < r.version; } diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index 791ee7a05a..79d0981e70 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -511,7 +511,8 @@ ACTOR Future writeDeltaFile(Reference bwData, numIterations++; Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D'); - Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize); + // TODO change once we support file multiplexing + Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize); tr->set(dfKey, dfValue); if (oldGranuleComplete.present()) { @@ -538,7 +539,8 @@ ACTOR Future writeDeltaFile(Reference bwData, if (BUGGIFY_WITH_PROB(0.01)) { wait(delay(deterministicRandom()->random01())); } - return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize); + // FIXME: change when we implement multiplexing + return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize); } catch (Error& e) { wait(tr->onError(e)); } @@ -648,7 +650,8 @@ ACTOR Future writeSnapshot(Reference bwData, wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno)); numIterations++; Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S'); - Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize); + // TODO change once we support file multiplexing + Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize); tr->set(snapshotFileKey, snapshotFileValue); // create granule history at version if this is a new granule with the initial dump from FDB if (createGranuleHistory) { @@ -692,7 +695,8 @@ ACTOR Future writeSnapshot(Reference bwData, wait(delay(deterministicRandom()->random01())); } - return BlobFileIndex(version, fname, 0, serializedSize); + // FIXME: change when we implement multiplexing + return BlobFileIndex(version, fname, 0, serializedSize, serializedSize); } ACTOR Future dumpInitialSnapshotFromFDB(Reference bwData, @@ -797,7 +801,8 @@ ACTOR Future compactFromBlob(Reference bwData, ASSERT(snapshotVersion < version); - chunk.snapshotFile = BlobFilePointerRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length); + chunk.snapshotFile = BlobFilePointerRef( + filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length, snapshotF.fullFileLength); compactBytesRead += snapshotF.length; int deltaIdx = files.deltaFiles.size() - 1; while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) { @@ -807,7 +812,8 @@ ACTOR Future compactFromBlob(Reference bwData, Version lastDeltaVersion = invalidVersion; while (deltaIdx < files.deltaFiles.size() && files.deltaFiles[deltaIdx].version <= version) { BlobFileIndex deltaF = files.deltaFiles[deltaIdx]; - chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length); + chunk.deltaFiles.emplace_back_deep( + filenameArena, deltaF.filename, deltaF.offset, deltaF.length, deltaF.fullFileLength); compactBytesRead += deltaF.length; lastDeltaVersion = files.deltaFiles[deltaIdx].version; deltaIdx++;