Merge branch 'main' into s3_sdk_credentials

This commit is contained in:
Josh Slocum 2022-03-24 18:30:47 -05:00
commit b41bb33d15
21 changed files with 508 additions and 140 deletions

View File

@ -119,29 +119,43 @@ static void applyDelta(KeyRangeRef keyRange, MutationRef m, std::map<KeyRef, Val
static void applyDeltas(const GranuleDeltas& deltas,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Version& lastFileEndVersion,
std::map<KeyRef, ValueRef>& dataMap) {
if (!deltas.empty()) {
// check that consecutive delta file versions are disjoint
ASSERT(lastFileEndVersion < deltas.front().version);
if (deltas.empty()) {
return;
}
for (const MutationsAndVersionRef& delta : deltas) {
if (delta.version > readVersion) {
// check that consecutive delta file versions are disjoint
ASSERT(lastFileEndVersion < deltas.front().version);
const MutationsAndVersionRef* mutationIt = deltas.begin();
// prune beginVersion if necessary
if (beginVersion > deltas.front().version) {
ASSERT(beginVersion <= deltas.back().version);
// binary search for beginVersion
mutationIt = std::lower_bound(deltas.begin(),
deltas.end(),
MutationsAndVersionRef(beginVersion, 0),
MutationsAndVersionRef::OrderByVersion());
}
while (mutationIt != deltas.end()) {
if (mutationIt->version > readVersion) {
lastFileEndVersion = readVersion;
return;
}
for (auto& m : delta.mutations) {
for (auto& m : mutationIt->mutations) {
applyDelta(keyRange, m, dataMap);
}
mutationIt++;
}
if (!deltas.empty()) {
lastFileEndVersion = deltas.back().version;
}
lastFileEndVersion = deltas.back().version;
}
static Arena loadDeltaFile(StringRef deltaData,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Version& lastFileEndVersion,
std::map<KeyRef, ValueRef>& dataMap) {
@ -151,7 +165,7 @@ static Arena loadDeltaFile(StringRef deltaData,
reader.deserialize(FileIdentifierFor<GranuleDeltas>::value, deltas, parseArena);
if (BG_READ_DEBUG) {
fmt::print("Parsed {}} deltas from file\n", deltas.size());
fmt::print("Parsed {} deltas from file\n", deltas.size());
}
// TODO REMOVE sanity check
@ -163,19 +177,18 @@ static Arena loadDeltaFile(StringRef deltaData,
ASSERT(deltas[i].version <= deltas[i + 1].version);
}
applyDeltas(deltas, keyRange, readVersion, lastFileEndVersion, dataMap);
applyDeltas(deltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
return parseArena;
}
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Optional<StringRef> snapshotData,
StringRef deltaFileData[]) {
// TODO REMOVE with V2 of protocol
// TODO REMOVE with early replying
ASSERT(readVersion == chunk.includedVersion);
ASSERT(chunk.snapshotFile.present());
ASSERT(snapshotData.present());
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
// will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a
@ -195,13 +208,14 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
}
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
Arena deltaArena = loadDeltaFile(deltaFileData[deltaIdx], keyRange, readVersion, lastFileEndVersion, dataMap);
Arena deltaArena =
loadDeltaFile(deltaFileData[deltaIdx], keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
arena.dependsOn(deltaArena);
}
if (BG_READ_DEBUG) {
fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size());
}
applyDeltas(chunk.newDeltas, keyRange, readVersion, lastFileEndVersion, dataMap);
applyDeltas(chunk.newDeltas, keyRange, beginVersion, readVersion, lastFileEndVersion, dataMap);
RangeResult ret;
for (auto& it : dataMap) {
@ -262,7 +276,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
// materialize rows from chunk
chunkRows = materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
chunkRows = materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData);
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
@ -278,8 +292,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
}
// FIXME: re-enable test!
TEST_CASE(":/blobgranule/files/applyDelta") {
TEST_CASE("/blobgranule/files/applyDelta") {
printf("Testing blob granule delta applying\n");
Arena a;

View File

@ -33,6 +33,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Optional<StringRef> snapshotData,
StringRef deltaFileData[]);

View File

@ -28,6 +28,7 @@
#include "fdbclient/BlobGranuleReader.actor.h"
#include "fdbclient/BlobWorkerCommon.h"
#include "fdbclient/BlobWorkerInterface.h"
#include "fdbclient/FDBTypes.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// TODO more efficient data structure besides std::map? PTree is unnecessary since this isn't versioned, but some other
@ -52,7 +53,6 @@ ACTOR Future<Standalone<StringRef>> readFile(Reference<BackupContainerFileSystem
StringRef dataRef(data, f.length);
return Standalone<StringRef>(dataRef, arena);
} catch (Error& e) {
printf("Reading file %s got error %s\n", f.toString().c_str(), e.name());
throw e;
}
}
@ -64,22 +64,25 @@ ACTOR Future<Standalone<StringRef>> readFile(Reference<BackupContainerFileSystem
// sub-functions that BlobGranuleFiles actually exposes?
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Reference<BackupContainerFileSystem> bstore,
Optional<BlobWorkerStats*> stats) {
// TODO REMOVE with V2 of protocol
// TODO REMOVE with early replying
ASSERT(readVersion == chunk.includedVersion);
ASSERT(chunk.snapshotFile.present());
state Arena arena;
try {
Future<Standalone<StringRef>> readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get());
state std::vector<Future<Standalone<StringRef>>> readDeltaFutures;
if (stats.present()) {
++stats.get()->s3GetReqs;
Future<Standalone<StringRef>> readSnapshotFuture;
if (chunk.snapshotFile.present()) {
readSnapshotFuture = readFile(bstore, chunk.snapshotFile.get());
if (stats.present()) {
++stats.get()->s3GetReqs;
}
}
state std::vector<Future<Standalone<StringRef>>> readDeltaFutures;
readDeltaFutures.reserve(chunk.deltaFiles.size());
for (BlobFilePointerRef deltaFile : chunk.deltaFiles) {
@ -89,8 +92,12 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
}
}
state Standalone<StringRef> snapshotData = wait(readSnapshotFuture);
arena.dependsOn(snapshotData.arena());
state Optional<StringRef> snapshotData; // not present if snapshotFile isn't present
if (chunk.snapshotFile.present()) {
state Standalone<StringRef> s = wait(readSnapshotFuture);
arena.dependsOn(s.arena());
snapshotData = s;
}
state int numDeltaFiles = chunk.deltaFiles.size();
state StringRef* deltaData = new (arena) StringRef[numDeltaFiles];
@ -103,10 +110,9 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
arena.dependsOn(data.arena());
}
return materializeBlobGranule(chunk, keyRange, readVersion, snapshotData, deltaData);
return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData);
} catch (Error& e) {
printf("Reading blob granule got error %s\n", e.name());
throw e;
}
}
@ -121,18 +127,12 @@ ACTOR Future<Void> readBlobGranules(BlobGranuleFileRequest request,
try {
state int i;
for (i = 0; i < reply.chunks.size(); i++) {
/*printf("ReadBlobGranules processing chunk %d [%s - %s)\n",
i,
reply.chunks[i].keyRange.begin.printable().c_str(),
reply.chunks[i].keyRange.end.printable().c_str());*/
RangeResult chunkResult =
wait(readBlobGranule(reply.chunks[i], request.keyRange, request.readVersion, bstore));
RangeResult chunkResult = wait(
readBlobGranule(reply.chunks[i], request.keyRange, request.beginVersion, request.readVersion, bstore));
results.send(std::move(chunkResult));
}
// printf("ReadBlobGranules done, sending EOS\n");
results.sendError(end_of_stream());
} catch (Error& e) {
printf("ReadBlobGranules got error %s\n", e.name());
results.sendError(e);
}

View File

@ -40,6 +40,7 @@
// the request
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version beginVersion,
Version readVersion,
Reference<BackupContainerFileSystem> bstore,
Optional<BlobWorkerStats*> stats = Optional<BlobWorkerStats*>());

View File

@ -38,6 +38,8 @@ struct BlobWorkerStats {
Counter commitVersionChecks;
Counter granuleUpdateErrors;
Counter granuleRequestTimeouts;
Counter readRequestsWithBegin;
Counter readRequestsCollapsed;
int numRangesAssigned;
int mutationBytesBuffered;
@ -59,6 +61,7 @@ struct BlobWorkerStats {
readReqTotalFilesReturned("ReadReqTotalFilesReturned", cc),
readReqDeltaBytesReturned("ReadReqDeltaBytesReturned", cc), commitVersionChecks("CommitVersionChecks", cc),
granuleUpdateErrors("GranuleUpdateErrors", cc), granuleRequestTimeouts("GranuleRequestTimeouts", cc),
readRequestsWithBegin("ReadRequestsWithBegin", cc), readRequestsCollapsed("ReadRequestsCollapsed", cc),
numRangesAssigned(0), mutationBytesBuffered(0), activeReadRequests(0) {
specialCounter(cc, "NumRangesAssigned", [this]() { return this->numRangesAssigned; });
specialCounter(cc, "MutationBytesBuffered", [this]() { return this->mutationBytesBuffered; });

View File

@ -86,13 +86,14 @@ struct BlobGranuleFileRequest {
KeyRangeRef keyRange;
Version beginVersion = 0;
Version readVersion;
bool canCollapseBegin = true;
ReplyPromise<BlobGranuleFileReply> reply;
BlobGranuleFileRequest() {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, beginVersion, readVersion, reply, arena);
serializer(ar, keyRange, beginVersion, readVersion, canCollapseBegin, reply, arena);
}
};

View File

@ -7442,6 +7442,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
req.keyRange = KeyRangeRef(StringRef(req.arena, granuleStartKey), StringRef(req.arena, granuleEndKey));
req.beginVersion = begin;
req.readVersion = rv;
req.canCollapseBegin = true; // TODO make this a parameter once we support it
std::vector<Reference<ReferencedInterface<BlobWorkerInterface>>> v;
v.push_back(

View File

@ -1791,8 +1791,6 @@ Future<Standalone<VectorRef<BlobGranuleChunkRef>>> ReadYourWritesTransaction::re
Version begin,
Optional<Version> readVersion,
Version* readVersionOut) {
// Remove in V2 of API
ASSERT(begin == 0);
if (!options.readYourWritesDisabled) {
return blob_granule_no_ryw();

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbserver/BlobGranuleServerCommon.actor.h"
@ -25,6 +26,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/Arena.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // has to be last include
// Gets the latest granule history node for range that was persisted
@ -102,3 +104,252 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Database cx, UID granuleID) {
}
}
}
// Normally a beginVersion != 0 means the caller wants all mutations between beginVersion and readVersion, instead of
// the latest snapshot before readVersion + deltas after the snapshot. When canCollapse is set, the beginVersion is
// essentially just an optimization hint. The caller is still concerned with reconstructing rows at readVersion, it just
// knows it doesn't need anything before beginVersion.
// Normally this can eliminate the need for a snapshot and just return a small amount of deltas. But in a highly active
// key range, the granule may have a snapshot file at version X, where beginVersion < X <= readVersion. In this case, if
// the number of bytes in delta files between beginVersion and X is larger than the snapshot file at version X, it is
// strictly more efficient (in terms of files and bytes read) to just use the snapshot file at version X instead.
void GranuleFiles::getFiles(Version beginVersion,
Version readVersion,
bool canCollapse,
BlobGranuleChunkRef& chunk,
Arena& replyArena,
int64_t& deltaBytesCounter) const {
BlobFileIndex dummyIndex; // for searching
// if beginVersion == 0 or we can collapse, find the latest snapshot <= readVersion
auto snapshotF = snapshotFiles.end();
if (beginVersion == 0 || canCollapse) {
dummyIndex.version = readVersion;
snapshotF = std::lower_bound(snapshotFiles.begin(), snapshotFiles.end(), dummyIndex);
if (snapshotF == snapshotFiles.end() || snapshotF->version > readVersion) {
ASSERT(snapshotF != snapshotFiles.begin());
snapshotF--;
}
ASSERT(snapshotF != snapshotFiles.end());
ASSERT(snapshotF->version <= readVersion);
}
auto deltaF = deltaFiles.end();
if (beginVersion > 0) {
dummyIndex.version = beginVersion;
deltaF = std::lower_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
if (canCollapse) {
ASSERT(snapshotF != snapshotFiles.end());
// If we can collapse, see if delta files up to snapshotVersion are smaller or larger than snapshotBytes in
// total
auto deltaFCopy = deltaF;
int64_t snapshotBytes = snapshotF->length;
while (deltaFCopy != deltaFiles.end() && deltaFCopy->version <= snapshotF->version && snapshotBytes > 0) {
snapshotBytes -= deltaFCopy->length;
deltaFCopy++;
}
// if delta files contain the same or more bytes as the snapshot with collapse, do the collapse
if (snapshotBytes > 0) {
// don't collapse, clear snapshotF and just do delta files
snapshotF = snapshotFiles.end();
} else {
// do snapshot instead of previous deltas
dummyIndex.version = snapshotF->version;
deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version);
}
}
} else {
dummyIndex.version = snapshotF->version;
deltaF = std::upper_bound(deltaFiles.begin(), deltaFiles.end(), dummyIndex);
ASSERT(deltaF == deltaFiles.end() || deltaF->version > snapshotF->version);
}
Version lastIncluded = invalidVersion;
if (snapshotF != snapshotFiles.end()) {
chunk.snapshotVersion = snapshotF->version;
chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length);
lastIncluded = chunk.snapshotVersion;
} else {
chunk.snapshotVersion = invalidVersion;
}
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
deltaBytesCounter += deltaF->length;
ASSERT(lastIncluded < deltaF->version);
lastIncluded = deltaF->version;
deltaF++;
}
// include last delta file that passes readVersion, if it exists
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
deltaBytesCounter += deltaF->length;
lastIncluded = deltaF->version;
}
}
static std::string makeTestFileName(Version v) {
return "test" + std::to_string(v);
}
static BlobFileIndex makeTestFile(Version v, int64_t len) {
return BlobFileIndex(v, makeTestFileName(v), 0, len);
}
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {
ASSERT(makeTestFileName(expectedVersion) == actualFile.filename.toString());
}
static void checkFiles(const GranuleFiles& f,
Version beginVersion,
Version readVersion,
bool canCollapse,
Optional<int> expectedSnapshotVersion,
std::vector<int> expectedDeltaVersions) {
Arena a;
BlobGranuleChunkRef chunk;
int64_t deltaBytes = 0;
f.getFiles(beginVersion, readVersion, canCollapse, chunk, a, deltaBytes);
fmt::print("results({0}, {1}, {2}):\nEXPECTED:\n snapshot={3}\n deltas ({4}):\n",
beginVersion,
readVersion,
canCollapse ? "T" : "F",
expectedSnapshotVersion.present() ? makeTestFileName(expectedSnapshotVersion.get()).c_str() : "<N/A>",
expectedDeltaVersions.size());
for (int d : expectedDeltaVersions) {
fmt::print(" {}\n", makeTestFileName(d));
}
fmt::print("ACTUAL:\n snapshot={0}\n deltas ({1}):\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().filename.toString().c_str() : "<N/A>",
chunk.deltaFiles.size());
for (auto& it : chunk.deltaFiles) {
fmt::print(" {}\n", it.filename.toString());
}
printf("\n\n\n");
ASSERT(expectedSnapshotVersion.present() == chunk.snapshotFile.present());
if (expectedSnapshotVersion.present()) {
checkFile(expectedSnapshotVersion.get(), chunk.snapshotFile.get());
}
ASSERT(expectedDeltaVersions.size() == chunk.deltaFiles.size());
for (int i = 0; i < expectedDeltaVersions.size(); i++) {
checkFile(expectedDeltaVersions[i], chunk.deltaFiles[i]);
}
}
/*
* Files:
* S @ 100 (10 bytes)
* D @ 150 (5 bytes)
* D @ 200 (6 bytes)
* S @ 200 (15 bytes)
* D @ 250 (7 bytes)
* D @ 300 (8 bytes)
* S @ 300 (10 bytes)
* D @ 350 (4 bytes)
*/
TEST_CASE("/blobgranule/server/common/granulefiles") {
// simple cases first
// single snapshot file, no deltas
GranuleFiles files;
files.snapshotFiles.push_back(makeTestFile(100, 10));
printf("Just snapshot\n");
checkFiles(files, 0, 100, false, 100, {});
checkFiles(files, 0, 200, false, 100, {});
printf("Small test\n");
// add delta files with re-snapshot at end
files.deltaFiles.push_back(makeTestFile(150, 5));
files.deltaFiles.push_back(makeTestFile(200, 6));
files.snapshotFiles.push_back(makeTestFile(200, 15));
// check different read versions with beginVersion=0
checkFiles(files, 0, 100, false, 100, {});
checkFiles(files, 0, 101, false, 100, { 150 });
checkFiles(files, 0, 149, false, 100, { 150 });
checkFiles(files, 0, 150, false, 100, { 150 });
checkFiles(files, 0, 151, false, 100, { 150, 200 });
checkFiles(files, 0, 199, false, 100, { 150, 200 });
checkFiles(files, 0, 200, false, 200, {});
checkFiles(files, 0, 300, false, 200, {});
// Test all cases of beginVersion + readVersion. Because delta files are smaller than snapshot at 200, this should
// be the same with and without collapse
checkFiles(files, 100, 200, false, Optional<int>(), { 150, 200 });
checkFiles(files, 100, 300, false, Optional<int>(), { 150, 200 });
checkFiles(files, 101, 199, false, Optional<int>(), { 150, 200 });
checkFiles(files, 149, 151, false, Optional<int>(), { 150, 200 });
checkFiles(files, 149, 150, false, Optional<int>(), { 150 });
checkFiles(files, 150, 151, false, Optional<int>(), { 150, 200 });
checkFiles(files, 151, 200, false, Optional<int>(), { 200 });
checkFiles(files, 100, 200, true, Optional<int>(), { 150, 200 });
checkFiles(files, 100, 300, true, Optional<int>(), { 150, 200 });
checkFiles(files, 101, 199, true, Optional<int>(), { 150, 200 });
checkFiles(files, 149, 151, true, Optional<int>(), { 150, 200 });
checkFiles(files, 149, 150, true, Optional<int>(), { 150 });
checkFiles(files, 150, 151, true, Optional<int>(), { 150, 200 });
checkFiles(files, 151, 200, true, Optional<int>(), { 200 });
printf("Larger test\n");
// add more delta files and snapshots to check collapse logic
files.deltaFiles.push_back(makeTestFile(250, 7));
files.deltaFiles.push_back(makeTestFile(300, 8));
files.snapshotFiles.push_back(makeTestFile(300, 10));
files.deltaFiles.push_back(makeTestFile(350, 4));
checkFiles(files, 0, 300, false, 300, {});
checkFiles(files, 0, 301, false, 300, { 350 });
checkFiles(files, 0, 400, false, 300, { 350 });
// check delta files without collapse
checkFiles(files, 100, 301, false, Optional<int>(), { 150, 200, 250, 300, 350 });
checkFiles(files, 100, 300, false, Optional<int>(), { 150, 200, 250, 300 });
checkFiles(files, 100, 251, false, Optional<int>(), { 150, 200, 250, 300 });
checkFiles(files, 100, 250, false, Optional<int>(), { 150, 200, 250 });
checkFiles(files, 151, 300, false, Optional<int>(), { 200, 250, 300 });
checkFiles(files, 151, 301, false, Optional<int>(), { 200, 250, 300, 350 });
checkFiles(files, 151, 400, false, Optional<int>(), { 200, 250, 300, 350 });
checkFiles(files, 201, 300, false, Optional<int>(), { 250, 300 });
checkFiles(files, 201, 301, false, Optional<int>(), { 250, 300, 350 });
checkFiles(files, 201, 400, false, Optional<int>(), { 250, 300, 350 });
checkFiles(files, 251, 300, false, Optional<int>(), { 300 });
checkFiles(files, 251, 301, false, Optional<int>(), { 300, 350 });
checkFiles(files, 251, 400, false, Optional<int>(), { 300, 350 });
checkFiles(files, 301, 400, false, Optional<int>(), { 350 });
checkFiles(files, 351, 400, false, Optional<int>(), {});
// check with collapse
// these 2 collapse because the delta files at 150+200+250+300 are larger than the snapshot at 300
checkFiles(files, 100, 301, true, 300, { 350 });
checkFiles(files, 100, 300, true, 300, {});
// these 2 don't collapse because 150+200 delta files are smaller than the snapshot at 200
checkFiles(files, 100, 251, true, Optional<int>(), { 150, 200, 250, 300 });
checkFiles(files, 100, 250, true, Optional<int>(), { 150, 200, 250 });
// these 3 do collapse because the delta files at 200+250+300 are larger than the snapshot at 300
checkFiles(files, 151, 300, true, 300, {});
checkFiles(files, 151, 301, true, 300, { 350 });
checkFiles(files, 151, 400, true, 300, { 350 });
// these 3 do collapse because the delta files at 250+300 are larger than the snapshot at 300
checkFiles(files, 201, 300, true, 300, {});
checkFiles(files, 201, 301, true, 300, { 350 });
checkFiles(files, 201, 400, true, 300, { 350 });
// these don't collapse because the delta file at 300 is smaller than the snapshot at 300
checkFiles(files, 251, 300, true, Optional<int>(), { 300 });
checkFiles(files, 251, 301, true, Optional<int>(), { 300, 350 });
checkFiles(files, 251, 400, true, Optional<int>(), { 300, 350 });
checkFiles(files, 301, 400, true, Optional<int>(), { 350 });
checkFiles(files, 351, 400, true, Optional<int>(), {});
return Void();
}

