Blob Worker side of beginVersion done, added unit test
This commit is contained in:
parent
ad6a63c16d
commit
1b1182f414
|
@ -172,7 +172,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
|||
Version readVersion,
|
||||
Optional<StringRef> snapshotData,
|
||||
StringRef deltaFileData[]) {
|
||||
// TODO REMOVE with V2 of protocol
|
||||
// TODO REMOVE with early replying
|
||||
ASSERT(readVersion == chunk.includedVersion);
|
||||
ASSERT(chunk.snapshotFile.present());
|
||||
ASSERT(snapshotData.present());
|
||||
|
@ -278,8 +278,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: re-enable test!
|
||||
TEST_CASE(":/blobgranule/files/applyDelta") {
|
||||
TEST_CASE("/blobgranule/files/applyDelta") {
|
||||
printf("Testing blob granule delta applying\n");
|
||||
Arena a;
|
||||
|
||||
|
|
|
@ -52,7 +52,6 @@ ACTOR Future<Standalone<StringRef>> readFile(Reference<BackupContainerFileSystem
|
|||
StringRef dataRef(data, f.length);
|
||||
return Standalone<StringRef>(dataRef, arena);
|
||||
} catch (Error& e) {
|
||||
printf("Reading file %s got error %s\n", f.toString().c_str(), e.name());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -68,7 +67,7 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
|||
Reference<BackupContainerFileSystem> bstore,
|
||||
Optional<BlobWorkerStats*> stats) {
|
||||
|
||||
// TODO REMOVE with V2 of protocol
|
||||
// TODO REMOVE with early replying
|
||||
ASSERT(readVersion == chunk.includedVersion);
|
||||
ASSERT(chunk.snapshotFile.present());
|
||||
|
||||
|
@ -106,7 +105,6 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
|
|||
return materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
|
||||
|
||||
} catch (Error& e) {
|
||||
printf("Reading blob granule got error %s\n", e.name());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -121,18 +119,12 @@ ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
|
|||
try {
|
||||
state int i;
|
||||
for (i = 0; i < reply.chunks.size(); i++) {
|
||||
/*printf("ReadBlobGranules processing chunk %d [%s - %s)\n",
|
||||
i,
|
||||
reply.chunks[i].keyRange.begin.printable().c_str(),
|
||||
reply.chunks[i].keyRange.end.printable().c_str());*/
|
||||
RangeResult chunkResult =
|
||||
wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore));
|
||||
results.send(std::move(chunkResult));
|
||||
}
|
||||
// printf("ReadBlobGranules done, sending EOS\n");
|
||||
results.sendError(end_of_stream());
|
||||
} catch (Error& e) {
|
||||
printf("ReadBlobGranules got error %s\n", e.name());
|
||||
results.sendError(e);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,8 @@ struct BlobWorkerStats {
|
|||
Counter commitVersionChecks;
|
||||
Counter granuleUpdateErrors;
|
||||
Counter granuleRequestTimeouts;
|
||||
Counter readRequestsWithBegin;
|
||||
Counter readRequestsCollapsed;
|
||||
|
||||
int numRangesAssigned;
|
||||
int mutationBytesBuffered;
|
||||
|
@ -59,6 +61,7 @@ struct BlobWorkerStats {
|
|||
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc),
|
||||
readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc),
|
||||
granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc),
|
||||
readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc),
|
||||
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0) {
|
||||
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
|
||||
specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; });
|
||||
|
|
|
@ -86,13 +86,14 @@ struct BlobGranuleFileRequest {
|
|||
KeyRangeRef keyRange;
|
||||
Version beginVersion = 0;
|
||||
Version readVersion;
|
||||
bool canCollapseBegin = true;
|
||||
ReplyPromise<BlobGranuleFileReply> reply;
|
||||
|
||||
BlobGranuleFileRequest() {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, keyRange, beginVersion, readVersion, reply, arena);
|
||||
serializer(ar, keyRange, beginVersion, readVersion, canCollapseBegin, reply, arena);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1791,8 +1791,6 @@ Future<Standalone<VectorRef<BlobGranuleChunkRef>>> ReadYourWritesTransaction::re
|
|||
Version begin,
|
||||
Optional<Version> readVersion,
|
||||
Version* readVersionOut) {
|
||||
// Remove in V2 of API
|
||||
ASSERT(begin == 0);
|
||||
|
||||
if (!options.readYourWritesDisabled) {
|
||||
return blob_granule_no_ryw();
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "contrib/fmt-8.1.1/include/fmt/format.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/BlobGranuleCommon.h"
|
||||
#include "fdbserver/BlobGranuleServerCommon.actor.h"
|
||||
|
@ -25,6 +26,7 @@
|
|||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// Gets the latest granule history node for range that was persisted
|
||||
|
@ -102,3 +104,252 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Normally a beginVersion != 0 means the caller wants all mutations between beginVersion and readVersion, instead of
|
||||
// the latest snapshot before readVersion + deltas after the snapshot. When canCollapse is set, the beginVersion is
|
||||
// essentially just an optimization hint. The caller is still concerned with reconstructing rows at readVersion, it just
|
||||
// knows it doesn't need anything before beginVersion.
|
||||
// Normally this can eliminate the need for a snapshot and just return a small amount of deltas. But in a highly active
|
||||
// key range, the granule may have a snapshot file at version X, where beginVersion < X <= readVersion. In this case, if
|
||||
// the number of bytes in delta files between beginVersion and X is larger than the snapshot file at version X, it is
|
||||
// strictly more efficient (in terms of files and bytes read) to just use the snapshot file at version X instead.
|
||||
void GranuleFiles::getFiles(Version beginVersion,
|
||||
Version readVersion,
|
||||
bool canCollapse,
|
||||
BlobGranuleChunkRef& chunk,
|
||||
Arena& replyArena) const {
|
||||
BlobFileIndex dummyIndex; // for searching
|
||||
|
||||
auto snapshotF = snapshotFiles.end();
|
||||
if (beginVersion == 0 || canCollapse) {
|
||||
dummyIndex.version = readVersion;
|
||||
snapshotF = std::lower_bound(snapshotFiles.begin(), snapshotFiles.end(), dummyIndex);
|
||||
if (snapshotF == snapshotFiles.end() || snapshotF->version > readVersion) {
|
||||
ASSERT(snapshotF != snapshotFiles.begin());
|
||||
snapshotF--;
|
||||
}
|
||||
ASSERT(snapshotF != snapshotFiles.end());
|
||||
ASSERT(snapshotF->version <= readVersion);
|
||||
}
|
||||
|
||||
auto deltaF = deltaFiles.end();
|
||||
if (beginVersion > 0) {
|
||||
dummyIndex.version = beginVersion;
|
||||
deltaF = std::lower_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
|
||||
if (canCollapse) {
|
||||
ASSERT(snapshotF != snapshotFiles.end());
|
||||
// see if delta files up to snapshotVersion are smaller or larger than snapshotBytes in total
|
||||
auto deltaFCopy = deltaF;
|
||||
int64_t snapshotBytes = snapshotF->length;
|
||||
while (deltaFCopy != deltaFiles.end() && deltaFCopy->version <= snapshotF->version && snapshotBytes > 0) {
|
||||
snapshotBytes -= deltaFCopy->length;
|
||||
deltaFCopy++;
|
||||
}
|
||||
// if delta files contain the same or more bytes as the snapshot with collapse, do the collapse
|
||||
if (snapshotBytes > 0) {
|
||||
// don't collapse, clear snapshotF and just do delta files
|
||||
snapshotF = snapshotFiles.end();
|
||||
} else {
|
||||
// do snapshot instead of previous deltas
|
||||
dummyIndex.version = snapshotF->version;
|
||||
deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
|
||||
ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dummyIndex.version = snapshotF->version;
|
||||
deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
|
||||
ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version);
|
||||
}
|
||||
|
||||
Version lastIncluded = invalidVersion;
|
||||
if (snapshotF != snapshotFiles.end()) {
|
||||
chunk.snapshotVersion = snapshotF->version;
|
||||
chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length);
|
||||
lastIncluded = chunk.snapshotVersion;
|
||||
} else {
|
||||
chunk.snapshotVersion = invalidVersion;
|
||||
}
|
||||
|
||||
int64_t deltaBytes = 0;
|
||||
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
|
||||
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
|
||||
deltaBytes += deltaF->length;
|
||||
ASSERT(lastIncluded < deltaF->version);
|
||||
lastIncluded = deltaF->version;
|
||||
deltaF++;
|
||||
}
|
||||
// include last delta file that passes readVersion, if it exists
|
||||
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
|
||||
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
|
||||
deltaBytes += deltaF->length;
|
||||
lastIncluded = deltaF->version;
|
||||
}
|
||||
|
||||
// TODO wire this up,
|
||||
// bwData->stats.readReqDeltaBytesReturned += deltaBytes;
|
||||
}
|
||||
|
||||
static std::string makeTestFileName(Version v) {
|
||||
return "test" + std::to_string(v);
|
||||
}
|
||||
|
||||
static BlobFileIndex makeTestFile(Version v, int64_t len) {
|
||||
return BlobFileIndex(v, makeTestFileName(v), 0, len);
|
||||
}
|
||||
|
||||
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {
|
||||
ASSERT(makeTestFileName(expectedVersion) == actualFile.filename.toString());
|
||||
}
|
||||
|
||||
static void checkFiles(const GranuleFiles& f,
|
||||
Version beginVersion,
|
||||
Version readVersion,
|
||||
bool canCollapse,
|
||||
Optional<int> expectedSnapshotVersion,
|
||||
std::vector<int> expectedDeltaVersions) {
|
||||
Arena a;
|
||||
BlobGranuleChunkRef chunk;
|
||||
f.getFiles(beginVersion, readVersion, canCollapse, chunk, a);
|
||||
fmt::print("results({0}, {1}, {2}):\nEXPECTED: snapshot={3}\n deltas ({4}):\n",
|
||||
beginVersion,
|
||||
readVersion,
|
||||
canCollapse ? "T" : "F",
|
||||
expectedSnapshotVersion.present() ? makeTestFileName(expectedSnapshotVersion.get()).c_str() : "<N/A>",
|
||||
expectedDeltaVersions.size());
|
||||
for (int d : expectedDeltaVersions) {
|
||||
fmt::print(" {}\n", makeTestFileName(d));
|
||||
}
|
||||
fmt::print("ACTUAL:\n snapshot={0}\n deltas ({1}):\n",
|
||||
chunk.snapshotFile.present() ? chunk.snapshotFile.get().filename.toString().c_str() : "<N/A>",
|
||||
chunk.deltaFiles.size());
|
||||
for (auto& it : chunk.deltaFiles) {
|
||||
fmt::print(" {}\n", it.filename.toString());
|
||||
}
|
||||
printf("\n\n\n");
|
||||
ASSERT(expectedSnapshotVersion.present() == chunk.snapshotFile.present());
|
||||
if (expectedSnapshotVersion.present()) {
|
||||
checkFile(expectedSnapshotVersion.get(), chunk.snapshotFile.get());
|
||||
}
|
||||
ASSERT(expectedDeltaVersions.size() == chunk.deltaFiles.size());
|
||||
for (int i = 0; i < expectedDeltaVersions.size(); i++) {
|
||||
checkFile(expectedDeltaVersions[i], chunk.deltaFiles[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Files:
|
||||
* S @ 100 (10 bytes)
|
||||
* D @ 150 (5 bytes)
|
||||
* D @ 200 (6 bytes)
|
||||
* S @ 200 (15 bytes)
|
||||
* D @ 250 (7 bytes)
|
||||
* D @ 300 (8 bytes)
|
||||
* S @ 300 (10 bytes)
|
||||
* D @ 350 (4 bytes)
|
||||
*/
|
||||
TEST_CASE("/blobgranule/server/common/granulefiles") {
|
||||
// simple cases first
|
||||
|
||||
// single snapshot file, no deltas
|
||||
GranuleFiles files;
|
||||
files.snapshotFiles.push_back(makeTestFile(100, 10));
|
||||
|
||||
printf("Just snapshot\n");
|
||||
|
||||
checkFiles(files, 0, 100, false, 100, {});
|
||||
checkFiles(files, 0, 200, false, 100, {});
|
||||
|
||||
printf("Small test\n");
|
||||
// add delta files with re-snapshot at end
|
||||
files.deltaFiles.push_back(makeTestFile(150, 5));
|
||||
files.deltaFiles.push_back(makeTestFile(200, 6));
|
||||
files.snapshotFiles.push_back(makeTestFile(200, 15));
|
||||
|
||||
// check different read versions with beginVersion=0
|
||||
checkFiles(files, 0, 100, false, 100, {});
|
||||
checkFiles(files, 0, 101, false, 100, { 150 });
|
||||
checkFiles(files, 0, 149, false, 100, { 150 });
|
||||
checkFiles(files, 0, 150, false, 100, { 150 });
|
||||
checkFiles(files, 0, 151, false, 100, { 150, 200 });
|
||||
checkFiles(files, 0, 199, false, 100, { 150, 200 });
|
||||
checkFiles(files, 0, 200, false, 200, {});
|
||||
checkFiles(files, 0, 300, false, 200, {});
|
||||
|
||||
// Test all cases of beginVersion + readVersion. Because delta files are smaller than snapshot at 200, this should
|
||||
// be the same with and without collapse
|
||||
checkFiles(files, 100, 200, false, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 100, 300, false, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 101, 199, false, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 149, 151, false, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 149, 150, false, Optional<int>(), { 150 });
|
||||
checkFiles(files, 150, 151, false, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 151, 200, false, Optional<int>(), { 200 });
|
||||
|
||||
checkFiles(files, 100, 200, true, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 100, 300, true, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 101, 199, true, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 149, 151, true, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 149, 150, true, Optional<int>(), { 150 });
|
||||
checkFiles(files, 150, 151, true, Optional<int>(), { 150, 200 });
|
||||
checkFiles(files, 151, 200, true, Optional<int>(), { 200 });
|
||||
|
||||
printf("Larger test\n");
|
||||
// add more delta files and snapshots to check collapse logic
|
||||
files.deltaFiles.push_back(makeTestFile(250, 7));
|
||||
files.deltaFiles.push_back(makeTestFile(300, 8));
|
||||
files.snapshotFiles.push_back(makeTestFile(300, 10));
|
||||
files.deltaFiles.push_back(makeTestFile(350, 4));
|
||||
|
||||
checkFiles(files, 0, 300, false, 300, {});
|
||||
checkFiles(files, 0, 301, false, 300, { 350 });
|
||||
checkFiles(files, 0, 400, false, 300, { 350 });
|
||||
|
||||
// check delta files without collapse
|
||||
|
||||
checkFiles(files, 100, 301, false, Optional<int>(), { 150, 200, 250, 300, 350 });
|
||||
checkFiles(files, 100, 300, false, Optional<int>(), { 150, 200, 250, 300 });
|
||||
checkFiles(files, 100, 251, false, Optional<int>(), { 150, 200, 250, 300 });
|
||||
checkFiles(files, 100, 250, false, Optional<int>(), { 150, 200, 250 });
|
||||
|
||||
checkFiles(files, 151, 300, false, Optional<int>(), { 200, 250, 300 });
|
||||
checkFiles(files, 151, 301, false, Optional<int>(), { 200, 250, 300, 350 });
|
||||
checkFiles(files, 151, 400, false, Optional<int>(), { 200, 250, 300, 350 });
|
||||
|
||||
checkFiles(files, 201, 300, false, Optional<int>(), { 250, 300 });
|
||||
checkFiles(files, 201, 301, false, Optional<int>(), { 250, 300, 350 });
|
||||
checkFiles(files, 201, 400, false, Optional<int>(), { 250, 300, 350 });
|
||||
|
||||
checkFiles(files, 251, 300, false, Optional<int>(), { 300 });
|
||||
checkFiles(files, 251, 301, false, Optional<int>(), { 300, 350 });
|
||||
checkFiles(files, 251, 400, false, Optional<int>(), { 300, 350 });
|
||||
checkFiles(files, 301, 400, false, Optional<int>(), { 350 });
|
||||
checkFiles(files, 351, 400, false, Optional<int>(), {});
|
||||
|
||||
// check with collapse
|
||||
// these 2 collapse because the delta files at 150+200+250+300 are larger than the snapshot at 300
|
||||
checkFiles(files, 100, 301, true, 300, { 350 });
|
||||
checkFiles(files, 100, 300, true, 300, {});
|
||||
// these 2 don't collapse because 150+200 delta files are smaller than the snapshot at 200
|
||||
checkFiles(files, 100, 251, true, Optional<int>(), { 150, 200, 250, 300 });
|
||||
checkFiles(files, 100, 250, true, Optional<int>(), { 150, 200, 250 });
|
||||
|
||||
// these 3 do collapse because the delta files at 200+250+300 are larger than the snapshot at 300
|
||||
checkFiles(files, 151, 300, true, 300, {});
|
||||
checkFiles(files, 151, 301, true, 300, { 350 });
|
||||
checkFiles(files, 151, 400, true, 300, { 350 });
|
||||
|
||||
// these 3 do collapse because the delta files at 250+300 are larger than the snapshot at 300
|
||||
checkFiles(files, 201, 300, true, 300, {});
|
||||
checkFiles(files, 201, 301, true, 300, { 350 });
|
||||
checkFiles(files, 201, 400, true, 300, { 350 });
|
||||
|
||||
// these don't collapse because the delta file at 300 is smaller than the snapshot at 300
|
||||
checkFiles(files, 251, 300, true, Optional<int>(), { 300 });
|
||||
checkFiles(files, 251, 301, true, Optional<int>(), { 300, 350 });
|
||||
checkFiles(files, 251, 400, true, Optional<int>(), { 300, 350 });
|
||||
checkFiles(files, 301, 400, true, Optional<int>(), { 350 });
|
||||
checkFiles(files, 351, 400, true, Optional<int>(), {});
|
||||
|
||||
return Void();
|
||||
}
|
|
@ -54,12 +54,22 @@ struct BlobFileIndex {
|
|||
|
||||
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length)
|
||||
: version(version), filename(filename), offset(offset), length(length) {}
|
||||
|
||||
// compare on version
|
||||
bool operator<(const BlobFileIndex& r) const { return version < r.version; }
|
||||
};
|
||||
|
||||
// FIXME: initialize these to smaller default sizes to save a bit of memory, particularly snapshotFiles
|
||||
// Stores the files that comprise a blob granule
|
||||
struct GranuleFiles {
|
||||
std::deque<BlobFileIndex> snapshotFiles;
|
||||
std::deque<BlobFileIndex> deltaFiles;
|
||||
std::vector<BlobFileIndex> snapshotFiles;
|
||||
std::vector<BlobFileIndex> deltaFiles;
|
||||
|
||||
void getFiles(Version beginVersion,
|
||||
Version readVersion,
|
||||
bool canCollapse,
|
||||
BlobGranuleChunkRef& chunk,
|
||||
Arena& replyArena) const;
|
||||
};
|
||||
|
||||
class Transaction;
|
||||
|
|
|
@ -2778,7 +2778,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
|
|||
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
|
||||
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D)
|
||||
// should be in removed.
|
||||
TEST_CASE(":/blobmanager/updateranges") {
|
||||
TEST_CASE("/blobmanager/updateranges") {
|
||||
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
|
||||
Arena ar;
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <limits>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
@ -43,9 +44,10 @@
|
|||
#include "flow/Error.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include "flow/network.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
#define BW_DEBUG false
|
||||
#define BW_REQUEST_DEBUG false
|
||||
|
||||
|
@ -2100,9 +2102,13 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
req.readVersion);
|
||||
}
|
||||
|
||||
state bool didCollapse = false;
|
||||
try {
|
||||
// TODO REMOVE in api V2
|
||||
ASSERT(req.beginVersion == 0);
|
||||
// TODO remove requirement for canCollapseBegin once we implement early replying
|
||||
ASSERT(req.beginVersion == 0 || req.canCollapseBegin);
|
||||
if (req.beginVersion != 0) {
|
||||
ASSERT(req.beginVersion > 0);
|
||||
}
|
||||
state BlobGranuleFileReply rep;
|
||||
state std::vector<Reference<GranuleMetadata>> granules;
|
||||
|
||||
|
@ -2150,6 +2156,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
continue;
|
||||
}
|
||||
state Reference<GranuleMetadata> metadata = m;
|
||||
state Version granuleBeginVersion = req.beginVersion;
|
||||
|
||||
choose {
|
||||
when(wait(metadata->readable.getFuture())) {}
|
||||
|
@ -2290,67 +2297,25 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
// granule is up to date, do read
|
||||
ASSERT(metadata->cancelled.canBeSet());
|
||||
|
||||
// Right now we force a collapse if the version range crosses granule boundaries, for simplicity
|
||||
if (chunkFiles.snapshotFiles.front().version < granuleBeginVersion) {
|
||||
didCollapse = true;
|
||||
granuleBeginVersion = 0;
|
||||
}
|
||||
BlobGranuleChunkRef chunk;
|
||||
// TODO change in V2
|
||||
// TODO change with early reply
|
||||
chunk.includedVersion = req.readVersion;
|
||||
chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end));
|
||||
|
||||
// handle snapshot files
|
||||
// TODO refactor the "find snapshot file" logic to GranuleFiles?
|
||||
// FIXME: binary search instead of linear search, especially when file count is large
|
||||
int i = chunkFiles.snapshotFiles.size() - 1;
|
||||
while (i >= 0 && chunkFiles.snapshotFiles[i].version > req.readVersion) {
|
||||
i--;
|
||||
}
|
||||
// because of granule history, we should always be able to find the desired snapshot
|
||||
// version, and have thrown blob_granule_transaction_too_old earlier if not possible.
|
||||
if (i < 0) {
|
||||
fmt::print("req @ {0} >= initial snapshot {1} but can't find snapshot in ({2}) files:\n",
|
||||
req.readVersion,
|
||||
metadata->initialSnapshotVersion,
|
||||
chunkFiles.snapshotFiles.size());
|
||||
for (auto& f : chunkFiles.snapshotFiles) {
|
||||
fmt::print(" {0}", f.version);
|
||||
}
|
||||
}
|
||||
ASSERT(i >= 0);
|
||||
|
||||
BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i];
|
||||
chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length);
|
||||
Version snapshotVersion = chunkFiles.snapshotFiles[i].version;
|
||||
chunk.snapshotVersion = snapshotVersion;
|
||||
|
||||
// handle delta files
|
||||
// cast this to an int so i going to -1 still compares properly
|
||||
int lastDeltaFileIdx = chunkFiles.deltaFiles.size() - 1;
|
||||
i = lastDeltaFileIdx;
|
||||
// skip delta files that are too new
|
||||
while (i >= 0 && chunkFiles.deltaFiles[i].version > req.readVersion) {
|
||||
i--;
|
||||
}
|
||||
if (i < lastDeltaFileIdx) {
|
||||
// we skipped one file at the end with a larger read version, this will actually contain
|
||||
// our query version, so add it back.
|
||||
i++;
|
||||
}
|
||||
// only include delta files after the snapshot file
|
||||
int j = i;
|
||||
while (j >= 0 && chunkFiles.deltaFiles[j].version > snapshotVersion) {
|
||||
j--;
|
||||
}
|
||||
j++;
|
||||
while (j <= i) {
|
||||
BlobFileIndex deltaF = chunkFiles.deltaFiles[j];
|
||||
chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length);
|
||||
bwData->stats.readReqDeltaBytesReturned += deltaF.length;
|
||||
j++;
|
||||
chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena);
|
||||
if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) {
|
||||
didCollapse = true;
|
||||
}
|
||||
|
||||
// new deltas (if version is larger than version of last delta file)
|
||||
// FIXME: do trivial key bounds here if key range is not fully contained in request key
|
||||
// range
|
||||
|
||||
if (req.readVersion > metadata->durableDeltaVersion.get()) {
|
||||
if (req.readVersion > metadata->durableDeltaVersion.get() && metadata->currentDeltas.size()) {
|
||||
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) {
|
||||
fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n",
|
||||
metadata->keyRange.begin.printable(),
|
||||
|
@ -2359,13 +2324,31 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
metadata->durableDeltaVersion.get(),
|
||||
metadata->pendingDeltaVersion);
|
||||
}
|
||||
|
||||
// prune mutations based on begin version, if possible
|
||||
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
|
||||
rep.arena.dependsOn(metadata->currentDeltas.arena());
|
||||
for (auto& delta : metadata->currentDeltas) {
|
||||
if (delta.version > req.readVersion) {
|
||||
MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin();
|
||||
if (granuleBeginVersion > metadata->currentDeltas.back().version) {
|
||||
TEST(true); // beginVersion pruning all in-memory mutations
|
||||
mutationIt = metadata->currentDeltas.end();
|
||||
} else if (granuleBeginVersion > metadata->currentDeltas.front().version) {
|
||||
// binary search for beginVersion
|
||||
TEST(true); // beginVersion pruning some in-memory mutations
|
||||
mutationIt = std::lower_bound(metadata->currentDeltas.begin(),
|
||||
metadata->currentDeltas.end(),
|
||||
MutationsAndVersionRef(granuleBeginVersion, 0),
|
||||
MutationsAndVersionRef::OrderByVersion());
|
||||
}
|
||||
|
||||
// add mutations to response
|
||||
while (mutationIt != metadata->currentDeltas.end()) {
|
||||
if (mutationIt->version > req.readVersion) {
|
||||
TEST(true); // readVersion pruning some in-memory mutations
|
||||
break;
|
||||
}
|
||||
chunk.newDeltas.push_back_deep(rep.arena, delta);
|
||||
chunk.newDeltas.push_back_deep(rep.arena, *mutationIt);
|
||||
mutationIt++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2376,11 +2359,19 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
|
|||
|
||||
wait(yield(TaskPriority::DefaultEndpoint));
|
||||
}
|
||||
// do these together to keep them synchronous
|
||||
if (req.beginVersion != 0) {
|
||||
++bwData->stats.readRequestsWithBegin;
|
||||
}
|
||||
if (didCollapse) {
|
||||
++bwData->stats.readRequestsCollapsed;
|
||||
}
|
||||
ASSERT(!req.reply.isSet());
|
||||
req.reply.send(rep);
|
||||
--bwData->stats.activeReadRequests;
|
||||
} catch (Error& e) {
|
||||
// fmt::print("Error in BGFRequest {0}\n", e.name());
|
||||
// TODO REMOVE
|
||||
fmt::print("Error in BGFRequest {0}\n", e.name());
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
throw;
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
[[test]]
|
||||
testTitle = 'BGServerCommonUnit'
|
||||
useDB = false
|
||||
startDelay = 0
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'UnitTests'
|
||||
maxTestCases = 0
|
||||
testsMatching = /blobgranule/server/common/
|
|
@ -50,7 +50,8 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES s3VersionHeaders.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES BigInsert.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt)
|
||||
add_fdb_test(TEST_FILES BGServerCommonUnit.toml)
|
||||
add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt) # TODO change these to toml
|
||||
add_fdb_test(TEST_FILES BlobManagerUnit.txt)
|
||||
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
|
||||
|
|
Loading…
Reference in New Issue