diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 070114a890..4da77a88cf 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -540,7 +540,7 @@ struct DDQueue { }; struct ServerCounter { - enum CountType { ProposedSource = 0, QueuedSource, LaunchedSource, LaunchedDest }; + enum CountType : uint8_t { ProposedSource = 0, QueuedSource, LaunchedSource, LaunchedDest }; private: typedef std::array Item; // one for each CountType @@ -551,24 +551,29 @@ struct DDQueue { std::string toString(const Item& item) const { return format("%d %d %d %d", item[0], item[1], item[2], item[3]); } + void traceReasonItem(TraceEvent* event, const ReasonItem& item) const { for (int i = 0; i < item.size(); ++i) { event->detail(RelocateReason(i).toString(), toString(item[i])); } } - public: - void clear() { counter.clear(); } - bool has(const UID& id) const { return counter.find(id) != counter.end(); } - int& get(const UID& id, RelocateReason reason, CountType type) { + void increase(const UID& id, RelocateReason reason, CountType type) { int idx = (int)(reason); ASSERT(idx >= 0 && idx < 3); - return counter[id][idx][(int)type]; + counter[id][idx][(int)type] += 1; + } + + public: + void clear() { counter.clear(); } + + int get(const UID& id, RelocateReason reason, CountType type) const { + return counter.at(id)[(int)reason][(int)type]; } void increaseForTeam(const std::vector& ids, RelocateReason reason, CountType type) { for (auto& id : ids) { - get(id, reason, type)++; + increase(id, reason, type); } } @@ -579,6 +584,14 @@ struct DDQueue { traceReasonItem(&event, reasonItem); } } + + size_t size() const { return counter.size(); } + + // for random test + static CountType randomCountType() { + int i = deterministicRandom()->randomInt(0, (int)LaunchedDest + 1); + return (CountType)i; + } }; ActorCollectionNoErrors noErrorActors; // has to be the last one to be destroyed because other Actors may use it. @@ -705,6 +718,7 @@ struct DDQueue { suppressIntervals(0), rawProcessingUnhealthy(new AsyncVar(false)), rawProcessingWiggle(new AsyncVar(false)), unhealthyRelocations(0), movedKeyServersEventHolder(makeReference("MovedKeyServers")) {} + DDQueue() = default; void validate() { if (EXPENSIVE_VALIDATION) { @@ -1269,7 +1283,7 @@ struct DDQueue { serverCounter.traceAll(distributorId); serverCounter.clear(); }; - return recurring(f, SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL, TaskPriority::FlushTrace); + return recurring(f, SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL); } }; @@ -2460,3 +2474,27 @@ ACTOR Future dataDistributionQueue(Database cx, throw e; } } + +TEST_CASE("/DataDistribution/DDQueue/ServerCounterTrace") { + state double duration = 2 * SERVER_KNOBS->DD_QUEUE_COUNTER_REFRESH_INTERVAL; + state DDQueue self; + state Future counterFuture = self.periodicalRefreshCounter(); + state Future finishFuture = delay(duration); + std::cout << "Start trace counter unit test for " << duration << "s ...\n"; + loop choose { + when(wait(counterFuture)) {} + when(wait(finishFuture)) { break; } + when(wait(delayJittered(2.0))) { + std::vector team(3); + for (int i = 0; i < team.size(); ++i) { + team[i] = UID(deterministicRandom()->randomInt(1, 5), 0); + } + auto reason = RelocateReason(deterministicRandom()->randomInt(0, RelocateReason::typeCount())); + auto countType = DDQueue::ServerCounter::randomCountType(); + self.serverCounter.increaseForTeam(team, reason, countType); + ASSERT(self.serverCounter.get(team[0], reason, countType)); + } + } + std::cout << "Finished."; + return Void(); +} \ No newline at end of file diff --git a/fdbserver/include/fdbserver/DataDistribution.actor.h b/fdbserver/include/fdbserver/DataDistribution.actor.h index aeebd884c2..ff9d8d1ca3 100644 --- a/fdbserver/include/fdbserver/DataDistribution.actor.h +++ b/fdbserver/include/fdbserver/DataDistribution.actor.h @@ -55,6 +55,7 @@ public: } } operator int() const { return (int)value; } + constexpr static int8_t typeCount() { return 3; } private: Value value;