This commit is contained in:
Josh Slocum 2022-03-24 09:53:42 -05:00
parent 989dd8d7eb
commit 3cd1e5599e
10 changed files with 40 additions and 40 deletions

View File

@ -132,13 +132,6 @@ static void applyDeltas(const GranuleDeltas& deltas,
const MutationsAndVersionRef* mutationIt = deltas.begin();
// prune beginVersion if necessary
if (beginVersion > deltas.front().version) {
if (beginVersion > deltas.back().version) {
printf("beginVersion=%lld, deltas.front=%lld, deltas.back=%lld, deltas.size=%d\n",
beginVersion,
deltas.front().version,
deltas.back().version,
deltas.size());
}
ASSERT(beginVersion <= deltas.back().version);
// binary search for beginVersion
mutationIt = std::lower_bound(deltas.begin(),
@ -172,7 +165,7 @@ static Arena loadDeltaFile(StringRef deltaData,
reader.deserialize(FileIdentifierFor<GranuleDeltas>::value, deltas, parseArena);
if (BG_READ_DEBUG) {
fmt::print("Parsed {}} deltas from file\n", deltas.size());
fmt::print("Parsed {} deltas from file\n", deltas.size());
}
// TODO REMOVE sanity check

View File

@ -117,9 +117,11 @@ void GranuleFiles::getFiles(Version beginVersion,
Version readVersion,
bool canCollapse,
BlobGranuleChunkRef& chunk,
Arena& replyArena) const {
Arena& replyArena,
int64_t& deltaBytesCounter) const {
BlobFileIndex dummyIndex; // for searching
// if beginVersion == 0 or we can collapse, find the latest snapshot <= readVersion
auto snapshotF = snapshotFiles.end();
if (beginVersion == 0 || canCollapse) {
dummyIndex.version = readVersion;
@ -138,7 +140,8 @@ void GranuleFiles::getFiles(Version beginVersion,
deltaF = std::lower_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
if (canCollapse) {
ASSERT(snapshotF != snapshotFiles.end());
// see if delta files up to snapshotVersion are smaller or larger than snapshotBytes in total
// If we can collapse, see if delta files up to snapshotVersion are smaller or larger than snapshotBytes in
// total
auto deltaFCopy = deltaF;
int64_t snapshotBytes = snapshotF->length;
while (deltaFCopy != deltaFiles.end() && deltaFCopy->version <= snapshotF->version && snapshotBytes > 0) {
@ -171,10 +174,9 @@ void GranuleFiles::getFiles(Version beginVersion,
chunk.snapshotVersion = invalidVersion;
}
int64_t deltaBytes = 0;
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
deltaBytes += deltaF->length;
deltaBytesCounter += deltaF->length;
ASSERT(lastIncluded < deltaF->version);
lastIncluded = deltaF->version;
deltaF++;
@ -182,12 +184,9 @@ void GranuleFiles::getFiles(Version beginVersion,
// include last delta file that passes readVersion, if it exists
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
deltaBytes += deltaF->length;
deltaBytesCounter += deltaF->length;
lastIncluded = deltaF->version;
}
// TODO wire this up,
// bwData->stats.readReqDeltaBytesReturned += deltaBytes;
}
static std::string makeTestFileName(Version v) {
@ -210,8 +209,9 @@ static void checkFiles(const GranuleFiles& f,
std::vector<int> expectedDeltaVersions) {
Arena a;
BlobGranuleChunkRef chunk;
f.getFiles(beginVersion, readVersion, canCollapse, chunk, a);
fmt::print("results({0}, {1}, {2}):\nEXPECTED: snapshot={3}\n deltas ({4}):\n",
int64_t deltaBytes = 0;
f.getFiles(beginVersion, readVersion, canCollapse, chunk, a, deltaBytes);
fmt::print("results({0}, {1}, {2}):\nEXPECTED:\n snapshot={3}\n deltas ({4}):\n",
beginVersion,
readVersion,
canCollapse ? "T" : "F",

View File

@ -69,7 +69,8 @@ struct GranuleFiles {
Version readVersion,
bool canCollapse,
BlobGranuleChunkRef& chunk,
Arena& replyArena) const;
Arena& replyArena,
int64_t& deltaBytesCounter) const;
};
class Transaction;

View File

@ -2313,7 +2313,10 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
chunk.includedVersion = req.readVersion;
chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end));
chunkFiles.getFiles(granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena);
int64_t deltaBytes = 0;
chunkFiles.getFiles(
granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena, deltaBytes);
bwData->stats.readReqDeltaBytesReturned += deltaBytes;
if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) {
TEST(true); // collapsed begin version request for efficiency
didCollapse = true;
@ -2378,8 +2381,6 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
req.reply.send(rep);
--bwData->stats.activeReadRequests;
} catch (Error& e) {
// TODO REMOVE
fmt::print("Error in BGFRequest {0}\n", e.name());
if (e.code() == error_code_operation_cancelled) {
req.reply.sendError(wrong_shard_server());
throw;

View File

@ -1,5 +1,5 @@
[[test]]
testTitle = 'BGServerCommonUnit'
testTitle = 'BlobGranuleServerCommonUnit'
useDB = false
startDelay = 0

View File

@ -0,0 +1,10 @@
[[test]]
testTitle = 'BlobGranuleFileUnit'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 0
testsMatching = /blobgranule/files/

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobgranule/

View File

@ -0,0 +1,9 @@
[[test]]
testTitle = 'BlobManagerUnit'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 0
testsMatching = /blobmanager/

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobmanager/

View File

@ -51,8 +51,8 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE)
add_fdb_test(TEST_FILES BigInsert.txt IGNORE)
add_fdb_test(TEST_FILES BGServerCommonUnit.toml)
add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt) # TODO change these to toml
add_fdb_test(TEST_FILES BlobManagerUnit.txt)
add_fdb_test(TEST_FILES BlobGranuleFileUnit.toml)
add_fdb_test(TEST_FILES BlobManagerUnit.toml)
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)