Future-proof blob granules with full file size

This commit is contained in:
Josh Slocum 2022-03-28 14:48:12 -05:00
parent 2f8e9d9de0
commit 61474d5d54
7 changed files with 46 additions and 26 deletions

View File

@ -52,19 +52,20 @@ struct BlobFilePointerRef {
StringRef filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BlobFilePointerRef() {}
BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length)
: filename(to, filename), offset(offset), length(length) {}
BlobFilePointerRef(Arena& to, const std::string& filename, int64_t offset, int64_t length, int64_t fullFileLength)
: filename(to, filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length);
serializer(ar, filename, offset, length, fullFileLength);
}
std::string toString() const {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length;
ss << filename.toString() << ":" << offset << ":" << length << ":" << fullFileLength;
return std::move(ss).str();
}
};

View File

@ -240,22 +240,27 @@ static void startLoad(const ReadBlobGranuleContext granuleContext,
// Start load process for all files in chunk
if (chunk.snapshotFile.present()) {
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
// FIXME: full file length won't always be length of read
// FIXME: remove when we implement file multiplexing
ASSERT(chunk.snapshotFile.get().offset == 0);
ASSERT(chunk.snapshotFile.get().length == chunk.snapshotFile.get().fullFileLength);
loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
chunk.snapshotFile.get().length,
chunk.snapshotFile.get().fullFileLength,
granuleContext.userContext);
}
loadIds.deltaIds.reserve(chunk.deltaFiles.size());
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
// FIXME: remove when we implement file multiplexing
ASSERT(chunk.deltaFiles[deltaFileIdx].offset == 0);
ASSERT(chunk.deltaFiles[deltaFileIdx].length == chunk.deltaFiles[deltaFileIdx].fullFileLength);
int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
chunk.deltaFiles[deltaFileIdx].length,
chunk.deltaFiles[deltaFileIdx].fullFileLength,
granuleContext.userContext);
loadIds.deltaIds.push_back(deltaLoadId);
}

View File

@ -1190,23 +1190,26 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID) {
return KeyRangeRef(startKey, strinc(startKey));
}
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length) {
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << filename;
wr << offset;
wr << length;
wr << fullFileLength;
return wr.toValue();
}
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) {
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value) {
StringRef filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BinaryReader reader(value, IncludeVersion());
reader >> filename;
reader >> offset;
reader >> length;
return std::tuple(filename, offset, length);
reader >> fullFileLength;
return std::tuple(filename, offset, length, fullFileLength);
}
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) {

View File

@ -572,8 +572,8 @@ const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t file
std::tuple<UID, Version, uint8_t> decodeBlobGranuleFileKey(KeyRef const& key);
const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length);
std::tuple<Standalone<StringRef>, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength);
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value);

View File