View File

@ -54,12 +54,23 @@ struct BlobFileIndex {
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length)
: version(version), filename(filename), offset(offset), length(length) {}
// compare on version
bool operator<(const BlobFileIndex& r) const { return version < r.version; }
};
// FIXME: initialize these to smaller default sizes to save a bit of memory, particularly snapshotFiles
// Stores the files that comprise a blob granule
struct GranuleFiles {
std::deque<BlobFileIndex> snapshotFiles;
std::deque<BlobFileIndex> deltaFiles;
std::vector<BlobFileIndex> snapshotFiles;
std::vector<BlobFileIndex> deltaFiles;
void getFiles(Version beginVersion,
Version readVersion,
bool canCollapse,
BlobGranuleChunkRef& chunk,
Arena& replyArena,
int64_t& deltaBytesCounter) const;
};
class Transaction;

View File

@ -2778,7 +2778,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
// DB has [A - B) and [C - D). They should show up in knownBlobRanges, and [B - C) should be in removed.
// DB has [B - C). It should show up in knownBlobRanges, [B - C) should be in added, and [A - B) and [C - D)
// should be in removed.
TEST_CASE(":/blobmanager/updateranges") {
TEST_CASE("/blobmanager/updateranges") {
KeyRangeMap<bool> knownBlobRanges(false, normalKeys.end);
Arena ar;

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <limits>
#include <tuple>
#include <utility>
#include <vector>
@ -43,9 +44,10 @@
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // has to be last include
#include "flow/network.h"
#include "flow/actorcompiler.h" // has to be last include
#define BW_DEBUG false
#define BW_REQUEST_DEBUG false
@ -832,7 +834,7 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
rowsStream,
false);
RangeResult newGranule =
wait(readBlobGranule(chunk, metadata->keyRange, version, bwData->bstore, &bwData->stats));
wait(readBlobGranule(chunk, metadata->keyRange, 0, version, bwData->bstore, &bwData->stats));
bwData->stats.bytesReadFromS3ForCompaction += compactBytesRead;
rowsStream.send(std::move(newGranule));
@ -2093,16 +2095,25 @@ ACTOR Future<Void> waitForVersion(Reference<GranuleMetadata> metadata, Version v
ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, BlobGranuleFileRequest req) {
if (BW_REQUEST_DEBUG) {
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ {3}\n",
fmt::print("BW {0} processing blobGranuleFileRequest for range [{1} - {2}) @ ",
bwData->id.toString(),
req.keyRange.begin.printable(),
req.keyRange.end.printable(),
req.readVersion);
if (req.beginVersion > 0) {
fmt::print("{0} - {1}\n", req.beginVersion, req.readVersion);
} else {
fmt::print("{}", req.readVersion);
}
}
state bool didCollapse = false;
try {
// TODO REMOVE in api V2
ASSERT(req.beginVersion == 0);
// TODO remove requirement for canCollapseBegin once we implement early replying
ASSERT(req.beginVersion == 0 || req.canCollapseBegin);
if (req.beginVersion != 0) {
ASSERT(req.beginVersion > 0);
}
state BlobGranuleFileReply rep;
state std::vector<Reference<GranuleMetadata>> granules;
@ -2150,6 +2161,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
continue;
}
state Reference<GranuleMetadata> metadata = m;
state Version granuleBeginVersion = req.beginVersion;
choose {
when(wait(metadata->readable.getFuture())) {}
@ -2290,67 +2302,30 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
// granule is up to date, do read
ASSERT(metadata->cancelled.canBeSet());
// Right now we force a collapse if the version range crosses granule boundaries, for simplicity
if (granuleBeginVersion <= chunkFiles.snapshotFiles.front().version) {
TEST(true); // collapsed begin version request because of boundaries
didCollapse = true;
granuleBeginVersion = 0;
}
BlobGranuleChunkRef chunk;
// TODO change in V2
// TODO change with early reply
chunk.includedVersion = req.readVersion;
chunk.keyRange = KeyRangeRef(StringRef(rep.arena, chunkRange.begin), StringRef(rep.arena, chunkRange.end));
// handle snapshot files
// TODO refactor the "find snapshot file" logic to GranuleFiles?
// FIXME: binary search instead of linear search, especially when file count is large
int i = chunkFiles.snapshotFiles.size() - 1;
while (i >= 0 && chunkFiles.snapshotFiles[i].version > req.readVersion) {
i--;
}
// because of granule history, we should always be able to find the desired snapshot
// version, and have thrown blob_granule_transaction_too_old earlier if not possible.
if (i < 0) {
fmt::print("req @ {0} >= initial snapshot {1} but can't find snapshot in ({2}) files:\n",
req.readVersion,
metadata->initialSnapshotVersion,
chunkFiles.snapshotFiles.size());
for (auto& f : chunkFiles.snapshotFiles) {
fmt::print(" {0}", f.version);
}
}
ASSERT(i >= 0);
BlobFileIndex snapshotF = chunkFiles.snapshotFiles[i];
chunk.snapshotFile = BlobFilePointerRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length);
Version snapshotVersion = chunkFiles.snapshotFiles[i].version;
chunk.snapshotVersion = snapshotVersion;
// handle delta files
// cast this to an int so i going to -1 still compares properly
int lastDeltaFileIdx = chunkFiles.deltaFiles.size() - 1;
i = lastDeltaFileIdx;
// skip delta files that are too new
while (i >= 0 && chunkFiles.deltaFiles[i].version > req.readVersion) {
i--;
}
if (i < lastDeltaFileIdx) {
// we skipped one file at the end with a larger read version, this will actually contain
// our query version, so add it back.
i++;
}
// only include delta files after the snapshot file
int j = i;
while (j >= 0 && chunkFiles.deltaFiles[j].version > snapshotVersion) {
j--;
}
j++;
while (j <= i) {
BlobFileIndex deltaF = chunkFiles.deltaFiles[j];
chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length);
bwData->stats.readReqDeltaBytesReturned += deltaF.length;
j++;
int64_t deltaBytes = 0;
chunkFiles.getFiles(
granuleBeginVersion, req.readVersion, req.canCollapseBegin, chunk, rep.arena, deltaBytes);
bwData->stats.readReqDeltaBytesReturned += deltaBytes;
if (granuleBeginVersion > 0 && chunk.snapshotFile.present()) {
TEST(true); // collapsed begin version request for efficiency
didCollapse = true;
}
// new deltas (if version is larger than version of last delta file)
// FIXME: do trivial key bounds here if key range is not fully contained in request key
// range
if (req.readVersion > metadata->durableDeltaVersion.get()) {
if (req.readVersion > metadata->durableDeltaVersion.get() && !metadata->currentDeltas.empty()) {
if (metadata->durableDeltaVersion.get() != metadata->pendingDeltaVersion) {
fmt::print("real-time read [{0} - {1}) @ {2} doesn't have mutations!! durable={3}, pending={4}\n",
metadata->keyRange.begin.printable(),
@ -2359,13 +2334,32 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
metadata->durableDeltaVersion.get(),
metadata->pendingDeltaVersion);
}
// prune mutations based on begin version, if possible
ASSERT(metadata->durableDeltaVersion.get() == metadata->pendingDeltaVersion);
// FIXME: I think we can remove this dependsOn since we are doing push_back_deep
rep.arena.dependsOn(metadata->currentDeltas.arena());
for (auto& delta : metadata->currentDeltas) {
if (delta.version > req.readVersion) {
MutationsAndVersionRef* mutationIt = metadata->currentDeltas.begin();
if (granuleBeginVersion > metadata->currentDeltas.back().version) {
TEST(true); // beginVersion pruning all in-memory mutations
mutationIt = metadata->currentDeltas.end();
} else if (granuleBeginVersion > metadata->currentDeltas.front().version) {
// binary search for beginVersion
TEST(true); // beginVersion pruning some in-memory mutations
mutationIt = std::lower_bound(metadata->currentDeltas.begin(),
metadata->currentDeltas.end(),
MutationsAndVersionRef(granuleBeginVersion, 0),
MutationsAndVersionRef::OrderByVersion());
}
// add mutations to response
while (mutationIt != metadata->currentDeltas.end()) {
if (mutationIt->version > req.readVersion) {
TEST(true); // readVersion pruning some in-memory mutations
break;
}
chunk.newDeltas.push_back_deep(rep.arena, delta);
chunk.newDeltas.push_back_deep(rep.arena, *mutationIt);
mutationIt++;
}
}
@ -2376,11 +2370,17 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
wait(yield(TaskPriority::DefaultEndpoint));
}
// do these together to keep them synchronous
if (req.beginVersion != 0) {
++bwData->stats.readRequestsWithBegin;
}
if (didCollapse) {
++bwData->stats.readRequestsCollapsed;
}
ASSERT(!req.reply.isSet());
req.reply.send(rep);
--bwData->stats.activeReadRequests;
} catch (Error& e) {
// fmt::print("Error in BGFRequest {0}\n", e.name());
if (e.code() == error_code_operation_cancelled) {
req.reply.sendError(wrong_shard_server());
throw;

View File

@ -50,11 +50,12 @@ struct RelocateData {
std::vector<UID> completeSources;
std::vector<UID> completeDests;
bool wantsNewServers;
bool cancellable;
TraceInterval interval;
RelocateData()
: priority(-1), boundaryPriority(-1), healthPriority(-1), startTime(-1), workFactor(0), wantsNewServers(false),
interval("QueuedRelocation") {}
cancellable(false), interval("QueuedRelocation") {}
explicit RelocateData(RelocateShard const& rs)
: keys(rs.keys), priority(rs.priority), boundaryPriority(isBoundaryPriority(rs.priority) ? rs.priority : -1),
healthPriority(isHealthPriority(rs.priority) ? rs.priority : -1), startTime(now()),
@ -63,7 +64,7 @@ struct RelocateData {
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT),
interval("QueuedRelocation") {}
cancellable(true), interval("QueuedRelocation") {}
static bool isHealthPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_POPULATE_REGION ||
@ -610,19 +611,23 @@ struct DDQueueData {
.detail(
"Problem",
"the key range in the inFlight map matches the key range in the RelocateData message");
} else if (it->value().cancellable) {
TraceEvent(SevError, "DDQueueValidateError13")
.detail("Problem", "key range is cancellable but not in flight!")
.detail("Range", it->range());
}
}
for (auto it = busymap.begin(); it != busymap.end(); ++it) {
for (int i = 0; i < it->second.ledger.size() - 1; i++) {
if (it->second.ledger[i] < it->second.ledger[i + 1])
TraceEvent(SevError, "DDQueueValidateError13")
TraceEvent(SevError, "DDQueueValidateError14")
.detail("Problem", "ascending ledger problem")
.detail("LedgerLevel", i)
.detail("LedgerValueA", it->second.ledger[i])
.detail("LedgerValueB", it->second.ledger[i + 1]);
if (it->second.ledger[i] < 0.0)
TraceEvent(SevError, "DDQueueValidateError14")
TraceEvent(SevError, "DDQueueValidateError15")
.detail("Problem", "negative ascending problem")
.detail("LedgerLevel", i)
.detail("LedgerValue", it->second.ledger[i]);
@ -632,13 +637,13 @@ struct DDQueueData {
for (auto it = destBusymap.begin(); it != destBusymap.end(); ++it) {
for (int i = 0; i < it->second.ledger.size() - 1; i++) {
if (it->second.ledger[i] < it->second.ledger[i + 1])
TraceEvent(SevError, "DDQueueValidateError15")
TraceEvent(SevError, "DDQueueValidateError16")
.detail("Problem", "ascending ledger problem")
.detail("LedgerLevel", i)
.detail("LedgerValueA", it->second.ledger[i])
.detail("LedgerValueB", it->second.ledger[i + 1]);
if (it->second.ledger[i] < 0.0)
TraceEvent(SevError, "DDQueueValidateError16")
TraceEvent(SevError, "DDQueueValidateError17")
.detail("Problem", "negative ascending problem")
.detail("LedgerLevel", i)
.detail("LedgerValue", it->second.ledger[i]);
@ -954,7 +959,7 @@ struct DDQueueData {
auto containedRanges = inFlight.containedRanges(rd.keys);
std::vector<RelocateData> cancellableRelocations;
for (auto it = containedRanges.begin(); it != containedRanges.end(); ++it) {
if (inFlightActors.liveActorAt(it->range().begin)) {
if (it.value().cancellable) {
cancellableRelocations.push_back(it->value());
}
}
@ -1180,6 +1185,12 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
// TODO different trace event + knob for overloaded? Could wait on an async var for done moves
}
// set cancellable to false on inFlight's entry for this key range
auto inFlightRange = self->inFlight.rangeContaining(rd.keys.begin);
ASSERT(inFlightRange.range() == rd.keys);
ASSERT(inFlightRange.value().randomId == rd.randomId);
inFlightRange.value().cancellable = false;
destIds.clear();
state std::vector<UID> healthyIds;
state std::vector<UID> extraIds;

View File

@ -272,15 +272,20 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
// FIXME: typedef this pair type and/or chunk list
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>>
readFromBlob(Database cx, BlobGranuleCorrectnessWorkload* self, KeyRange range, Version version) {
ACTOR Future<std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>> readFromBlob(
Database cx,
BlobGranuleCorrectnessWorkload* self,
KeyRange range,
Version beginVersion,
Version readVersion) {
state RangeResult out;
state Standalone<VectorRef<BlobGranuleChunkRef>> chunks;
state Transaction tr(cx);
loop {
try {
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ = wait(tr.readBlobGranules(range, 0, version));
Standalone<VectorRef<BlobGranuleChunkRef>> chunks_ =
wait(tr.readBlobGranules(range, beginVersion, readVersion));
chunks = chunks_;
break;
} catch (Error& e) {
@ -289,7 +294,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
}
for (const BlobGranuleChunkRef& chunk : chunks) {
RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore));
RangeResult chunkRows = wait(readBlobGranule(chunk, range, beginVersion, readVersion, self->bstore));
out.arena().dependsOn(chunkRows.arena());
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
}
@ -321,7 +326,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
Version rv = wait(self->doGrv(&tr));
state Version readVersion = rv;
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion));
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
fmt::print("Directory {0} got {1} RV {2}\n",
threadData->directoryID,
doSetup ? "initial" : "final",
@ -349,6 +354,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
const Optional<Value>& blobValue,
uint32_t startKey,
uint32_t endKey,
Version beginVersion,
Version readVersion,
const std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>>& blob) {
threadData->mismatches++;
@ -360,11 +366,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
ev.detail("DirectoryID", format("%08x", threadData->directoryID))
.detail("RangeStart", format("%08x", startKey))
.detail("RangeEnd", format("%08x", endKey))
.detail("BeginVersion", beginVersion)
.detail("Version", readVersion);
fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3}\n",
fmt::print("Found mismatch! Request for dir {0} [{1} - {2}) @ {3} - {4}\n",
format("%08x", threadData->directoryID),
format("%08x", startKey),
format("%08x", endKey),
beginVersion,
readVersion);
if (lastMatching.present()) {
fmt::print(" last correct: {}\n", lastMatching.get().printable());
@ -456,6 +464,29 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
readVersion);
}
// because each chunk could be separately collapsed or not if we set beginVersion, we have to track it by chunk
KeyRangeMap<Version> beginVersionByChunk;
beginVersionByChunk.insert(normalKeys, 0);
int beginCollapsed = 0;
int beginNotCollapsed = 0;
for (auto& chunk : blob.second) {
if (!chunk.snapshotFile.present()) {
ASSERT(beginVersion > 0);
ASSERT(chunk.snapshotVersion == invalidVersion);
beginCollapsed++;
beginVersionByChunk.insert(chunk.keyRange, beginVersion);
} else {
ASSERT(chunk.snapshotVersion != invalidVersion);
if (beginVersion > 0) {
beginNotCollapsed++;
}
}
}
TEST(beginCollapsed > 0); // BGCorrectness got collapsed request with beginVersion > 0
TEST(beginNotCollapsed > 0); // BGCorrectness got un-collapsed request with beginVersion > 0
TEST(beginCollapsed > 0 &&
beginNotCollapsed > 0); // BGCorrectness got both collapsed and uncollapsed in the same request!
while (checkIt != threadData->keyData.end() && checkIt->first < endKeyExclusive) {
uint32_t key = checkIt->first;
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
@ -475,6 +506,16 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
for (; idIdx < checkIt->second.writes.size() && checkIt->second.writes[idIdx].writeVersion <= readVersion;
idIdx++) {
Key nextKeyShouldBe = threadData->getKey(key, idIdx);
Version keyBeginVersion = beginVersionByChunk.rangeContaining(nextKeyShouldBe).cvalue();
if (keyBeginVersion > checkIt->second.writes[idIdx].writeVersion) {
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
fmt::print("DBG READ: Skip ID {0} written @ {1} < beginVersion {2}\n",
idIdx,
checkIt->second.writes[idIdx].clearVersion,
keyBeginVersion);
}
continue;
}
if (DEBUG_READ_OP(threadData->directoryID, readVersion)) {
fmt::print("DBG READ: Checking ID {0} ({1}) written @ {2}\n",
format("%08x", idIdx),
@ -491,6 +532,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
Optional<Value>(),
startKeyInclusive,
endKeyExclusive,
beginVersion,
readVersion,
blob);
return false;
@ -509,6 +551,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
Optional<Value>(),
startKeyInclusive,
endKeyExclusive,
beginVersion,
readVersion,
blob);
return false;
@ -523,6 +566,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
blob.first[resultIdx].value,
startKeyInclusive,
endKeyExclusive,
beginVersion,
readVersion,
blob);
return false;
@ -545,6 +589,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
Optional<Value>(),
startKeyInclusive,
endKeyExclusive,
beginVersion,
readVersion,
blob);
return false;
@ -565,6 +610,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
state double targetReadBytesPerSec = threadData->targetByteRate * 4;
ASSERT(targetReadBytesPerSec > 0);
state Version beginVersion;
state Version readVersion;
TraceEvent("BlobGranuleCorrectnessReaderStart").log();
@ -610,26 +656,42 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
state KeyRange range = KeyRangeRef(threadData->getKey(startKey, 0), threadData->getKey(endKey, 0));
// pick read version
// TODO could also pick begin version here
ASSERT(threadData->writeVersions.back() >= threadData->minSuccessfulReadVersion);
size_t readVersionIdx;
// randomly choose up to date vs time travel read
if (deterministicRandom()->random01() < 0.5) {
threadData->reads++;
readVersionIdx = threadData->writeVersions.size() - 1;
readVersion = threadData->writeVersions.back();
} else {
threadData->timeTravelReads++;
size_t startIdx = 0;
loop {
int readVersionIdx = deterministicRandom()->randomInt(0, threadData->writeVersions.size());
readVersionIdx = deterministicRandom()->randomInt(startIdx, threadData->writeVersions.size());
readVersion = threadData->writeVersions[readVersionIdx];
if (readVersion >= threadData->minSuccessfulReadVersion) {
break;
} else {
startIdx = readVersionIdx + 1;
}
}
}
// randomly choose begin version or not
beginVersion = 0;
if (deterministicRandom()->random01() < 0.5) {
int startIdx = 0;
int endIdxExclusive = readVersionIdx + 1;
// Choose skewed towards later versions. It's ok if beginVersion isn't readable though because it
// will collapse
size_t beginVersionIdx = (size_t)std::sqrt(
deterministicRandom()->randomInt(startIdx * startIdx, endIdxExclusive * endIdxExclusive));
beginVersion = threadData->writeVersions[beginVersionIdx];
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, range, readVersion));
self->validateResult(threadData, blob, startKey, endKey, 0, readVersion);
wait(self->readFromBlob(cx, self, range, beginVersion, readVersion));
self->validateResult(threadData, blob, startKey, endKey, beginVersion, readVersion);
int resultBytes = blob.first.expectedSize();
threadData->rowsRead += blob.first.size();
@ -822,7 +884,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
fmt::print("Directory {0} doing final data check @ {1}\n", threadData->directoryID, readVersion);
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> blob =
wait(self->readFromBlob(cx, self, threadData->directoryRange, readVersion));
wait(self->readFromBlob(cx, self, threadData->directoryRange, 0, readVersion));
result = self->validateResult(threadData, blob, 0, std::numeric_limits<uint32_t>::max(), 0, readVersion);
finalRowsValidated = blob.first.size();

View File

@ -225,7 +225,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
for (const BlobGranuleChunkRef& chunk : chunks) {
RangeResult chunkRows = wait(readBlobGranule(chunk, range, version, self->bstore));
RangeResult chunkRows = wait(readBlobGranule(chunk, range, 0, version, self->bstore));
out.arena().dependsOn(chunkRows.arena());
out.append(out.arena(), chunkRows.begin(), chunkRows.size());
}

View File

@ -0,0 +1,9 @@
[[test]]
testTitle = 'BlobGranuleServerCommonUnit'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 0
testsMatching = /blobgranule/server/common/

View File

@ -0,0 +1,10 @@
[[test]]
testTitle = 'BlobGranuleFileUnit'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 0
testsMatching = /blobgranule/files/

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobgranule/

View File

@ -0,0 +1,9 @@
[[test]]
testTitle = 'BlobManagerUnit'
useDB = false
startDelay = 0
[[test.workload]]
testName = 'UnitTests'
maxTestCases = 0
testsMatching = /blobmanager/

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
startDelay=0
useDB=false
testName=UnitTests
maxTestCases=0
testsMatching=/blobmanager/

View File

@ -50,8 +50,9 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES s3VersionHeaders.txt IGNORE)
add_fdb_test(TEST_FILES BandwidthThrottle.txt IGNORE)
add_fdb_test(TEST_FILES BigInsert.txt IGNORE)
add_fdb_test(TEST_FILES BlobGranuleFileUnit.txt)
add_fdb_test(TEST_FILES BlobManagerUnit.txt)
add_fdb_test(TEST_FILES BGServerCommonUnit.toml)
add_fdb_test(TEST_FILES BlobGranuleFileUnit.toml)
add_fdb_test(TEST_FILES BlobManagerUnit.toml)
add_fdb_test(TEST_FILES ConsistencyCheck.txt IGNORE)
add_fdb_test(TEST_FILES DDMetricsExclude.txt IGNORE)
add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)