Blob integration (#6808)

* Fixing leaked stream with explicit notify failed before destructor

* better logic to prevent races in change feed fetching

* Found new race that makes assert incorrect

* handle server overloaded in initial read from fdb

* Handling more blob error types in granule retry

* Fixing rollback metadata problem, added better debugging

* Fixing version race when fetching change feed metadata

* Better racing split request handling

* fixing assert

* Handle change feed popped check in the blob worker

* fix: do not use a RYW transaction for a versionstamp because of randomize API version (#6768)

* more merge conflict issues

* Change feed destroy fixes

* Fixing change feed destroy and move race

* Check error condition in BG file req

* Using relative endpoints for blob worker interface

* Fixing bug in previous fix

* More destroy and move race fixes

* Don't update empty version on destroy in case it gets rolled back. moved() and removing will take care of ensuring it is not read

* Bug fix (#6796)

* fix: do not use a RYW transaction for a versionstamp because of randomize API version

* fix: if the initialSnapshotVersion was pruned, granule history was incorrect

* added a way to compress null bytes in printable()

* Fixing durability issue with moving and destroying change feeds

* Adding fix for not fully deleting files for a granule that child granules need to re-snapshot

* More destroy and move races

* Fixing change feed destroy and pop races

* Renaming bg prune to purge, and adding a C api and unit test for it

* more cleanup

* review comments

* Observability for granule purging

* better handling for change feed not registered

* Fixed purging bugs (#6815)

* fix: do not use a RYW transaction for a versionstamp because of randomize API version

* fix: if the initialSnapshotVersion was pruned, granule history was incorrect

* added a way to compress null bytes in printable()

* fixed a few purging bugs

Co-authored-by: Evan Tschannen <evan.tschannen@snowflake.com>
This commit is contained in:
Josh Slocum 2022-04-08 16:15:25 -05:00 committed by GitHub
parent 37054af7e2
commit 6276cebad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 966 additions and 340 deletions

View File

@ -466,6 +466,27 @@ extern "C" DLLEXPORT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db
}).extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_purge_blob_granules(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force) {
return (FDBFuture*)(DB(db)
->purgeBlobGranules(KeyRangeRef(StringRef(begin_key_name, begin_key_name_length),
StringRef(end_key_name, end_key_name_length)),
purge_version,
force)
.extractPtr());
}
extern "C" DLLEXPORT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length) {
return (
FDBFuture*)(DB(db)->waitPurgeGranulesComplete(StringRef(purge_key_name, purge_key_name_length)).extractPtr());
}
extern "C" DLLEXPORT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant, FDBTransaction** out_transaction) {
CATCH_AND_RETURN(*out_transaction = (FDBTransaction*)TENANT(tenant)->createTransaction().extractPtr(););
}

View File

@ -299,6 +299,18 @@ DLLEXPORT WARN_UNUSED_RESULT double fdb_database_get_main_thread_busyness(FDBDat
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_get_server_protocol(FDBDatabase* db, uint64_t expected_version);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_purge_blob_granules(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_database_wait_purge_granules_complete(FDBDatabase* db,
uint8_t const* purge_key_name,
int purge_key_name_length);
DLLEXPORT WARN_UNUSED_RESULT fdb_error_t fdb_tenant_create_transaction(FDBTenant* tenant,
FDBTransaction** out_transaction);

View File

@ -130,6 +130,25 @@ EmptyFuture Database::create_snapshot(FDBDatabase* db,
return EmptyFuture(fdb_database_create_snapshot(db, uid, uid_length, snap_command, snap_command_length));
}
KeyFuture Database::purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force) {
return KeyFuture(fdb_database_purge_blob_granules(db,
(const uint8_t*)begin_key.data(),
begin_key.size(),
(const uint8_t*)end_key.data(),
end_key.size(),
purge_version,
force));
}
EmptyFuture Database::wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key) {
return EmptyFuture(
fdb_database_wait_purge_granules_complete(db, (const uint8_t*)purge_key.data(), purge_key.size()));
}
// Tenant
Tenant::Tenant(FDBDatabase* db, const uint8_t* name, int name_length) {
if (fdb_error_t err = fdb_database_open_tenant(db, name, name_length, &tenant)) {

View File

@ -97,6 +97,7 @@ public:
private:
friend class Transaction;
friend class Database;
KeyFuture(FDBFuture* f) : Future(f) {}
};
@ -201,6 +202,14 @@ public:
int uid_length,
const uint8_t* snap_command,
int snap_command_length);
static KeyFuture purge_blob_granules(FDBDatabase* db,
std::string_view begin_key,
std::string_view end_key,
int64_t purge_version,
fdb_bool_t force);
static EmptyFuture wait_purge_granules_complete(FDBDatabase* db, std::string_view purge_key);
};
class Tenant final {

View File

@ -2592,7 +2592,6 @@ TEST_CASE("Blob Granule Functions") {
}
// write some data
insert_data(db, create_data({ { "bg1", "a" }, { "bg2", "b" }, { "bg3", "c" } }));
// because wiring up files is non-trivial, just test the calls complete with the expected no_materialize error
@ -2709,6 +2708,42 @@ TEST_CASE("Blob Granule Functions") {
tr.reset();
break;
}
// do a purge + wait at that version to purge everything before originalReadVersion
fdb::KeyFuture purgeKeyFuture =
fdb::Database::purge_blob_granules(db, key("bg"), key("bh"), originalReadVersion, false);
fdb_check(wait_future(purgeKeyFuture));
const uint8_t* purgeKeyData;
int purgeKeyLen;
fdb_check(purgeKeyFuture.get(&purgeKeyData, &purgeKeyLen));
std::string purgeKey((const char*)purgeKeyData, purgeKeyLen);
fdb::EmptyFuture waitPurgeFuture = fdb::Database::wait_purge_granules_complete(db, purgeKey);
fdb_check(wait_future(waitPurgeFuture));
// re-read again at the purge version to make sure it is still valid
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), 0, originalReadVersion, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
}
int main(int argc, char** argv) {

View File

@ -44,7 +44,18 @@ struct BlobWorkerInterface {
BlobWorkerInterface() {}
explicit BlobWorkerInterface(const struct LocalityData& l, UID id) : locality(l), myId(id) {}
void initEndpoints() {}
void initEndpoints() {
// TODO: specify endpoint priorities?
std::vector<std::pair<FlowReceiver*, TaskPriority>> streams;
streams.push_back(waitFailure.getReceiver());
streams.push_back(blobGranuleFileRequest.getReceiver());
streams.push_back(assignBlobRangeRequest.getReceiver());
streams.push_back(revokeBlobRangeRequest.getReceiver());
streams.push_back(granuleAssignmentsRequest.getReceiver());
streams.push_back(granuleStatusStreamRequest.getReceiver());
streams.push_back(haltBlobWorker.getReceiver());
FlowTransport::transport().addEndpoints(streams);
}
UID id() const { return myId; }
NetworkAddress address() const { return blobGranuleFileRequest.getEndpoint().getPrimaryAddress(); }
NetworkAddress stableAddress() const { return blobGranuleFileRequest.getEndpoint().getStableAddress(); }
@ -54,16 +65,22 @@ struct BlobWorkerInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar,
waitFailure,
blobGranuleFileRequest,
assignBlobRangeRequest,
revokeBlobRangeRequest,
granuleAssignmentsRequest,
granuleStatusStreamRequest,
haltBlobWorker,
locality,
myId);
// use adjusted endpoints
serializer(ar, myId, locality, waitFailure);
if (Archive::isDeserializing) {
blobGranuleFileRequest =
RequestStream<struct BlobGranuleFileRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(1));
assignBlobRangeRequest =
RequestStream<struct AssignBlobRangeRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(2));
revokeBlobRangeRequest =
RequestStream<struct RevokeBlobRangeRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(3));
granuleAssignmentsRequest =
RequestStream<struct GetGranuleAssignmentsRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(4));
granuleStatusStreamRequest =
RequestStream<struct GranuleStatusStreamRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(5));
haltBlobWorker =
RequestStream<struct HaltBlobWorkerRequest>(waitFailure.getEndpoint().getAdjustedEndpoint(6));
}
}
};

View File

@ -372,6 +372,9 @@ public:
Future<std::vector<OverlappingChangeFeedEntry>> getOverlappingChangeFeeds(KeyRangeRef ranges, Version minVersion);
Future<Void> popChangeFeedMutations(Key rangeID, Version version);
Future<Key> purgeBlobGranules(KeyRange keyRange, Version purgeVersion, bool force = false);
Future<Void> waitPurgeGranulesComplete(Key purgeKey);
// private:
explicit DatabaseContext(Reference<AsyncVar<Reference<IClusterConnectionRecord>>> connectionRecord,
Reference<AsyncVar<ClientDBInfo>> clientDBInfo,

View File

@ -152,6 +152,11 @@ public:
// Management API, create snapshot
virtual ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) = 0;
// purge blob granules api. purgeBlobGranules is asynchronus, calling waitPurgeGranulesComplete after guarantees
// completion.
virtual ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) = 0;
virtual ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) = 0;
// Interface to manage shared state across multiple connections to the same Database
virtual ThreadFuture<DatabaseSharedState*> createSharedState() = 0;
virtual void setSharedState(DatabaseSharedState* p) = 0;

View File

@ -516,6 +516,38 @@ ThreadFuture<ProtocolVersion> DLDatabase::getServerProtocol(Optional<ProtocolVer
});
}
ThreadFuture<Key> DLDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
if (!api->purgeBlobGranules) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->purgeBlobGranules(db,
keyRange.begin.begin(),
keyRange.begin.size(),
keyRange.end.begin(),
keyRange.end.size(),
purgeVersion,
force);
return toThreadFuture<Key>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) {
const uint8_t* key;
int keyLength;
FdbCApi::fdb_error_t error = api->futureGetKey(f, &key, &keyLength);
ASSERT(!error);
// The memory for this is stored in the FDBFuture and is released when the future gets destroyed
return Key(KeyRef(key, keyLength), Arena());
});
}
ThreadFuture<Void> DLDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
if (!api->waitPurgeGranulesComplete) {
return unsupported_operation();
}
FdbCApi::FDBFuture* f = api->waitPurgeGranulesComplete(db, purgeKey.begin(), purgeKey.size());
return toThreadFuture<Void>(api, f, [](FdbCApi::FDBFuture* f, FdbCApi* api) { return Void(); });
}
// DLApi
// Loads the specified function from a dynamic library
@ -590,6 +622,15 @@ void DLApi::init() {
loadClientFunction(
&api->databaseCreateSnapshot, lib, fdbCPath, "fdb_database_create_snapshot", headerVersion >= 700);
loadClientFunction(
&api->purgeBlobGranules, lib, fdbCPath, "fdb_database_purge_blob_granules", headerVersion >= 710);
loadClientFunction(&api->waitPurgeGranulesComplete,
lib,
fdbCPath,
"fdb_database_wait_purge_granules_complete",
headerVersion >= 710);
loadClientFunction(
&api->tenantCreateTransaction, lib, fdbCPath, "fdb_tenant_create_transaction", headerVersion >= 710);
loadClientFunction(&api->tenantDestroy, lib, fdbCPath, "fdb_tenant_destroy", headerVersion >= 710);
@ -1442,6 +1483,17 @@ double MultiVersionDatabase::getMainThreadBusyness() {
return localClientBusyness;
}
ThreadFuture<Key> MultiVersionDatabase::purgeBlobGranules(const KeyRangeRef& keyRange,
Version purgeVersion,
bool force) {
auto f = dbState->db ? dbState->db->purgeBlobGranules(keyRange, purgeVersion, force) : ThreadFuture<Key>(Never());
return abortableFuture(f, dbState->dbVar->get().onChange);
}
ThreadFuture<Void> MultiVersionDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
auto f = dbState->db ? dbState->db->waitPurgeGranulesComplete(purgeKey) : ThreadFuture<Void>(Never());
return abortableFuture(f, dbState->dbVar->get().onChange);
}
// Returns the protocol version reported by the coordinator this client is connected to
// If an expected version is given, the future won't return until the protocol version is different than expected
// Note: this will never return if the server is running a protocol from FDB 5.0 or older

