Made subsequent snapshots read from blob files instead of FDB
This commit is contained in:
parent
2ae447eaaa
commit
42a781016e
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/BlobGranuleReader.actor.h"
|
||||
#include "fdbclient/BlobWorkerInterface.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
|
@ -27,6 +28,7 @@
|
|||
#include "fdbserver/BlobWorker.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/WaitFailure.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure?
|
||||
|
@ -41,7 +43,6 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
|
|||
|
||||
KeyRange keyRange;
|
||||
Future<Void> fileUpdaterFuture;
|
||||
PromiseStream<Version> snapshotVersions;
|
||||
|
||||
// FIXME: right now there is a dependency because this contains both the actual file/delta data as well as the
|
||||
// metadata (worker futures), so removing this reference from the map doesn't actually cancel the workers. It'd be
|
||||
|
@ -118,7 +119,93 @@ ACTOR Future<std::pair<Version, std::string>> writeDeltaFile(BlobWorkerData* bwD
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
|
||||
ACTOR Future<std::string> writeSnapshot(BlobWorkerData* bwData,
|
||||
KeyRange keyRange,
|
||||
Version version,
|
||||
PromiseStream<RangeResult> rows) {
|
||||
// TODO some sort of directory structure would be useful maybe?
|
||||
state std::string fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(version) + ".snapshot";
|
||||
state Arena arena;
|
||||
state GranuleSnapshot snapshot;
|
||||
|
||||
loop {
|
||||
try {
|
||||
RangeResult res = waitNext(rows.getFuture());
|
||||
arena.dependsOn(res.arena());
|
||||
snapshot.append(arena, res.begin(), res.size());
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_end_of_stream) {
|
||||
break;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
printf("Granule [%s - %s) read %d snapshot rows\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
snapshot.size());
|
||||
if (snapshot.size() < 10) {
|
||||
for (auto& row : snapshot) {
|
||||
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO REMOVE sanity check!
|
||||
for (int i = 0; i < snapshot.size() - 1; i++) {
|
||||
if (snapshot[i].key >= snapshot[i + 1].key) {
|
||||
printf("SORT ORDER VIOLATION IN SNAPSHOT FILE: %s, %s\n",
|
||||
snapshot[i].key.printable().c_str(),
|
||||
snapshot[i + 1].key.printable().c_str());
|
||||
}
|
||||
ASSERT(snapshot[i].key < snapshot[i + 1].key);
|
||||
}
|
||||
|
||||
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
|
||||
state Value serialized = ObjectWriter::toValue(snapshot, Unversioned());
|
||||
|
||||
// write to s3 using multi part upload
|
||||
state Reference<AsyncFileS3BlobStoreWrite> objectFile =
|
||||
makeReference<AsyncFileS3BlobStoreWrite>(bwData->bstore, bwData->bucket, fname);
|
||||
wait(objectFile->write(serialized.begin(), serialized.size(), 0));
|
||||
wait(objectFile->sync());
|
||||
|
||||
// object uploaded successfully, save it to system key space
|
||||
// TODO add conflict range for writes?
|
||||
state Tuple snapshotFileKey;
|
||||
snapshotFileKey.append(keyRange.begin).append(keyRange.end);
|
||||
snapshotFileKey.append(LiteralStringRef("snapshot")).append(version);
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
loop {
|
||||
try {
|
||||
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// if transaction throws non-retryable error, delete s3 file before exiting
|
||||
printf("deleting s3 object %s after error %s\n", fname.c_str(), e.name());
|
||||
bwData->bstore->deleteObject(bwData->bucket, fname);
|
||||
throw e;
|
||||
}
|
||||
|
||||
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
fname.c_str(),
|
||||
serialized.size());
|
||||
|
||||
return fname;
|
||||
}
|
||||
|
||||
ACTOR Future<std::pair<Version, std::string>> dumpInitialSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
|
||||
printf("Dumping snapshot from FDB for [%s - %s)\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str());
|
||||
|
@ -126,92 +213,185 @@ ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(BlobWorkerData
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
loop {
|
||||
state std::string fname = "";
|
||||
try {
|
||||
state Version readVersion = wait(tr->getReadVersion());
|
||||
fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(readVersion) + ".snapshot";
|
||||
state PromiseStream<RangeResult> rowsStream;
|
||||
state Future<std::string> snapshotWriter = writeSnapshot(bwData, keyRange, readVersion, rowsStream);
|
||||
|
||||
// TODO some sort of directory structure would be useful?
|
||||
state Arena arena;
|
||||
state GranuleSnapshot allRows;
|
||||
|
||||
// TODO would be fairly easy to change this to a promise stream, and optionally build from blobGranuleReader
|
||||
// instead
|
||||
state Key beginKey = keyRange.begin;
|
||||
loop {
|
||||
// TODO knob for limit?
|
||||
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, keyRange.end), 1000));
|
||||
/*printf("granule [%s - %s) read %d%s rows\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
res.size(),
|
||||
res.more ? "+" : "");*/
|
||||
arena.dependsOn(res.arena());
|
||||
allRows.append(arena, res.begin(), res.size());
|
||||
rowsStream.send(res);
|
||||
if (res.more) {
|
||||
beginKey = keyAfter(res.back().key);
|
||||
} else {
|
||||
rowsStream.sendError(end_of_stream());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
printf("Granule [%s - %s) read %d snapshot rows from fdb\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
allRows.size());
|
||||
if (allRows.size() < 10) {
|
||||
for (auto& row : allRows) {
|
||||
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
|
||||
}
|
||||
}
|
||||
// TODO REMOVE sanity check!
|
||||
|
||||
for (int i = 0; i < allRows.size() - 1; i++) {
|
||||
if (allRows[i].key >= allRows[i + 1].key) {
|
||||
printf("SORT ORDER VIOLATION IN SNAPSHOT FILE: %s, %s\n",
|
||||
allRows[i].key.printable().c_str(),
|
||||
allRows[i + 1].key.printable().c_str());
|
||||
}
|
||||
ASSERT(allRows[i].key < allRows[i + 1].key);
|
||||
}
|
||||
|
||||
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
|
||||
state Value serialized = ObjectWriter::toValue(allRows, Unversioned());
|
||||
|
||||
// write to s3 using multi part upload
|
||||
state Reference<AsyncFileS3BlobStoreWrite> objectFile =
|
||||
makeReference<AsyncFileS3BlobStoreWrite>(bwData->bstore, bwData->bucket, fname);
|
||||
wait(objectFile->write(serialized.begin(), serialized.size(), 0));
|
||||
wait(objectFile->sync());
|
||||
|
||||
// TODO could move this into separate txn to avoid the timeout, it'll need to be separate later anyway
|
||||
// object uploaded successfully, save it to system key space (TODO later - and memory file history)
|
||||
// TODO add conflict range for writes?
|
||||
Tuple snapshotFileKey;
|
||||
snapshotFileKey.append(keyRange.begin).append(keyRange.end);
|
||||
snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion);
|
||||
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
|
||||
wait(tr->commit());
|
||||
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
fname.c_str(),
|
||||
serialized.size());
|
||||
return std::pair<Version, std::string>(readVersion, fname);
|
||||
std::string fname = wait(snapshotWriter);
|
||||
return std::pair(readVersion, fname);
|
||||
} catch (Error& e) {
|
||||
// TODO REMOVE
|
||||
printf("dump range txn got error %s\n", e.name());
|
||||
if (fname != "") {
|
||||
// TODO delete unsuccessfully written file
|
||||
bwData->bstore->deleteObject(bwData->bucket, fname);
|
||||
printf("deleting s3 object %s\n", fname.c_str());
|
||||
}
|
||||
printf("Dumping snapshot from FDB for [%s - %s) got error %s\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
e.name());
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<std::pair<Version, std::string>> compactFromBlob(BlobWorkerData* bwData, KeyRange keyRange) {
|
||||
printf("Compacting snapshot from blob for [%s - %s)\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str());
|
||||
|
||||
Reference<GranuleMetadata> metadata = bwData->granuleMetadata.rangeContaining(keyRange.begin).value();
|
||||
ASSERT(metadata->keyRange == keyRange);
|
||||
|
||||
ASSERT(!metadata->snapshotFiles.empty());
|
||||
ASSERT(!metadata->deltaFiles.empty());
|
||||
ASSERT(metadata->currentDeltas.empty());
|
||||
state Version version = metadata->deltaFiles.back().first;
|
||||
|
||||
state Arena filenameArena;
|
||||
state BlobGranuleChunk chunk;
|
||||
|
||||
state Version snapshotVersion = metadata->snapshotFiles.back().first;
|
||||
chunk.snapshotFileName = StringRef(filenameArena, metadata->snapshotFiles.back().second);
|
||||
int deltaIdx = metadata->deltaFiles.size() - 1;
|
||||
while (deltaIdx >= 0 && metadata->deltaFiles[deltaIdx].first > snapshotVersion) {
|
||||
deltaIdx--;
|
||||
}
|
||||
deltaIdx++;
|
||||
while (deltaIdx < metadata->deltaFiles.size()) {
|
||||
chunk.deltaFileNames.push_back_deep(filenameArena, metadata->deltaFiles[deltaIdx].second);
|
||||
deltaIdx++;
|
||||
}
|
||||
|
||||
printf("Re-snapshotting [%s - %s) @ %lld\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
version);
|
||||
|
||||
printf(" SnapshotFile:\n %s\n", chunk.snapshotFileName.toString().c_str());
|
||||
printf(" DeltaFiles:\n");
|
||||
for (auto& df : chunk.deltaFileNames) {
|
||||
printf(" %s\n", df.toString().c_str());
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
state PromiseStream<RangeResult> rowsStream;
|
||||
state Future<std::string> snapshotWriter = writeSnapshot(bwData, keyRange, version, rowsStream);
|
||||
RangeResult newGranule = wait(readBlobGranule(chunk, keyRange, version, bwData->bstore, bwData->bucket));
|
||||
rowsStream.send(std::move(newGranule));
|
||||
rowsStream.sendError(end_of_stream());
|
||||
|
||||
std::string fname = wait(snapshotWriter);
|
||||
return std::pair(version, fname);
|
||||
} catch (Error& e) {
|
||||
// TODO better error handling eventually - should retry unless the error is because another worker took over
|
||||
// the range
|
||||
printf("Compacting snapshot from blob for [%s - %s) got error %s\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
e.name());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*ACTOR Future<std::pair<Version, std::string>> dumpSnapshotFromFDB(BlobWorkerData* bwData, KeyRange keyRange) {
|
||||
printf("Dumping snapshot from FDB for [%s - %s)\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str());
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(bwData->db);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
loop {
|
||||
state std::string fname = "";
|
||||
try {
|
||||
state Version readVersion = wait(tr->getReadVersion());
|
||||
fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(readVersion) + ".snapshot";
|
||||
|
||||
// TODO some sort of directory structure would be useful?
|
||||
state Arena arena;
|
||||
state GranuleSnapshot allRows;
|
||||
|
||||
// TODO would be fairly easy to change this to a promise stream, and optionally build from blobGranuleReader
|
||||
// instead
|
||||
state Key beginKey = keyRange.begin;
|
||||
loop {
|
||||
// TODO knob for limit?
|
||||
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, keyRange.end), 1000));
|
||||
arena.dependsOn(res.arena());
|
||||
allRows.append(arena, res.begin(), res.size());
|
||||
if (res.more) {
|
||||
beginKey = keyAfter(res.back().key);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
printf("Granule [%s - %s) read %d snapshot rows from fdb\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
allRows.size());
|
||||
if (allRows.size() < 10) {
|
||||
for (auto& row : allRows) {
|
||||
printf(" %s=%s\n", row.key.printable().c_str(), row.value.printable().c_str());
|
||||
}
|
||||
}
|
||||
// TODO REMOVE sanity check!
|
||||
|
||||
for (int i = 0; i < allRows.size() - 1; i++) {
|
||||
if (allRows[i].key >= allRows[i + 1].key) {
|
||||
printf("SORT ORDER VIOLATION IN SNAPSHOT FILE: %s, %s\n",
|
||||
allRows[i].key.printable().c_str(),
|
||||
allRows[i + 1].key.printable().c_str());
|
||||
}
|
||||
ASSERT(allRows[i].key < allRows[i + 1].key);
|
||||
}
|
||||
|
||||
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
|
||||
state Value serialized = ObjectWriter::toValue(allRows, Unversioned());
|
||||
|
||||
// write to s3 using multi part upload
|
||||
state Reference<AsyncFileS3BlobStoreWrite> objectFile =
|
||||
makeReference<AsyncFileS3BlobStoreWrite>(bwData->bstore, bwData->bucket, fname);
|
||||
wait(objectFile->write(serialized.begin(), serialized.size(), 0));
|
||||
wait(objectFile->sync());
|
||||
|
||||
// TODO could move this into separate txn to avoid the timeout, it'll need to be separate later anyway
|
||||
// object uploaded successfully, save it to system key space (TODO later - and memory file history)
|
||||
// TODO add conflict range for writes?
|
||||
Tuple snapshotFileKey;
|
||||
snapshotFileKey.append(keyRange.begin).append(keyRange.end);
|
||||
snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion);
|
||||
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleFileKeys.begin), fname);
|
||||
wait(tr->commit());
|
||||
printf("Granule [%s - %s) committed new snapshot file %s with %d bytes\n\n",
|
||||
keyRange.begin.printable().c_str(),
|
||||
keyRange.end.printable().c_str(),
|
||||
fname.c_str(),
|
||||
serialized.size());
|
||||
return std::pair<Version, std::string>(readVersion, fname);
|
||||
} catch (Error& e) {
|
||||
// TODO REMOVE
|
||||
printf("dump range txn got error %s\n", e.name());
|
||||
if (fname != "") {
|
||||
// TODO delete unsuccessfully written file
|
||||
bwData->bstore->deleteObject(bwData->bucket, fname);
|
||||
printf("deleting s3 object %s\n", fname.c_str());
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
ACTOR Future<std::pair<Key, Version>> createRangeFeed(BlobWorkerData* bwData, KeyRange keyRange) {
|
||||
state Key rangeFeedID = StringRef(deterministicRandom()->randomUniqueID().toString());
|
||||
state Transaction tr(bwData->db);
|
||||
|
@ -245,11 +425,10 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
|
|||
metadata->keyRange.end.printable().c_str(),
|
||||
rangeFeedData.second);
|
||||
|
||||
std::pair<Version, std::string> newSnapshotFile = wait(dumpSnapshotFromFDB(bwData, metadata->keyRange));
|
||||
std::pair<Version, std::string> newSnapshotFile = wait(dumpInitialSnapshotFromFDB(bwData, metadata->keyRange));
|
||||
ASSERT(rangeFeedData.second <= newSnapshotFile.first);
|
||||
metadata->snapshotFiles.push_back(newSnapshotFile);
|
||||
metadata->lastWriteVersion = newSnapshotFile.first;
|
||||
metadata->snapshotVersions.send(newSnapshotFile.first);
|
||||
rangeFeedFuture = bwData->db->getRangeFeedStream(
|
||||
rangeFeedStream, rangeFeedData.first, newSnapshotFile.first + 1, maxVersion, metadata->keyRange);
|
||||
|
||||
|
@ -303,15 +482,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(BlobWorkerData* bwData, Reference<Gran
|
|||
metadata->bytesInNewDeltaFiles);
|
||||
// FIXME: instead of just doing new snapshot, it should offer shard back to blob manager and get
|
||||
// reassigned
|
||||
// FIXME: this should read previous snapshot + delta files instead, unless it knows it's really
|
||||
// small or there was a huge clear or something
|
||||
std::pair<Version, std::string> newSnapshotFile =
|
||||
wait(dumpSnapshotFromFDB(bwData, metadata->keyRange));
|
||||
// TODO: this could read from FDB read previous snapshot + delta files instead if it knew there was
|
||||
// a large range clear at the end or it knew the granule was small, or something
|
||||
std::pair<Version, std::string> newSnapshotFile = wait(compactFromBlob(bwData, metadata->keyRange));
|
||||
|
||||
// add new snapshot file
|
||||
metadata->snapshotFiles.push_back(newSnapshotFile);
|
||||
metadata->lastWriteVersion = newSnapshotFile.first;
|
||||
metadata->snapshotVersions.send(newSnapshotFile.first);
|
||||
|
||||
// reset metadata
|
||||
metadata->bytesInNewDeltaFiles = 0;
|
||||
|
@ -436,7 +613,6 @@ static Reference<GranuleMetadata> constructNewBlobRange(BlobWorkerData* bwData,
|
|||
keyRange.end.printable().c_str());*/
|
||||
Reference<GranuleMetadata> newMetadata = makeReference<GranuleMetadata>();
|
||||
newMetadata->keyRange = keyRange;
|
||||
// newMetadata->rangeFeedFuture = fakeRangeFeed(newMetadata->rangeFeed, newMetadata->snapshotVersions, keyRange);
|
||||
newMetadata->fileUpdaterFuture = blobGranuleUpdateFiles(bwData, newMetadata);
|
||||
|
||||
return newMetadata;
|
||||
|
|
Loading…
Reference in New Issue