diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 6d74c2f67a..3fe97059b2 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -744,6 +744,13 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( LATENCY_SAMPLE_SIZE, 100000 ); init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 ); + // Blob granlues + init( BG_URL, "" ); + init( BG_BUCKET, "" ); + init( BG_SNAPSHOT_FILE_TARGET_BYTES, 10000000 ); + init( BG_DELTA_BYTES_BEFORE_COMPACT, BG_SNAPSHOT_FILE_TARGET_BYTES/2 ); + init( BG_DELTA_FILE_TARGET_BYTES, BG_DELTA_BYTES_BEFORE_COMPACT/10 ); + // clang-format on if (clientKnobs) { diff --git a/fdbclient/ServerKnobs.h b/fdbclient/ServerKnobs.h index 79600d49bb..2c344567f5 100644 --- a/fdbclient/ServerKnobs.h +++ b/fdbclient/ServerKnobs.h @@ -692,6 +692,15 @@ public: int LATENCY_SAMPLE_SIZE; double LATENCY_METRICS_LOGGING_INTERVAL; + // blob granule stuff + // TODO better place to put this or wire up blob config? + + std::string BG_URL; + std::string BG_BUCKET; + int BG_SNAPSHOT_FILE_TARGET_BYTES; + int BG_DELTA_FILE_TARGET_BYTES; + int BG_DELTA_BYTES_BEFORE_COMPACT; + ServerKnobs(Randomize, ClientKnobs*, IsSimulated); void initialize(Randomize, ClientKnobs*, IsSimulated); }; diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 83afbc0053..7cace4ca21 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1039,6 +1039,11 @@ const KeyRangeRef configClassKeys("\xff\xff/configClasses/"_sr, "\xff\xff/config const KeyRef blobRangeChangeKey = LiteralStringRef("\xff\x02/blobRangeChange"); const KeyRangeRef blobRangeKeys(LiteralStringRef("\xff/blobRange/"), LiteralStringRef("\xff/blobRange0")); +// TODO need something for tracking "current" granules + +// blob range file data +const KeyRangeRef blobGranuleKeys(LiteralStringRef("\xff/blobGranules/"), LiteralStringRef("\xff/blobGranules0")); + // for tests void testSSISerdes(StorageServerInterface const& ssi, bool useFB) { printf("ssi=\nid=%s\nlocality=%s\nisTss=%s\ntssId=%s\naddress=%s\ngetValue=%s\n\n\n", diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index e14373cdbe..883797990f 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -505,6 +505,11 @@ extern const KeyRangeRef configClassKeys; extern const KeyRef blobRangeChangeKey; extern const KeyRangeRef blobRangeKeys; +// blob granule keys + +// \xff/blobGranule/(startKey, endKey, {snapshot,delta}, version) = [[filename]] +extern const KeyRangeRef blobGranuleKeys; + #pragma clang diagnostic pop #endif diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index bf1daa192e..d42d2ae222 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -24,6 +24,9 @@ #include "fdbrpc/Smoother.h" #include "fdbrpc/simulator.h" #include "fdbclient/ReadYourWrites.h" +#include "fdbclient/Tuple.h" // TODO REMOVE +#include "fdbclient/S3BlobStore.h" // TODO REMOVE +#include "fdbclient/AsyncFileS3BlobStore.actor.h" // TODO REMOVE #include "fdbclient/TagThrottle.h" #include "fdbserver/Knobs.h" #include "fdbserver/DataDistribution.actor.h" @@ -1408,8 +1411,144 @@ ACTOR Future configurationMonitor(RatekeeperData* self) { } } +// TODO MOVE +// TODO is there a "proper" way to pick file identifiers? +struct GranuleSnapshot : VectorRef { + + constexpr static FileIdentifier file_identifier = 4268040; + + template + void serialize(Ar& ar) { + serializer(ar, ((VectorRef&)*this)); + } +}; + +/* +struct MutationRefAndVersion { + constexpr static FileIdentifier file_identifier = 4268041; + MutationRef m; + Version v; + + template + void serialize(Ar& ar) { + serializer(ar, m, v); + } +}; + +struct GranuleDeltas : VectorRef { + constexpr static FileIdentifier file_identifier = 4268042; + + template + void serialize(Ar& ar) { + serializer(ar, ((VectorRef&)*this)); + } +}; +*/ + +// TODO might need to use IBackupFile instead of blob store interface to support non-s3 things like azure +ACTOR Future blobWorker(RatekeeperData* self, + Reference bstore, + std::string bucket, + Key startKey, + Key endKey) { + printf("Blob worker starting for [%s - %s) for bucket %s\n", + startKey.printable().c_str(), + endKey.printable().c_str(), + bucket.c_str()); + state Tuple keyRangeTuple; + keyRangeTuple.append(startKey).append(endKey); + + // just periodically dump range to file for now + loop { + wait(delay(60.0)); + printf("Blob worker dumping range [%s - %s)\n", startKey.printable().c_str(), endKey.printable().c_str()); + + state Reference tr = makeReference(self->db); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + loop { + state std::string fname = ""; + try { + state Version readVersion = wait(tr->getReadVersion()); + fname = deterministicRandom()->randomUniqueID().toString() + "_T" + + std::to_string((uint64_t)(1000.0 * now())) + "_V" + std::to_string(readVersion) + ".snapshot"; + + // TODO some sort of directory structure would be useful? + state Arena arena; + state GranuleSnapshot allRows; + + state Key beginKey = startKey; + loop { + // TODO knob for limit? + RangeResult res = wait(tr->getRange(KeyRangeRef(beginKey, endKey), 1000)); + printf("blob worker read %d%s rows\n", res.size(), res.more ? "+" : ""); + arena.dependsOn(res.arena()); + allRows.append(arena, res.begin(), res.size()); + if (res.more) { + beginKey = keyAfter(res.back().key); + } else { + break; + } + } + + printf("Blob worker read %d rows from fdb\n", allRows.size()); + // TODO is this easy to read as a flatbuffer from reader? Need to be sure about this data format + Value serialized = ObjectWriter::toValue(allRows, Unversioned()); + + printf("blob worker writing %d bytes\n", serialized.size()); + + // write to s3 using multi part upload + state Reference objectFile = + makeReference(bstore, bucket, fname); + wait(objectFile->write(serialized.begin(), serialized.size(), 0)); + wait(objectFile->sync()); + + printf("blob worker wrote to s3 file %s\n", fname.c_str()); + + // TODO could move this into separate txn to avoid the timeout, it'll need to be separate later anyway + // object uploaded successfully, save it to system key space (TODO later - and memory file history) + // TODO add conflict range for writes? + Tuple snapshotFileKey = keyRangeTuple; + snapshotFileKey.append(LiteralStringRef("snapshot")).append(readVersion); + tr->set(snapshotFileKey.getDataAsStandalone().withPrefix(blobGranuleKeys.begin), fname); + wait(tr->commit()); + printf("Blob worker committed new snapshot file for range\n"); + break; + } catch (Error& e) { + // TODO REMOVE + printf("dump range txn got error %s\n", e.name()); + if (fname != "") { + // TODO delete unsuccessfully written file + bstore->deleteObject(bucket, fname); + printf("deleting s3 object %s\n", fname.c_str()); + } + wait(tr->onError(e)); + } + } + } +} + // TODO MOVE ELSEWHERE ACTOR Future blobManagerPoc(RatekeeperData* self) { + state Future currentWorker; + state Reference bstore; + state std::string bucket = SERVER_KNOBS->BG_BUCKET; + + printf("Initializing blob manager s3 stuff\n"); + try { + printf("constructing s3blobstoreendpoint from %s\n", SERVER_KNOBS->BG_URL.c_str()); + bstore = S3BlobStoreEndpoint::fromString(SERVER_KNOBS->BG_URL); + printf("checking if bucket %s exists\n", bucket.c_str()); + bool bExists = wait(bstore->bucketExists(bucket)); + if (!bExists) { + printf("Bucket %s does not exist!\n", bucket.c_str()); + return Void(); + } + } catch (Error& e) { + printf("Blob manager got s3 init error %s\n", e.name()); + return Void(); + } + loop { state Reference tr = makeReference(self->db); @@ -1433,6 +1572,7 @@ ACTOR Future blobManagerPoc(RatekeeperData* self) { results[i].key.printable().c_str(), results[i + 1].key.printable().c_str()); foundRange = true; + currentWorker = blobWorker(self, bstore, bucket, results[i].key, results[i + 1].key); } } @@ -1470,7 +1610,14 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceTAG_MEASUREMENT_INTERVAL)); // TODO MOVE eventually - self.addActor.send(blobManagerPoc(&self)); + if (SERVER_KNOBS->BG_URL == "" || SERVER_KNOBS->BG_BUCKET == "") { + printf("not starting blob manager poc, no url/bucket configured\n"); + } else { + printf("Starting blob manager with url=%s and bucket=%s\n", + SERVER_KNOBS->BG_URL.c_str(), + SERVER_KNOBS->BG_BUCKET.c_str()); + self.addActor.send(blobManagerPoc(&self)); + } TraceEvent("RkTLogQueueSizeParameters", rkInterf.id()) .detail("Target", SERVER_KNOBS->TARGET_BYTES_PER_TLOG)