View File

@ -156,6 +156,16 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
double (*databaseGetMainThreadBusyness)(FDBDatabase* database);
FDBFuture* (*databaseGetServerProtocol)(FDBDatabase* database, uint64_t expectedVersion);
FDBFuture* (*purgeBlobGranules)(FDBDatabase* db,
uint8_t const* begin_key_name,
int begin_key_name_length,
uint8_t const* end_key_name,
int end_key_name_length,
int64_t purge_version,
fdb_bool_t force);
FDBFuture* (*waitPurgeGranulesComplete)(FDBDatabase* db, uint8_t const* purge_key_name, int purge_key_name_length);
// Tenant
fdb_error_t (*tenantCreateTransaction)(FDBTenant* tenant, FDBTransaction** outTransaction);
void (*tenantDestroy)(FDBTenant* tenant);
@ -438,6 +448,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;
@ -716,6 +729,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;

View File

@ -9184,3 +9184,86 @@ Future<Void> DatabaseContext::popChangeFeedMutations(Key rangeID, Version versio
Reference<DatabaseContext::TransactionT> DatabaseContext::createTransaction() {
return makeReference<ReadYourWritesTransaction>(Database(Reference<DatabaseContext>::addRef(this)));
}
ACTOR Future<Key> purgeBlobGranulesActor(Reference<DatabaseContext> db,
KeyRange range,
Version purgeVersion,
bool force) {
state Database cx(db);
state Transaction tr(cx);
state Key purgeKey;
// FIXME: implement force
if (!force) {
throw unsupported_operation();
}
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Value purgeValue = blobGranulePurgeValueFor(purgeVersion, range, force);
tr.atomicOp(
addVersionStampAtEnd(blobGranulePurgeKeys.begin), purgeValue, MutationRef::SetVersionstampedKey);
tr.set(blobGranulePurgeChangeKey, deterministicRandom()->randomUniqueID().toString());
state Future<Standalone<StringRef>> fTrVs = tr.getVersionstamp();
wait(tr.commit());
Standalone<StringRef> vs = wait(fTrVs);
purgeKey = blobGranulePurgeKeys.begin.withSuffix(vs);
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for range [{0} - {1}) at version {2} registered {3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
purgeKey.printable());
}
break;
} catch (Error& e) {
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for range [{0} - {1}) at version {2} encountered error {3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
e.name());
}
wait(tr.onError(e));
}
}
return purgeKey;
}
Future<Key> DatabaseContext::purgeBlobGranules(KeyRange range, Version purgeVersion, bool force) {
return purgeBlobGranulesActor(Reference<DatabaseContext>::addRef(this), range, purgeVersion, force);
}
ACTOR Future<Void> waitPurgeGranulesCompleteActor(Reference<DatabaseContext> db, Key purgeKey) {
state Database cx(db);
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> purgeVal = wait(tr->get(purgeKey));
if (!purgeVal.present()) {
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for {0} succeeded\n", purgeKey.printable());
}
return Void();
}
if (BG_REQUEST_DEBUG) {
fmt::print("purgeBlobGranules for {0} watching\n", purgeKey.printable());
}
state Future<Void> watchFuture = tr->watch(purgeKey);
wait(tr->commit());
wait(watchFuture);
tr->reset();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
Future<Void> DatabaseContext::waitPurgeGranulesComplete(Key purgeKey) {
return waitPurgeGranulesCompleteActor(Reference<DatabaseContext>::addRef(this), purgeKey);
}

View File

@ -1156,9 +1156,9 @@ const KeyRangeRef blobGranuleMappingKeys(LiteralStringRef("\xff\x02/bgm/"), Lite
const KeyRangeRef blobGranuleLockKeys(LiteralStringRef("\xff\x02/bgl/"), LiteralStringRef("\xff\x02/bgl0"));
const KeyRangeRef blobGranuleSplitKeys(LiteralStringRef("\xff\x02/bgs/"), LiteralStringRef("\xff\x02/bgs0"));
const KeyRangeRef blobGranuleHistoryKeys(LiteralStringRef("\xff\x02/bgh/"), LiteralStringRef("\xff\x02/bgh0"));
const KeyRangeRef blobGranulePruneKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
const KeyRangeRef blobGranulePurgeKeys(LiteralStringRef("\xff\x02/bgp/"), LiteralStringRef("\xff\x02/bgp0"));
const KeyRangeRef blobGranuleVersionKeys(LiteralStringRef("\xff\x02/bgv/"), LiteralStringRef("\xff\x02/bgv0"));
const KeyRef blobGranulePruneChangeKey = LiteralStringRef("\xff\x02/bgpChange");
const KeyRef blobGranulePurgeChangeKey = LiteralStringRef("\xff\x02/bgpChange");
const uint8_t BG_FILE_TYPE_DELTA = 'D';
const uint8_t BG_FILE_TYPE_SNAPSHOT = 'S';
@ -1215,7 +1215,7 @@ std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFi
return std::tuple(filename, offset, length, fullFileLength);
}
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force) {
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withBlobGranule()));
wr << version;
wr << range;
@ -1223,7 +1223,7 @@ const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force
return wr.toValue();
}
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value) {
std::tuple<Version, KeyRange, bool> decodeBlobGranulePurgeValue(ValueRef const& value) {
Version version;
KeyRange range;
bool force;

View File

@ -569,9 +569,9 @@ extern const KeyRangeRef blobGranuleSplitKeys;
extern const KeyRangeRef blobGranuleHistoryKeys;
// \xff\x02/bgp/(start,end) = (version, force)
extern const KeyRangeRef blobGranulePruneKeys;
extern const KeyRangeRef blobGranulePurgeKeys;
extern const KeyRangeRef blobGranuleVersionKeys;
extern const KeyRef blobGranulePruneChangeKey;
extern const KeyRef blobGranulePurgeChangeKey;
const Key blobGranuleFileKeyFor(UID granuleID, Version fileVersion, uint8_t fileType);
std::tuple<UID, Version, uint8_t> decodeBlobGranuleFileKey(KeyRef const& key);
@ -580,8 +580,8 @@ const KeyRange blobGranuleFileKeyRangeFor(UID granuleID);
const Value blobGranuleFileValueFor(StringRef const& filename, int64_t offset, int64_t length, int64_t fullFileLength);
std::tuple<Standalone<StringRef>, int64_t, int64_t, int64_t> decodeBlobGranuleFileValue(ValueRef const& value);
const Value blobGranulePruneValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePruneValue(ValueRef const& value);
const Value blobGranulePurgeValueFor(Version version, KeyRange range, bool force);
std::tuple<Version, KeyRange, bool> decodeBlobGranulePurgeValue(ValueRef const& value);
const Value blobGranuleMappingValueFor(UID const& workerID);
UID decodeBlobGranuleMappingValue(ValueRef const& value);

View File

@ -127,6 +127,20 @@ ThreadFuture<ProtocolVersion> ThreadSafeDatabase::getServerProtocol(Optional<Pro
[db, expectedVersion]() -> Future<ProtocolVersion> { return db->getClusterProtocol(expectedVersion); });
}
ThreadFuture<Key> ThreadSafeDatabase::purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) {
DatabaseContext* db = this->db;
KeyRange range = keyRange;
return onMainThread([db, range, purgeVersion, force]() -> Future<Key> {
return db->purgeBlobGranules(range, purgeVersion, force);
});
}
ThreadFuture<Void> ThreadSafeDatabase::waitPurgeGranulesComplete(const KeyRef& purgeKey) {
DatabaseContext* db = this->db;
Key key = purgeKey;
return onMainThread([db, key]() -> Future<Void> { return db->waitPurgeGranulesComplete(key); });
}
ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion) {
ClusterConnectionFile* connFile =
new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFilename).first);

View File

@ -59,6 +59,9 @@ public:
ThreadFuture<Void> forceRecoveryWithDataLoss(const StringRef& dcid) override;
ThreadFuture<Void> createSnapshot(const StringRef& uid, const StringRef& snapshot_command) override;
ThreadFuture<Key> purgeBlobGranules(const KeyRangeRef& keyRange, Version purgeVersion, bool force) override;
ThreadFuture<Void> waitPurgeGranulesComplete(const KeyRef& purgeKey) override;
ThreadFuture<DatabaseSharedState*> createSharedState() override;
void setSharedState(DatabaseSharedState* p) override;

View File

