Changing API and file format to full V1 specification

This commit is contained in:
Josh Slocum 2021-08-24 10:05:46 -05:00
parent 42a781016e
commit fb2eef38fc
5 changed files with 170 additions and 208 deletions

View File

@ -34,26 +34,25 @@
ACTOR Future<Arena> readSnapshotFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
BlobFilenameRef f,
KeyRangeRef keyRange,
std::map<KeyRef, ValueRef>* dataMap) {
try {
state Arena arena;
// printf("Starting read of snapshot file %s\n", filename.c_str());
state Reference<AsyncFileS3BlobStoreRead> reader =
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, filename);
state int64_t size = wait(reader->size());
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, f.filename.toString());
// printf("Got snapshot file size %lld\n", size);
state uint8_t* data = new (arena) uint8_t[size];
state uint8_t* data = new (arena) uint8_t[f.length];
// printf("Reading %lld bytes from snapshot file %s\n", size, filename.c_str());
int readSize = wait(reader->read(data, size, 0));
int readSize = wait(reader->read(data, f.length, f.offset));
// printf("Read %lld bytes from snapshot file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
ASSERT(f.length == readSize);
// weird stuff for deserializing vector and arenas
Arena parseArena;
GranuleSnapshot snapshot;
StringRef dataRef(data, size);
StringRef dataRef(data, f.length);
ArenaObjectReader rdr(arena, dataRef, Unversioned());
rdr.deserialize(FileIdentifierFor<GranuleSnapshot>::value, snapshot, parseArena);
arena.dependsOn(parseArena);
@ -93,59 +92,60 @@ ACTOR Future<Arena> readSnapshotFile(Reference<S3BlobStoreEndpoint> bstore,
}*/
printf("Started with %d rows from snapshot file %s after pruning to [%s - %s)\n",
dataMap->size(),
filename.c_str(),
f.toString().c_str(),
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
return arena;
} catch (Error& e) {
printf("Reading snapshot file %s got error %s\n", filename.c_str(), e.name());
printf("Reading snapshot file %s got error %s\n", f.toString().c_str(), e.name());
throw e;
}
}
ACTOR Future<Standalone<GranuleDeltas>> readDeltaFile(Reference<S3BlobStoreEndpoint> bstore,
std::string bucket,
std::string filename,
BlobFilenameRef f,
KeyRangeRef keyRange,
Version readVersion) {
try {
// printf("Starting read of delta file %s\n", filename.c_str());
state Standalone<GranuleDeltas> result;
state Reference<AsyncFileS3BlobStoreRead> reader =
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, filename);
state int64_t size = wait(reader->size());
makeReference<AsyncFileS3BlobStoreRead>(bstore, bucket, f.filename.toString());
// printf("Got delta file size %lld\n", size);
state uint8_t* data = new (result.arena()) uint8_t[size];
state uint8_t* data = new (result.arena()) uint8_t[f.length];
// printf("Reading %lld bytes from delta file %s into %p\n", size, filename.c_str(), data);
int readSize = wait(reader->read(data, size, 0));
int readSize = wait(reader->read(data, f.length, f.offset));
// printf("Read %d bytes from delta file %s\n", readSize, filename.c_str());
ASSERT(size == readSize);
ASSERT(f.length == readSize);
// Don't do range or version filtering in here since we'd have to copy/rewrite the deltas and it might starve
// snapshot read task, do it in main thread
// weirdness with vector refs and arenas here
Arena parseArena;
StringRef dataRef(data, size);
StringRef dataRef(data, f.length);
ArenaObjectReader rdr(result.arena(), dataRef, Unversioned());
rdr.deserialize(FileIdentifierFor<GranuleDeltas>::value, result.contents(), parseArena);
result.arena().dependsOn(parseArena);
// result.contents() = ObjectReader::fromStringRef<GranuleDeltas>(dataRef, Unversioned());
printf("Parsed %d deltas from delta file %s\n", result.size(), filename.c_str());
printf("Parsed %d deltas from delta file %s\n", result.size(), f.toString().c_str());
// TODO REMOVE sanity check
for (int i = 0; i < result.size() - 1; i++) {
if (result[i].v > result[i + 1].v) {
printf("BG VERSION ORDER VIOLATION IN DELTA FILE: '%lld', '%lld'\n", result[i].v, result[i + 1].v);
if (result[i].version > result[i + 1].version) {
printf("BG VERSION ORDER VIOLATION IN DELTA FILE: '%lld', '%lld'\n",
result[i].version,
result[i + 1].version);
}
ASSERT(result[i].v <= result[i + 1].v);
ASSERT(result[i].version <= result[i + 1].version);
}
return result;
} catch (Error& e) {
printf("Reading delta file %s got error %s\n", filename.c_str(), e.name());
printf("Reading delta file %s got error %s\n", f.toString().c_str(), e.name());
throw e;
}
}
@ -242,19 +242,23 @@ static void applyDeltas(std::map<KeyRef, ValueRef>* dataMap,
GranuleDeltas deltas,
KeyRangeRef keyRange,
Version readVersion) {
for (MutationAndVersion& delta : deltas) {
if (delta.v > readVersion) {
for (MutationsAndVersionRef& delta : deltas) {
if (delta.version > readVersion) {
break;
}
applyDelta(dataMap, arena, keyRange, delta.m);
for (auto& m : delta.mutations) {
applyDelta(dataMap, arena, keyRange, m);
}
}
}
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<S3BlobStoreEndpoint> bstore,
std::string bucket) {
// TODO REMOVE with V2 of protocol
ASSERT(readVersion == chunk.includedVersion);
// Arena to hold all allocations for applying deltas. Most of it, and the arenas produced by reading the files,
// will likely be tossed if there are a significant number of mutations, so we copy at the end instead of doing a
// dependsOn.
@ -264,12 +268,15 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
try {
state std::map<KeyRef, ValueRef> dataMap;
Future<Arena> readSnapshotFuture =
readSnapshotFile(bstore, bucket, chunk.snapshotFileName.toString(), keyRange, &dataMap);
chunk.snapshotFile.present()
? readSnapshotFile(bstore, bucket, chunk.snapshotFile.get(), keyRange, &dataMap)
: Future<Arena>(Arena());
state std::vector<Future<Standalone<GranuleDeltas>>> readDeltaFutures;
readDeltaFutures.reserve(chunk.deltaFileNames.size());
for (StringRef deltaFileName : chunk.deltaFileNames) {
readDeltaFutures.push_back(readDeltaFile(bstore, bucket, deltaFileName.toString(), keyRange, readVersion));
readDeltaFutures.reserve(chunk.deltaFiles.size());
for (BlobFilenameRef deltaFile : chunk.deltaFiles) {
readDeltaFutures.push_back(readDeltaFile(bstore, bucket, deltaFile, keyRange, readVersion));
}
Arena snapshotArena = wait(readSnapshotFuture);
@ -289,7 +296,7 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
RangeResult ret;
for (auto& it : dataMap) {
ret.push_back_deep(ret.arena(), KeyValueRef(it.first, it.second));
// TODO for large reads, probably wait to yield periodically here for slowTask
// TODO for large reads, probably wait to yield periodically here for SlowTask
}
return ret;

View File

@ -35,7 +35,7 @@
// Reads the fileset in the reply using the provided blob store, and filters data and mutations by key + version from
// the request
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunk chunk,
ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
KeyRangeRef keyRange,
Version readVersion,
Reference<S3BlobStoreEndpoint> bstore,

View File

@ -22,7 +22,8 @@
#define FDBCLIENT_BLOBWORKERINTERFACE_H
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/StorageServerInterface.h" // just for MutationsAndVersion, TODO pull that out maybe?
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
@ -51,26 +52,11 @@ struct BlobWorkerInterface {
}
};
struct MutationAndVersion {
constexpr static FileIdentifier file_identifier = 4268041;
MutationRef m;
Version v;
MutationAndVersion() {}
MutationAndVersion(Arena& to, MutationRef m, Version v) : m(to, m), v(v) {}
MutationAndVersion(Arena& to, const MutationAndVersion& from) : m(to, from.m), v(from.v) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, m, v);
}
};
// TODO should name all things that don't have their own arena *Ref
// file format of actual blob files
struct GranuleSnapshot : VectorRef<KeyValueRef> {
constexpr static FileIdentifier file_identifier = 4268040;
constexpr static FileIdentifier file_identifier = 1300395;
template <class Ar>
void serialize(Ar& ar) {
@ -78,12 +64,35 @@ struct GranuleSnapshot : VectorRef<KeyValueRef> {
}
};
struct GranuleDeltas : VectorRef<MutationAndVersion> {
constexpr static FileIdentifier file_identifier = 4268042;
struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
constexpr static FileIdentifier file_identifier = 8563013;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ((VectorRef<MutationAndVersion>&)*this));
serializer(ar, ((VectorRef<MutationsAndVersionRef>&)*this));
}
};
// TODO better name?
struct BlobFilenameRef {
constexpr static FileIdentifier file_identifier = 5253554;
StringRef filename;
int64_t offset;
int64_t length;
BlobFilenameRef() {}
BlobFilenameRef(Arena& to, std::string filename, int64_t offset, int64_t length)
: filename(to, filename), offset(offset), length(length) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, filename, offset, length);
}
std::string toString() {
std::stringstream ss;
ss << filename.toString() << ":" << offset << ":" << length;
return std::move(ss).str();
}
};
@ -91,16 +100,17 @@ struct GranuleDeltas : VectorRef<MutationAndVersion> {
// TODO could filter out delta files that don't intersect the key range being requested?
// TODO since client request passes version, we don't need to include the version of each mutation in the response if we
// pruned it there
struct BlobGranuleChunk {
struct BlobGranuleChunkRef {
constexpr static FileIdentifier file_identifier = 991434;
KeyRangeRef keyRange;
StringRef snapshotFileName;
VectorRef<StringRef> deltaFileNames;
Version includedVersion;
Optional<BlobFilenameRef> snapshotFile; // not set if it's an incremental read
VectorRef<BlobFilenameRef> deltaFiles;
GranuleDeltas newDeltas;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, snapshotFileName, deltaFileNames, newDeltas);
serializer(ar, keyRange, includedVersion, snapshotFile, deltaFiles, newDeltas);
}
};
@ -108,7 +118,7 @@ struct BlobGranuleFileReply {
// TODO is there a "proper" way to generate file_identifier?
constexpr static FileIdentifier file_identifier = 6858612;
Arena arena;
VectorRef<BlobGranuleChunk> chunks;
VectorRef<BlobGranuleChunkRef> chunks;
template <class Ar>
void serialize(Ar& ar) {
@ -119,10 +129,10 @@ struct BlobGranuleFileReply {
// TODO could do a reply promise stream of file mutations to bound memory requirements?
// Have to load whole snapshot file into memory though so it doesn't actually matter too much
struct BlobGranuleFileRequest {
constexpr static FileIdentifier file_identifier = 4150141;
Arena arena;
KeyRangeRef keyRange;
Version beginVersion = 0;
Version readVersion;
ReplyPromise<BlobGranuleFileReply> reply;
@ -130,12 +140,12 @@ struct BlobGranuleFileRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, keyRange, readVersion, reply, arena);
serializer(ar, keyRange, beginVersion, readVersion, reply, arena);
}
};
struct AssignBlobRangeRequest {
constexpr static FileIdentifier file_identifier = 4150141;
constexpr static FileIdentifier file_identifier = 4844288;
Arena arena;
KeyRangeRef keyRange;
Version assignVersion;

View File

@ -32,12 +32,23 @@
#include "flow/actorcompiler.h" // has to be last include
// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure?
struct BlobFileIndex {
Version version;
std::string filename;
int64_t offset;
int64_t length;
BlobFileIndex(Version version, std::string filename, int64_t offset, int64_t length)
: version(version), filename(filename), offset(offset), length(length) {}
};
struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
std::deque<std::pair<Version, std::string>> snapshotFiles;
std::deque<std::pair<Version, std::string>> deltaFiles;
std::deque<BlobFileIndex> snapshotFiles;
std::deque<BlobFileIndex> deltaFiles;
GranuleDeltas currentDeltas;
uint64_t bytesInNewDeltaFiles = 0;
Version lastWriteVersion = 0;
Version currentDeltaVersion = 0;
uint64_t currentDeltaBytes = 0;
Arena deltaArena;
@ -78,15 +89,21 @@ struct BlobWorkerData {
~BlobWorkerData() { printf("Destroying blob worker data for %s\n", id.toString().c_str()); }
};
static Value getFileValue(std::string fname, int64_t offset, int64_t length) {
Tuple fileValue;
fileValue.append(fname).append(offset).append(length);
return fileValue.getDataAsStandalone();
}
// TODO add granule locks
ACTOR Future<std::pair<Version, std::string>> writeDeltaFile(BlobWorkerData* bwData,
KeyRange keyRange,
GranuleDeltas const* deltasToWrite) {
ACTOR Future<BlobFileIndex> writeDeltaFile(BlobWorkerData* bwData,
KeyRange keyRange,
GranuleDeltas const* deltasToWrite,
Version currentDeltaVersion) {
// TODO some sort of directory structure would be useful?
state Version lastVersion = deltasToWrite->back().v;
state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(lastVersion) +
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(currentDeltaVersion) +
".delta";
state Value serialized = ObjectWriter::toValue(*deltasToWrite, Unversioned());
@ -104,25 +121,27 @@ ACTOR Future<std::pair<Version, std::string>> writeDeltaFile(BlobWorkerData* bwD
try {
Tuple deltaFileKey;
deltaFileKey.append(keyRange.begin).append(keyRange.end);
deltaFileKey.append(LiteralStringRef("delta")).append(lastVersion);
tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
deltaFileKey.append(LiteralStringRef("delta")).append(currentDeltaVersion);
tr->set(deltaFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin),
getFileValue(fname, 0, serialized.size()));
wait(tr->commit());
printf("blob worker updated fdb with delta file %s of size %d at version %lld\n",
fname.c_str(),
serialized.size(),
lastVersion);
return std::pair<Version, std::string>(lastVersion, fname);
currentDeltaVersion);
return BlobFileIndex(currentDeltaVersion, fname, 0, serialized.size());
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<std::string> writeSnapshot(BlobWorkerData* bwData,
KeyRange keyRange,
Version version,
PromiseStream<RangeResult> rows) {
ACTOR Future<BlobFileIndex> writeSnapshot(BlobWorkerData* bwData,
KeyRange keyRange,
Version version,
PromiseStream<RangeResult> rows) {
// TODO some sort of directory structure would be useful maybe?
state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) + ".snapshot";
@ -182,7 +201,8 @@ ACTOR Future<std::string> writeSnapshot(BlobWorkerData* bwData,
try {
loop {
try {
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin),
getFileValue(fname, 0, serialized.size()));
wait(tr->commit());
break;
} catch (Error& e) {
@ -202,10 +222,10 @@ ACTOR Future<std::string> writeSnapshot(BlobWorkerData* bwData,
fname.c_str(),
serialized.size());
return fname;
return BlobFileIndex(version, fname, 0, serialized.size());
}
ACTOR Future<std::pair<Version, std::string>> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
printf("Dumping snapshot from FDB for [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
@ -216,7 +236,7 @@ ACTOR Future<std::pair<Version, std::string>> dumpInitialSnapshotFromFDB(BlobWor
try {
state Version readVersion = wait(tr->getReadVersion());
state PromiseStream<RangeResult> rowsStream;
state Future<std::string> snapshotWriter = writeSnapshot(bwData, keyRange, readVersion, rowsStream);
state Future<BlobFileIndex> snapshotWriter = writeSnapshot(bwData, keyRange, readVersion, rowsStream);
state Key beginKey = keyRange.begin;
loop {
@ -230,8 +250,8 @@ ACTOR Future<std::pair<Version, std::string>> dumpInitialSnapshotFromFDB(BlobWor
break;
}
}
std::string fname = wait(snapshotWriter);
return std::pair(readVersion, fname);
BlobFileIndex f = wait(snapshotWriter);
return f;
} catch (Error& e) {
printf("Dumping snapshot from FDB for [%s - %s) got error %s\n",
keyRange.begin.printable().c_str(),
@ -242,7 +262,7 @@ ACTOR Future<std::pair<Version, std::string>> dumpInitialSnapshotFromFDB(BlobWor
}
}
ACTOR Future<std::pair<Version, std::string>> compactFromBlob(BlobWorkerData* bwData, KeyRange keyRange) {
ACTOR Future<BlobFileIndex> compactFromBlob(BlobWorkerData* bwData, KeyRange keyRange) {
printf("Compacting snapshot from blob for [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
@ -253,20 +273,22 @@ ACTOR Future<std::pair<Version, std::string>> compactFromBlob(BlobWorkerData* bw
ASSERT(!metadata->snapshotFiles.empty());
ASSERT(!metadata->deltaFiles.empty());
ASSERT(metadata->currentDeltas.empty());
state Version version = metadata->deltaFiles.back().first;
state Version version = metadata->deltaFiles.back().version;
state Arena filenameArena;
state BlobGranuleChunk chunk;
state BlobGranuleChunkRef chunk;
state Version snapshotVersion = metadata->snapshotFiles.back().first;
chunk.snapshotFileName = StringRef(filenameArena, metadata->snapshotFiles.back().second);
state Version snapshotVersion = metadata->snapshotFiles.back().version;
BlobFileIndex snapshotF = metadata->snapshotFiles.back();
chunk.snapshotFile = BlobFilenameRef(filenameArena, snapshotF.filename, snapshotF.offset, snapshotF.length);
int deltaIdx = metadata->deltaFiles.size() - 1;
while (deltaIdx >= 0 && metadata->deltaFiles[deltaIdx].first > snapshotVersion) {
while (deltaIdx >= 0 && metadata->deltaFiles[deltaIdx].version > snapshotVersion) {
deltaIdx--;
}
deltaIdx++;
while (deltaIdx < metadata->deltaFiles.size()) {
chunk.deltaFileNames.push_back_deep(filenameArena, metadata->deltaFiles[deltaIdx].second);
BlobFileIndex deltaF = metadata->deltaFiles[deltaIdx];
chunk.deltaFiles.emplace_back_deep(filenameArena, deltaF.filename, deltaF.offset, deltaF.length);
deltaIdx++;
}
@ -275,22 +297,22 @@ ACTOR Future<std::pair<Version, std::string>> compactFromBlob(BlobWorkerData* bw
keyRange.end.printable().c_str(),
version);
printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str());
printf(" SnapshotFile:\n %s\n", chunk.snapshotFile.get().toString().c_str());
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFileNames) {
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
loop {
try {
state PromiseStream<RangeResult> rowsStream;
state Future<std::string> snapshotWriter = writeSnapshot(bwData, keyRange, version, rowsStream);
state Future<BlobFileIndex> snapshotWriter = writeSnapshot(bwData, keyRange, version, rowsStream);
RangeResult newGranule = wait(readBlobGranule(chunk, keyRange, version, bwData->bstore, bwData->bucket));
rowsStream.send(std::move(newGranule));
rowsStream.sendError(end_of_stream());
std::string fname = wait(snapshotWriter);
return std::pair(version, fname);
BlobFileIndex f = wait(snapshotWriter);
return f;
} catch (Error& e) {
// TODO better error handling eventually - should retry unless the error is because another worker took over
// the range
@ -303,95 +325,6 @@ ACTOR Future<std::pair<Version, std::string>> compactFromBlob(BlobWorkerData* bw
}
}
/*ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
printf("Dumping snapshot from FDB for [%s - %s)\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str());
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
loop {
state std::string fname = "";
try {
state Version readVersion = wait(tr->getReadVersion());
fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(readVersion) + ".snapshot";
// TODO some sort of directory structure would be useful?
state Arena arena;
state GranuleSnapshot allRows;
// TODO would be fairly easy to change this to a promise stream, and optionally build from blobGranuleReader
// instead
state Key beginKey = keyRange.begin;
loop {
// TODO knob for limit?
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, keyRange.end), 1000));
arena.dependsOn(res.arena());
allRows.append(arena, res.begin(), res.size());
if (res.more) {
beginKey = keyAfter(res.back().key);
} else {
break;
}
}
printf("Granule [%s - %s) read %d snapshot rows from fdb\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
allRows.size());
if (allRows.size() < 10) {
for (auto& row : allRows) {
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
}
}
// TODO REMOVE sanity check!
for (int i = 0; i < allRows.size() - 1; i++) {
if (allRows[i].key >= allRows[i + 1].key) {
printf("SORT ORDER VIOLATION IN SNAPSHOT FILE: %s, %s\n",
allRows[i].key.printable().c_str(),
allRows[i + 1].key.printable().c_str());
}
ASSERT(allRows[i].key < allRows[i + 1].key);
}
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
state Value serialized = ObjectWriter::toValue(allRows, Unversioned());
// write to s3 using multi part upload
state Reference<AsyncFileS3BlobStoreWrite> objectFile =
makeReference<AsyncFileS3BlobStoreWrite>(bwData->bstore, bwData->bucket, fname);
wait(objectFile->write(serialized.begin(), serialized.size(), 0));
wait(objectFile->sync());
// TODO could move this into separate txn to avoid the timeout, it'll need to be separate later anyway
// object uploaded successfully, save it to system key space (TODO later - and memory file history)
// TODO add conflict range for writes?
Tuple snapshotFileKey;
snapshotFileKey.append(keyRange.begin).append(keyRange.end);
snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion);
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
wait(tr->commit());
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
keyRange.begin.printable().c_str(),
keyRange.end.printable().c_str(),
fname.c_str(),
serialized.size());
return std::pair<Version, std::string>(readVersion, fname);
} catch (Error& e) {
// TODO REMOVE
printf("dump range txn got error %s\n", e.name());
if (fname != "") {
// TODO delete unsuccessfully written file
bwData->bstore->deleteObject(bwData->bucket, fname);
printf("deleting s3 object %s\n", fname.c_str());
}
wait(tr->onError(e));
}
}
}*/
ACTOR Future<std::pair<Key, Version>> createRangeFeed(BlobWorkerData* bwData, KeyRange keyRange) {
state Key rangeFeedID = StringRef(deterministicRandom()->randomUniqueID().toString());
state Transaction tr(bwData->db);
@ -425,17 +358,20 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
metadata->keyRange.end.printable().c_str(),
rangeFeedData.second);
std::pair<Version, std::string> newSnapshotFile = wait(dumpInitialSnapshotFromFDB(bwData, metadata->keyRange));
ASSERT(rangeFeedData.second <= newSnapshotFile.first);
BlobFileIndex newSnapshotFile = wait(dumpInitialSnapshotFromFDB(bwData, metadata->keyRange));
ASSERT(rangeFeedData.second <= newSnapshotFile.version);
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
metadata->lastWriteVersion = newSnapshotFile.version;
metadata->currentDeltaVersion = metadata->lastWriteVersion;
rangeFeedFuture = bwData->db->getRangeFeedStream(
rangeFeedStream, rangeFeedData.first, newSnapshotFile.first + 1, maxVersion, metadata->keyRange);
rangeFeedStream, rangeFeedData.first, newSnapshotFile.version + 1, maxVersion, metadata->keyRange);
loop {
state Standalone<VectorRef<MutationsAndVersionRef>> mutations = waitNext(rangeFeedStream.getFuture());
// TODO should maybe change mutation buffer to MutationsAndVersionRef instead of MutationAndVersion
for (auto& deltas : mutations) {
if (!deltas.mutations.empty()) {
metadata->currentDeltas.emplace_back_deep(metadata->deltaArena, deltas);
}
for (auto& delta : deltas.mutations) {
// TODO REMOVE!!! Just for initial debugging
/*printf("BlobWorker [%s - %s) Got Mutation @ %lld: %s\n",
@ -444,24 +380,25 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
deltas.version,
delta.toString().c_str());*/
metadata->currentDeltas.emplace_back_deep(metadata->deltaArena, delta, deltas.version);
// 8 for version, 1 for type, 4 for each param length then actual param size
metadata->currentDeltaBytes += 17 + delta.param1.size() + delta.param2.size();
}
ASSERT(metadata->currentDeltaVersion <= deltas.version);
metadata->currentDeltaVersion = deltas.version;
// TODO handle version batch barriers
if (metadata->currentDeltaBytes >= SERVER_KNOBS->BG_DELTA_FILE_TARGET_BYTES &&
metadata->currentDeltas.back().v > metadata->lastWriteVersion) {
metadata->currentDeltaVersion > metadata->lastWriteVersion) {
printf("Granule [%s - %s) flushing delta file after %d bytes\n",
metadata->keyRange.begin.printable().c_str(),
metadata->keyRange.end.printable().c_str(),
metadata->currentDeltaBytes);
std::pair<Version, std::string> newDeltaFile =
wait(writeDeltaFile(bwData, metadata->keyRange, &metadata->currentDeltas));
BlobFileIndex newDeltaFile = wait(writeDeltaFile(
bwData, metadata->keyRange, &metadata->currentDeltas, metadata->currentDeltaVersion));
// add new delta file
metadata->deltaFiles.push_back(newDeltaFile);
metadata->lastWriteVersion = newDeltaFile.first;
metadata->lastWriteVersion = metadata->currentDeltaVersion;
metadata->bytesInNewDeltaFiles += metadata->currentDeltaBytes;
// reset current deltas
@ -471,8 +408,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
printf("Popping range feed %s at %lld\n\n",
rangeFeedData.first.printable().c_str(),
newDeltaFile.first);
wait(bwData->db->popRangeFeedMutations(rangeFeedData.first, newDeltaFile.first));
metadata->lastWriteVersion);
wait(bwData->db->popRangeFeedMutations(rangeFeedData.first, metadata->lastWriteVersion));
}
if (metadata->bytesInNewDeltaFiles >= SERVER_KNOBS->BG_DELTA_BYTES_BEFORE_COMPACT) {
@ -484,11 +421,11 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
// reassigned
// TODO: this could read from FDB read previous snapshot + delta files instead if it knew there was
// a large range clear at the end or it knew the granule was small, or something
std::pair<Version, std::string> newSnapshotFile = wait(compactFromBlob(bwData, metadata->keyRange));
BlobFileIndex newSnapshotFile = wait(compactFromBlob(bwData, metadata->keyRange));
// add new snapshot file
metadata->snapshotFiles.push_back(newSnapshotFile);
metadata->lastWriteVersion = newSnapshotFile.first;
metadata->lastWriteVersion = newSnapshotFile.version;
// reset metadata
metadata->bytesInNewDeltaFiles = 0;
@ -505,6 +442,8 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
}
static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranuleFileRequest& req) {
// TODO REMOVE in api V2
ASSERT(req.beginVersion == 0);
BlobGranuleFileReply rep;
auto checkRanges = bwData->granuleMetadata.intersectingRanges(req.keyRange);
@ -551,13 +490,15 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
Reference<GranuleMetadata> metadata = r.value();
// FIXME: eventually need to handle waiting for granule's committed version to catch up to the request version
// before copying mutations into reply's arena, to ensure read isn't stale
BlobGranuleChunk chunk;
BlobGranuleChunkRef chunk;
// TODO change in V2
chunk.includedVersion = req.readVersion;
chunk.keyRange =
KeyRangeRef(StringRef(rep.arena, metadata->keyRange.begin), StringRef(rep.arena, r.value()->keyRange.end));
// handle snapshot files
int i = metadata->snapshotFiles.size() - 1;
while (i >= 0 && metadata->snapshotFiles[i].first > req.readVersion) {
while (i >= 0 && metadata->snapshotFiles[i].version > req.readVersion) {
i--;
}
// if version is older than oldest snapshot file (or no snapshot files), throw too old
@ -566,13 +507,14 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
req.reply.sendError(transaction_too_old());
return;
}
chunk.snapshotFileName = StringRef(rep.arena, metadata->snapshotFiles[i].second);
Version snapshotVersion = metadata->snapshotFiles[i].first;
BlobFileIndex snapshotF = metadata->snapshotFiles[i];
chunk.snapshotFile = BlobFilenameRef(rep.arena, snapshotF.filename, snapshotF.offset, snapshotF.length);
Version snapshotVersion = metadata->snapshotFiles[i].version;
// handle delta files
i = metadata->deltaFiles.size() - 1;
// skip delta files that are too new
while (i >= 0 && metadata->deltaFiles[i].first > req.readVersion) {
while (i >= 0 && metadata->deltaFiles[i].version > req.readVersion) {
i--;
}
if (i < metadata->deltaFiles.size() - 1) {
@ -580,21 +522,22 @@ static void handleBlobGranuleFileRequest(BlobWorkerData* bwData, const BlobGranu
}
// only include delta files after the snapshot file
int j = i;
while (j >= 0 && metadata->deltaFiles[j].first > snapshotVersion) {
while (j >= 0 && metadata->deltaFiles[j].version > snapshotVersion) {
j--;
}
j++;
while (j <= i) {
chunk.deltaFileNames.push_back_deep(rep.arena, metadata->deltaFiles[j].second);
BlobFileIndex deltaF = metadata->deltaFiles[j];
chunk.deltaFiles.emplace_back_deep(rep.arena, deltaF.filename, deltaF.offset, deltaF.length);
j++;
}
// new deltas (if version is larger than version of last delta file)
// FIXME: do trivial key bounds here if key range is not fully contained in request key range
if (!metadata->deltaFiles.size() || req.readVersion >= metadata->deltaFiles.back().first) {
if (!metadata->deltaFiles.size() || req.readVersion >= metadata->deltaFiles.back().version) {
rep.arena.dependsOn(metadata->deltaArena);
for (auto& delta : metadata->currentDeltas) {
if (delta.v <= req.readVersion) {
if (delta.version <= req.readVersion) {
chunk.newDeltas.push_back_deep(rep.arena, delta);
}
}

View File

@ -3208,17 +3208,19 @@ ACTOR Future<Void> doBlobGranuleRequests(ClusterControllerData* self, Ratekeeper
chunk.keyRange.begin.printable().c_str(),
chunk.keyRange.end.printable().c_str());
printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str());
printf(" SnapshotFile:\n %s\n",
chunk.snapshotFile.present() ? chunk.snapshotFile.get().toString().c_str() : "<none>");
printf(" DeltaFiles:\n");
for (auto& df : chunk.deltaFileNames) {
for (auto& df : chunk.deltaFiles) {
printf(" %s\n", df.toString().c_str());
}
printf(" Deltas: (%d)", chunk.newDeltas.size());
if (chunk.newDeltas.size() > 0) {
printf(" with version [%lld - %lld]",
chunk.newDeltas[0].v,
chunk.newDeltas[chunk.newDeltas.size() - 1].v);
chunk.newDeltas[0].version,
chunk.newDeltas[chunk.newDeltas.size() - 1].version);
}
printf(" IncludedVersion: %lld\n", chunk.includedVersion);
printf("\n\n");
}
state PromiseStream<RangeResult> results;