Added test check to verify change feeds get cleaned up at the end of blob granule tests (#8322)
* implemented check, but it doesn't always work * cleanup
This commit is contained in:
parent
9799329b99
commit
8592f7e253
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "fdbserver/BlobGranuleValidation.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
ACTOR Future<std::pair<RangeResult, Version>> readFromFDB(Database cx, KeyRange range) {
|
||||
|
@ -332,3 +333,97 @@ ACTOR Future<Void> validateGranuleSummaries(Database cx,
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
struct feed_cmp_f {
|
||||
bool operator()(const std::pair<Key, KeyRange>& lhs, const std::pair<Key, KeyRange>& rhs) const {
|
||||
if (lhs.second.begin == rhs.second.begin) {
|
||||
return lhs.second.end < rhs.second.end;
|
||||
}
|
||||
return lhs.second.begin < rhs.second.begin;
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR Future<std::vector<std::pair<Key, KeyRange>>> getActiveFeeds(Transaction* tr) {
|
||||
RangeResult feedResult = wait(tr->getRange(changeFeedKeys, 10000));
|
||||
ASSERT(!feedResult.more);
|
||||
std::vector<std::pair<Key, KeyRange>> results;
|
||||
for (auto& it : feedResult) {
|
||||
Key feedKey = it.key.removePrefix(changeFeedPrefix);
|
||||
KeyRange feedRange;
|
||||
Version version;
|
||||
ChangeFeedStatus status;
|
||||
|
||||
std::tie(feedRange, version, status) = decodeChangeFeedValue(it.value);
|
||||
results.push_back({ feedKey, feedRange });
|
||||
}
|
||||
|
||||
std::sort(results.begin(), results.end(), feed_cmp_f());
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
// TODO: add debug parameter
|
||||
// FIXME: this check currently assumes blob granules are the only users of change feeds, and will fail if that is not
|
||||
// the case
|
||||
ACTOR Future<Void> checkFeedCleanup(Database cx, bool debug) {
|
||||
if (SERVER_KNOBS->BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY < 0) {
|
||||
// no guarantee of feed cleanup, return
|
||||
return Void();
|
||||
}
|
||||
// big extra timeout just because simulation can take a while to quiesce
|
||||
state double checkTimeoutOnceStable = 300.0 + 2 * SERVER_KNOBS->BLOB_WORKER_FORCE_FLUSH_CLEANUP_DELAY;
|
||||
state Optional<double> stableTimestamp;
|
||||
state Standalone<VectorRef<KeyRangeRef>> lastGranules;
|
||||
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
// get set of current granules. if different than last set of granules
|
||||
state Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(normalKeys, 10000));
|
||||
state std::vector<std::pair<Key, KeyRange>> activeFeeds = wait(getActiveFeeds(&tr));
|
||||
|
||||
// TODO REMOVE
|
||||
if (debug) {
|
||||
fmt::print("{0} granules and {1} active feeds found\n", granules.size(), activeFeeds.size());
|
||||
}
|
||||
/*fmt::print("Granules:\n");
|
||||
for (auto& it : granules) {
|
||||
fmt::print(" [{0} - {1})\n", it.begin.printable(), it.end.printable());
|
||||
}*/
|
||||
bool allPresent = granules.size() == activeFeeds.size();
|
||||
for (int i = 0; allPresent && i < granules.size(); i++) {
|
||||
if (granules[i] != activeFeeds[i].second) {
|
||||
if (debug) {
|
||||
fmt::print("Feed {0} for [{1} - {2}) still exists despite no granule!\n",
|
||||
activeFeeds[i].first.printable(),
|
||||
activeFeeds[i].second.begin.printable(),
|
||||
activeFeeds[i].second.end.printable());
|
||||
}
|
||||
allPresent = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allPresent) {
|
||||
if (debug) {
|
||||
fmt::print("Feed Cleanup Check Complete\n");
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
if (granules != lastGranules) {
|
||||
stableTimestamp.reset();
|
||||
} else if (!stableTimestamp.present()) {
|
||||
stableTimestamp = now();
|
||||
}
|
||||
lastGranules = granules;
|
||||
|
||||
// ensure this converges within a time window of granules becoming stable
|
||||
if (stableTimestamp.present()) {
|
||||
ASSERT(now() - stableTimestamp.get() <= checkTimeoutOnceStable);
|
||||
}
|
||||
|
||||
wait(delay(2.0));
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,6 +60,8 @@ ACTOR Future<Void> validateGranuleSummaries(Database cx,
|
|||
Optional<TenantName> tenantName,
|
||||
Promise<Void> testComplete);
|
||||
|
||||
ACTOR Future<Void> checkFeedCleanup(Database cx, bool debug);
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
||||
#endif
|
||||
|
|
|
@ -1007,6 +1007,13 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
ACTOR Future<bool> _check(Database cx, BlobGranuleCorrectnessWorkload* self) {
|
||||
// check error counts, and do an availability check at the end
|
||||
state std::vector<Future<bool>> results;
|
||||
state Future<Void> checkFeedCleanupFuture;
|
||||
if (self->clientId == 0) {
|
||||
checkFeedCleanupFuture = checkFeedCleanup(cx, BGW_DEBUG);
|
||||
} else {
|
||||
checkFeedCleanupFuture = Future<Void>(Void());
|
||||
}
|
||||
|
||||
for (auto& it : self->directories) {
|
||||
results.push_back(self->checkDirectory(cx, self, it));
|
||||
}
|
||||
|
@ -1015,6 +1022,7 @@ struct BlobGranuleCorrectnessWorkload : TestWorkload {
|
|||
bool dirSuccess = wait(f);
|
||||
allSuccessful &= dirSuccess;
|
||||
}
|
||||
wait(checkFeedCleanupFuture);
|
||||
return allSuccessful;
|
||||
}
|
||||
|
||||
|
|
|
@ -1029,6 +1029,13 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
wait(self->setUpBlobRange(cx));
|
||||
}
|
||||
|
||||
state Future<Void> checkFeedCleanupFuture;
|
||||
if (self->clientId == 0) {
|
||||
checkFeedCleanupFuture = checkFeedCleanup(cx, BGV_DEBUG);
|
||||
} else {
|
||||
checkFeedCleanupFuture = Future<Void>(Void());
|
||||
}
|
||||
|
||||
state Version readVersion = wait(self->doGrv(&tr));
|
||||
state Version startReadVersion = readVersion;
|
||||
state int checks = 0;
|
||||
|
@ -1125,6 +1132,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
state bool dataPassed = wait(self->checkAllData(cx, self));
|
||||
wait(checkFeedCleanupFuture);
|
||||
|
||||
state bool result =
|
||||
availabilityPassed && dataPassed && self->mismatches == 0 && (checks > 0) && (self->timeTravelTooOld == 0);
|
||||
|
|
Loading…
Reference in New Issue