@ -326,6 +326,7 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
AcknowledgementReceiver acknowledgements;
Endpoint requestStreamEndpoint;
bool sentError = false;
bool notifiedFailed = false;
Promise<Void> onConnect;
NetNotifiedQueueWithAcknowledgements(int futures, int promises)
@ -402,14 +403,20 @@ struct NetNotifiedQueueWithAcknowledgements final : NotifiedQueue<T>,
return res;
}
~NetNotifiedQueueWithAcknowledgements() {
if (acknowledgements.getRawEndpoint().isValid() && acknowledgements.isRemoteEndpoint() && !this->hasError()) {
void notifyFailed() {
if (!notifiedFailed && acknowledgements.getRawEndpoint().isValid() && acknowledgements.isRemoteEndpoint() &&
!this->hasError()) {
// Notify the server that a client is not using this ReplyPromiseStream anymore
FlowTransport::transport().sendUnreliable(
SerializeSource<ErrorOr<AcknowledgementReply>>(operation_obsolete()),
acknowledgements.getEndpoint(TaskPriority::ReadSocket),
false);
notifiedFailed = true;
}
}
~NetNotifiedQueueWithAcknowledgements() {
notifyFailed();
if (isRemoteEndpoint() && !sentError && !acknowledgements.failures.isReady()) {
// Notify the client ReplyPromiseStream was cancelled before sending an error, so the storage server must
// have died
@ -505,6 +512,8 @@ public:
return queue->onConnect.getFuture();
}
void notifyFailed() { queue->notifyFailed(); }
~ReplyPromiseStream() {
if (queue)
queue->delPromiseRef();

View File

@ -321,6 +321,8 @@ void endStreamOnDisconnect(Future<Void> signal,
wait(signal || stream.onConnected());
}
}
// Notify BEFORE dropping last reference, causing broken_promise to send on stream before destructor is called
stream.notifyFailed();
}
}

View File

@ -216,7 +216,7 @@ struct SplitEvaluation {
struct BlobManagerStats {
CounterCollection cc;
// FIXME: pruning stats
// FIXME: purging stats
Counter granuleSplits;
Counter granuleWriteHotSplits;
@ -226,6 +226,10 @@ struct BlobManagerStats {
Counter ccMismatches;
Counter ccTimeouts;
Counter ccErrors;
Counter purgesProcessed;
Counter granulesFullyPurged;
Counter granulesPartiallyPurged;
Counter filesPurged;
Future<Void> logger;
// Current stats maintained for a given blob worker process
@ -233,7 +237,9 @@ struct BlobManagerStats {
: cc("BlobManagerStats", id.toString()), granuleSplits("GranuleSplits", cc),
granuleWriteHotSplits("GranuleWriteHotSplits", cc), ccGranulesChecked("CCGranulesChecked", cc),
ccRowsChecked("CCRowsChecked", cc), ccBytesChecked("CCBytesChecked", cc), ccMismatches("CCMismatches", cc),
ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc) {
ccTimeouts("CCTimeouts", cc), ccErrors("CCErrors", cc), purgesProcessed("PurgesProcessed", cc),
granulesFullyPurged("GranulesFullyPurged", cc), granulesPartiallyPurged("GranulesPartiallyPurged", cc),
filesPurged("FilesPurged", cc) {
specialCounter(cc, "WorkerCount", [workers]() { return workers->size(); });
logger = traceCounters("BlobManagerMetrics", id, interval, &cc, "BlobManagerMetrics");
}
@ -438,6 +444,7 @@ ACTOR Future<UID> pickWorkerForAssign(Reference<BlobManagerData> bmData) {
ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
RangeAssignment assignment,
Optional<UID> workerID,
int64_t epoch,
int64_t seqNo) {
// WorkerId is set, except in case of assigning to any worker. Then we pick the worker to assign to in here
@ -468,7 +475,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
assignment.isAssign ? "assigning" : "revoking",
assignment.keyRange.begin.printable(),
assignment.keyRange.end.printable(),
bmData->epoch,
epoch,
seqNo,
workerID.get().toString());
}
@ -481,7 +488,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
AssignBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.type = assignment.assign.get().type;
@ -497,7 +504,7 @@ ACTOR Future<Void> doRangeAssignment(Reference<BlobManagerData> bmData,
RevokeBlobRangeRequest req;
req.keyRange = KeyRangeRef(StringRef(req.arena, assignment.keyRange.begin),
StringRef(req.arena, assignment.keyRange.end));
req.managerEpoch = bmData->epoch;
req.managerEpoch = epoch;
req.managerSeqno = seqNo;
req.dispose = assignment.revoke.get().dispose;
@ -637,10 +644,10 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
}
count++;
}
ASSERT(count == 1);
if (skip) {
continue;
}
ASSERT(count == 1);
if (assignment.worker.present() && assignment.worker.get().isValid()) {
if (BM_DEBUG) {
@ -653,7 +660,7 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
bmData->workerAssignments.insert(assignment.keyRange, workerId);
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, workerId, seqNo));
doRangeAssignment(bmData, assignment, workerId, bmData->epoch, seqNo));
// If we know about the worker and this is not a continue, then this is a new range for the worker
if (bmData->workerStats.count(workerId) &&
assignment.assign.get().type != AssignRequestType::Continue) {
@ -662,8 +669,8 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
} else {
// Ensure the key boundaries are updated before we pick a worker
bmData->workerAssignments.insert(assignment.keyRange, UID());
bmData->assignsInProgress.insert(assignment.keyRange,
doRangeAssignment(bmData, assignment, Optional<UID>(), seqNo));
bmData->assignsInProgress.insert(
assignment.keyRange, doRangeAssignment(bmData, assignment, Optional<UID>(), bmData->epoch, seqNo));
}
} else {
@ -677,7 +684,8 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
if (existingRange.range() == assignment.keyRange && existingRange.cvalue() == assignment.worker.get()) {
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
bmData->addActor.send(doRangeAssignment(bmData, assignment, assignment.worker.get(), seqNo));
bmData->addActor.send(
doRangeAssignment(bmData, assignment, assignment.worker.get(), bmData->epoch, seqNo));
} else {
auto currentAssignments = bmData->workerAssignments.intersectingRanges(assignment.keyRange);
for (auto& it : currentAssignments) {
@ -693,7 +701,7 @@ ACTOR Future<Void> rangeAssigner(Reference<BlobManagerData> bmData) {
}
// revoke the range for the worker that owns it, not the worker specified in the revoke
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), seqNo));
bmData->addActor.send(doRangeAssignment(bmData, assignment, it.value(), bmData->epoch, seqNo));
}
bmData->workerAssignments.insert(assignment.keyRange, UID());
}
@ -1356,26 +1364,6 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
// back is to split the range.
ASSERT(rep.doSplit);
// only evaluate for split if this worker currently owns the granule in this blob manager's mapping
auto currGranuleAssignment = bmData->workerAssignments.rangeContaining(rep.granuleRange.begin);
if (!(currGranuleAssignment.begin() == rep.granuleRange.begin &&
currGranuleAssignment.end() == rep.granuleRange.end &&
currGranuleAssignment.cvalue() == bwInterf.id())) {
if (BM_DEBUG) {
fmt::print("Manager {0} ignoring status from BW {1} for granule [{2} - {3}) since BW {4} owns "
"[{5} - {6}).\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
currGranuleAssignment.cvalue().toString().substr(0, 5),
currGranuleAssignment.begin().printable(),
currGranuleAssignment.end().printable());
}
// FIXME: could send revoke request
continue;
}
// FIXME: We will need to go over all splits in the range once we're doing merges, instead of first one
auto lastSplitEval = bmData->splitEvaluations.rangeContaining(rep.granuleRange.begin);
if (rep.granuleRange.begin == lastSplitEval.begin() && rep.granuleRange.end == lastSplitEval.end() &&
@ -1386,46 +1374,67 @@ ACTOR Future<Void> monitorBlobWorkerStatus(Reference<BlobManagerData> bmData, Bl
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable());
}
} else {
ASSERT(lastSplitEval.cvalue().epoch < rep.epoch ||
(lastSplitEval.cvalue().epoch == rep.epoch && lastSplitEval.cvalue().seqno < rep.seqno));
if (lastSplitEval.cvalue().inProgress.isValid() && !lastSplitEval.cvalue().inProgress.isReady()) {
TEST(true); // racing BM splits
// For example, one worker asked BM to split, then died, granule was moved, new worker asks to
// split on recovery. We need to ensure that they are semantically the same split.
// We will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("Manager {0} got split request for [{1} - {2}) @ ({3}, {4}), but already in "
"progress from [{5} - {6}) @ ({7}, {8})\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
lastSplitEval.begin().printable().c_str(),
lastSplitEval.end().printable().c_str(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// ignore the request, they will retry
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) @ ({3}, {4}) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno);
}
Future<Void> doSplitEval = maybeSplitRange(bmData,
bwInterf.id(),
rep.granuleRange,
rep.granuleID,
rep.startVersion,
rep.writeHotSplit);
bmData->splitEvaluations.insert(rep.granuleRange,
SplitEvaluation(rep.epoch, rep.seqno, doSplitEval));
} else if (!(lastSplitEval.cvalue().epoch < rep.epoch ||
(lastSplitEval.cvalue().epoch == rep.epoch && lastSplitEval.cvalue().seqno < rep.seqno))) {
TEST(true); // BM got out-of-date split request
if (BM_DEBUG) {
fmt::print(
"Manager {0} ignoring status from BW {1} for granule [{2} - {3}) since it already processed"
"[{4} - {5}) @ ({6}, {7}).\n",
bmData->epoch,
bwInterf.id().toString().substr(0, 5),
rep.granuleRange.begin.printable(),
rep.granuleRange.end.printable(),
lastSplitEval.begin().printable(),
lastSplitEval.end().printable(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// revoke range from out-of-date worker, but bypass rangeAssigner and hack (epoch, seqno) to be
// (requesting epoch, requesting seqno + 1) to ensure no race with then reassigning the range to the
// worker at a later version
RangeAssignment revokeOld;
revokeOld.isAssign = false;
revokeOld.worker = bwInterf.id();
revokeOld.keyRange = rep.granuleRange;
revokeOld.revoke = RangeRevokeData(false);
bmData->addActor.send(
doRangeAssignment(bmData, revokeOld, bwInterf.id(), rep.epoch, rep.seqno + 1));
} else if (lastSplitEval.cvalue().inProgress.isValid() &&
!lastSplitEval.cvalue().inProgress.isReady()) {
TEST(true); // racing BM splits
// For example, one worker asked BM to split, then died, granule was moved, new worker asks to
// split on recovery. We need to ensure that they are semantically the same split.
// We will just rely on the in-progress split to finish
if (BM_DEBUG) {
fmt::print("Manager {0} got split request for [{1} - {2}) @ ({3}, {4}), but already in "
"progress from [{5} - {6}) @ ({7}, {8})\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno,
lastSplitEval.begin().printable().c_str(),
lastSplitEval.end().printable().c_str(),
lastSplitEval.cvalue().epoch,
lastSplitEval.cvalue().seqno);
}
// ignore the request, they will retry
} else {
if (BM_DEBUG) {
fmt::print("Manager {0} evaluating [{1} - {2}) @ ({3}, {4}) for split\n",
bmData->epoch,
rep.granuleRange.begin.printable().c_str(),
rep.granuleRange.end.printable().c_str(),
rep.epoch,
rep.seqno);
}
Future<Void> doSplitEval = maybeSplitRange(
bmData, bwInterf.id(), rep.granuleRange, rep.granuleID, rep.startVersion, rep.writeHotSplit);
bmData->splitEvaluations.insert(rep.granuleRange,
SplitEvaluation(rep.epoch, rep.seqno, doSplitEval));
}
}
} catch (Error& e) {
@ -2160,23 +2169,84 @@ ACTOR Future<GranuleFiles> loadHistoryFiles(Reference<BlobManagerData> bmData, U
}
}
// FIXME: trace events for pruning
// FIXME: trace events for purging
ACTOR Future<Void> canDeleteFullGranule(Reference<BlobManagerData> self, UID granuleId) {
state Transaction tr(self->db);
state KeyRange splitRange = blobGranuleSplitKeyRangeFor(granuleId);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state RangeResult splitState = wait(tr.getRange(splitRange, SERVER_KNOBS->BG_MAX_SPLIT_FANOUT));
state int i = 0;
state bool retry = false;
for (; i < splitState.size(); i++) {
UID parent, child;
BlobGranuleSplitState st;
Version v;
std::tie(parent, child) = decodeBlobGranuleSplitKey(splitState[i].key);
std::tie(st, v) = decodeBlobGranuleSplitValue(splitState[i].value);
// if split state is done, this granule has definitely persisted a snapshot
if (st >= BlobGranuleSplitState::Done) {
continue;
}
// if split state isn't even assigned, this granule has definitely not persisted a snapshot
if (st <= BlobGranuleSplitState::Initialized) {
retry = true;
break;
}
ASSERT(st == BlobGranuleSplitState::Assigned);
// if assigned, granule may or may not have snapshotted. Check files to confirm. Since a re-snapshot is
// the first file written for a new granule, any files present mean it has re-snapshotted from this
// granule
KeyRange granuleFileRange = blobGranuleFileKeyRangeFor(child);
RangeResult files = wait(tr.getRange(granuleFileRange, 1));
if (files.empty()) {
retry = true;
break;
}
}
if (retry) {
tr.reset();
wait(delay(1.0));
} else {
if (splitState.empty() || !splitState.more) {
break;
}
splitRange = KeyRangeRef(keyAfter(splitState.back().key), splitRange.end);
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
/*
* Deletes all files pertaining to the granule with id granuleId and
* also removes the history entry for this granule from the system keyspace
* TODO: ensure cannot fully delete granule that is still splitting!
*/
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Key historyKey) {
ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self,
UID granuleId,
Key historyKey,
Version purgeVersion) {
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: init\n", granuleId.toString());
}
// if granule is still splitting and files are needed for new sub-granules to re-snapshot, we can only partially
// delete the granule, since we need to keep the last snapshot and deltas for splitting
wait(canDeleteFullGranule(self, granuleId));
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
std::vector<Future<Void>> deletions;
std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
state std::vector<std::string> filesToDelete; // TODO: remove, just for debugging
for (auto snapshotFile : files.snapshotFiles) {
std::string fname = snapshotFile.filename;
@ -2191,7 +2261,7 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granu
}
if (BM_DEBUG) {
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), deletions.size());
fmt::print("Fully deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {}\n", filename.c_str());
}
@ -2228,18 +2298,27 @@ ACTOR Future<Void> fullyDeleteGranule(Reference<BlobManagerData> self, UID granu
fmt::print("Fully deleting granule {0}: success\n", granuleId.toString());
}
TraceEvent("GranuleFullPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)
.detail("FilesPurged", filesToDelete.size());
++self->stats.granulesFullyPurged;
self->stats.filesPurged += filesToDelete.size();
return Void();
}
/*
* For the granule with id granuleId, finds the first snapshot file at a
* version <= pruneVersion and deletes all files older than it.
* version <= purgeVersion and deletes all files older than it.
*
* Assumption: this granule's startVersion might change because the first snapshot
* file might be deleted. We will need to ensure we don't rely on the granule's startVersion
* (that's persisted as part of the key), but rather use the granule's first snapshot's version when needed
*/
ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Version pruneVersion) {
ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID granuleId, Version purgeVersion) {
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: init\n", granuleId.toString());
}
@ -2247,7 +2326,7 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
// get files
GranuleFiles files = wait(loadHistoryFiles(self->db, granuleId));
// represents the version of the latest snapshot file in this granule with G.version < pruneVersion
// represents the version of the latest snapshot file in this granule with G.version < purgeVersion
Version latestSnapshotVersion = invalidVersion;
state std::vector<Future<Void>> deletions; // deletion work per file
@ -2262,8 +2341,8 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
deletions.emplace_back(self->bstore->deleteFile(fname));
deletedFileKeys.emplace_back(blobGranuleFileKeyFor(granuleId, files.snapshotFiles[idx].version, 'S'));
filesToDelete.emplace_back(fname);
} else if (files.snapshotFiles[idx].version <= pruneVersion) {
// otherwise if this is the FIRST snapshot file with version < pruneVersion,
} else if (files.snapshotFiles[idx].version <= purgeVersion) {
// otherwise if this is the FIRST snapshot file with version < purgeVersion,
// then we found our latestSnapshotVersion (FIRST since we are traversing in reverse)
latestSnapshotVersion = files.snapshotFiles[idx].version;
}
@ -2289,19 +2368,19 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
}
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), deletions.size());
fmt::print("Partially deleting granule {0}: deleting {1} files\n", granuleId.toString(), filesToDelete.size());
for (auto filename : filesToDelete) {
fmt::print(" - {0}\n", filename);
}
}
// TODO: the following comment relies on the assumption that BWs will not get requests to
// read data that was already pruned. confirm assumption is fine. otherwise, we'd need
// to communicate with BWs here and have them ack the pruneVersion
// read data that was already purged. confirm assumption is fine. otherwise, we'd need
// to communicate with BWs here and have them ack the purgeVersion
// delete the files before the corresponding metadata.
// this could lead to dangling pointers in fdb, but we should never read data older than
// pruneVersion anyways, and we can clean up the keys the next time around.
// purgeVersion anyways, and we can clean up the keys the next time around.
// deleting files before corresponding metadata reduces the # of orphaned files.
wait(waitForAll(deletions));
@ -2329,26 +2408,41 @@ ACTOR Future<Void> partiallyDeleteGranule(Reference<BlobManagerData> self, UID g
if (BM_DEBUG) {
fmt::print("Partially deleting granule {0}: success\n", granuleId.toString());
}
TraceEvent("GranulePartialPurge", self->id)
.detail("Epoch", self->epoch)
.detail("GranuleID", granuleId)
.detail("PurgeVersion", purgeVersion)
.detail("FilesPurged", filesToDelete.size());
++self->stats.granulesPartiallyPurged;
self->stats.filesPurged += filesToDelete.size();
return Void();
}
/*
* This method is used to prune the range [startKey, endKey) at (and including) pruneVersion.
* This method is used to purge the range [startKey, endKey) at (and including) purgeVersion.
* To do this, we do a BFS traversal starting at the active granules. Then we classify granules
* in the history as nodes that can be fully deleted (i.e. their files and history can be deleted)
* and nodes that can be partially deleted (i.e. some of their files can be deleted).
* Once all this is done, we finally clear the pruneIntent key, if possible, to indicate we are done
* processing this prune intent.
* Once all this is done, we finally clear the purgeIntent key, if possible, to indicate we are done
* processing this purge intent.
*/
ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range, Version pruneVersion, bool force) {
ACTOR Future<Void> purgeRange(Reference<BlobManagerData> self, KeyRangeRef range, Version purgeVersion, bool force) {
if (BM_DEBUG) {
fmt::print("pruneRange starting for range [{0} - {1}) @ pruneVersion={2}, force={3}\n",
fmt::print("purgeRange starting for range [{0} - {1}) @ purgeVersion={2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion,
purgeVersion,
force);
}
TraceEvent("PurgeGranulesBegin", self->id)
.detail("Epoch", self->epoch)
.detail("Range", range)
.detail("PurgeVersion", purgeVersion)
.detail("Force", force);
// queue of <range, startVersion, endVersion> for BFS traversal of history
state std::queue<std::tuple<KeyRange, Version, Version>> historyEntryQueue;
@ -2371,18 +2465,18 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
state KeyRangeMap<UID>::iterator activeRange;
for (activeRange = activeRanges.begin(); activeRange != activeRanges.end(); ++activeRange) {
if (BM_DEBUG) {
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be pruned\n",
fmt::print("Checking if active range [{0} - {1}), owned by BW {2}, should be purged\n",
activeRange.begin().printable(),
activeRange.end().printable(),
activeRange.value().toString());
}
// assumption: prune boundaries must respect granule boundaries
// assumption: purge boundaries must respect granule boundaries
if (activeRange.begin() < range.begin || activeRange.end() > range.end) {
continue;
}
// TODO: if this is a force prune, then revoke the assignment from the corresponding BW first
// TODO: if this is a force purge, then revoke the assignment from the corresponding BW first
// so that it doesn't try to interact with the granule (i.e. force it to give up gLock).
// we'll need some way to ack that the revoke was successful
@ -2456,17 +2550,17 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
}
// There are three cases this granule can fall into:
// - if the granule's end version is at or before the prune version or this is a force delete,
// - if the granule's end version is at or before the purge version or this is a force delete,
// this granule should be completely deleted
// - else if the startVersion <= pruneVersion, then G.startVersion < pruneVersion < G.endVersion
// - else if the startVersion <= purgeVersion, then G.startVersion < purgeVersion < G.endVersion
// and so this granule should be partially deleted
// - otherwise, this granule is active, so don't schedule it for deletion
if (force || endVersion <= pruneVersion) {
if (force || endVersion <= purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be FULLY deleted\n", currHistoryNode.granuleID.toString());
}
toFullyDelete.push_back({ currHistoryNode.granuleID, historyKey });
} else if (startVersion < pruneVersion) {
} else if (startVersion < purgeVersion) {
if (BM_DEBUG) {
fmt::print("Granule {0} will be partially deleted\n", currHistoryNode.granuleID.toString());
}
@ -2513,70 +2607,79 @@ ACTOR Future<Void> pruneRange(Reference<BlobManagerData> self, KeyRangeRef range
// we won't run into any issues with trying to "re-delete" a blob file since deleting
// a file that doesn't exist is considered successful
state std::vector<Future<Void>> partialDeletions;
state int i;
if (BM_DEBUG) {
fmt::print("{0} granules to fully delete\n", toFullyDelete.size());
}
for (i = toFullyDelete.size() - 1; i >= 0; --i) {
UID granuleId;
state UID granuleId;
Key historyKey;
std::tie(granuleId, historyKey) = toFullyDelete[i];
// FIXME: consider batching into a single txn (need to take care of txn size limit)
if (BM_DEBUG) {
fmt::print("About to fully delete granule {0}\n", granuleId.toString());
}
wait(fullyDeleteGranule(self, granuleId, historyKey));
wait(fullyDeleteGranule(self, granuleId, historyKey, purgeVersion));
}
if (BM_DEBUG) {
fmt::print("{0} granules to partially delete\n", toPartiallyDelete.size());
}
std::vector<Future<Void>> partialDeletions;
for (i = toPartiallyDelete.size() - 1; i >= 0; --i) {
UID granuleId = toPartiallyDelete[i];
if (BM_DEBUG) {
fmt::print("About to partially delete granule {0}\n", granuleId.toString());
}
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, pruneVersion));
partialDeletions.emplace_back(partiallyDeleteGranule(self, granuleId, purgeVersion));
}
wait(waitForAll(partialDeletions));
// Now that all the necessary granules and their files have been deleted, we can
// clear the pruneIntent key to signify that the work is done. However, there could have been
// another pruneIntent that got written for this table while we were processing this one.
// clear the purgeIntent key to signify that the work is done. However, there could have been
// another purgeIntent that got written for this table while we were processing this one.
// If that is the case, we should not clear the key. Otherwise, we can just clear the key.
if (BM_DEBUG) {
fmt::print("Successfully pruned range [{0} - {1}) at pruneVersion={2}\n",
fmt::print("Successfully purged range [{0} - {1}) at purgeVersion={2}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion);
purgeVersion);
}
TraceEvent("PurgeGranulesComplete", self->id)
.detail("Epoch", self->epoch)
.detail("Range", range)
.detail("PurgeVersion", purgeVersion)
.detail("Force", force);
++self->stats.purgesProcessed;
return Void();
}
/*
* This monitor watches for changes to a key K that gets updated whenever there is a new prune intent.
* On this change, we scan through all blobGranulePruneKeys (which look like <startKey, endKey>=<prune_version,
* force>) and prune any intents.
* This monitor watches for changes to a key K that gets updated whenever there is a new purge intent.
* On this change, we scan through all blobGranulePurgeKeys (which look like <startKey, endKey>=<purge_version,
* force>) and purge any intents.
*
* Once the prune has succeeded, we clear the key IF the version is still the same one that was pruned.
* That way, if another prune intent arrived for the same range while we were working on an older one,
* Once the purge has succeeded, we clear the key IF the version is still the same one that was purged.
* That way, if another purge intent arrived for the same range while we were working on an older one,
* we wouldn't end up clearing the intent.
*
* When watching for changes, we might end up in scenarios where we failed to do the work
* for a prune intent even though the watch was triggered (maybe the BM had a blip). This is problematic
* if the intent is a force and there isn't another prune intent for quite some time. To remedy this,
* if we don't see a watch change in X (configurable) seconds, we will just sweep through the prune intents,
* for a purge intent even though the watch was triggered (maybe the BM had a blip). This is problematic
* if the intent is a force and there isn't another purge intent for quite some time. To remedy this,
* if we don't see a watch change in X (configurable) seconds, we will just sweep through the purge intents,
* consolidating any work we might have missed before.
*
* Note: we could potentially use a changefeed here to get the exact pruneIntent that was added
* Note: we could potentially use a changefeed here to get the exact purgeIntent that was added
* rather than iterating through all of them, but this might have too much overhead for latency
* improvements we don't really need here (also we need to go over all prune intents anyways in the
* case that the timer is up before any new prune intents arrive).
* improvements we don't really need here (also we need to go over all purge intents anyways in the
* case that the timer is up before any new purge intents arrive).
*/
ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
ACTOR Future<Void> monitorPurgeKeys(Reference<BlobManagerData> self) {
self->initBStore();
loop {
@ -2585,35 +2688,35 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// Wait for the watch to change, or some time to expire (whichever comes first)
// before checking through the prune intents. We write a UID into the change key value
// before checking through the purge intents. We write a UID into the change key value
// so that we can still recognize when the watch key has been changed while we weren't
// monitoring it
state Key lastPruneKey = blobGranulePruneKeys.begin;
state Key lastPurgeKey = blobGranulePurgeKeys.begin;
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state std::vector<Future<Void>> prunes;
state CoalescedKeyRangeMap<std::pair<Version, bool>> pruneMap;
pruneMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
state std::vector<Future<Void>> purges;
state CoalescedKeyRangeMap<std::pair<Version, bool>> purgeMap;
purgeMap.insert(allKeys, std::make_pair<Version, bool>(0, false));
try {
// TODO: replace 10000 with a knob
state RangeResult pruneIntents = wait(tr->getRange(blobGranulePruneKeys, BUGGIFY ? 1 : 10000));
if (pruneIntents.size()) {
state RangeResult purgeIntents = wait(tr->getRange(blobGranulePurgeKeys, BUGGIFY ? 1 : 10000));
if (purgeIntents.size()) {
int rangeIdx = 0;
for (; rangeIdx < pruneIntents.size(); ++rangeIdx) {
Version pruneVersion;
for (; rangeIdx < purgeIntents.size(); ++rangeIdx) {
Version purgeVersion;
KeyRange range;
bool force;
std::tie(pruneVersion, range, force) =
decodeBlobGranulePruneValue(pruneIntents[rangeIdx].value);
auto ranges = pruneMap.intersectingRanges(range);
std::tie(purgeVersion, range, force) =
decodeBlobGranulePurgeValue(purgeIntents[rangeIdx].value);
auto ranges = purgeMap.intersectingRanges(range);
bool foundConflict = false;
for (auto it : ranges) {
if ((it.value().second && !force && it.value().first < pruneVersion) ||
(!it.value().second && force && pruneVersion < it.value().first)) {
if ((it.value().second && !force && it.value().first < purgeVersion) ||
(!it.value().second && force && purgeVersion < it.value().first)) {
foundConflict = true;
break;
}
@ -2621,39 +2724,41 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
if (foundConflict) {
break;
}
pruneMap.insert(range, std::make_pair(pruneVersion, force));
purgeMap.insert(range, std::make_pair(purgeVersion, force));
fmt::print("about to prune range [{0} - {1}) @ {2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
pruneVersion,
force ? "T" : "F");
if (BM_DEBUG) {
fmt::print("about to purge range [{0} - {1}) @ {2}, force={3}\n",
range.begin.printable(),
range.end.printable(),
purgeVersion,
force ? "T" : "F");
}
}
lastPruneKey = pruneIntents[rangeIdx - 1].key;
lastPurgeKey = purgeIntents[rangeIdx - 1].key;
for (auto it : pruneMap.ranges()) {
for (auto it : purgeMap.ranges()) {
if (it.value().first > 0) {
prunes.emplace_back(pruneRange(self, it.range(), it.value().first, it.value().second));
purges.emplace_back(purgeRange(self, it.range(), it.value().first, it.value().second));
}
}
// wait for this set of prunes to complete before starting the next ones since if we
// prune a range R at version V and while we are doing that, the time expires, we will
// end up trying to prune the same range again since the work isn't finished and the
// prunes will race
// wait for this set of purges to complete before starting the next ones since if we
// purge a range R at version V and while we are doing that, the time expires, we will
// end up trying to purge the same range again since the work isn't finished and the
// purges will race
//
// TODO: this isn't that efficient though. Instead we could keep metadata as part of the
// BM's memory that tracks which prunes are active. Once done, we can mark that work as
// done. If the BM fails then all prunes will fail and so the next BM will have a clear
// BM's memory that tracks which purges are active. Once done, we can mark that work as
// done. If the BM fails then all purges will fail and so the next BM will have a clear
// set of metadata (i.e. no work in progress) so we will end up doing the work in the
// new BM
wait(waitForAll(prunes));
wait(waitForAll(purges));
break;
} else {
state Future<Void> watchPruneIntentsChange = tr->watch(blobGranulePruneChangeKey);
state Future<Void> watchPurgeIntentsChange = tr->watch(blobGranulePurgeChangeKey);
wait(tr->commit());
wait(watchPruneIntentsChange);
wait(watchPurgeIntentsChange);
tr->reset();
}
} catch (Error& e) {
@ -2666,7 +2771,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr->clear(KeyRangeRef(blobGranulePruneKeys.begin, keyAfter(lastPruneKey)));
tr->clear(KeyRangeRef(blobGranulePurgeKeys.begin, keyAfter(lastPurgeKey)));
wait(tr->commit());
break;
} catch (Error& e) {
@ -2675,7 +2780,7 @@ ACTOR Future<Void> monitorPruneKeys(Reference<BlobManagerData> self) {
}
if (BM_DEBUG) {
printf("Done pruning current set of prune intents.\n");
printf("Done clearing current set of purge intents.\n");
}
}
}
@ -2876,7 +2981,7 @@ ACTOR Future<Void> blobManager(BlobManagerInterface bmInterf,
self->addActor.send(doLockChecks(self));
self->addActor.send(monitorClientRanges(self));
self->addActor.send(monitorPruneKeys(self));
self->addActor.send(monitorPurgeKeys(self));
if (SERVER_KNOBS->BG_CONSISTENCY_CHECK_ENABLED) {
self->addActor.send(bgConsistencyCheck(self));
}

View File

@ -86,6 +86,7 @@ struct GranuleMetadata : NonCopyable, ReferenceCounted<GranuleMetadata> {
NotifiedVersion durableSnapshotVersion; // same as delta vars, except for snapshots
Version pendingSnapshotVersion = 0;
Version initialSnapshotVersion = invalidVersion;
Version historyVersion = invalidVersion;
Version knownCommittedVersion;
int64_t originalEpoch;
@ -756,7 +757,11 @@ ACTOR Future<BlobFileIndex> dumpInitialSnapshotFromFDB(Reference<BlobWorkerData>
bytesRead);
}
state Error err = e;
wait(tr->onError(e));
if (e.code() == error_code_server_overloaded) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
} else {
wait(tr->onError(e));
}
retries++;
TEST(true); // Granule initial snapshot failed
// FIXME: why can't we supress error event?
@ -935,13 +940,8 @@ ACTOR Future<BlobFileIndex> checkSplitAndReSnapshot(Reference<BlobWorkerData> bw
break;
}
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(metadata->keyRange,
true,
writeHot,
statusEpoch,
statusSeqno,
granuleID,
metadata->initialSnapshotVersion));
bwData->currentManagerStatusStream.get().send(GranuleStatusReply(
metadata->keyRange, true, writeHot, statusEpoch, statusSeqno, granuleID, metadata->historyVersion));
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
@ -1037,10 +1037,14 @@ static void handleCompletedDeltaFile(Reference<BlobWorkerData> bwData,
// if we get an i/o error updating files, or a rollback, reassign the granule to ourselves and start fresh
static bool granuleCanRetry(const Error& e) {
switch (e.code()) {
case error_code_please_reboot:
case error_code_io_error:
case error_code_io_timeout:
// FIXME: handle connection errors in tighter retry loop around individual files.
// FIXME: if these requests fail at a high enough rate, the whole worker should be marked as unhealthy and its
// granules should be moved away, as there may be some problem with this host contacting blob storage
case error_code_http_request_failed:
case error_code_connection_failed:
case error_code_lookup_failed: // dns
return true;
default:
return false;
@ -1119,10 +1123,15 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
}
metadata->pendingDeltaVersion = cfRollbackVersion;
if (BW_DEBUG) {
fmt::print("[{0} - {1}) rollback discarding all {2} in-memory mutations\n",
fmt::print("[{0} - {1}) rollback discarding all {2} in-memory mutations",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->currentDeltas.size());
if (metadata->currentDeltas.size()) {
fmt::print(
" {0} - {1}", metadata->currentDeltas.front().version, metadata->currentDeltas.back().version);
}
fmt::print("\n");
}
// discard all in-memory mutations
@ -1150,6 +1159,8 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
// FIXME: could binary search?
int mIdx = metadata->currentDeltas.size() - 1;
Version firstDiscarded = invalidVersion;
Version lastDiscarded = invalidVersion;
while (mIdx >= 0) {
if (metadata->currentDeltas[mIdx].version <= rollbackVersion) {
break;
@ -1157,19 +1168,37 @@ static Version doGranuleRollback(Reference<GranuleMetadata> metadata,
for (auto& m : metadata->currentDeltas[mIdx].mutations) {
metadata->bufferedDeltaBytes -= m.totalSize();
}
if (firstDiscarded == invalidVersion) {
firstDiscarded = metadata->currentDeltas[mIdx].version;
}
lastDiscarded = metadata->currentDeltas[mIdx].version;
mIdx--;
}
mIdx++;
if (BW_DEBUG) {
fmt::print("[{0} - {1}) rollback discarding {2} in-memory mutations, {3} mutations and {4} bytes left\n",
fmt::print("[{0} - {1}) rollback discarding {2} in-memory mutations",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
metadata->currentDeltas.size() - mIdx,
mIdx,
metadata->bufferedDeltaBytes);
metadata->currentDeltas.size() - mIdx - 1);
if (firstDiscarded != invalidVersion) {
fmt::print(" {0} - {1}", lastDiscarded, firstDiscarded);
}
fmt::print(", {0} mutations", mIdx);
if (mIdx >= 0) {
fmt::print(
" ({0} - {1})", metadata->currentDeltas.front().version, metadata->currentDeltas[mIdx].version);
}
fmt::print(" and {0} bytes left\n", metadata->bufferedDeltaBytes);
}
metadata->currentDeltas.resize(metadata->currentDeltas.arena(), mIdx);
if (mIdx < 0) {
metadata->currentDeltas = Standalone<GranuleDeltas>();
metadata->bufferedDeltaBytes = 0;
} else {
metadata->currentDeltas.resize(metadata->currentDeltas.arena(), mIdx + 1);
}
// delete all deltas in rollback range, but we can optimize here to just skip the uncommitted mutations
// directly and immediately pop the rollback out of inProgress to completed
@ -1328,6 +1357,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->pendingSnapshotVersion = metadata->files.snapshotFiles.back().version;
metadata->durableSnapshotVersion.set(metadata->pendingSnapshotVersion);
metadata->initialSnapshotVersion = metadata->files.snapshotFiles.front().version;
metadata->historyVersion = startState.history.get().version;
} else {
if (startState.blobFilesToSnapshot.present()) {
startVersion = startState.previousDurableVersion;
@ -1350,6 +1380,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
metadata->initialSnapshotVersion = startVersion;
metadata->pendingSnapshotVersion = startVersion;
metadata->historyVersion = startState.history.present() ? startState.history.get().version : startVersion;
}
metadata->durableDeltaVersion.set(startVersion);
@ -1459,8 +1490,16 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
ASSERT(mutations.front().version > metadata->bufferedDeltaVersion);
// If this assert trips we should have gotten change_feed_popped from SS and didn't
ASSERT(mutations.front().version >= metadata->activeCFData.get()->popVersion);
// Rare race from merge cursor where no individual server detected popped in their response
if (mutations.front().version < metadata->activeCFData.get()->popVersion) {
TEST(true); // Blob Worker detected popped instead of change feed
TraceEvent("BlobWorkerChangeFeedPopped", bwData->id)
.detail("Granule", metadata->keyRange)
.detail("GranuleID", startState.granuleID)
.detail("MutationVersion", mutations.front().version)
.detail("PopVersion", metadata->activeCFData.get()->popVersion);
throw change_feed_popped();
}
}
when(wait(inFlightFiles.empty() ? Never() : success(inFlightFiles.front().future))) {}
}
@ -1623,6 +1662,7 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
metadata->activeCFData.set(cfData);
justDidRollback = true;
lastDeltaVersion = cfRollbackVersion;
break;
}
}
@ -1841,6 +1881,12 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
}
}
} catch (Error& e) {
if (BW_DEBUG) {
fmt::print("Granule file updater for [{0} - {1}) got error {2}, exiting\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
e.name());
}
// Free last change feed data
metadata->activeCFData.set(Reference<ChangeFeedData>());
@ -1871,12 +1917,6 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
return Void();
}
++bwData->stats.granuleUpdateErrors;
if (BW_DEBUG) {
fmt::print("Granule file updater for [{0} - {1}) got error {2}, exiting\n",
metadata->keyRange.begin.printable(),
metadata->keyRange.end.printable(),
e.name());
}
if (granuleCanRetry(e)) {
TEST(true); // Granule close and re-open on error
@ -2002,6 +2042,14 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
int skipped = historyEntryStack.size() - 1 - i;
while (i >= 0) {
auto intersectingRanges = bwData->granuleHistory.intersectingRanges(historyEntryStack[i]->range);
std::vector<std::pair<KeyRange, Reference<GranuleHistoryEntry>>> newerHistory;
for (auto& r : intersectingRanges) {
if (r.value().isValid() && r.value()->endVersion >= historyEntryStack[i]->endVersion) {
newerHistory.push_back(std::make_pair(r.range(), r.value()));
}
}
auto prevRanges = bwData->granuleHistory.rangeContaining(historyEntryStack[i]->range.begin);
if (prevRanges.value().isValid() &&
@ -2012,6 +2060,9 @@ ACTOR Future<Void> blobGranuleLoadHistory(Reference<BlobWorkerData> bwData,
}
bwData->granuleHistory.insert(historyEntryStack[i]->range, historyEntryStack[i]);
for (auto& it : newerHistory) {
bwData->granuleHistory.insert(it.first, it.second);
}
i--;
}
@ -2137,7 +2188,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
if (req.beginVersion > 0) {
fmt::print("{0} - {1}\n", req.beginVersion, req.readVersion);
} else {
fmt::print("{}", req.readVersion);
fmt::print("{}\n", req.readVersion);
}
}
@ -2210,7 +2261,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
state KeyRange chunkRange;
state GranuleFiles chunkFiles;
if (metadata->initialSnapshotVersion > req.readVersion) {
if (req.readVersion < metadata->historyVersion) {
TEST(true); // Granule Time Travel Read
// this is a time travel query, find previous granule
if (metadata->historyLoaded.canBeSet()) {
@ -2226,7 +2277,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
Reference<GranuleHistoryEntry> cur = bwData->granuleHistory.rangeContaining(historySearchKey).value();
// FIXME: use skip pointers here
Version expectedEndVersion = metadata->initialSnapshotVersion;
Version expectedEndVersion = metadata->historyVersion;
if (cur.isValid()) {
ASSERT(cur->endVersion == expectedEndVersion);
}
@ -2269,17 +2320,22 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
}
if (chunkFiles.snapshotFiles.empty()) {
// a snapshot file must have been pruned
// a snapshot file must have been purged
throw blob_granule_transaction_too_old();
}
ASSERT(!chunkFiles.deltaFiles.empty());
ASSERT(chunkFiles.deltaFiles.back().version > req.readVersion);
if (chunkFiles.snapshotFiles.front().version > req.readVersion) {
// a snapshot file must have been pruned
// a snapshot file must have been purged
throw blob_granule_transaction_too_old();
}
} else {
if (req.readVersion < metadata->initialSnapshotVersion) {
// a snapshot file must have been pruned
throw blob_granule_transaction_too_old();
}
TEST(true); // Granule Active Read
// this is an active granule query
loop {
@ -2287,7 +2343,7 @@ ACTOR Future<Void> doBlobGranuleFileRequest(Reference<BlobWorkerData> bwData, Bl
throw wrong_shard_server();
}
Future<Void> waitForVersionFuture = waitForVersion(metadata, req.readVersion);
if (waitForVersionFuture.isReady()) {
if (waitForVersionFuture.isReady() && !waitForVersionFuture.isError()) {
// didn't wait, so no need to check rollback stuff
break;
}

View File

@ -440,6 +440,8 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
Version metadataCreateVersion = invalidVersion;
bool removing = false;
bool destroyed = false;
bool possiblyDestroyed = false;
KeyRangeMap<std::unordered_map<UID, Promise<Void>>> moveTriggers;
@ -472,6 +474,13 @@ struct ChangeFeedInfo : ReferenceCounted<ChangeFeedInfo> {
}
// TODO: may be more cleanup possible here
}
void destroy(Version destroyVersion) {
removing = true;
destroyed = true;
moved(range);
newMutations.trigger();
}
};
class ServerWatchMetadata : public ReferenceCounted<ServerWatchMetadata> {
@ -1911,6 +1920,12 @@ ACTOR Future<Void> overlappingChangeFeedsQ(StorageServer* data, OverlappingChang
for (auto& it : rangeIds) {
reply.rangeIds.push_back(OverlappingChangeFeedEntry(
it.first, std::get<0>(it.second), std::get<1>(it.second), std::get<2>(it.second)));
TraceEvent(SevDebug, "OverlappingChangeFeedEntry", data->thisServerID)
.detail("MinVersion", req.minVersion)
.detail("FeedID", it.first)
.detail("Range", std::get<0>(it.second))
.detail("EmptyVersion", std::get<1>(it.second))
.detail("StopVersion", std::get<2>(it.second));
}
// Make sure all of the metadata we are sending won't get rolled back
@ -4702,6 +4717,9 @@ ACTOR Future<Void> tryGetRange(PromiseStream<RangeResult> results, Transaction*
}
}
// global validation that missing refreshed feeds were previously destroyed
static std::unordered_set<Key> allDestroyedChangeFeeds;
// We have to store the version the change feed was stopped at in the SS instead of just the stopped status
// In addition to simplifying stopping logic, it enables communicating stopped status when fetching change feeds
// from other SS correctly
@ -4742,33 +4760,35 @@ ACTOR Future<Void> changeFeedPopQ(StorageServer* self, ChangeFeedPopRequest req)
.detail("RangeID", req.rangeID.printable())
.detail("Version", req.version)
.detail("SSVersion", self->version.get())
.detail("Range", req.range.toString());
.detail("Range", req.range);
if (req.version - 1 > feed->second->emptyVersion) {
feed->second->emptyVersion = req.version - 1;
while (!feed->second->mutations.empty() && feed->second->mutations.front().version < req.version) {
feed->second->mutations.pop_front();
}
Version durableVersion = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, req.version)));
if (req.version > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
if (!feed->second->destroyed) {
Version durableVersion = self->data().getLatestVersion();
auto& mLV = self->addVersionToMutationLog(durableVersion);
self->addMutationToMutationLog(
mLV,
MutationRef(
MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + feed->second->id.toString(),
changeFeedSSValue(feed->second->range, feed->second->emptyVersion + 1, feed->second->stopVersion)));
if (feed->second->storageVersion != invalidVersion) {
++self->counters.kvSystemClearRanges;
self->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feed->second->id, 0),
changeFeedDurableKey(feed->second->id, req.version)));
if (req.version > feed->second->storageVersion) {
feed->second->storageVersion = invalidVersion;
feed->second->durableVersion = invalidVersion;
}
}
wait(self->durableVersion.whenAtLeast(durableVersion));
}
wait(self->durableVersion.whenAtLeast(durableVersion));
}
req.reply.send(Void());
return Void();
@ -4947,7 +4967,9 @@ ACTOR Future<Version> fetchChangeFeedApplier(StorageServer* data,
.errorUnsuppressed(e)
.detail("RangeID", rangeId.printable())
.detail("Range", range.toString())
.detail("EndVersion", endVersion);
.detail("EndVersion", endVersion)
.detail("Removing", changeFeedInfo->removing)
.detail("Destroyed", changeFeedInfo->destroyed);
throw;
}
}
@ -5044,6 +5066,7 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
}
}
state bool seenNotRegistered = false;
loop {
try {
Version maxFetched = wait(fetchChangeFeedApplier(data,
@ -5060,19 +5083,110 @@ ACTOR Future<Version> fetchChangeFeed(StorageServer* data,
throw;
}
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
// TODO REMOVE
fmt::print("DBG: SS {} Feed {} possibly destroyed {}, {} metadata create, {} desired committed\n",
data->thisServerID.toString().substr(0, 4),
changeFeedInfo->id.printable(),
changeFeedInfo->possiblyDestroyed,
changeFeedInfo->metadataCreateVersion,
data->desiredOldestVersion.get());
// There are two reasons for change_feed_not_registered:
// 1. The feed was just created, but the ss mutation stream is ahead of the GRV that fetchChangeFeedApplier
// uses to read the change feed data from the database. In this case we need to wait and retry
// 2. The feed was destroyed, but we missed a metadata update telling us this. In this case we need to destroy
// the feed
// endVersion >= the metadata create version, so we can safely use it as a proxy
if (beginVersion != 0 || seenNotRegistered || endVersion <= data->desiredOldestVersion.get()) {
// If any of these are true, the feed must be destroyed.
Version cleanupVersion = data->data().getLatestVersion();
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetch", data->thisServerID)
.detail("RangeID", changeFeedInfo->id.printable())
.detail("Range", changeFeedInfo->range.toString())
.detail("Version", cleanupVersion);
if (g_network->isSimulated()) {
ASSERT(allDestroyedChangeFeeds.count(changeFeedInfo->id));
}
Key beginClearKey = changeFeedInfo->id.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(
mLV, MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(changeFeedInfo->id, 0),
changeFeedDurableKey(changeFeedInfo->id, cleanupVersion)));
++data->counters.kvSystemClearRanges;
changeFeedInfo->destroy(cleanupVersion);
data->changeFeedCleanupDurable[changeFeedInfo->id] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) {
it.second.send(changeFeedInfo->id);
}
return invalidVersion;
}
// otherwise assume the feed just hasn't been created on the SS we tried to read it from yet, wait for it to
// definitely be committed and retry
seenNotRegistered = true;
wait(data->desiredOldestVersion.whenAtLeast(endVersion));
}
}
ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
KeyRange keys,
Version fetchVersion,
PromiseStream<Key> removals) {
PromiseStream<Key> removals,
UID fetchKeysID) {
// Wait for current TLog batch to finish to ensure that we're fetching metadata at a version >= the version of the
// ChangeServerKeys mutation. This guarantees we don't miss any metadata between the previous batch's version
// (data->version) and the mutation version.
wait(data->version.whenAtLeast(data->version.get() + 1));
state Version fetchVersion = data->version.get();
TraceEvent(SevDebug, "FetchChangeFeedMetadata", data->thisServerID)
.detail("Range", keys.toString())
.detail("FetchVersion", fetchVersion);
state std::vector<OverlappingChangeFeedEntry> feeds =
wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion + 1));
.detail("Range", keys)
.detail("FetchVersion", fetchVersion)
.detail("FKID", fetchKeysID);
state std::set<Key> refreshedFeedIds;
state std::set<Key> destroyedFeedIds;
// before fetching feeds from other SS's, refresh any feeds we already have that are being marked as removed
auto ranges = data->keyChangeFeed.intersectingRanges(keys);
for (auto& r : ranges) {
for (auto& cfInfo : r.value()) {
auto feedCleanup = data->changeFeedCleanupDurable.find(cfInfo->id);
if (feedCleanup != data->changeFeedCleanupDurable.end() && cfInfo->removing && !cfInfo->destroyed) {
TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing
destroyedFeedIds.insert(cfInfo->id);
cfInfo->removing = false;
// because we now have a gap in the metadata, it's possible this feed was destroyed
cfInfo->possiblyDestroyed = true;
// reset fetch versions because everything previously fetched was cleaned up
cfInfo->fetchVersion = invalidVersion;
cfInfo->durableFetchVersion = NotifiedVersion();
TraceEvent(SevDebug, "ResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfInfo->id.printable())
.detail("Range", cfInfo->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfInfo->emptyVersion)
.detail("StopVersion", cfInfo->stopVersion)
.detail("FKID", fetchKeysID);
}
}
}
state std::vector<OverlappingChangeFeedEntry> feeds = wait(data->cx->getOverlappingChangeFeeds(keys, fetchVersion));
// handle change feeds removed while fetching overlapping
while (removals.getFuture().isReady()) {
Key remove = waitNext(removals.getFuture());
for (int i = 0; i < feeds.size(); i++) {
@ -5081,6 +5195,7 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
}
}
std::vector<Key> feedIds;
feedIds.reserve(feeds.size());
// create change feed metadata if it does not exist
@ -5093,16 +5208,23 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
TraceEvent(SevDebug, "FetchedChangeFeedInfo", data->thisServerID)
.detail("RangeID", cfEntry.rangeId.printable())
.detail("Range", cfEntry.range.toString())
.detail("Range", cfEntry.range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", cfEntry.emptyVersion)
.detail("StopVersion", cfEntry.stopVersion)
.detail("Existing", existing)
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion);
.detail("CleanupPendingVersion", cleanupPending ? cleanupEntry->second : invalidVersion)
.detail("FKID", fetchKeysID);
bool addMutationToLog = false;
Reference<ChangeFeedInfo> changeFeedInfo;
auto fid = destroyedFeedIds.find(cfEntry.rangeId);
if (fid != destroyedFeedIds.end()) {
refreshedFeedIds.insert(cfEntry.rangeId);
destroyedFeedIds.erase(fid);
}
if (!existing) {
TEST(cleanupPending); // Fetch change feed which is cleanup pending. This means there was a move away and a
// move back, this will remake the metadata
@ -5123,30 +5245,26 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
addMutationToLog = true;
} else {
changeFeedInfo = existingEntry->second;
auto feedCleanup = data->changeFeedCleanupDurable.find(cfEntry.rangeId);
if (changeFeedInfo->destroyed) {
// race where multiple feeds fetched overlapping change feed, one realized feed was missing and marked
// it removed+destroyed, then this one fetched the same info
continue;
}
// we checked all feeds we already owned in this range at the start to reset them if they were removing, and
// this actor would have been cancelled if a later remove happened
ASSERT(!changeFeedInfo->removing);
if (cfEntry.stopVersion < changeFeedInfo->stopVersion) {
TEST(true); // Change feed updated stop version from fetch metadata
changeFeedInfo->stopVersion = cfEntry.stopVersion;
addMutationToLog = true;
}
if (feedCleanup != data->changeFeedCleanupDurable.end() && changeFeedInfo->removing) {
TEST(true); // re-fetching feed scheduled for deletion! Un-mark it as removing
if (cfEntry.emptyVersion < data->version.get()) {
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
}
changeFeedInfo->removing = false;
// reset fetch versions because everything previously fetched was cleaned up
changeFeedInfo->fetchVersion = invalidVersion;
changeFeedInfo->durableFetchVersion = NotifiedVersion();
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here
// don't update empty version past SS version if SS is behind, it can cause issues
if (cfEntry.emptyVersion < data->version.get() && cfEntry.emptyVersion > changeFeedInfo->emptyVersion) {
TEST(true); // Change feed updated empty version from fetch metadata
changeFeedInfo->emptyVersion = cfEntry.emptyVersion;
addMutationToLog = true;
}
}
@ -5166,6 +5284,84 @@ ACTOR Future<std::vector<Key>> fetchChangeFeedMetadata(StorageServer* data,
}
}
}
TEST(!refreshedFeedIds.empty()); // Feed refreshed between move away and move back
TEST(!destroyedFeedIds.empty()); // Feed destroyed between move away and move back
for (auto& feedId : refreshedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
TEST(true); // feed refreshed
continue;
}
// Since cleanup put a mutation in the log to delete the change feed data, put one in the log to restore
// it
// We may just want to refactor this so updateStorage does explicit deletes based on
// changeFeedCleanupDurable and not use the mutation log at all for the change feed metadata cleanup.
// Then we wouldn't have to reset anything here or above
// Do the mutation log update here instead of above to ensure we only add it back to the mutation log if we're
// sure it wasn't deleted in the metadata gap
Version metadataVersion = data->data().getLatestVersion();
auto& mLV = data->addVersionToMutationLog(metadataVersion);
data->addMutationToMutationLog(
mLV,
MutationRef(MutationRef::SetValue,
persistChangeFeedKeys.begin.toString() + existingEntry->second->id.toString(),
changeFeedSSValue(existingEntry->second->range,
existingEntry->second->emptyVersion + 1,
existingEntry->second->stopVersion)));
TraceEvent(SevDebug, "PersistingResetChangeFeedInfo", data->thisServerID)
.detail("RangeID", existingEntry->second->id.printable())
.detail("Range", existingEntry->second->range)
.detail("FetchVersion", fetchVersion)
.detail("EmptyVersion", existingEntry->second->emptyVersion)
.detail("StopVersion", existingEntry->second->stopVersion)
.detail("FKID", fetchKeysID)
.detail("MetadataVersion", metadataVersion);
}
for (auto& feedId : destroyedFeedIds) {
auto existingEntry = data->uidChangeFeed.find(feedId);
if (existingEntry == data->uidChangeFeed.end() || existingEntry->second->destroyed) {
TEST(true); // feed refreshed but then destroyed elsewhere
continue;
}
// TODO REMOVE print
fmt::print("DBG: SS {} fetching feed {} was refreshed but not present!! assuming destroyed\n",
data->thisServerID.toString().substr(0, 4),
feedId.printable());
Version cleanupVersion = data->data().getLatestVersion();
TraceEvent(SevDebug, "DestroyingChangeFeedFromFetchMetadata", data->thisServerID)
.detail("RangeID", feedId.printable())
.detail("Range", existingEntry->second->range)
.detail("Version", cleanupVersion)
.detail("FKID", fetchKeysID);
if (g_network->isSimulated()) {
ASSERT(allDestroyedChangeFeeds.count(feedId));
}
Key beginClearKey = feedId.withPrefix(persistChangeFeedKeys.begin);
auto& mLV = data->addVersionToMutationLog(cleanupVersion);
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange, beginClearKey, keyAfter(beginClearKey)));
++data->counters.kvSystemClearRanges;
data->addMutationToMutationLog(mLV,
MutationRef(MutationRef::ClearRange,
changeFeedDurableKey(feedId, 0),
changeFeedDurableKey(feedId, cleanupVersion)));
++data->counters.kvSystemClearRanges;
existingEntry->second->destroy(cleanupVersion);
data->changeFeedCleanupDurable[feedId] = cleanupVersion;
for (auto& it : data->changeFeedRemovals) {
it.second.send(feedId);
}
}
return feedIds;
}
@ -5221,7 +5417,6 @@ ACTOR Future<std::unordered_map<Key, Version>> dispatchChangeFeeds(StorageServer
}
}
if (done) {
data->changeFeedRemovals.erase(fetchKeysID);
return feedMaxFetched;
}
}
@ -5286,8 +5481,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state PromiseStream<Key> removals;
data->changeFeedRemovals[fetchKeysID] = removals;
state Future<std::vector<Key>> fetchCFMetadata =
fetchChangeFeedMetadata(data, keys, data->version.get(), removals);
state Future<std::vector<Key>> fetchCFMetadata = fetchChangeFeedMetadata(data, keys, removals, fetchKeysID);
validate(data);
@ -5632,6 +5826,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
}
}
data->changeFeedRemovals.erase(fetchKeysID);
shard->phase = AddingShard::Waiting;
// Similar to transferred version, but wait for all feed data and
@ -5944,7 +6140,6 @@ void changeServerKeys(StorageServer* data,
auto feed = data->uidChangeFeed.find(f.first);
if (feed != data->uidChangeFeed.end()) {
feed->second->emptyVersion = version - 1;
feed->second->removing = true;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
@ -6246,7 +6441,10 @@ private:
feed->second->durableVersion = invalidVersion;
}
}
addMutationToLog = true;
if (!feed->second->destroyed) {
// if feed is destroyed, adding an extra mutation here would re-create it if SS restarted
addMutationToLog = true;
}
}
} else if (status == ChangeFeedStatus::CHANGE_FEED_CREATE && createdFeed) {
@ -6282,13 +6480,12 @@ private:
changeFeedDurableKey(feed->second->id, currentVersion)));
++data->counters.kvSystemClearRanges;
feed->second->emptyVersion = currentVersion - 1;
feed->second->stopVersion = currentVersion;
feed->second->removing = true;
feed->second->moved(feed->second->range);
feed->second->newMutations.trigger();
feed->second->destroy(currentVersion);
data->changeFeedCleanupDurable[feed->first] = cleanupVersion;
if (g_network->isSimulated()) {
allDestroyedChangeFeeds.insert(changeFeedId);
}
}
if (status == ChangeFeedStatus::CHANGE_FEED_DESTROY) {

View File

@ -62,8 +62,9 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
int64_t timeTravelTooOld = 0;
int64_t rowsRead = 0;
int64_t bytesRead = 0;
int64_t purges = 0;
std::vector<Future<Void>> clients;
bool enablePruning;
bool enablePurging;
DatabaseConfiguration config;
@ -79,7 +80,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
timeTravelLimit = getOption(options, LiteralStringRef("timeTravelLimit"), testDuration);
timeTravelBufferSize = getOption(options, LiteralStringRef("timeTravelBufferSize"), 100000000);
threads = getOption(options, LiteralStringRef("threads"), 1);
enablePruning = getOption(options, LiteralStringRef("enablePruning"), false /*sharedRandomNumber % 2 == 0*/);
enablePurging = getOption(options, LiteralStringRef("enablePurging"), false /*sharedRandomNumber % 2 == 0*/);
ASSERT(threads >= 1);
if (BGV_DEBUG) {
@ -177,60 +178,6 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
OldRead(KeyRange range, Version v, RangeResult oldResult) : range(range), v(v), oldResult(oldResult) {}
};
// utility to prune <range> at pruneVersion=<version> with the <force> flag
ACTOR Future<Void> pruneAtVersion(Database cx, KeyRange range, Version version, bool force) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
state Key pruneKey;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Value pruneValue = blobGranulePruneValueFor(version, range, force);
tr->atomicOp(
addVersionStampAtEnd(blobGranulePruneKeys.begin), pruneValue, MutationRef::SetVersionstampedKey);
tr->set(blobGranulePruneChangeKey, deterministicRandom()->randomUniqueID().toString());
state Future<Standalone<StringRef>> fTrVs = tr->getVersionstamp();
wait(tr->commit());
Standalone<StringRef> vs = wait(fTrVs);
pruneKey = blobGranulePruneKeys.begin.withSuffix(vs);
if (BGV_DEBUG) {
fmt::print("pruneAtVersion for range [{0} - {1}) at version {2} succeeded\n",
range.begin.printable(),
range.end.printable(),
version);
}
break;
} catch (Error& e) {
if (BGV_DEBUG) {
fmt::print("pruneAtVersion for range [{0} - {1}) at version {2} encountered error {3}\n",
range.begin.printable(),
range.end.printable(),
version,
e.name());
}
wait(tr->onError(e));
}
}
tr->reset();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> pruneVal = wait(tr->get(pruneKey));
if (!pruneVal.present()) {
return Void();
}
state Future<Void> watchFuture = tr->watch(pruneKey);
wait(tr->commit());
wait(watchFuture);
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> killBlobWorkers(Database cx, BlobGranuleVerifierWorkload* self) {
state Transaction tr(cx);
state std::set<UID> knownWorkers;
@ -272,12 +219,12 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
}
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPruning) {
ACTOR Future<Void> verifyGranules(Database cx, BlobGranuleVerifierWorkload* self, bool allowPurging) {
state double last = now();
state double endTime = last + self->testDuration;
state std::map<double, OldRead> timeTravelChecks;
state int64_t timeTravelChecksMemory = 0;
state Version prevPruneVersion = -1;
state Version prevPurgeVersion = -1;
state UID dbgId = debugRandom()->randomUniqueID();
TraceEvent("BlobGranuleVerifierStart");
@ -300,25 +247,27 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
state OldRead oldRead = timeTravelIt->second;
timeTravelChecksMemory -= oldRead.oldResult.expectedSize();
timeTravelIt = timeTravelChecks.erase(timeTravelIt);
if (prevPruneVersion == -1) {
prevPruneVersion = oldRead.v;
if (prevPurgeVersion == -1) {
prevPurgeVersion = oldRead.v;
}
// advance iterator before doing read, so if it gets error we don't retry it
try {
state Version newPruneVersion = 0;
state bool doPruning = allowPruning && deterministicRandom()->random01() < 0.5;
if (doPruning) {
Version maxPruneVersion = oldRead.v;
state Version newPurgeVersion = 0;
state bool doPurging = allowPurging && deterministicRandom()->random01() < 0.5;
if (doPurging) {
Version maxPurgeVersion = oldRead.v;
for (auto& it : timeTravelChecks) {
maxPruneVersion = std::min(it.second.v, maxPruneVersion);
maxPurgeVersion = std::min(it.second.v, maxPurgeVersion);
}
if (prevPruneVersion < maxPruneVersion) {
newPruneVersion = deterministicRandom()->randomInt64(prevPruneVersion, maxPruneVersion);
prevPruneVersion = std::max(prevPruneVersion, newPruneVersion);
wait(self->pruneAtVersion(cx, normalKeys, newPruneVersion, false));
if (prevPurgeVersion < maxPurgeVersion) {
newPurgeVersion = deterministicRandom()->randomInt64(prevPurgeVersion, maxPurgeVersion);
prevPurgeVersion = std::max(prevPurgeVersion, newPurgeVersion);
Key purgeKey = wait(cx->purgeBlobGranules(normalKeys, newPurgeVersion, false));
wait(cx->waitPurgeGranulesComplete(purgeKey));
self->purges++;
} else {
doPruning = false;
doPurging = false;
}
}
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> reReadResult =
@ -328,12 +277,12 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
self->timeTravelReads++;
if (doPruning) {
if (doPurging) {
wait(self->killBlobWorkers(cx, self));
std::pair<RangeResult, Standalone<VectorRef<BlobGranuleChunkRef>>> versionRead =
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPruneVersion));
wait(readFromBlob(cx, self->bstore, oldRead.range, 0, prevPurgeVersion));
try {
Version minSnapshotVersion = newPruneVersion;
Version minSnapshotVersion = newPurgeVersion;
for (auto& it : versionRead.second) {
minSnapshotVersion = std::min(minSnapshotVersion, it.snapshotVersion);
}
@ -395,10 +344,10 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
Future<Void> start(Database const& cx) override {
clients.reserve(threads + 1);
clients.push_back(timeout(findGranules(cx, this), testDuration, Void()));
if (enablePruning && clientId == 0) {
if (enablePurging && clientId == 0) {
clients.push_back(
timeout(reportErrors(verifyGranules(cx, this, true), "BlobGranuleVerifier"), testDuration, Void()));
} else if (!enablePruning) {
} else if (!enablePurging) {
for (int i = 0; i < threads; i++) {
clients.push_back(timeout(
reportErrors(verifyGranules(cx, this, false), "BlobGranuleVerifier"), testDuration, Void()));
@ -518,6 +467,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
fmt::print(" {} time travel reads\n", self->timeTravelReads);
fmt::print(" {} rows\n", self->rowsRead);
fmt::print(" {} bytes\n", self->bytesRead);
fmt::print(" {} purges\n", self->purges);
// FIXME: add above as details to trace event
TraceEvent("BlobGranuleVerifierChecked").detail("Result", result);

View File

@ -35,6 +35,7 @@
#define TRACE_DEFAULT_ROLL_SIZE (10 << 20)
#define TRACE_DEFAULT_MAX_LOGS_SIZE (10 * TRACE_DEFAULT_ROLL_SIZE)
#define PRINTABLE_COMPRESS_NULLS 0
inline int fastrand() {
static int g_seed = 0;
@ -343,20 +344,37 @@ struct TraceableStringImpl : std::true_type {
}
std::string result;
result.reserve(size - nonPrintables + (nonPrintables * 4) + numBackslashes);
int numNull = 0;
for (auto iter = TraceableString<T>::begin(value); !TraceableString<T>::atEnd(value, iter); ++iter) {
if (*iter == '\\') {
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
result.push_back('\\');
result.push_back('\\');
} else if (isPrintable(*iter)) {
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
result.push_back(*iter);
} else {
const uint8_t byte = *iter;
result.push_back('\\');
result.push_back('x');
result.push_back(base16Char(byte / 16));
result.push_back(base16Char(byte));
if (PRINTABLE_COMPRESS_NULLS && byte == 0) {
numNull++;
} else {
result.push_back('\\');
result.push_back('x');
result.push_back(base16Char(byte / 16));
result.push_back(base16Char(byte));
}
}
}
if (numNull > 0) {
result += format("[%d]", numNull);
numNull = 0;
}
return result;
}
};