diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index a7e5dda5a3..6c2cd83a5f 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -172,7 +172,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, Version readVersion, Optional snapshotData, StringRef deltaFileData[]) { - // TODO REMOVE with V2 of protocol + // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); ASSERT(chunk.snapshotFile.present()); ASSERT(snapshotData.present()); @@ -278,8 +278,7 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone> readFile(Reference(dataRef, arena); } catch (Error& e) { - printf("Reading file %s got error %s\n", f.toString().c_str(), e.name()); throw e; } } @@ -68,7 +67,7 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, Reference bstore, Optional stats) { - // TODO REMOVE with V2 of protocol + // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); ASSERT(chunk.snapshotFile.present()); @@ -106,7 +105,6 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, return materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData); } catch (Error& e) { - printf("Reading blob granule got error %s\n", e.name()); throw e; } } @@ -121,18 +119,12 @@ ACTOR Future readBlobGranules(BlobGranuleFileRequest request, try { state int i; for (i = 0; i < reply.chunks.size(); i++) { - /*printf("ReadBlobGranules processing chunk %d [%s - %s)\n", - i, - reply.chunks[i].keyRange.begin.printable().c_str(), - reply.chunks[i].keyRange.end.printable().c_str());*/ RangeResult chunkResult = wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore)); results.send(std::move(chunkResult)); } - // printf("ReadBlobGranules done, sending EOS\n"); results.sendError(end_of_stream()); } catch (Error& e) { - printf("ReadBlobGranules got error %s\n", e.name()); results.sendError(e); } diff --git a/fdbclient/BlobWorkerCommon.h b/fdbclient/BlobWorkerCommon.h index 2898412e73..49aed17985 100644 --- a/fdbclient/BlobWorkerCommon.h +++ b/fdbclient/BlobWorkerCommon.h @@ -38,6 +38,8 @@ struct BlobWorkerStats { Counter commitVersionChecks; Counter granuleUpdateErrors; Counter granuleRequestTimeouts; + Counter readRequestsWithBegin; + Counter readRequestsCollapsed; int numRangesAssigned; int mutationBytesBuffered; @@ -59,6 +61,7 @@ struct BlobWorkerStats { readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc), readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc), granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc), + readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc), numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0) { specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; }); specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; }); diff --git a/fdbclient/BlobWorkerInterface.h b/fdbclient/BlobWorkerInterface.h index f69b73e1bc..5dd36b7128 100644 --- a/fdbclient/BlobWorkerInterface.h +++ b/fdbclient/BlobWorkerInterface.h @@ -86,13 +86,14 @@ struct BlobGranuleFileRequest { KeyRangeRef keyRange; Version beginVersion = 0; Version readVersion; + bool canCollapseBegin = true; ReplyPromise reply; BlobGranuleFileRequest() {} template void serialize(Ar& ar) { - serializer(ar, keyRange, beginVersion, readVersion, reply, arena); + serializer(ar, keyRange, beginVersion, readVersion, canCollapseBegin, reply, arena); } }; diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index c7b9629196..a16034963b 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1791,8 +1791,6 @@ Future>> ReadYourWritesTransaction::re Version begin, Optional readVersion, Version* readVersionOut) { - // Remove in V2 of API - ASSERT(begin == 0); if (!options.readYourWritesDisabled) { return blob_granule_no_ryw(); diff --git a/fdbserver/BlobGranuleServerCommon.actor.cpp b/fdbserver/BlobGranuleServerCommon.actor.cpp index c47ed13199..28b4c1db44 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.cpp +++ b/fdbserver/BlobGranuleServerCommon.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "contrib/fmt-8.1.1/include/fmt/format.h" #include "fdbclient/SystemData.h" #include "fdbclient/BlobGranuleCommon.h" #include "fdbserver/BlobGranuleServerCommon.actor.h" @@ -25,6 +26,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbclient/ReadYourWrites.h" #include "flow/Arena.h" +#include "flow/UnitTest.h" #include "flow/actorcompiler.h" // has to be last include // Gets the latest granule history node for range that was persisted @@ -102,3 +104,252 @@ ACTOR Future loadHistoryFiles(Database cx, UID granuleID) { } } } + +// Normally a beginVersion != 0 means the caller wants all mutations between beginVersion and readVersion, instead of +// the latest snapshot before readVersion + deltas after the snapshot. When canCollapse is set, the beginVersion is +// essentially just an optimization hint. The caller is still concerned with reconstructing rows at readVersion, it just +// knows it doesn't need anything before beginVersion. +// Normally this can eliminate the need for a snapshot and just return a small amount of deltas. But in a highly active +// key range, the granule may have a snapshot file at version X, where beginVersion < X <= readVersion. In this case, if +// the number of bytes in delta files between beginVersion and X is larger than the snapshot file at version X, it is +// strictly more efficient (in terms of files and bytes read) to just use the snapshot file at version X instead. +void GranuleFiles::getFiles(Version beginVersion, + Version readVersion, + bool canCollapse, + BlobGranuleChunkRef& chunk, + Arena& replyArena) const { + BlobFileIndex dummyIndex; // for searching + + auto snapshotF = snapshotFiles.end(); + if (beginVersion == 0 || canCollapse) { + dummyIndex.version = readVersion; + snapshotF = std::lower_bound(snapshotFiles.begin(), snapshotFiles.end(), dummyIndex); + if (snapshotF == snapshotFiles.end() || snapshotF->version > readVersion) { + ASSERT(snapshotF != snapshotFiles.begin()); + snapshotF--; + } + ASSERT(snapshotF != snapshotFiles.end()); + ASSERT(snapshotF->version <= readVersion); + } + + auto deltaF = deltaFiles.end(); + if (beginVersion > 0) { + dummyIndex.version = beginVersion; + deltaF = std::lower_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex); + if (canCollapse) { + ASSERT(snapshotF != snapshotFiles.end()); + // see if delta files up to snapshotVersion are smaller or larger than snapshotBytes in total + auto deltaFCopy = deltaF; + int64_t snapshotBytes = snapshotF->length; + while (deltaFCopy != deltaFiles.end() && deltaFCopy->version <= snapshotF->version && snapshotBytes > 0) { + snapshotBytes -= deltaFCopy->length; + deltaFCopy++; + } + // if delta files contain the same or more bytes as the snapshot with collapse, do the collapse + if (snapshotBytes > 0) { + // don't collapse, clear snapshotF and just do delta files + snapshotF = snapshotFiles.end(); + } else { + // do snapshot instead of previous deltas + dummyIndex.version = snapshotF->version; + deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex); + ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version); + } + } + } else { + dummyIndex.version = snapshotF->version; + deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex); + ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version); + } + + Version lastIncluded = invalidVersion; + if (snapshotF != snapshotFiles.end()) { + chunk.snapshotVersion = snapshotF->version; + chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length); + lastIncluded = chunk.snapshotVersion; + } else { + chunk.snapshotVersion = invalidVersion; + } + + int64_t deltaBytes = 0; + while (deltaF != deltaFiles.end() && deltaF->version < readVersion) { + chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + deltaBytes += deltaF->length; + ASSERT(lastIncluded < deltaF->version); + lastIncluded = deltaF->version; + deltaF++; + } + // include last delta file that passes readVersion, if it exists + if (deltaF != deltaFiles.end() && lastIncluded < readVersion) { + chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length); + deltaBytes += deltaF->length; + lastIncluded = deltaF->version; + } + + // TODO wire this up, + // bwData->stats.readReqDeltaBytesReturned += deltaBytes; +} + +static std::string makeTestFileName(Version v) { + return "test" + std::to_string(v); +} + +static BlobFileIndex makeTestFile(Version v, int64_t len) { + return BlobFileIndex(v, makeTestFileName(v), 0, len); +} + +static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) { + ASSERT(makeTestFileName(expectedVersion) == actualFile.filename.toString()); +} + +static void checkFiles(const GranuleFiles& f, + Version beginVersion, + Version readVersion, + bool canCollapse, + Optional expectedSnapshotVersion, + std::vector expectedDeltaVersions) { + Arena a; + BlobGranuleChunkRef chunk; + f.getFiles(beginVersion, readVersion, canCollapse, chunk, a); + fmt::print("results({0}, {1}, {2}):\nEXPECTED: snapshot={3}\n deltas ({4}):\n", + beginVersion, + readVersion, + canCollapse ? "T" : "F", + expectedSnapshotVersion.present() ? makeTestFileName(expectedSnapshotVersion.get()).c_str() : "", + expectedDeltaVersions.size()); + for (int d : expectedDeltaVersions) { + fmt::print(" {}\n", makeTestFileName(d)); + } + fmt::print("ACTUAL:\n snapshot={0}\n deltas ({1}):\n", + chunk.snapshotFile.present() ? chunk.snapshotFile.get().filename.toString().c_str() : "", + chunk.deltaFiles.size()); + for (auto& it : chunk.deltaFiles) { + fmt::print(" {}\n", it.filename.toString()); + } + printf("\n\n\n"); + ASSERT(expectedSnapshotVersion.present() == chunk.snapshotFile.present()); + if (expectedSnapshotVersion.present()) { + checkFile(expectedSnapshotVersion.get(), chunk.snapshotFile.get()); + } + ASSERT(expectedDeltaVersions.size() == chunk.deltaFiles.size()); + for (int i = 0; i < expectedDeltaVersions.size(); i++) { + checkFile(expectedDeltaVersions[i], chunk.deltaFiles[i]); + } +} + +/* + * Files: + * S @ 100 (10 bytes) + * D @ 150 (5 bytes) + * D @ 200 (6 bytes) + * S @ 200 (15 bytes) + * D @ 250 (7 bytes) + * D @ 300 (8 bytes) + * S @ 300 (10 bytes) + * D @ 350 (4 bytes) + */ +TEST_CASE("/blobgranule/server/common/granulefiles") { + // simple cases first + + // single snapshot file, no deltas + GranuleFiles files; + files.snapshotFiles.push_back(makeTestFile(100, 10)); + + printf("Just snapshot\n"); + + checkFiles(files, 0, 100, false, 100, {}); + checkFiles(files, 0, 200, false, 100, {}); + + printf("Small test\n"); + // add delta files with re-snapshot at end + files.deltaFiles.push_back(makeTestFile(150, 5)); + files.deltaFiles.push_back(makeTestFile(200, 6)); + files.snapshotFiles.push_back(makeTestFile(200, 15)); + + // check different read versions with beginVersion=0 + checkFiles(files, 0, 100, false, 100, {}); + checkFiles(files, 0, 101, false, 100, { 150 }); + checkFiles(files, 0, 149, false, 100, { 150 }); + checkFiles(files, 0, 150, false, 100, { 150 }); + checkFiles(files, 0, 151, false, 100, { 150, 200 }); + checkFiles(files, 0, 199, false, 100, { 150, 200 }); + checkFiles(files, 0, 200, false, 200, {}); + checkFiles(files, 0, 300, false, 200, {}); + + // Test all cases of beginVersion + readVersion. Because delta files are smaller than snapshot at 200, this should + // be the same with and without collapse + checkFiles(files, 100, 200, false, Optional(), { 150, 200 }); + checkFiles(files, 100, 300, false, Optional(), { 150, 200 }); + checkFiles(files, 101, 199, false, Optional(), { 150, 200 }); + checkFiles(files, 149, 151, false, Optional(), { 150, 200 }); + checkFiles(files, 149, 150, false, Optional(), { 150 }); + checkFiles(files, 150, 151, false, Optional(), { 150, 200 }); + checkFiles(files, 151, 200, false, Optional(), { 200 }); + + checkFiles(files, 100, 200, true, Optional(), { 150, 200 }); + checkFiles(files, 100, 300, true, Optional(), { 150, 200 }); + checkFiles(files, 101, 199, true, Optional(), { 150, 200 }); + checkFiles(files, 149, 151, true, Optional(), { 150, 200 }); + checkFiles(files, 149, 150, true, Optional(), { 150 }); + checkFiles(files, 150, 151, true, Optional(), { 150, 200 }); + checkFiles(files, 151, 200, true, Optional(), { 200 }); + + printf("Larger test\n"); + // add more delta files and snapshots to check collapse logic + files.deltaFiles.push_back(makeTestFile(250, 7)); + files.deltaFiles.push_back(makeTestFile(300, 8)); + files.snapshotFiles.push_back(makeTestFile(300, 10)); + files.deltaFiles.push_back(makeTestFile(350, 4)); + + checkFiles(files, 0, 300, false, 300, {}); + checkFiles(files, 0, 301, false, 300, { 350 }); + checkFiles(files, 0, 400, false, 300, { 350 }); + + // check delta files without collapse + + checkFiles(files, 100, 301, false, Optional(), { 150, 200, 250, 300, 350 }); + checkFiles(files, 100, 300, false, Optional(), { 150, 200, 250, 300 }); + checkFiles(files, 100, 251, false, Optional(), { 150, 200, 250, 300 }); + checkFiles(files, 100, 250, false, Optional(), { 150, 200, 250 }); + + checkFiles(files, 151, 300, false, Optional(), { 200, 250, 300 }); + checkFiles(files, 151, 301, false, Optional(), { 200, 250, 300, 350 }); + checkFiles(files, 151, 400, false, Optional(), { 200, 250, 300, 350 }); + + checkFiles(files, 201, 300, false, Optional(), { 250, 300 }); + checkFiles(files, 201, 301, false, Optional(), { 250, 300, 350 }); + checkFiles(files, 201, 400, false, Optional(), { 250, 300, 350 }); + + checkFiles(files, 251, 300, false, Optional(), { 300 }); + checkFiles(files, 251, 301, false, Optional(), { 300, 350 }); + checkFiles(files, 251, 400, false, Optional(), { 300, 350 }); + checkFiles(files, 301, 400, false, Optional(), { 350 }); + checkFiles(files, 351, 400, false, Optional(), {}); + + // check with collapse + // these 2 collapse because the delta files at 150+200+250+300 are larger than the snapshot at 300 + checkFiles(files, 100, 301, true, 300, { 350 }); + checkFiles(files, 100, 300, true, 300, {}); + // these 2 don't collapse because 150+200 delta files are smaller than the snapshot at 200 + checkFiles(files, 100, 251, true, Optional(), { 150, 200, 250, 300 }); + checkFiles(files, 100, 250, true, Optional(), { 150, 200, 250 }); + + // these 3 do collapse because the delta files at 200+250+300 are larger than the snapshot at 300 + checkFiles(files, 151, 300, true, 300, {}); + checkFiles(files, 151, 301, true, 300, { 350 }); + checkFiles(files, 151, 400, true, 300, { 350 }); + + // these 3 do collapse because the delta files at 250+300 are larger than the snapshot at 300 + checkFiles(files, 201, 300, true, 300, {}); + checkFiles(files, 201, 301, true, 300, { 350 }); + checkFiles(files, 201, 400, true, 300, { 350 }); + + // these don't collapse because the delta file at 300 is smaller than the snapshot at 300 + checkFiles(files, 251, 300, true, Optional(), { 300 }); + checkFiles(files, 251, 301, true, Optional(), { 300, 350 }); + checkFiles(files, 251, 400, true, Optional(), { 300, 350 }); + checkFiles(files, 301, 400, true, Optional(), { 350 }); + checkFiles(files, 351, 400, true, Optional(), {}); + + return Void(); +} \ No newline at end of file diff --git a/fdbserver/BlobGranuleServerCommon.actor.h b/fdbserver/BlobGranuleServerCommon.actor.h index d48418c951..b088903967 100644 --- a/fdbserver/BlobGranuleServerCommon.actor.h +++ b/fdbserver/BlobGranuleServerCommon.actor.h @@ -54,12 +54,22 @@ struct BlobFileIndex { BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length) : version(version), filename(filename), offset(offset), length(length) {} + + // compare on version + bool operator<(const BlobFileIndex& r) const { return version < r.version; } }; +// FIXME: initialize these to smaller default sizes to save a bit of memory, particularly snapshotFiles // Stores the files that comprise a blob granule struct GranuleFiles { - std::deque snapshotFiles; - std::deque deltaFiles; + std::vector snapshotFiles; + std::vector deltaFiles; + + void getFiles(Version beginVersion, + Version readVersion, + bool canCollapse, + BlobGranuleChunkRef& chunk, + Arena& replyArena) const; }; class Transaction; diff --git a/fdbserver/BlobManager.actor.cpp b/fdbserver/BlobManager.actor.cpp index 2b6a4da2bf..b81fee7d70 100644 --- a/fdbserver/BlobManager.actor.cpp +++ b/fdbserver/BlobManager.actor.cpp @@ -2778,7 +2778,7 @@ ACTOR Future blobManager(BlobManagerInterface bmInterf, // DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed. // DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D) // should be in removed. -TEST_CASE(":/blobmanager/updateranges") { +TEST_CASE("/blobmanager/updateranges") { KeyRangeMap knownBlobRanges(false, normalKeys.end); Arena ar; diff --git a/fdbserver/BlobWorker.actor.cpp b/fdbserver/BlobWorker.actor.cpp index e939d71ec6..5b6d22c2b1 100644 --- a/fdbserver/BlobWorker.actor.cpp +++ b/fdbserver/BlobWorker.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include #include #include #include @@ -43,9 +44,10 @@ #include "flow/Error.h" #include "flow/IRandom.h" #include "flow/Trace.h" -#include "flow/actorcompiler.h" // has to be last include #include "flow/network.h" +#include "flow/actorcompiler.h" // has to be last include + #define BW_DEBUG false #define BW_REQUEST_DEBUG false @@ -2100,9 +2102,13 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl req.readVersion); } + state bool didCollapse = false; try { - // TODO REMOVE in api V2 - ASSERT(req.beginVersion == 0); + // TODO remove requirement for canCollapseBegin once we implement early replying + ASSERT(req.beginVersion == 0 || req.canCollapseBegin); + if (req.beginVersion != 0) { + ASSERT(req.beginVersion > 0); + } state BlobGranuleFileReply rep; state std::vector> granules; @@ -2150,6 +2156,7 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl continue; } state Reference metadata = m; + state Version granuleBeginVersion = req.beginVersion; choose { when(wait(metadata->readable.getFuture())) {} @@ -2290,67 +2297,25 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl // granule is up to date, do read ASSERT(metadata->cancelled.canBeSet()); + // Right now we force a collapse if the version range crosses granule boundaries, for simplicity + if (chunkFiles.snapshotFiles.front().version < granuleBeginVersion) { + didCollapse = true; + granuleBeginVersion = 0; + } BlobGranuleChunkRef chunk; - // TODO change in V2 + // TODO change with early reply chunk.includedVersion = req.readVersion; chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end)); - // handle snapshot files - // TODO refactor the "find snapshot file" logic to GranuleFiles? - // FIXME: binary search instead of linear search, especially when file count is large - int i = chunkFiles.snapshotFiles.size() - 1; - while (i >= 0 && chunkFiles.snapshotFiles[i].version > req.readVersion) { - i--; - } - // because of granule history, we should always be able to find the desired snapshot - // version, and have thrown blob_granule_transaction_too_old earlier if not possible. - if (i < 0) { - fmt::print("req @ {0} >= initial snapshot {1} but can't find snapshot in ({2}) files:\n", - req.readVersion, - metadata->initialSnapshotVersion, - chunkFiles.snapshotFiles.size()); - for (auto& f : chunkFiles.snapshotFiles) { - fmt::print(" {0}", f.version); - } - } - ASSERT(i >= 0); - - BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i]; - chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length); - Version snapshotVersion = chunkFiles.snapshotFiles[i].version; - chunk.snapshotVersion = snapshotVersion; - - // handle delta files - // cast this to an int so i going to -1 still compares properly - int lastDeltaFileIdx = chunkFiles.deltaFiles.size() - 1; - i = lastDeltaFileIdx; - // skip delta files that are too new - while (i >= 0 && chunkFiles.deltaFiles[i].version > req.readVersion) { - i--; - } - if (i < lastDeltaFileIdx) { - // we skipped one file at the end with a larger read version, this will actually contain - // our query version, so add it back. - i++; - } - // only include delta files after the snapshot file - int j = i; - while (j >= 0 && chunkFiles.deltaFiles[j].version > snapshotVersion) { - j--; - } - j++; - while (j <= i) { - BlobFileIndex deltaF = chunkFiles.deltaFiles[j]; - chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length); - bwData->stats.readReqDeltaBytesReturned += deltaF.length; - j++; + chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena); + if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) { + didCollapse = true; } // new deltas (if version is larger than version of last delta file) // FIXME: do trivial key bounds here if key range is not fully contained in request key // range - - if (req.readVersion > metadata->durableDeltaVersion.get()) { + if (req.readVersion > metadata->durableDeltaVersion.get() && metadata->currentDeltas.size()) { if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) { fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n", metadata->keyRange.begin.printable(), @@ -2359,13 +2324,31 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl metadata->durableDeltaVersion.get(), metadata->pendingDeltaVersion); } + + // prune mutations based on begin version, if possible ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion); rep.arena.dependsOn(metadata->currentDeltas.arena()); - for (auto& delta : metadata->currentDeltas) { - if (delta.version > req.readVersion) { + MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin(); + if (granuleBeginVersion > metadata->currentDeltas.back().version) { + TEST(true); // beginVersion pruning all in-memory mutations + mutationIt = metadata->currentDeltas.end(); + } else if (granuleBeginVersion > metadata->currentDeltas.front().version) { + // binary search for beginVersion + TEST(true); // beginVersion pruning some in-memory mutations + mutationIt = std::lower_bound(metadata->currentDeltas.begin(), + metadata->currentDeltas.end(), + MutationsAndVersionRef(granuleBeginVersion, 0), + MutationsAndVersionRef::OrderByVersion()); + } + + // add mutations to response + while (mutationIt != metadata->currentDeltas.end()) { + if (mutationIt->version > req.readVersion) { + TEST(true); // readVersion pruning some in-memory mutations break; } - chunk.newDeltas.push_back_deep(rep.arena, delta); + chunk.newDeltas.push_back_deep(rep.arena, *mutationIt); + mutationIt++; } } @@ -2376,11 +2359,19 @@ ACTOR Future doBlobGranuleFileRequest(Reference bwData, Bl wait(yield(TaskPriority::DefaultEndpoint)); } + // do these together to keep them synchronous + if (req.beginVersion != 0) { + ++bwData->stats.readRequestsWithBegin; + } + if (didCollapse) { + ++bwData->stats.readRequestsCollapsed; + } ASSERT(!req.reply.isSet()); req.reply.send(rep); --bwData->stats.activeReadRequests; } catch (Error& e) { - // fmt::print("Error in BGFRequest {0}\n", e.name()); + // TODO REMOVE + fmt::print("Error in BGFRequest {0}\n", e.name()); if (e.code() == error_code_operation_cancelled) { req.reply.sendError(wrong_shard_server()); throw; diff --git a/tests/BGServerCommonUnit.toml b/tests/BGServerCommonUnit.toml new file mode 100644 index 0000000000..d7a5eba2ca --- /dev/null +++ b/tests/BGServerCommonUnit.toml @@ -0,0 +1,9 @@ +[[test]] +testTitle = 'BGServerCommonUnit' +useDB = false +startDelay = 0 + + [[test.workload]] + testName = 'UnitTests' + maxTestCases = 0 + testsMatching = /blobgranule/server/common/ \ No newline at end of file diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 42713ad6cb..d9a3eca683 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -50,7 +50,8 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES s3VersionHeaders.txt IGNORE) add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE) add_fdb_test(TEST_FILES BigInsert.txt IGNORE) - add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt) + add_fdb_test(TEST_FILES BGServerCommonUnit.toml) + add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt) # TODO change these to toml add_fdb_test(TEST_FILES BlobManagerUnit.txt) add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE) add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)