POC worker that dumps fdb range to an s3 object
This commit is contained in:
parent
a931f70cfb
commit
e83b27387a
|
@ -744,6 +744,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( LATENCY_SAMPLE_SIZE, 100000 );
|
||||
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
|
||||
|
||||
// Blob granlues
|
||||
init( BG_URL, "" );
|
||||
init( BG_BUCKET, "" );
|
||||
init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 );
|
||||
init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 );
|
||||
init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 );
|
||||
|
||||
// clang-format on
|
||||
|
||||
if (clientKnobs) {
|
||||
|
|
|
@ -692,6 +692,15 @@ public:
|
|||
int LATENCY_SAMPLE_SIZE;
|
||||
double LATENCY_METRICS_LOGGING_INTERVAL;
|
||||
|
||||
// blob granule stuff
|
||||
// TODO better place to put this or wire up blob config?
|
||||
|
||||
std::string BG_URL;
|
||||
std::string BG_BUCKET;
|
||||
int BG_SNAPSHOT_FILE_TARGET_BYTES;
|
||||
int BG_DELTA_FILE_TARGET_BYTES;
|
||||
int BG_DELTA_BYTES_BEFORE_COMPACT;
|
||||
|
||||
ServerKnobs(Randomize, ClientKnobs*, IsSimulated);
|
||||
void initialize(Randomize, ClientKnobs*, IsSimulated);
|
||||
};
|
||||
|
|
|
@ -1039,6 +1039,11 @@ const KeyRangeRef configClassKeys("\xff\xff/configClasses/"_sr, "\xff\xff/config
|
|||
const KeyRef blobRangeChangeKey = LiteralStringRef("\xff\x02/blobRangeChange");
|
||||
const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff/blobRange/"), LiteralStringRef("\xff/blobRange0"));
|
||||
|
||||
// TODO need something for tracking "current" granules
|
||||
|
||||
// blob range file data
|
||||
const KeyRangeRef blobGranuleKeys(LiteralStringRef("\xff/blobGranules/"), LiteralStringRef("\xff/blobGranules0"));
|
||||
|
||||
// for tests
|
||||
void testSSISerdes(StorageServerInterface const& ssi, bool useFB) {
|
||||
printf("ssi=\nid=%s\nlocality=%s\nisTss=%s\ntssId=%s\naddress=%s\ngetValue=%s\n\n\n",
|
||||
|
|
|
@ -505,6 +505,11 @@ extern const KeyRangeRef configClassKeys;
|
|||
extern const KeyRef blobRangeChangeKey;
|
||||
extern const KeyRangeRef blobRangeKeys;
|
||||
|
||||
// blob granule keys
|
||||
|
||||
// \xff/blobGranule/(startKey, endKey, {snapshot,delta}, version) = [[filename]]
|
||||
extern const KeyRangeRef blobGranuleKeys;
|
||||
|
||||
#pragma clang diagnostic pop
|
||||
|
||||
#endif
|
||||
|
|
|
@ -24,6 +24,9 @@
|
|||
#include "fdbrpc/Smoother.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "fdbclient/Tuple.h" // TODO REMOVE
|
||||
#include "fdbclient/S3BlobStore.h" // TODO REMOVE
|
||||
#include "fdbclient/AsyncFileS3BlobStore.actor.h" // TODO REMOVE
|
||||
#include "fdbclient/TagThrottle.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
|
@ -1408,8 +1411,144 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO MOVE
|
||||
// TODO is there a "proper" way to pick file identifiers?
|
||||
struct GranuleSnapshot : VectorRef<KeyValueRef> {
|
||||
|
||||
constexpr static FileIdentifier file_identifier = 4268040;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ((VectorRef<KeyValueRef>&)*this));
|
||||
}
|
||||
};
|
||||
|
||||
/*
|
||||
struct MutationRefAndVersion {
|
||||
constexpr static FileIdentifier file_identifier = 4268041;
|
||||
MutationRef m;
|
||||
Version v;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, m, v);
|
||||
}
|
||||
};
|
||||
|
||||
struct GranuleDeltas : VectorRef<MutationRefAndVersion> {
|
||||
constexpr static FileIdentifier file_identifier = 4268042;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, ((VectorRef<MutationRefAndVersion>&)*this));
|
||||
}
|
||||
};
|
||||
*/
|
||||
|
||||
// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure
|
||||
ACTOR Future<Void> blobWorker(RatekeeperData* self,
|
||||
Reference<S3BlobStoreEndpoint> bstore,
|
||||
std::string bucket,
|
||||
Key startKey,
|
||||
Key endKey) {
|
||||
printf("Blob worker starting for [%s - %s) for bucket %s\n",
|
||||
startKey.printable().c_str(),
|
||||
endKey.printable().c_str(),
|
||||
bucket.c_str());
|
||||
state Tuple keyRangeTuple;
|
||||
keyRangeTuple.append(startKey).append(endKey);
|
||||
|
||||
// just periodically dump range to file for now
|
||||
loop {
|
||||
wait(delay(60.0));
|
||||
printf("Blob worker dumping range [%s - %s)\n", startKey.printable().c_str(), endKey.printable().c_str());
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
loop {
|
||||
state std::string fname = "";
|
||||
try {
|
||||
state Version readVersion = wait(tr->getReadVersion());
|
||||
fname = deterministicRandom()->randomUniqueID().toString() + "_T" +
|
||||
std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(readVersion) + ".snapshot";
|
||||
|
||||
// TODO some sort of directory structure would be useful?
|
||||
state Arena arena;
|
||||
state GranuleSnapshot allRows;
|
||||
|
||||
state Key beginKey = startKey;
|
||||
loop {
|
||||
// TODO knob for limit?
|
||||
RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, endKey), 1000));
|
||||
printf("blob worker read %d%s rows\n", res.size(), res.more ? "+" : "");
|
||||
arena.dependsOn(res.arena());
|
||||
allRows.append(arena, res.begin(), res.size());
|
||||
if (res.more) {
|
||||
beginKey = keyAfter(res.back().key);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
printf("Blob worker read %d rows from fdb\n", allRows.size());
|
||||
// TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format
|
||||
Value serialized = ObjectWriter::toValue(allRows, Unversioned());
|
||||
|
||||
printf("blob worker writing %d bytes\n", serialized.size());
|
||||
|
||||
// write to s3 using multi part upload
|
||||
state Reference<AsyncFileS3BlobStoreWrite> objectFile =
|
||||
makeReference<AsyncFileS3BlobStoreWrite>(bstore, bucket, fname);
|
||||
wait(objectFile->write(serialized.begin(), serialized.size(), 0));
|
||||
wait(objectFile->sync());
|
||||
|
||||
printf("blob worker wrote to s3 file %s\n", fname.c_str());
|
||||
|
||||
// TODO could move this into separate txn to avoid the timeout, it'll need to be separate later anyway
|
||||
// object uploaded successfully, save it to system key space (TODO later - and memory file history)
|
||||
// TODO add conflict range for writes?
|
||||
Tuple snapshotFileKey = keyRangeTuple;
|
||||
snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion);
|
||||
tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname);
|
||||
wait(tr->commit());
|
||||
printf("Blob worker committed new snapshot file for range\n");
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
// TODO REMOVE
|
||||
printf("dump range txn got error %s\n", e.name());
|
||||
if (fname != "") {
|
||||
// TODO delete unsuccessfully written file
|
||||
bstore->deleteObject(bucket, fname);
|
||||
printf("deleting s3 object %s\n", fname.c_str());
|
||||
}
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO MOVE ELSEWHERE
|
||||
ACTOR Future<Void> blobManagerPoc(RatekeeperData* self) {
|
||||
state Future<Void> currentWorker;
|
||||
state Reference<S3BlobStoreEndpoint> bstore;
|
||||
state std::string bucket = SERVER_KNOBS->BG_BUCKET;
|
||||
|
||||
printf("Initializing blob manager s3 stuff\n");
|
||||
try {
|
||||
printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str());
|
||||
bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL);
|
||||
printf("checking if bucket %s exists\n", bucket.c_str());
|
||||
bool bExists = wait(bstore->bucketExists(bucket));
|
||||
if (!bExists) {
|
||||
printf("Bucket %s does not exist!\n", bucket.c_str());
|
||||
return Void();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
printf("Blob manager got s3 init error %s\n", e.name());
|
||||
return Void();
|
||||
}
|
||||
|
||||
loop {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->db);
|
||||
|
||||
|
@ -1433,6 +1572,7 @@ ACTOR Future<Void> blobManagerPoc(RatekeeperData* self) {
|
|||
results[i].key.printable().c_str(),
|
||||
results[i + 1].key.printable().c_str());
|
||||
foundRange = true;
|
||||
currentWorker = blobWorker(self, bstore, bucket, results[i].key, results[i + 1].key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1470,7 +1610,14 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
recurring([selfPtr]() { refreshStorageServerCommitCost(selfPtr); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
|
||||
|
||||
// TODO MOVE eventually
|
||||
self.addActor.send(blobManagerPoc(&self));
|
||||
if (SERVER_KNOBS->BG_URL == "" || SERVER_KNOBS->BG_BUCKET == "") {
|
||||
printf("not starting blob manager poc, no url/bucket configured\n");
|
||||
} else {
|
||||
printf("Starting blob manager with url=%s and bucket=%s\n",
|
||||
SERVER_KNOBS->BG_URL.c_str(),
|
||||
SERVER_KNOBS->BG_BUCKET.c_str());
|
||||
self.addActor.send(blobManagerPoc(&self));
|
||||
}
|
||||
|
||||
TraceEvent("RkTLogQueueSizeParameters", rkInterf.id())
|
||||
.detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)
|
||||
|
|
Loading…
Reference in New Issue