@ -60,13 +60,14 @@ ACTOR Future<Void> readGranuleFiles(Transaction* tr, Key* startKey, Key endKey,
Standalone<StringRef> filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
std::tie(gid, version, fileType) = decodeBlobGranuleFileKey(it.key);
ASSERT(gid == granuleID);
std::tie(filename, offset, length) = decodeBlobGranuleFileValue(it.value);
std::tie(filename, offset, length, fullFileLength) = decodeBlobGranuleFileValue(it.value);
BlobFileIndex idx(version, filename.toString(), offset, length);
BlobFileIndex idx(version, filename.toString(), offset, length, fullFileLength);
if (fileType == 'S') {
ASSERT(files->snapshotFiles.empty() || files->snapshotFiles.back().version < idx.version);
files->snapshotFiles.push_back(idx);
@ -168,14 +169,16 @@ void GranuleFiles::getFiles(Version beginVersion,
Version lastIncluded = invalidVersion;
if (snapshotF != snapshotFiles.end()) {
chunk.snapshotVersion = snapshotF->version;
chunk.snapshotFile = BlobFilePointerRef(replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length);
chunk.snapshotFile = BlobFilePointerRef(
replyArena, snapshotF->filename, snapshotF->offset, snapshotF->length, snapshotF->fullFileLength);
lastIncluded = chunk.snapshotVersion;
} else {
chunk.snapshotVersion = invalidVersion;
}
while (deltaF != deltaFiles.end() && deltaF->version < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
deltaBytesCounter += deltaF->length;
ASSERT(lastIncluded < deltaF->version);
lastIncluded = deltaF->version;
@ -183,7 +186,8 @@ void GranuleFiles::getFiles(Version beginVersion,
}
// include last delta file that passes readVersion, if it exists
if (deltaF != deltaFiles.end() && lastIncluded < readVersion) {
chunk.deltaFiles.emplace_back_deep(replyArena, deltaF->filename, deltaF->offset, deltaF->length);
chunk.deltaFiles.emplace_back_deep(
replyArena, deltaF->filename, deltaF->offset, deltaF->length, deltaF->fullFileLength);
deltaBytesCounter += deltaF->length;
lastIncluded = deltaF->version;
}
@ -194,7 +198,7 @@ static std::string makeTestFileName(Version v) {
}
static BlobFileIndex makeTestFile(Version v, int64_t len) {
return BlobFileIndex(v, makeTestFileName(v), 0, len);
return BlobFileIndex(v, makeTestFileName(v), 0, len, len);
}
static void checkFile(int expectedVersion, const BlobFilePointerRef& actualFile) {

View File

@ -49,11 +49,12 @@ struct BlobFileIndex {
std::string filename;
int64_t offset;
int64_t length;
int64_t fullFileLength;
BlobFileIndex() {}
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length)
: version(version), filename(filename), offset(offset), length(length) {}
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length, int64_t fullFileLength)
: version(version), filename(filename), offset(offset), length(length), fullFileLength(fullFileLength) {}
// compare on version
bool operator<(const BlobFileIndex& r) const { return version < r.version; }

View File

@ -511,7 +511,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
numIterations++;
Key dfKey = blobGranuleFileKeyFor(granuleID, currentDeltaVersion, 'D');
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize);
// TODO change once we support file multiplexing
Value dfValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
tr->set(dfKey, dfValue);
if (oldGranuleComplete.present()) {
@ -538,7 +539,8 @@ ACTOR Future<BlobFileIndex> writeDeltaFile(Reference<BlobWorkerData> bwData,
if (BUGGIFY_WITH_PROB(0.01)) {
wait(delay(deterministicRandom()->random01()));
}
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize);
// FIXME: change when we implement multiplexing
return BlobFileIndex(currentDeltaVersion, fname, 0, serializedSize, serializedSize);
} catch (Error& e) {
wait(tr->onError(e));
}
@ -648,7 +650,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
wait(readAndCheckGranuleLock(tr, keyRange, epoch, seqno));
numIterations++;
Key snapshotFileKey = blobGranuleFileKeyFor(granuleID, version, 'S');
Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize);
// TODO change once we support file multiplexing
Key snapshotFileValue = blobGranuleFileValueFor(fname, 0, serializedSize, serializedSize);
tr->set(snapshotFileKey, snapshotFileValue);
// create granule history at version if this is a new granule with the initial dump from FDB
if (createGranuleHistory) {
@ -692,7 +695,8 @@ ACTOR Future<BlobFileIndex> writeSnapshot(Reference<BlobWorkerData> bwData,
wait(delay(deterministicRandom()->random01()));
}
return BlobFileIndex(version, fname, 0, serializedSize);
// FIXME: change when we implement multiplexing
return BlobFileIndex(version, fname, 0, serializedSize, serializedSize);
}
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData> bwData,
@ -797,7 +801,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
ASSERT(snapshotVersion < version);
chunk.snapshotFile = BlobFilePointerRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length);
chunk.snapshotFile = BlobFilePointerRef(
filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length, snapshotF.fullFileLength);
compactBytesRead += snapshotF.length;
int deltaIdx = files.deltaFiles.size() - 1;
while (deltaIdx >= 0 && files.deltaFiles[deltaIdx].version > snapshotVersion) {
@ -807,7 +812,8 @@ ACTOR Future<BlobFileIndex> compactFromBlob(Reference<BlobWorkerData> bwData,
Version lastDeltaVersion = invalidVersion;
while (deltaIdx < files.deltaFiles.size() && files.deltaFiles[deltaIdx].version <= version) {
BlobFileIndex deltaF = files.deltaFiles[deltaIdx];
chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length);
chunk.deltaFiles.emplace_back_deep(
filenameArena, deltaF.filename, deltaF.offset, deltaF.length, deltaF.fullFileLength);
compactBytesRead += deltaF.length;
lastDeltaVersion = files.deltaFiles[deltaIdx].version;
deltaIdx++;