Added popping to change feed operations workload
This commit is contained in:
parent
142ed7713e
commit
83ae15012d
|
@ -2197,12 +2197,10 @@ static std::deque<Standalone<MutationsAndVersionRef>>::const_iterator searchChan
|
|||
break;
|
||||
}
|
||||
lastEnd = currentEnd + 1;
|
||||
jump = std::min((int)(currentEnd - mutations.begin()), jump);
|
||||
currentEnd -= jump;
|
||||
jump <<= 1;
|
||||
}
|
||||
if (currentEnd < mutations.begin()) {
|
||||
currentEnd = mutations.begin();
|
||||
}
|
||||
auto ret = std::lower_bound(currentEnd, lastEnd, searchKey, MutationsAndVersionRef::OrderByVersion());
|
||||
// TODO REMOVE: for validation
|
||||
if (ret != mutations.end()) {
|
||||
|
|
|
@ -39,7 +39,6 @@
|
|||
#define DEBUG_KEY ""_sr
|
||||
|
||||
#define DEBUG_CF(feedKey) (feedKey.printable() == DEBUG_KEY)
|
||||
// #define DEBUG_CF(feedKey) true
|
||||
|
||||
// TODO refactor this to deterministicRandom() from here and BlobGranuleFiles
|
||||
// picks a number between 2^minExp and 2^maxExp, but uniformly distributed over exponential buckets 2^n an 2^n+1
|
||||
|
@ -56,6 +55,17 @@ int randomExp(int minExp, int maxExp) {
|
|||
return deterministicRandom()->randomInt(val, val * 2);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doPop(Database cx, Key key, Key feedID, Version version, Version* doneOut) {
|
||||
wait(cx->popChangeFeedMutations(feedID, version));
|
||||
if (*doneOut < version) {
|
||||
*doneOut = version;
|
||||
}
|
||||
if (DEBUG_CF(key)) {
|
||||
fmt::print("DBG) {0} Popped through {1}\n", key.printable(), version);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
struct FeedTestData : ReferenceCounted<FeedTestData>, NonCopyable {
|
||||
Key key;
|
||||
KeyRange keyRange;
|
||||
|
@ -64,21 +74,35 @@ struct FeedTestData : ReferenceCounted<FeedTestData>, NonCopyable {
|
|||
Future<Void> liveReader;
|
||||
bool lastCleared = false;
|
||||
|
||||
Version popVersion;
|
||||
std::vector<Future<Void>> pops;
|
||||
Version poppingVersion;
|
||||
Version poppedVersion;
|
||||
Optional<Version> stopVersion;
|
||||
bool destroying;
|
||||
bool destroyed;
|
||||
bool complete;
|
||||
|
||||
int popWindow;
|
||||
int popDelayWindow;
|
||||
|
||||
std::deque<std::pair<Version, Optional<Value>>> writesByVersion;
|
||||
|
||||
// these were all committed
|
||||
std::deque<std::pair<Version, Optional<Value>>> pendingCheck;
|
||||
NotifiedVersion checkVersion;
|
||||
|
||||
FeedTestData(Key key)
|
||||
FeedTestData(Key key, bool doPops)
|
||||
: key(key), keyRange(KeyRangeRef(key, keyAfter(key))), feedID(key.withPrefix(LiteralStringRef("CF"))), nextVal(0),
|
||||
lastCleared(false), popVersion(0), destroying(false), destroyed(false), complete(false), checkVersion(0) {}
|
||||
lastCleared(false), poppingVersion(0), poppedVersion(0), destroying(false), destroyed(false), complete(false),
|
||||
checkVersion(0) {
|
||||
if (doPops) {
|
||||
popWindow = randomExp(1, 8);
|
||||
popDelayWindow = deterministicRandom()->randomInt(0, 2) * randomExp(1, 4);
|
||||
} else {
|
||||
popWindow = -1;
|
||||
popDelayWindow = -1;
|
||||
}
|
||||
}
|
||||
|
||||
Value nextValue() {
|
||||
std::string v = std::to_string(nextVal);
|
||||
|
@ -99,6 +123,21 @@ struct FeedTestData : ReferenceCounted<FeedTestData>, NonCopyable {
|
|||
complete = true;
|
||||
checkVersion.set(checkVersion.get() + 1);
|
||||
}
|
||||
|
||||
void pop(Database cx, Version v) {
|
||||
if (DEBUG_CF(key)) {
|
||||
fmt::print("DBG) {0} Popping through {1}\n", key.printable(), v);
|
||||
}
|
||||
ASSERT(poppingVersion < v);
|
||||
poppingVersion = v;
|
||||
while (!writesByVersion.empty() && v > writesByVersion.front().first) {
|
||||
writesByVersion.pop_front();
|
||||
}
|
||||
while (!pendingCheck.empty() && v > pendingCheck.front().first) {
|
||||
pendingCheck.pop_front();
|
||||
}
|
||||
pops.push_back(doPop(cx, key, feedID, v, &poppedVersion));
|
||||
}
|
||||
};
|
||||
|
||||
static void rollbackFeed(Key key,
|
||||
|
@ -143,12 +182,6 @@ static void checkNextResult(Key key,
|
|||
// these asserts are correctness of change feed invariants. TODO add trace events for when they fail
|
||||
// to debug
|
||||
|
||||
if (checkData.front().second.present()) {
|
||||
ASSERT(buffered.front().mutations[0].param2 == checkData.front().second.get());
|
||||
} else {
|
||||
ASSERT(buffered.front().mutations[0].param2 == keyAfter(key));
|
||||
}
|
||||
|
||||
// Handle case where txn retried and wrote same value twice. checkData's version is the committed one, so the same
|
||||
// update may appear at an earlier version. This is fine, as long as it then actually appears at the committed
|
||||
// version
|
||||
|
@ -167,15 +200,23 @@ static void checkNextResult(Key key,
|
|||
}
|
||||
ASSERT(checkData.front().first >= buffered.front().version);
|
||||
|
||||
if (checkData.front().second.present()) {
|
||||
ASSERT(buffered.front().mutations[0].type == MutationRef::SetValue);
|
||||
ASSERT(buffered.front().mutations[0].param2 == checkData.front().second.get());
|
||||
} else {
|
||||
ASSERT(buffered.front().mutations[0].type == MutationRef::ClearRange);
|
||||
ASSERT(buffered.front().mutations[0].param2 == keyAfter(key));
|
||||
}
|
||||
|
||||
if (checkData.front().first == buffered.front().version) {
|
||||
checkData.pop_front();
|
||||
}
|
||||
buffered.pop_front();
|
||||
}
|
||||
|
||||
// TODO launch pops occasionally
|
||||
ACTOR Future<Void> liveReader(Database cx, Reference<FeedTestData> data, Version begin) {
|
||||
state Version lastCheckVersion = 0;
|
||||
state Version nextCheckVersion = 0;
|
||||
state std::deque<Standalone<MutationsAndVersionRef>> buffered;
|
||||
state Reference<ChangeFeedData> results = makeReference<ChangeFeedData>();
|
||||
state Future<Void> stream =
|
||||
|
@ -186,6 +227,7 @@ ACTOR Future<Void> liveReader(Database cx, Reference<FeedTestData> data, Version
|
|||
// done
|
||||
return Void();
|
||||
}
|
||||
nextCheckVersion = data->pendingCheck.empty() ? invalidVersion : data->pendingCheck.front().first;
|
||||
choose {
|
||||
when(Standalone<VectorRef<MutationsAndVersionRef>> res = waitNext(results->mutations.getFuture())) {
|
||||
for (auto& it : res) {
|
||||
|
@ -220,21 +262,49 @@ ACTOR Future<Void> liveReader(Database cx, Reference<FeedTestData> data, Version
|
|||
when(wait(data->checkVersion.whenAtLeast(lastCheckVersion + 1))) {
|
||||
// wake loop and start new whenAtLeast whenever checkVersion is set
|
||||
lastCheckVersion = data->checkVersion.get();
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Live can check through {1}\n", data->key.printable(), lastCheckVersion);
|
||||
}
|
||||
}
|
||||
when(wait(data->pendingCheck.empty() ? Never()
|
||||
: results->whenAtLeast(data->pendingCheck.front().first))) {
|
||||
Version v = buffered.front().version;
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Live checking through {1}\n",
|
||||
data->key.printable(),
|
||||
data->pendingCheck.front().first);
|
||||
|
||||
if (data->pendingCheck.empty() || data->pendingCheck.front().first > nextCheckVersion) {
|
||||
// pendingCheck wasn't empty before whenAtLeast, and nextCheckVersion = the front version, so if
|
||||
// either of these are true, the data was popped concurrently and we can move on to checking the
|
||||
// next value
|
||||
TEST(true); // popped while waiting for whenAtLeast to check next value
|
||||
continue;
|
||||
}
|
||||
checkNextResult(data->key, buffered, data->pendingCheck);
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Live Checked through {1}\n", data->key.printable(), v);
|
||||
while (!buffered.empty() && buffered.front().version < data->poppingVersion) {
|
||||
TEST(true); // live reader ignoring data that is being popped
|
||||
buffered.pop_front();
|
||||
}
|
||||
if (buffered.empty()) {
|
||||
if (data->poppingVersion < data->pendingCheck.front().first) {
|
||||
fmt::print("DBG) {0} Buffered empty after ready for check, and data not popped! popped "
|
||||
"{1}, popping {2}, check {3}\n",
|
||||
data->key.printable(),
|
||||
data->poppedVersion,
|
||||
data->poppingVersion,
|
||||
data->pendingCheck.front().first);
|
||||
}
|
||||
ASSERT(data->poppingVersion >= data->pendingCheck.front().first);
|
||||
data->pendingCheck.pop_front();
|
||||
} else {
|
||||
Version v = buffered.front().version;
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Live checking through {1}\n",
|
||||
data->key.printable(),
|
||||
data->pendingCheck.front().first);
|
||||
}
|
||||
checkNextResult(data->key, buffered, data->pendingCheck);
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Live Checked through {1}\n", data->key.printable(), v);
|
||||
}
|
||||
|
||||
if (data->popDelayWindow >= 0 && data->popWindow >= 0 &&
|
||||
data->writesByVersion.size() == data->popWindow + data->popDelayWindow) {
|
||||
data->pop(cx, data->writesByVersion[data->popWindow - 1].first + 1);
|
||||
ASSERT(data->writesByVersion.size() == data->popDelayWindow);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,6 +321,10 @@ ACTOR Future<Void> historicReader(Database cx, Reference<FeedTestData> data, Ver
|
|||
state Reference<ChangeFeedData> results = makeReference<ChangeFeedData>();
|
||||
state Future<Void> stream = cx->getChangeFeedStream(results, data->feedID, begin, end, data->keyRange);
|
||||
|
||||
if (DEBUG_CF(data->key)) {
|
||||
fmt::print("DBG) {0} Starting historical read {1} - {2}\n", data->key.printable(), begin, end);
|
||||
}
|
||||
|
||||
// TODO could cpu optimize this
|
||||
for (auto& it : data->writesByVersion) {
|
||||
if (it.first >= end) {
|
||||
|
@ -286,9 +360,18 @@ ACTOR Future<Void> historicReader(Database cx, Reference<FeedTestData> data, Ver
|
|||
}
|
||||
}
|
||||
|
||||
while (!buffered.empty() && buffered.front().version < data->poppingVersion) {
|
||||
// ignore data
|
||||
buffered.pop_front();
|
||||
}
|
||||
while (!checkData.empty() && checkData.front().first < data->poppingVersion) {
|
||||
checkData.pop_front();
|
||||
}
|
||||
|
||||
while (!checkData.empty() && !buffered.empty()) {
|
||||
checkNextResult(data->key, buffered, checkData);
|
||||
}
|
||||
// Change feed missing data it should have
|
||||
ASSERT(checkData.empty());
|
||||
if (!buffered.empty()) {
|
||||
fmt::print("ERROR {0}: buffered still has {1} for historic read {2} - {3}!\n",
|
||||
|
@ -303,6 +386,7 @@ ACTOR Future<Void> historicReader(Database cx, Reference<FeedTestData> data, Ver
|
|||
buffered[i].mutations.size());
|
||||
}
|
||||
}
|
||||
// Change feed read extra data it shouldn't have
|
||||
ASSERT(buffered.empty());
|
||||
|
||||
return Void();
|
||||
|
@ -313,7 +397,8 @@ enum Op {
|
|||
READ = 1,
|
||||
UPDATE_CLEAR = 2,
|
||||
STOP = 3,
|
||||
OP_COUNT = 4 /* keep this last */
|
||||
POP = 4,
|
||||
OP_COUNT = 5 /* keep this last */
|
||||
};
|
||||
|
||||
struct ChangeFeedOperationsWorkload : TestWorkload {
|
||||
|
@ -324,6 +409,7 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
bool clientsDisjointKeyspace;
|
||||
bool clearKeyWhenDestroy;
|
||||
double clearFrequency;
|
||||
int popMode;
|
||||
|
||||
int opWeights[Op::OP_COUNT];
|
||||
int totalOpWeight;
|
||||
|
@ -348,6 +434,8 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
rand /= 2;
|
||||
bool noCreateDelete = rand % 10 == 0;
|
||||
rand /= 10;
|
||||
popMode = rand % 3; // 0=none, 1=read-driven, 2=op-driven
|
||||
rand /= 3;
|
||||
|
||||
ASSERT(clientId >= 0);
|
||||
ASSERT(clientId < clientCount);
|
||||
|
@ -367,6 +455,9 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
if (noCreateDelete) {
|
||||
opWeights[Op::CREATE_DELETE] = 0;
|
||||
}
|
||||
if (popMode != 2) {
|
||||
opWeights[Op::POP] = 0;
|
||||
}
|
||||
|
||||
std::string weightString = "|";
|
||||
totalOpWeight = 0;
|
||||
|
@ -423,7 +514,7 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
ACTOR Future<Void> createNewFeed(Database cx, ChangeFeedOperationsWorkload* self) {
|
||||
state Transaction tr(cx);
|
||||
state Key key = self->unusedNewRandomKey();
|
||||
state Reference<FeedTestData> feedData = makeReference<FeedTestData>(key);
|
||||
state Reference<FeedTestData> feedData = makeReference<FeedTestData>(key, self->popMode == 1);
|
||||
state Value initialValue = feedData->nextValue();
|
||||
|
||||
if (DEBUG_CF(key)) {
|
||||
|
@ -475,19 +566,55 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
return _check(cx, this);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkFeed(Database cx, ChangeFeedOperationsWorkload* self, Reference<FeedTestData> feedData) {
|
||||
state int popIdx;
|
||||
feedData->testComplete();
|
||||
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print("Final check {0} waiting on live reader\n", feedData->key.printable());
|
||||
}
|
||||
// wait on live reader and pops to make sure they complete without error
|
||||
wait(feedData->liveReader);
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print("Final check {0} waiting on {1} pops\n", feedData->key.printable(), feedData->pops.size());
|
||||
}
|
||||
for (popIdx = 0; popIdx < feedData->pops.size(); popIdx++) {
|
||||
wait(feedData->pops[popIdx]);
|
||||
}
|
||||
|
||||
// do final check, read everything not popped
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print("Final check {0} waiting on data check\n", feedData->key.printable(), feedData->pops.size());
|
||||
}
|
||||
wait(self->doRead(cx, feedData, feedData->writesByVersion.size()));
|
||||
|
||||
// ensure reading [0, poppedVersion) returns no results
|
||||
if (feedData->poppedVersion > 0) {
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print(
|
||||
"Final check {0} waiting on read popped check\n", feedData->key.printable(), feedData->pops.size());
|
||||
}
|
||||
// TODO need to make this not ignore popped, otherwise this is a no-op
|
||||
wait(historicReader(cx, feedData, 0, feedData->poppedVersion));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<bool> _check(Database cx, ChangeFeedOperationsWorkload* self) {
|
||||
TraceEvent("ChangeFeedOperationsCheck").detail("FeedCount", self->data.size()).log();
|
||||
state int i;
|
||||
for (i = 0; i < self->data.size(); i++) {
|
||||
fmt::print("Checking {0} feeds\n", self->data.size()); // TODO REMOVE
|
||||
state std::vector<Future<Void>> feedChecks;
|
||||
for (int i = 0; i < self->data.size(); i++) {
|
||||
if (self->data[i]->destroying) {
|
||||
continue;
|
||||
}
|
||||
self->data[i]->testComplete();
|
||||
wait(self->data[i]->liveReader);
|
||||
// TODO: wait on active reader to be caught up and make sure they completed
|
||||
// do final check of non-destroyed feeds by reading everything
|
||||
wait(self->doRead(cx, self, i, self->data[i]->writesByVersion.size()));
|
||||
if (DEBUG_CF(self->data[i]->key)) {
|
||||
fmt::print("Final check {0}\n", self->data[i]->key.printable());
|
||||
}
|
||||
feedChecks.push_back(self->checkFeed(cx, self, self->data[i]));
|
||||
}
|
||||
wait(waitForAll(feedChecks));
|
||||
// TODO maybe check that all destroyed feeds are actually destroyed?
|
||||
TraceEvent("ChangeFeedOperationsCheckComplete");
|
||||
return true;
|
||||
|
@ -495,8 +622,7 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
|
||||
ACTOR Future<Void> stopFeed(Database cx, ChangeFeedOperationsWorkload* self, int feedIdx) {
|
||||
state Reference<FeedTestData> feedData = self->data[feedIdx];
|
||||
ACTOR Future<Void> stopFeed(Database cx, Reference<FeedTestData> feedData) {
|
||||
state Transaction tr(cx);
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print("DBG) {0} Stopping\n", feedData->key.printable());
|
||||
|
@ -520,6 +646,12 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
void popFeed(Database cx, Reference<FeedTestData> feedData) {
|
||||
if (!feedData->writesByVersion.empty()) {
|
||||
feedData->pop(cx, feedData->writesByVersion.front().first + 1);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> destroyFeed(Database cx, ChangeFeedOperationsWorkload* self, int feedIdx) {
|
||||
state Reference<FeedTestData> feedData = self->data[feedIdx];
|
||||
state Transaction tr(cx);
|
||||
|
@ -538,6 +670,7 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
|
||||
feedData->destroyed = true;
|
||||
// remove feed from list
|
||||
ASSERT(self->data[feedIdx]->key == feedData->key);
|
||||
swapAndPop(&self->data, feedIdx);
|
||||
if (DEBUG_CF(feedData->key)) {
|
||||
fmt::print("DBG) {0} Destroyed @ {1}\n", feedData->key.printable(), tr.getCommittedVersion());
|
||||
|
@ -549,8 +682,10 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doRead(Database cx, ChangeFeedOperationsWorkload* self, int feedIdx, int targetReadWidth) {
|
||||
state Reference<FeedTestData> feedData = self->data[feedIdx];
|
||||
ACTOR Future<Void> doRead(Database cx, Reference<FeedTestData> feedData, int targetReadWidth) {
|
||||
if (feedData->writesByVersion.empty()) {
|
||||
return Void();
|
||||
}
|
||||
Version beginVersion;
|
||||
Version endVersion;
|
||||
if (targetReadWidth >= feedData->writesByVersion.size()) {
|
||||
|
@ -582,9 +717,10 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> doUpdateClear(Database cx, ChangeFeedOperationsWorkload* self, int feedIdx) {
|
||||
ACTOR Future<Void> doUpdateClear(Database cx,
|
||||
ChangeFeedOperationsWorkload* self,
|
||||
Reference<FeedTestData> feedData) {
|
||||
state Transaction tr(cx);
|
||||
state Reference<FeedTestData> feedData = self->data[feedIdx];
|
||||
state Optional<Value> updateValue;
|
||||
|
||||
// if value is already not set, don't do a clear, otherwise pick either
|
||||
|
@ -645,11 +781,13 @@ struct ChangeFeedOperationsWorkload : TestWorkload {
|
|||
}
|
||||
} else if (op == Op::READ) {
|
||||
// relatively small random read
|
||||
wait(self->doRead(cx, self, feedIdx, randomExp(2, 8)));
|
||||
wait(self->doRead(cx, self->data[feedIdx], randomExp(2, 8)));
|
||||
} else if (op == Op::UPDATE_CLEAR) {
|
||||
wait(self->doUpdateClear(cx, self, feedIdx));
|
||||
wait(self->doUpdateClear(cx, self, self->data[feedIdx]));
|
||||
} else if (op == Op::STOP) {
|
||||
wait(self->stopFeed(cx, self, feedIdx));
|
||||
wait(self->stopFeed(cx, self->data[feedIdx]));
|
||||
} else if (op == Op::POP) {
|
||||
self->popFeed(cx, self->data[feedIdx]);
|
||||
} else {
|
||||
fmt::print("Invalid Op {0}\n", op);
|
||||
ASSERT(false);
|
||||
|
|
Loading…
Reference in New Issue